From a74cc3b6244a688507e35bc38d81a1ba43f23190 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 2 Oct 2019 22:49:34 +0200 Subject: [PATCH 1/3] Create tables sequentially, manually create migrators (#165) --- moor/lib/src/runtime/database.dart | 25 +++++++++++++++++-------- moor/lib/src/runtime/migration.dart | 4 +++- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/moor/lib/src/runtime/database.dart b/moor/lib/src/runtime/database.dart index cb307d7f..e3c9b243 100644 --- a/moor/lib/src/runtime/database.dart +++ b/moor/lib/src/runtime/database.dart @@ -263,8 +263,8 @@ mixin QueryEngine on DatabaseConnectionUser { /// Executes the custom sql [statement] on the database. @protected @visibleForTesting - Future customStatement(String statement) { - return _resolvedEngine.executor.runCustom(statement); + Future customStatement(String statement, [List args]) { + return _resolvedEngine.executor.runCustom(statement, args); } /// Executes [action] in a transaction, which means that all its queries and @@ -366,15 +366,24 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser executor?.databaseInfo = this; } - /// Creates a migrator with the provided query executor. We sometimes can't - /// use the regular [GeneratedDatabase.executor] because migration happens - /// before that executor is ready. - Migrator _createMigrator(SqlExecutor executor) => Migrator(this, executor); + /// Creates a [Migrator] with the provided query executor. Migrators generate + /// sql statements to create or drop tables. + /// + /// This api is mainly used internally in moor, for instance in + /// [handleDatabaseCreation] and [handleDatabaseVersionChange]. However, it + /// can also be used if you need to create tables manually and outside of a + /// [MigrationStrategy]. For almost all use cases, overriding [migration] + /// should suffice. + @protected + Migrator createMigrator([SqlExecutor executor]) { + final actualExecutor = executor ?? customStatement; + return Migrator(this, actualExecutor); + } /// Handles database creation by delegating the work to the [migration] /// strategy. This method should not be called by users. Future handleDatabaseCreation({@required SqlExecutor executor}) { - final migrator = _createMigrator(executor); + final migrator = createMigrator(executor); return _resolvedMigration.onCreate(migrator); } @@ -382,7 +391,7 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser /// strategy. This method should not be called by users. Future handleDatabaseVersionChange( {@required SqlExecutor executor, int from, int to}) { - final migrator = _createMigrator(executor); + final migrator = createMigrator(executor); return _resolvedMigration.onUpgrade(migrator, from, to); } diff --git a/moor/lib/src/runtime/migration.dart b/moor/lib/src/runtime/migration.dart index 40ca77aa..6df9809c 100644 --- a/moor/lib/src/runtime/migration.dart +++ b/moor/lib/src/runtime/migration.dart @@ -56,7 +56,9 @@ class Migrator { /// Creates all tables specified for the database, if they don't exist Future createAllTables() async { - return Future.wait(_db.allTables.map(createTable)); + for (var table in _db.allTables) { + await createTable(table); + } } GenerationContext _createContext() { From c2845cb2486107dfa63199747d027971162e4606 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 3 Oct 2019 10:40:31 +0200 Subject: [PATCH 2/3] Invalidate cached stream data on update (#166) --- .../src/runtime/executor/stream_queries.dart | 20 +++++++- .../utils/start_with_value_transformer.dart | 26 +++++----- moor/test/select_test.dart | 13 +++-- moor/test/streams_test.dart | 48 +++++++++++++++++-- 4 files changed, 82 insertions(+), 25 deletions(-) diff --git a/moor/lib/src/runtime/executor/stream_queries.dart b/moor/lib/src/runtime/executor/stream_queries.dart index 0508d364..f2b8a885 100644 --- a/moor/lib/src/runtime/executor/stream_queries.dart +++ b/moor/lib/src/runtime/executor/stream_queries.dart @@ -61,8 +61,14 @@ class StreamKey { /// them when needed. class StreamQueryStore { final Map _activeKeyStreams = {}; + + // Why is this stream synchronous? We want to dispatch table updates before + // the future from the query completes. This allows streams to invalidate + // their cached data before the user can send another query. + // There shouldn't be a problem as this stream is not exposed in any user- + // facing api. final StreamController> _updatedTableNames = - StreamController.broadcast(); + StreamController.broadcast(sync: true); StreamQueryStore(); @@ -151,7 +157,11 @@ class QueryStream { final names = _fetcher.readsFrom.map((t) => t.actualTableName).toSet(); _tablesChangedSubscription = _store._updatedTableNames.stream .where((changed) => changed.any(names.contains)) - .listen((_) => fetchAndEmitData()); + .listen((_) { + // table has changed, invalidate cache + _lastData = null; + fetchAndEmitData(); + }); } void _onCancel() { @@ -159,6 +169,12 @@ class QueryStream { _tablesChangedSubscription?.cancel(); _tablesChangedSubscription = null; + // we don't listen for table updates anymore, and we're guaranteed to + // re-fetch data after a new listener comes in. We can't know if the table + // was updated in the meantime, but let's delete the cached data just in + // case + _lastData = null; + _store.markAsClosed(this); } diff --git a/moor/lib/src/utils/start_with_value_transformer.dart b/moor/lib/src/utils/start_with_value_transformer.dart index 6fbae560..d80d1d7f 100644 --- a/moor/lib/src/utils/start_with_value_transformer.dart +++ b/moor/lib/src/utils/start_with_value_transformer.dart @@ -21,21 +21,21 @@ class StartWithValueTransformer extends StreamTransformerBase { controller ..onListen = () { - final data = _value(); - if (data != null) { - // Dart's stream contract specifies that listeners are only notified - // after the .listen() code completes. So, we add the initial data in - // a later microtask. - scheduleMicrotask(() { + // Dart's stream contract specifies that listeners are only notified + // after the .listen() code completes. So, we add the initial data in + // a later microtask. + scheduleMicrotask(() { + final data = _value(); + if (data != null) { controller.add(data); - }); - } + } - subscription = stream.listen( - controller.add, - onError: controller.addError, - onDone: controller.close, - ); + subscription = stream.listen( + controller.add, + onError: controller.addError, + onDone: controller.close, + ); + }); } ..onCancel = () { // not using a tear-off here because subscription.cancel is null before diff --git a/moor/test/select_test.dart b/moor/test/select_test.dart index fd34f066..e374fac2 100644 --- a/moor/test/select_test.dart +++ b/moor/test/select_test.dart @@ -129,7 +129,7 @@ void main() { expect(db.select(db.todosTable).getSingle(), completion(_todoEntry)); }); - test('get multiple times', () { + test('get multiple times', () async { final resultRows = >>[ [_dataOfTodoEntry], [], @@ -141,12 +141,15 @@ void main() { return Future.value(resultRows[_currentRow++]); }); - expectLater(db.select(db.todosTable).watchSingle(), + final expectation = expectLater(db.select(db.todosTable).watchSingle(), emitsInOrder([_todoEntry, null, emitsError(anything)])); - db - ..markTablesUpdated({db.todosTable}) - ..markTablesUpdated({db.todosTable}); + await pumpEventQueue(times: 1); + db.markTablesUpdated({db.todosTable}); + await pumpEventQueue(times: 1); + db.markTablesUpdated({db.todosTable}); + + await expectation; }); }); } diff --git a/moor/test/streams_test.dart b/moor/test/streams_test.dart index 52e276b6..c827e6c1 100644 --- a/moor/test/streams_test.dart +++ b/moor/test/streams_test.dart @@ -14,18 +14,20 @@ void main() { db = TodoDb(executor); }); - test('streams fetch when the first listener attaches', () { + test('streams fetch when the first listener attaches', () async { final stream = db.select(db.users).watch(); verifyNever(executor.runSelect(any, any)); stream.listen((_) {}); + await pumpEventQueue(times: 1); verify(executor.runSelect(any, any)).called(1); }); test('streams fetch when the underlying data changes', () async { db.select(db.users).watch().listen((_) {}); + await pumpEventQueue(times: 1); db.markTablesUpdated({db.users}); await pumpEventQueue(times: 1); @@ -39,6 +41,7 @@ void main() { final second = db.alias(db.users, 'two'); db.select(first).watch().listen((_) {}); + await pumpEventQueue(times: 1); db.markTablesUpdated({second}); await pumpEventQueue(times: 1); @@ -50,19 +53,53 @@ void main() { when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([])); final first = (db.select(db.users).watch()); - expect(first, emits(isEmpty)); + await expectLater(first, emits(isEmpty)); clearInteractions(executor); final second = (db.select(db.users).watch()); - expect(second, emits(isEmpty)); + await expectLater(second, emits(isEmpty)); - await pumpEventQueue(times: 1); // calling executor.dialect is ok, it's needed to construct the statement verify(executor.dialect); verifyNoMoreInteractions(executor); }); + group('updating clears cached data', () { + test('when an older stream is no longer listened to', () async { + when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([])); + final first = db.select(db.categories).watch(); + await first.first; // subscribe to first stream, then drop subscription + + when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([ + {'id': 1, 'description': 'd'} + ])); + await db + .into(db.categories) + .insert(CategoriesCompanion.insert(description: 'd')); + + final second = db.select(db.categories).watch(); + expect(second.first, completion(isNotEmpty)); + }); + + test('when an older stream is still listened to', () async { + when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([])); + final first = db.select(db.categories).watch(); + final subscription = first.listen((_) {}); + + when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([ + {'id': 1, 'description': 'd'} + ])); + await db + .into(db.categories) + .insert(CategoriesCompanion.insert(description: 'd')); + + final second = db.select(db.categories).watch(); + expect(second.first, completion(isNotEmpty)); + await subscription.cancel(); + }); + }); + test('every stream instance can be listened to', () async { when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([])); @@ -72,7 +109,7 @@ void main() { await first.first; // will listen to stream, then cancel await pumpEventQueue(times: 1); // give cancel event time to propagate - final checkEmits = expectLater(second, emitsInOrder([[], []])); + final checkEmits = expectLater(second, emitsInOrder([[]])); db.markTablesUpdated({db.users}); await pumpEventQueue(times: 1); @@ -99,6 +136,7 @@ void main() { await stream.first; // listen to stream, then cancel await pumpEventQueue(); // should remove the stream from the cache await stream.first; // listen again + await pumpEventQueue(times: 1); verify(executor.runSelect(any, any)).called(2); }); From b0d69f346f0aa7c11a3fc8a2ffc82d767de719f5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 3 Oct 2019 11:39:12 +0200 Subject: [PATCH 3/3] Make StartWithValueTransformer subscribe in same microtask --- .../src/utils/start_with_value_transformer.dart | 14 ++++++++------ moor/test/select_test.dart | 13 +++++-------- moor/test/streams_test.dart | 10 ++++------ 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/moor/lib/src/utils/start_with_value_transformer.dart b/moor/lib/src/utils/start_with_value_transformer.dart index d80d1d7f..a4c7f238 100644 --- a/moor/lib/src/utils/start_with_value_transformer.dart +++ b/moor/lib/src/utils/start_with_value_transformer.dart @@ -29,13 +29,15 @@ class StartWithValueTransformer extends StreamTransformerBase { if (data != null) { controller.add(data); } - - subscription = stream.listen( - controller.add, - onError: controller.addError, - onDone: controller.close, - ); }); + + // the .listen will run in a later microtask, so the cached data would + // still be added first. + subscription = stream.listen( + controller.add, + onError: controller.addError, + onDone: controller.close, + ); } ..onCancel = () { // not using a tear-off here because subscription.cancel is null before diff --git a/moor/test/select_test.dart b/moor/test/select_test.dart index e374fac2..fd34f066 100644 --- a/moor/test/select_test.dart +++ b/moor/test/select_test.dart @@ -129,7 +129,7 @@ void main() { expect(db.select(db.todosTable).getSingle(), completion(_todoEntry)); }); - test('get multiple times', () async { + test('get multiple times', () { final resultRows = >>[ [_dataOfTodoEntry], [], @@ -141,15 +141,12 @@ void main() { return Future.value(resultRows[_currentRow++]); }); - final expectation = expectLater(db.select(db.todosTable).watchSingle(), + expectLater(db.select(db.todosTable).watchSingle(), emitsInOrder([_todoEntry, null, emitsError(anything)])); - await pumpEventQueue(times: 1); - db.markTablesUpdated({db.todosTable}); - await pumpEventQueue(times: 1); - db.markTablesUpdated({db.todosTable}); - - await expectation; + db + ..markTablesUpdated({db.todosTable}) + ..markTablesUpdated({db.todosTable}); }); }); } diff --git a/moor/test/streams_test.dart b/moor/test/streams_test.dart index c827e6c1..a83d9350 100644 --- a/moor/test/streams_test.dart +++ b/moor/test/streams_test.dart @@ -14,20 +14,18 @@ void main() { db = TodoDb(executor); }); - test('streams fetch when the first listener attaches', () async { + test('streams fetch when the first listener attaches', () { final stream = db.select(db.users).watch(); verifyNever(executor.runSelect(any, any)); stream.listen((_) {}); - await pumpEventQueue(times: 1); verify(executor.runSelect(any, any)).called(1); }); test('streams fetch when the underlying data changes', () async { db.select(db.users).watch().listen((_) {}); - await pumpEventQueue(times: 1); db.markTablesUpdated({db.users}); await pumpEventQueue(times: 1); @@ -53,12 +51,12 @@ void main() { when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([])); final first = (db.select(db.users).watch()); - await expectLater(first, emits(isEmpty)); + expect(first, emits(isEmpty)); clearInteractions(executor); final second = (db.select(db.users).watch()); - await expectLater(second, emits(isEmpty)); + expect(second, emits(isEmpty)); // calling executor.dialect is ok, it's needed to construct the statement verify(executor.dialect); @@ -109,7 +107,7 @@ void main() { await first.first; // will listen to stream, then cancel await pumpEventQueue(times: 1); // give cancel event time to propagate - final checkEmits = expectLater(second, emitsInOrder([[]])); + final checkEmits = expectLater(second, emitsInOrder([[], []])); db.markTablesUpdated({db.users}); await pumpEventQueue(times: 1);