diff --git a/moor/lib/src/runtime/executor/stream_queries.dart b/moor/lib/src/runtime/executor/stream_queries.dart index 4f2479bf..691587bf 100644 --- a/moor/lib/src/runtime/executor/stream_queries.dart +++ b/moor/lib/src/runtime/executor/stream_queries.dart @@ -103,7 +103,13 @@ class StreamQueryStore { /// Handles updates on a given table by re-executing all queries that read /// from that table. Future handleTableUpdates(Set tables) async { - _updatedTableNames.add(tables.map((t) => t.actualTableName).toSet()); + handleTableUpdatesByName(tables.map((t) => t.actualTableName).toSet()); + } + + /// Handles updates on tables by their name. All queries reading from any of + /// the tables in [updatedTableNames] will fetch their data again. + void handleTableUpdatesByName(Set updatedTableNames) { + _updatedTableNames.add(updatedTableNames); } void markAsClosed(QueryStream stream) { diff --git a/moor/lib/src/runtime/isolate/client.dart b/moor/lib/src/runtime/isolate/client.dart index 072a6b11..4f84ef83 100644 --- a/moor/lib/src/runtime/isolate/client.dart +++ b/moor/lib/src/runtime/isolate/client.dart @@ -3,6 +3,7 @@ part of 'moor_isolate.dart'; class _MoorClient { final IsolateCommunication _channel; final SqlTypeSystem typeSystem; + _IsolateStreamQueryStore _streamStore; DatabaseConnection _connection; @@ -11,10 +12,12 @@ class _MoorClient { SqlExecutor get executor => _connection.executor.runCustom; _MoorClient(this._channel, this.typeSystem) { + _streamStore = _IsolateStreamQueryStore(this); + _connection = DatabaseConnection( typeSystem, _IsolateQueryExecutor(this), - null, + _streamStore, ); _channel.setRequestHandler(_handleRequest); } @@ -48,6 +51,8 @@ class _MoorClient { } else if (payload is _RunBeforeOpen) { return connectedDb.beforeOpenCallback( _connection.executor, payload.details); + } else if (payload is _NotifyTablesUpdated) { + _streamStore.handleTableUpdatesByName(payload.updatedTables.toSet()); } } } @@ -119,3 +124,18 @@ class _IsolateQueryExecutor extends QueryExecutor { return Future.value(); } } + +class _IsolateStreamQueryStore extends StreamQueryStore { + final _MoorClient client; + + _IsolateStreamQueryStore(this.client); + + @override + Future handleTableUpdates(Set tables) { + // we're not calling super.handleTableUpdates because the server will send + // a notification of those tables to all clients, including the one who sent + // this. When we get that reply, we update the tables. + return client._channel.request( + _NotifyTablesUpdated(tables.map((t) => t.actualTableName).toList())); + } +} diff --git a/moor/lib/src/runtime/isolate/communication.dart b/moor/lib/src/runtime/isolate/communication.dart index 39db8f2b..ce0bffc1 100644 --- a/moor/lib/src/runtime/isolate/communication.dart +++ b/moor/lib/src/runtime/isolate/communication.dart @@ -129,6 +129,7 @@ class IsolateCommunication { incomingRequests.listen((request) { try { final result = handler(request); + if (result is Future) { result.then((value) => respond(request, value)); } else { diff --git a/moor/lib/src/runtime/isolate/moor_isolate.dart b/moor/lib/src/runtime/isolate/moor_isolate.dart index 85042b27..3ebbbc40 100644 --- a/moor/lib/src/runtime/isolate/moor_isolate.dart +++ b/moor/lib/src/runtime/isolate/moor_isolate.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:isolate'; import 'package:moor/moor.dart'; +import 'package:moor/src/runtime/executor/stream_queries.dart'; import 'communication.dart'; part 'client.dart'; diff --git a/moor/lib/src/runtime/isolate/protocol.dart b/moor/lib/src/runtime/isolate/protocol.dart index 4e1ba6f1..8f3e6b79 100644 --- a/moor/lib/src/runtime/isolate/protocol.dart +++ b/moor/lib/src/runtime/isolate/protocol.dart @@ -54,8 +54,19 @@ class _RunOnUpgrade { _RunOnUpgrade(this.versionBefore, this.versionNow); } +/// Sent from the server to the client when it should run the before open +/// callback. class _RunBeforeOpen { final OpeningDetails details; _RunBeforeOpen(this.details); } + +/// Sent to notify that a previous query has updated some tables. When a server +/// receives this message, it replies with `null` but forwards a new request +/// with this payload to all connected clients. +class _NotifyTablesUpdated { + final List updatedTables; + + _NotifyTablesUpdated(this.updatedTables); +} diff --git a/moor/lib/src/runtime/isolate/server.dart b/moor/lib/src/runtime/isolate/server.dart index d5973b70..04a1f9a3 100644 --- a/moor/lib/src/runtime/isolate/server.dart +++ b/moor/lib/src/runtime/isolate/server.dart @@ -43,6 +43,10 @@ class _MoorServer { return null; } else if (payload is _ExecuteQuery) { return _runQuery(payload.method, payload.sql, payload.args); + } else if (payload is _NotifyTablesUpdated) { + for (var connected in server.currentChannels) { + connected.request(payload); + } } } diff --git a/moor/test/isolate_test.dart b/moor/test/isolate_test.dart index 433ddf6b..adceeebc 100644 --- a/moor/test/isolate_test.dart +++ b/moor/test/isolate_test.dart @@ -11,7 +11,7 @@ void main() { setUp(() async { isolate = await MoorIsolate.spawn(_backgroundConnection); - isolateConnection = await isolate.connect(isolateDebugLog: false); + isolateConnection = await isolate.connect(isolateDebugLog: true); }); tearDown(() { @@ -25,6 +25,20 @@ void main() { final result = await database.select(database.todosTable).get(); expect(result, isEmpty); }); + + test('stream queries work as expected', () async { + final database = TodoDb.connect(isolateConnection); + final initialCompanion = TodosTableCompanion.insert(content: 'my content'); + + final stream = database.select(database.todosTable).watchSingle(); + final expectation = expectLater( + stream, + emitsInOrder([null, TodoEntry(id: 1, content: 'my content')]), + ); + + await database.into(database.todosTable).insert(initialCompanion); + await expectation; + }); } DatabaseConnection _backgroundConnection() {