Support stream queries across isolates

This commit is contained in:
Simon Binder 2019-10-30 21:10:53 +01:00
parent a2c7c11abf
commit f3221e09bc
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
7 changed files with 60 additions and 3 deletions

View File

@ -103,7 +103,13 @@ class StreamQueryStore {
/// Handles updates on a given table by re-executing all queries that read /// Handles updates on a given table by re-executing all queries that read
/// from that table. /// from that table.
Future<void> handleTableUpdates(Set<TableInfo> tables) async { Future<void> handleTableUpdates(Set<TableInfo> tables) async {
_updatedTableNames.add(tables.map((t) => t.actualTableName).toSet()); handleTableUpdatesByName(tables.map((t) => t.actualTableName).toSet());
}
/// Handles updates on tables by their name. All queries reading from any of
/// the tables in [updatedTableNames] will fetch their data again.
void handleTableUpdatesByName(Set<String> updatedTableNames) {
_updatedTableNames.add(updatedTableNames);
} }
void markAsClosed(QueryStream stream) { void markAsClosed(QueryStream stream) {

View File

@ -3,6 +3,7 @@ part of 'moor_isolate.dart';
class _MoorClient { class _MoorClient {
final IsolateCommunication _channel; final IsolateCommunication _channel;
final SqlTypeSystem typeSystem; final SqlTypeSystem typeSystem;
_IsolateStreamQueryStore _streamStore;
DatabaseConnection _connection; DatabaseConnection _connection;
@ -11,10 +12,12 @@ class _MoorClient {
SqlExecutor get executor => _connection.executor.runCustom; SqlExecutor get executor => _connection.executor.runCustom;
_MoorClient(this._channel, this.typeSystem) { _MoorClient(this._channel, this.typeSystem) {
_streamStore = _IsolateStreamQueryStore(this);
_connection = DatabaseConnection( _connection = DatabaseConnection(
typeSystem, typeSystem,
_IsolateQueryExecutor(this), _IsolateQueryExecutor(this),
null, _streamStore,
); );
_channel.setRequestHandler(_handleRequest); _channel.setRequestHandler(_handleRequest);
} }
@ -48,6 +51,8 @@ class _MoorClient {
} else if (payload is _RunBeforeOpen) { } else if (payload is _RunBeforeOpen) {
return connectedDb.beforeOpenCallback( return connectedDb.beforeOpenCallback(
_connection.executor, payload.details); _connection.executor, payload.details);
} else if (payload is _NotifyTablesUpdated) {
_streamStore.handleTableUpdatesByName(payload.updatedTables.toSet());
} }
} }
} }
@ -119,3 +124,18 @@ class _IsolateQueryExecutor extends QueryExecutor {
return Future.value(); return Future.value();
} }
} }
class _IsolateStreamQueryStore extends StreamQueryStore {
final _MoorClient client;
_IsolateStreamQueryStore(this.client);
@override
Future<void> handleTableUpdates(Set<TableInfo> tables) {
// we're not calling super.handleTableUpdates because the server will send
// a notification of those tables to all clients, including the one who sent
// this. When we get that reply, we update the tables.
return client._channel.request(
_NotifyTablesUpdated(tables.map((t) => t.actualTableName).toList()));
}
}

View File

@ -129,6 +129,7 @@ class IsolateCommunication {
incomingRequests.listen((request) { incomingRequests.listen((request) {
try { try {
final result = handler(request); final result = handler(request);
if (result is Future) { if (result is Future) {
result.then((value) => respond(request, value)); result.then((value) => respond(request, value));
} else { } else {

View File

@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:isolate'; import 'dart:isolate';
import 'package:moor/moor.dart'; import 'package:moor/moor.dart';
import 'package:moor/src/runtime/executor/stream_queries.dart';
import 'communication.dart'; import 'communication.dart';
part 'client.dart'; part 'client.dart';

View File

@ -54,8 +54,19 @@ class _RunOnUpgrade {
_RunOnUpgrade(this.versionBefore, this.versionNow); _RunOnUpgrade(this.versionBefore, this.versionNow);
} }
/// Sent from the server to the client when it should run the before open
/// callback.
class _RunBeforeOpen { class _RunBeforeOpen {
final OpeningDetails details; final OpeningDetails details;
_RunBeforeOpen(this.details); _RunBeforeOpen(this.details);
} }
/// Sent to notify that a previous query has updated some tables. When a server
/// receives this message, it replies with `null` but forwards a new request
/// with this payload to all connected clients.
class _NotifyTablesUpdated {
final List<String> updatedTables;
_NotifyTablesUpdated(this.updatedTables);
}

View File

@ -43,6 +43,10 @@ class _MoorServer {
return null; return null;
} else if (payload is _ExecuteQuery) { } else if (payload is _ExecuteQuery) {
return _runQuery(payload.method, payload.sql, payload.args); return _runQuery(payload.method, payload.sql, payload.args);
} else if (payload is _NotifyTablesUpdated) {
for (var connected in server.currentChannels) {
connected.request(payload);
}
} }
} }

View File

@ -11,7 +11,7 @@ void main() {
setUp(() async { setUp(() async {
isolate = await MoorIsolate.spawn(_backgroundConnection); isolate = await MoorIsolate.spawn(_backgroundConnection);
isolateConnection = await isolate.connect(isolateDebugLog: false); isolateConnection = await isolate.connect(isolateDebugLog: true);
}); });
tearDown(() { tearDown(() {
@ -25,6 +25,20 @@ void main() {
final result = await database.select(database.todosTable).get(); final result = await database.select(database.todosTable).get();
expect(result, isEmpty); expect(result, isEmpty);
}); });
test('stream queries work as expected', () async {
final database = TodoDb.connect(isolateConnection);
final initialCompanion = TodosTableCompanion.insert(content: 'my content');
final stream = database.select(database.todosTable).watchSingle();
final expectation = expectLater(
stream,
emitsInOrder([null, TodoEntry(id: 1, content: 'my content')]),
);
await database.into(database.todosTable).insert(initialCompanion);
await expectation;
});
} }
DatabaseConnection _backgroundConnection() { DatabaseConnection _backgroundConnection() {