From a2c7c11abfa3e49782c20aab3a4fe8045bc349ec Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 30 Oct 2019 20:32:08 +0100 Subject: [PATCH] Support executing queries over isolates --- moor/lib/isolate.dart | 5 ++ moor/lib/src/runtime/database.dart | 4 +- moor/lib/src/runtime/isolate/client.dart | 21 ++++-- .../src/runtime/isolate/communication.dart | 66 +++++++++++++++---- .../lib/src/runtime/isolate/moor_isolate.dart | 22 +++++-- moor/lib/src/runtime/isolate/protocol.dart | 5 ++ moor/lib/src/runtime/isolate/server.dart | 18 +++++ moor/pubspec.yaml | 2 + moor/test/isolate_test.dart | 32 +++++++++ 9 files changed, 146 insertions(+), 29 deletions(-) create mode 100644 moor/lib/isolate.dart create mode 100644 moor/test/isolate_test.dart diff --git a/moor/lib/isolate.dart b/moor/lib/isolate.dart new file mode 100644 index 00000000..a1369f6e --- /dev/null +++ b/moor/lib/isolate.dart @@ -0,0 +1,5 @@ +/// Contains utils to run moor databases in a background isolate. This API is +/// not supported on the web. +library isolate; + +export 'src/runtime/isolate/moor_isolate.dart'; diff --git a/moor/lib/src/runtime/database.dart b/moor/lib/src/runtime/database.dart index f97ca720..ac76f740 100644 --- a/moor/lib/src/runtime/database.dart +++ b/moor/lib/src/runtime/database.dart @@ -438,7 +438,9 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser /// Used by generated code to connect to a database that is already open. GeneratedDatabase.connect(DatabaseConnection connection) - : super.fromConnection(connection); + : super.fromConnection(connection) { + connection?.executor?.databaseInfo = this; + } /// Creates a [Migrator] with the provided query executor. Migrators generate /// sql statements to create or drop tables. diff --git a/moor/lib/src/runtime/isolate/client.dart b/moor/lib/src/runtime/isolate/client.dart index 0b867848..072a6b11 100644 --- a/moor/lib/src/runtime/isolate/client.dart +++ b/moor/lib/src/runtime/isolate/client.dart @@ -19,9 +19,10 @@ class _MoorClient { _channel.setRequestHandler(_handleRequest); } - static Future<_MoorClient> connect(MoorIsolate isolate) async { - final connection = - await IsolateCommunication.connectAsClient(isolate._server); + static Future<_MoorClient> connect( + MoorIsolate isolate, bool isolateDebugLog) async { + final connection = await IsolateCommunication.connectAsClient( + isolate._server, isolateDebugLog); final typeSystem = await connection.request(_NoArgsRequest.getTypeSystem); @@ -34,19 +35,19 @@ class _MoorClient { if (payload is _NoArgsRequest) { switch (payload) { case _NoArgsRequest.runOnCreate: - connectedDb.handleDatabaseCreation(executor: executor); - return null; + return connectedDb.handleDatabaseCreation(executor: executor); default: throw UnsupportedError('This operation must be run on the server'); } } else if (payload is _RunOnUpgrade) { - connectedDb.handleDatabaseVersionChange( + return connectedDb.handleDatabaseVersionChange( executor: executor, from: payload.versionBefore, to: payload.versionNow, ); } else if (payload is _RunBeforeOpen) { - connectedDb.beforeOpenCallback(_connection.executor, payload.details); + return connectedDb.beforeOpenCallback( + _connection.executor, payload.details); } } } @@ -111,4 +112,10 @@ class _IsolateQueryExecutor extends QueryExecutor { Future>> runSelect(String statement, List args) { return _runRequest(_StatementMethod.select, statement, args); } + + @override + Future close() { + client._channel.close(); + return Future.value(); + } } diff --git a/moor/lib/src/runtime/isolate/communication.dart b/moor/lib/src/runtime/isolate/communication.dart index 0e7ba5ef..39db8f2b 100644 --- a/moor/lib/src/runtime/isolate/communication.dart +++ b/moor/lib/src/runtime/isolate/communication.dart @@ -10,8 +10,9 @@ class IsolateCommunication { /// The [SendPort] used to send messages to the peer. final SendPort sendPort; - /// The [ReceivePort] used to receive messages from the peer. - final ReceivePort receivePort; + /// The input stream of this channel. This could be a [ReceivePort]. + final Stream input; + StreamSubscription _inputSubscription; // note that there are two IsolateCommunication instances in each connection, // and each of them has an independent _currentRequestId field! @@ -20,8 +21,10 @@ class IsolateCommunication { final Map _pendingRequests = {}; final StreamController _incomingRequests = StreamController(); - IsolateCommunication._(this.sendPort, this.receivePort) { - receivePort.listen(_handleMessage); + final bool _debugLog; + + IsolateCommunication._(this.sendPort, this.input, [this._debugLog = false]) { + _inputSubscription = input.listen(_handleMessage); } /// Returns a future that resolves when this communication channel was closed, @@ -33,25 +36,27 @@ class IsolateCommunication { /// Establishes an [IsolateCommunication] by connecting to the [Server] which /// emitted the [key]. - static Future connectAsClient(ServerKey key) async { + static Future connectAsClient(ServerKey key, + [bool debugLog = false]) async { final clientReceive = ReceivePort(); + final stream = clientReceive.asBroadcastStream(); key.openConnectionPort .send(_ClientConnectionRequest(clientReceive.sendPort)); - final response = (await clientReceive.first) as _ServerConnectionResponse; + final response = (await stream.first) as _ServerConnectionResponse; - return IsolateCommunication._(response.sendPort, clientReceive); + return IsolateCommunication._(response.sendPort, stream, debugLog); } /// Closes the connection to the server. void close() { - sendPort.send(_ConnectionClose()); + _send(_ConnectionClose()); _closeLocally(); } void _closeLocally() { - receivePort.close(); + _inputSubscription?.cancel(); _closeCompleter.complete(); for (var pending in _pendingRequests.values) { @@ -61,6 +66,10 @@ class IsolateCommunication { } void _handleMessage(dynamic msg) { + if (_debugLog) { + print('[IN]: $msg'); + } + if (msg is _ConnectionClose) { _closeLocally(); } else if (msg is _Response) { @@ -79,6 +88,8 @@ class IsolateCommunication { _pendingRequests.remove(msg.requestId); } + } else if (msg is Request) { + _incomingRequests.add(msg); } } @@ -89,28 +100,40 @@ class IsolateCommunication { final completer = Completer(); _pendingRequests[id] = completer; - sendPort.send(Request._(_currentRequestId++, request)); + _send(Request._(id, request)); return completer.future; } + void _send(dynamic msg) { + if (_debugLog) { + print('[OUT]: $msg'); + } + sendPort.send(msg); + } + /// Sends a response for a handled [Request]. void respond(Request request, dynamic response) { - sendPort.send(_Response(request.id, response)); + _send(_Response(request.id, response)); } /// Sends an erroneous response for a [Request]. void respondError(Request request, dynamic error, [StackTrace trace]) { - sendPort.send(_ErrorResponse(request.id, error, trace.toString())); + _send(_ErrorResponse(request.id, error, trace.toString())); } /// Utility that listens to [incomingRequests] and invokes the [handler] on /// each request, sending the result back to the originating client. If - /// [handler] throws, the error will be re-directed to the client. + /// [handler] throws, the error will be re-directed to the client. If + /// [handler] returns a [Future], it will be awaited. void setRequestHandler(dynamic Function(Request) handler) { incomingRequests.listen((request) { try { final result = handler(request); - respond(request, result); + if (result is Future) { + result.then((value) => respond(request, value)); + } else { + respond(request, result); + } } catch (e, s) { respondError(request, e, s); } @@ -213,6 +236,11 @@ class Request { final dynamic payload; Request._(this.id, this.payload); + + @override + String toString() { + return 'request (id = $id): $payload'; + } } class _Response { @@ -220,6 +248,11 @@ class _Response { final dynamic response; _Response(this.requestId, this.response); + + @override + String toString() { + return 'response (id = $requestId): $response'; + } } class _ErrorResponse extends _Response { @@ -229,4 +262,9 @@ class _ErrorResponse extends _Response { _ErrorResponse(int requestId, dynamic error, [this.stackTrace]) : super(requestId, error); + + @override + String toString() { + return 'error response (id = $requestId): $error at $stackTrace'; + } } diff --git a/moor/lib/src/runtime/isolate/moor_isolate.dart b/moor/lib/src/runtime/isolate/moor_isolate.dart index 6ac5a88f..85042b27 100644 --- a/moor/lib/src/runtime/isolate/moor_isolate.dart +++ b/moor/lib/src/runtime/isolate/moor_isolate.dart @@ -2,7 +2,6 @@ import 'dart:async'; import 'dart:isolate'; import 'package:moor/moor.dart'; -import 'package:moor/moor_web.dart'; import 'communication.dart'; part 'client.dart'; @@ -34,15 +33,23 @@ class MoorIsolate { /// Identifier for the server isolate that we can connect to. final ServerKey _server; - MoorIsolate._(this._server); + final Isolate _isolate; + + MoorIsolate._(this._server, this._isolate); /// Connects to this [MoorIsolate] from another isolate. All operations on the /// returned [DatabaseConnection] will be executed on a background isolate. - Future connect() async { - final client = await _MoorClient.connect(this); + /// Setting the [isolateDebugLog] is only helpful when debugging moor itself. + Future connect({bool isolateDebugLog = false}) async { + final client = await _MoorClient.connect(this, isolateDebugLog); return client._connection; } + /// Calls [Isolate.kill] on the underlying isolate. + void kill() { + _isolate.kill(); + } + /// Creates a new [MoorIsolate] on a background thread. /// /// The [opener] function will be used to open the [DatabaseConnection] used @@ -57,9 +64,10 @@ class MoorIsolate { final receiveServer = ReceivePort(); final keyFuture = receiveServer.first; - await Isolate.spawn(_startMoorIsolate, [receiveServer.sendPort, opener]); + final isolate = await Isolate.spawn( + _startMoorIsolate, [receiveServer.sendPort, opener]); final key = await keyFuture as ServerKey; - return MoorIsolate._(key); + return MoorIsolate._(key, isolate); } /// Creates a [MoorIsolate] in the [Isolate.current] isolate. The returned @@ -68,7 +76,7 @@ class MoorIsolate { /// connection which operations are all executed on this isolate. static MoorIsolate inCurrent(DatabaseOpener opener) { final server = _MoorServer(opener); - return MoorIsolate._(server.key); + return MoorIsolate._(server.key, Isolate.current); } } diff --git a/moor/lib/src/runtime/isolate/protocol.dart b/moor/lib/src/runtime/isolate/protocol.dart index 37dd542f..4e1ba6f1 100644 --- a/moor/lib/src/runtime/isolate/protocol.dart +++ b/moor/lib/src/runtime/isolate/protocol.dart @@ -30,6 +30,11 @@ class _ExecuteQuery { final List args; _ExecuteQuery(this.method, this.sql, this.args); + + @override + String toString() { + return '$method: $sql with $args'; + } } /// Sent from the client to notify the server of the diff --git a/moor/lib/src/runtime/isolate/server.dart b/moor/lib/src/runtime/isolate/server.dart index f9ff7cca..d5973b70 100644 --- a/moor/lib/src/runtime/isolate/server.dart +++ b/moor/lib/src/runtime/isolate/server.dart @@ -41,8 +41,26 @@ class _MoorServer { } else if (payload is _SetSchemaVersion) { _fakeDb.schemaVersion = payload.schemaVersion; return null; + } else if (payload is _ExecuteQuery) { + return _runQuery(payload.method, payload.sql, payload.args); } } + + Future _runQuery(_StatementMethod method, String sql, List args) { + final executor = connection.executor; + switch (method) { + case _StatementMethod.custom: + return executor.runCustom(sql, args); + case _StatementMethod.deleteOrUpdate: + return executor.runDelete(sql, args); + case _StatementMethod.insert: + return executor.runInsert(sql, args); + case _StatementMethod.select: + return executor.runSelect(sql, args); + } + + throw AssertionError("Unknown _StatementMethod, this can't happen."); + } } /// A mock database so that the [QueryExecutor] which is running on a background diff --git a/moor/pubspec.yaml b/moor/pubspec.yaml index abdf1ec8..c81dcdfc 100644 --- a/moor/pubspec.yaml +++ b/moor/pubspec.yaml @@ -20,6 +20,8 @@ dependencies: dev_dependencies: moor_generator: ^2.0.0 + moor_ffi: # Used to run some tests + path: ../moor_ffi build_runner: '>=1.3.0 <2.0.0' build_test: ^0.10.8 test: ^1.6.4 diff --git a/moor/test/isolate_test.dart b/moor/test/isolate_test.dart new file mode 100644 index 00000000..433ddf6b --- /dev/null +++ b/moor/test/isolate_test.dart @@ -0,0 +1,32 @@ +import 'package:moor/isolate.dart'; +import 'package:moor/moor.dart'; +import 'package:moor_ffi/moor_ffi.dart'; +import 'package:test/test.dart'; + +import 'data/tables/todos.dart'; + +void main() { + MoorIsolate isolate; + DatabaseConnection isolateConnection; + + setUp(() async { + isolate = await MoorIsolate.spawn(_backgroundConnection); + isolateConnection = await isolate.connect(isolateDebugLog: false); + }); + + tearDown(() { + isolateConnection.executor.close(); + isolate.kill(); + }); + + test('can open database and send requests', () async { + final database = TodoDb.connect(isolateConnection); + + final result = await database.select(database.todosTable).get(); + expect(result, isEmpty); + }); +} + +DatabaseConnection _backgroundConnection() { + return DatabaseConnection.fromExecutor(VmDatabase.memory()); +}