diff --git a/moor/lib/src/runtime/database.dart b/moor/lib/src/runtime/database.dart index a2369f33..34829ead 100644 --- a/moor/lib/src/runtime/database.dart +++ b/moor/lib/src/runtime/database.dart @@ -46,26 +46,17 @@ abstract class DatabaseConnectionUser { executor = executor ?? other.executor, streamQueries = streamQueries ?? other.streamQueries; - /// Marks the table as updated. This method will be called internally whenever - /// a update, delete or insert statement is issued on the database. We can - /// then inform all active select-streams on that table that their snapshot - /// might be out-of-date and needs to be fetched again. - @Deprecated('Use markTablesUpdated instead') - void markTableUpdated(String tableName) { - markTablesUpdated({tableName}); - } - /// Marks the tables as updated. This method will be called internally /// whenever a update, delete or insert statement is issued on the database. /// We can then inform all active select-streams on those tables that their /// snapshot might be out-of-date and needs to be fetched again. - void markTablesUpdated(Set tables) { + void markTablesUpdated(Set tables) { streamQueries.handleTableUpdates(tables); } /// Creates and auto-updating stream from the given select statement. This /// method should not be used directly. - Stream> createStream(TableChangeListener> stmt) => + Stream createStream(QueryStreamFetcher stmt) => streamQueries.registerStream(stmt); } @@ -120,8 +111,7 @@ mixin QueryEngine on DatabaseConnectionUser { executor.doWhenOpened((_) => executor.runUpdate(query, mappedArgs)); if (updates != null) { - await streamQueries - .handleTableUpdates(updates.map((t) => t.$tableName).toSet()); + await streamQueries.handleTableUpdates(updates); } return affectedRows; @@ -132,7 +122,8 @@ mixin QueryEngine on DatabaseConnectionUser { /// value. Future> customSelect(String query, {List variables = const []}) async { - return CustomSelectStatement(query, variables, {}, this).read(); + return CustomSelectStatement(query, variables, {}, this) + .execute(); } /// Creates a stream from a custom select statement.To use the variables, mark @@ -143,7 +134,8 @@ mixin QueryEngine on DatabaseConnectionUser { Stream> customSelectStream(String query, {List variables = const [], Set readsFrom}) { final tables = readsFrom ?? {}; - return createStream(CustomSelectStatement(query, variables, tables, this)); + final statement = CustomSelectStatement(query, variables, tables, this); + return createStream(statement.constructFetcher()); } } diff --git a/moor/lib/src/runtime/executor/stream_queries.dart b/moor/lib/src/runtime/executor/stream_queries.dart index 443f20d7..74c1c42e 100644 --- a/moor/lib/src/runtime/executor/stream_queries.dart +++ b/moor/lib/src/runtime/executor/stream_queries.dart @@ -1,38 +1,95 @@ import 'dart:async'; +import 'package:collection/collection.dart'; +import 'package:meta/meta.dart'; import 'package:moor/moor.dart'; -/// Internal interface to mark classes that respond to table changes -abstract class TableChangeListener { - /// Called to check if this listener should update after any table who's name - /// is in the set has changed. - bool isAffectedBy(Set tables); +const _listEquality = ListEquality(); - /// Called to reload data from the table after it has changed. - Future handleDataChanged(); +/// Representation of a select statement that knows from which tables the +/// statement is reading its data and how to execute the query. +class QueryStreamFetcher { + /// The set of tables this query reads from. If any of these tables changes, + /// the stream must fetch its again. + final Set readsFrom; + + /// Key that can be used to check whether two fetchers will yield the same + /// result when operating on the same data. + final StreamKey key; + + /// Function that asynchronously fetches the latest set of data. + final Future Function() fetchData; + + QueryStreamFetcher( + {@required this.readsFrom, this.key, @required this.fetchData}); +} + +/// Key that uniquely identifies a select statement. If two keys created from +/// two select statements are equal, the statements are equal as well. +/// +/// As two equal statements always yield the same result when operating on the +/// same data, this can make streams more efficient as we can return the same +/// stream for two equivalent queries. +class StreamKey { + final String sql; + final List variables; + + /// Used to differentiate between custom streams, which return a [QueryRow], + /// and regular streams, which return an instance of a generated data class. + final Type returnType; + + StreamKey(this.sql, this.variables, this.returnType); + + @override + int get hashCode { + return (sql.hashCode * 31 + _listEquality.hash(variables)) * 31 + + returnType.hashCode; + } + + @override + bool operator ==(other) { + return identical(this, other) || + (other is StreamKey && + other.sql == sql && + _listEquality.equals(other.variables, variables) && + other.returnType == returnType); + } } /// Keeps track of active streams created from [SelectStatement]s and updates /// them when needed. class StreamQueryStore { - final List _activeStreams = []; - - // todo cache streams (return same instance for same sql + variables) + final List _activeStreamsWithoutKey = []; + final Map _activeKeyStreams = {}; StreamQueryStore(); + Iterable get _activeStreams { + return _activeKeyStreams.values.followedBy(_activeStreamsWithoutKey); + } + /// Creates a new stream from the select statement. - Stream> registerStream(TableChangeListener> statement) { - final stream = QueryStream(statement, this); - _activeStreams.add(stream); - return stream.stream; + Stream registerStream(QueryStreamFetcher fetcher) { + final key = fetcher.key; + + if (key == null) { + final stream = QueryStream(fetcher, this); + _activeStreamsWithoutKey.add(stream); + return stream.stream; + } else { + final stream = _activeKeyStreams.putIfAbsent(key, () { + return QueryStream(fetcher, this); + }); + + return (stream as QueryStream).stream; + } } /// Handles updates on a given table by re-executing all queries that read /// from that table. - Future handleTableUpdates(Set tables) async { + Future handleTableUpdates(Set tables) async { final affectedStreams = _activeStreams - .where((stream) => stream.isAffectedByTableChange(tables)); + .where((stream) => stream._fetcher.readsFrom.any(tables.contains)); for (var stream in affectedStreams) { await stream.fetchAndEmitData(); @@ -40,15 +97,22 @@ class StreamQueryStore { } void markAsClosed(QueryStream stream) { - _activeStreams.remove(stream); + final key = stream._fetcher.key; + if (key == null) { + _activeStreamsWithoutKey.remove(stream); + } else { + _activeKeyStreams.remove(key); + } } } class QueryStream { - final TableChangeListener listener; + final QueryStreamFetcher _fetcher; final StreamQueryStore _store; StreamController _controller; + // caching the stream so that the stream getter always returns the same stream + Stream _stream; Stream get stream { _controller ??= StreamController.broadcast( @@ -56,10 +120,10 @@ class QueryStream { onCancel: _onCancel, ); - return _controller.stream; + return _stream ??= _controller.stream; } - QueryStream(this.listener, this._store); + QueryStream(this._fetcher, this._store); void _onListen() { // first listener added, fetch query @@ -80,13 +144,10 @@ class QueryStream { // Fetch data if it's needed, publish that data if it's possible. if (!_controller.hasListener) return; - final data = await listener.handleDataChanged(); + final data = await _fetcher.fetchData(); if (!_controller.isClosed) { _controller.add(data); } } - - bool isAffectedByTableChange(Set tables) => - listener.isAffectedBy(tables); } diff --git a/moor/lib/src/runtime/executor/transactions.dart b/moor/lib/src/runtime/executor/transactions.dart index e9568a13..1451387b 100644 --- a/moor/lib/src/runtime/executor/transactions.dart +++ b/moor/lib/src/runtime/executor/transactions.dart @@ -21,18 +21,18 @@ class Transaction extends DatabaseConnectionUser with QueryEngine { /// updates to the outer stream query store when the transaction is completed. class _TransactionStreamStore extends StreamQueryStore { final StreamQueryStore parent; - final Set affectedTables = {}; + final Set affectedTables = {}; _TransactionStreamStore(this.parent); @override - Stream> registerStream(TableChangeListener> statement) { + Stream registerStream(QueryStreamFetcher statement) { throw StateError('Streams cannot be created inside a transaction. See the ' 'documentation of GeneratedDatabase.transaction for details.'); } @override - Future handleTableUpdates(Set tables) { + Future handleTableUpdates(Set tables) { affectedTables.addAll(tables); return Future.value(null); } diff --git a/moor/lib/src/runtime/statements/delete.dart b/moor/lib/src/runtime/statements/delete.dart index 23e07f90..c6fe6793 100644 --- a/moor/lib/src/runtime/statements/delete.dart +++ b/moor/lib/src/runtime/statements/delete.dart @@ -36,7 +36,7 @@ class DeleteStatement extends Query { await ctx.database.executor.runDelete(ctx.sql, ctx.boundVariables); if (rows > 0) { - database.markTablesUpdated({table.$tableName}); + database.markTablesUpdated({table}); } return rows; }); diff --git a/moor/lib/src/runtime/statements/insert.dart b/moor/lib/src/runtime/statements/insert.dart index 9745d62d..3c7e3290 100644 --- a/moor/lib/src/runtime/statements/insert.dart +++ b/moor/lib/src/runtime/statements/insert.dart @@ -56,7 +56,7 @@ class InsertStatement { await database.executor.doWhenOpened((e) async { await database.executor.runInsert(ctx.sql, ctx.boundVariables); - database.markTablesUpdated({table.$tableName}); + database.markTablesUpdated({table}); }); } diff --git a/moor/lib/src/runtime/statements/select.dart b/moor/lib/src/runtime/statements/select.dart index 0deef0f1..9fab4405 100644 --- a/moor/lib/src/runtime/statements/select.dart +++ b/moor/lib/src/runtime/statements/select.dart @@ -10,8 +10,7 @@ import 'package:moor/src/runtime/structure/table_info.dart'; typedef OrderingTerm OrderClauseGenerator(T tbl); -class SelectStatement extends Query - implements TableChangeListener> { +class SelectStatement extends Query { SelectStatement(QueryEngine database, TableInfo table) : super(database, table); @@ -23,7 +22,10 @@ class SelectStatement extends Query /// Loads and returns all results from this select query. Future> get() async { final ctx = constructQuery(); + return _getWithQuery(ctx); + } + Future> _getWithQuery(GenerationContext ctx) async { final results = await ctx.database.executor.doWhenOpened((e) async { return await ctx.database.executor.runSelect(ctx.sql, ctx.boundVariables); }); @@ -47,21 +49,18 @@ class SelectStatement extends Query /// Creates an auto-updating stream that emits new items whenever this table /// changes. Stream> watch() { - return database.createStream(this); - } + final query = constructQuery(); + final fetcher = QueryStreamFetcher>( + readsFrom: {table}, + fetchData: () => _getWithQuery(query), + key: StreamKey(query.sql, query.boundVariables, D), + ); - @override - Future> handleDataChanged() { - return get(); - } - - @override - bool isAffectedBy(Set tables) { - return tables.contains(super.table.$tableName); + return database.createStream(fetcher); } } -class CustomSelectStatement implements TableChangeListener> { +class CustomSelectStatement { /// Tables this select statement reads from final Set tables; final String query; @@ -70,26 +69,35 @@ class CustomSelectStatement implements TableChangeListener> { CustomSelectStatement(this.query, this.variables, this.tables, this.db); - Future> read() => handleDataChanged(); + QueryStreamFetcher> constructFetcher() { + final args = _mapArgs(); - @override - Future> handleDataChanged() async { + return QueryStreamFetcher>( + readsFrom: tables, + fetchData: () => _executeWithMappedArgs(args), + key: StreamKey(query, args, QueryRow), + ); + } + + Future> execute() async { + return _executeWithMappedArgs(_mapArgs()); + } + + List _mapArgs() { final ctx = GenerationContext(db); - final mappedArgs = variables.map((v) => v.mapToSimpleValue(ctx)).toList(); + return variables.map((v) => v.mapToSimpleValue(ctx)).toList(); + } + Future> _executeWithMappedArgs( + List mappedArgs) async { final result = await db.executor.doWhenOpened((e) => e.runSelect(query, mappedArgs)); return result.map((row) => QueryRow(row, db)).toList(); } - - @override - bool isAffectedBy(Set tables) { - return this.tables.intersection(tables).isNotEmpty; - } } -/// For custom select statement, represents a row in the result set. +/// For custom select statements, represents a row in the result set. class QueryRow { final Map data; final QueryEngine _db; diff --git a/moor/lib/src/runtime/statements/update.dart b/moor/lib/src/runtime/statements/update.dart index 8c39c1b9..62077971 100644 --- a/moor/lib/src/runtime/statements/update.dart +++ b/moor/lib/src/runtime/statements/update.dart @@ -36,7 +36,7 @@ class UpdateStatement extends Query { }); if (rows > 0) { - database.markTablesUpdated({table.$tableName}); + database.markTablesUpdated({table}); } return rows; diff --git a/moor/pubspec.yaml b/moor/pubspec.yaml index 5bb0492e..ce3f7db3 100644 --- a/moor/pubspec.yaml +++ b/moor/pubspec.yaml @@ -11,6 +11,7 @@ environment: dependencies: meta: '>= 1.0.0 <2.0.0' + collection: '>= 1.0.0 <2.0.0' dev_dependencies: moor_generator: ^1.1.1 @@ -21,11 +22,11 @@ dev_dependencies: mockito: ^4.0.0 grinder: ^0.8.0 coverage: ^0.12.0 + coveralls: ^5.1.0 dependency_overrides: moor_generator: path: ../moor_generator - coveralls: ^5.1.0 # Temporarily use my fork because it can collect coverage when running tests with the test runner coverage: git: https://github.com/simolus3/coverage.git diff --git a/moor/test/delete_test.dart b/moor/test/delete_test.dart index 4c89b082..3361fe53 100644 --- a/moor/test/delete_test.dart +++ b/moor/test/delete_test.dart @@ -56,7 +56,7 @@ void main() { await db.delete(db.users).go(); - verify(streamQueries.handleTableUpdates({'users'})); + verify(streamQueries.handleTableUpdates({db.users})); }); test('are not issued when no data was changed', () async { diff --git a/moor/test/insert_test.dart b/moor/test/insert_test.dart index ceeaecf1..4872dda4 100644 --- a/moor/test/insert_test.dart +++ b/moor/test/insert_test.dart @@ -42,7 +42,7 @@ void main() { profilePicture: Uint8List(0), )); - verify(streamQueries.handleTableUpdates({'users'})); + verify(streamQueries.handleTableUpdates({db.users})); }); test('enforces data integrety', () { diff --git a/moor/test/streams_test.dart b/moor/test/streams_test.dart index a85ec68c..33627269 100644 --- a/moor/test/streams_test.dart +++ b/moor/test/streams_test.dart @@ -1,3 +1,5 @@ +import 'package:moor/moor.dart'; +import 'package:moor/src/runtime/executor/stream_queries.dart'; import 'package:test_api/test_api.dart'; import 'data/tables/todos.dart'; @@ -24,17 +26,52 @@ void main() { test('streams fetch when the underlying data changes', () { db.select(db.users).watch().listen((_) {}); - db.markTablesUpdated({'users'}); + db.markTablesUpdated({db.users}); // twice: Once because the listener attached, once because the data changed verify(executor.runSelect(any, any)).called(2); }); + test('equal statements yield identical streams', () { + final firstStream = (db.select(db.users).watch())..listen((_) {}); + final secondStream = (db.select(db.users).watch())..listen((_) {}); + + expect(identical(firstStream, secondStream), true); + }); + + group('stream keys', () { + final keyA = StreamKey('SELECT * FROM users;', [], User); + final keyB = StreamKey('SELECT * FROM users;', [], User); + final keyCustom = StreamKey('SELECT * FROM users;', [], QueryRow); + final keyCustomTodos = StreamKey('SELECT * FROM todos;', [], QueryRow); + final keyArgs = StreamKey('SELECT * FROM users;', ['name'], User); + + test('are equal for same parameters', () { + expect(keyA, equals(keyB)); + expect(keyA.hashCode, keyB.hashCode); + }); + + test('are not equal for different queries', () { + expect(keyCustomTodos, isNot(keyCustom)); + expect(keyCustomTodos.hashCode, isNot(keyCustom.hashCode)); + }); + + test('are not equal for different variables', () { + expect(keyArgs, isNot(keyA)); + expect(keyArgs.hashCode, isNot(keyA.hashCode)); + }); + + test('are not equal for different types', () { + expect(keyCustom, isNot(keyA)); + expect(keyCustom.hashCode, isNot(keyA.hashCode)); + }); + }); + group("streams don't fetch", () { test('when no listeners were attached', () { db.select(db.users).watch(); - db.markTablesUpdated({'users'}); + db.markTablesUpdated({db.users}); verifyNever(executor.runSelect(any, any)); }); @@ -44,7 +81,7 @@ void main() { clearInteractions(executor); subscription.cancel(); - db.markTablesUpdated({'users'}); + db.markTablesUpdated({db.users}); verifyNever(executor.runSelect(any, any)); }); diff --git a/moor/test/transactions_test.dart b/moor/test/transactions_test.dart index 46a3e868..d501e9a2 100644 --- a/moor/test/transactions_test.dart +++ b/moor/test/transactions_test.dart @@ -37,7 +37,7 @@ void main() { }); // After the transaction completes, the queries should be updated - verify(streamQueries.handleTableUpdates({'users'})).called(1); + verify(streamQueries.handleTableUpdates({db.users})).called(1); verify(executor.transactions.send()); }); } diff --git a/moor/test/update_test.dart b/moor/test/update_test.dart index 0d4385e7..ca977010 100644 --- a/moor/test/update_test.dart +++ b/moor/test/update_test.dart @@ -67,7 +67,7 @@ void main() { content: 'Updated content', )); - verify(streamQueries.handleTableUpdates({'todos'})); + verify(streamQueries.handleTableUpdates({db.todosTable})); }); test('are not issued when no data was changed', () async { @@ -109,7 +109,7 @@ void main() { test('informs about updated tables', () async { await db.customUpdate('', updates: {db.users, db.todosTable}); - verify(streamQueries.handleTableUpdates({'users', 'todos'})); + verify(streamQueries.handleTableUpdates({db.users, db.todosTable})); }); }); } diff --git a/moor_generator/lib/src/parser/table_parser.dart b/moor_generator/lib/src/parser/table_parser.dart index cd7bb7e2..f1392cf9 100644 --- a/moor_generator/lib/src/parser/table_parser.dart +++ b/moor_generator/lib/src/parser/table_parser.dart @@ -92,8 +92,8 @@ class TableParser extends ParserBase { if (expression is SetOrMapLiteral) { for (var entry in expression.elements2) { if (entry is Identifier) { - final column = columns.singleWhere( - (column) => column.dartGetterName == entry.name); + final column = columns + .singleWhere((column) => column.dartGetterName == entry.name); parsedPrimaryKey.add(column); } else { // Don't add an error, these features aren't on a stable dart release @@ -101,19 +101,19 @@ class TableParser extends ParserBase { print('Unexpected entry in expression.elements2: $entry'); } } - // ignore: deprecated_member_use + // ignore: deprecated_member_use } else if (expression is MapLiteral) { for (var entry in expression.entries) { final key = entry.key as Identifier; final column = - columns.singleWhere((column) => column.dartGetterName == key.name); + columns.singleWhere((column) => column.dartGetterName == key.name); parsedPrimaryKey.add(column); } - // ignore: deprecated_member_use + // ignore: deprecated_member_use } else if (expression is SetLiteral) { for (var entry in expression.elements) { final column = columns.singleWhere( - (column) => column.dartGetterName == (entry as Identifier).name); + (column) => column.dartGetterName == (entry as Identifier).name); parsedPrimaryKey.add(column); } } else {