Support executing queries over isolates

This commit is contained in:
Simon Binder 2019-10-30 20:32:08 +01:00
parent 5cc1f85441
commit a2c7c11abf
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
9 changed files with 146 additions and 29 deletions

5
moor/lib/isolate.dart Normal file
View File

@ -0,0 +1,5 @@
/// Contains utils to run moor databases in a background isolate. This API is
/// not supported on the web.
library isolate;
export 'src/runtime/isolate/moor_isolate.dart';

View File

@ -438,7 +438,9 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
/// Used by generated code to connect to a database that is already open.
GeneratedDatabase.connect(DatabaseConnection connection)
: super.fromConnection(connection);
: super.fromConnection(connection) {
connection?.executor?.databaseInfo = this;
}
/// Creates a [Migrator] with the provided query executor. Migrators generate
/// sql statements to create or drop tables.

View File

@ -19,9 +19,10 @@ class _MoorClient {
_channel.setRequestHandler(_handleRequest);
}
static Future<_MoorClient> connect(MoorIsolate isolate) async {
final connection =
await IsolateCommunication.connectAsClient(isolate._server);
static Future<_MoorClient> connect(
MoorIsolate isolate, bool isolateDebugLog) async {
final connection = await IsolateCommunication.connectAsClient(
isolate._server, isolateDebugLog);
final typeSystem =
await connection.request<SqlTypeSystem>(_NoArgsRequest.getTypeSystem);
@ -34,19 +35,19 @@ class _MoorClient {
if (payload is _NoArgsRequest) {
switch (payload) {
case _NoArgsRequest.runOnCreate:
connectedDb.handleDatabaseCreation(executor: executor);
return null;
return connectedDb.handleDatabaseCreation(executor: executor);
default:
throw UnsupportedError('This operation must be run on the server');
}
} else if (payload is _RunOnUpgrade) {
connectedDb.handleDatabaseVersionChange(
return connectedDb.handleDatabaseVersionChange(
executor: executor,
from: payload.versionBefore,
to: payload.versionNow,
);
} else if (payload is _RunBeforeOpen) {
connectedDb.beforeOpenCallback(_connection.executor, payload.details);
return connectedDb.beforeOpenCallback(
_connection.executor, payload.details);
}
}
}
@ -111,4 +112,10 @@ class _IsolateQueryExecutor extends QueryExecutor {
Future<List<Map<String, dynamic>>> runSelect(String statement, List args) {
return _runRequest(_StatementMethod.select, statement, args);
}
@override
Future<void> close() {
client._channel.close();
return Future.value();
}
}

View File

@ -10,8 +10,9 @@ class IsolateCommunication {
/// The [SendPort] used to send messages to the peer.
final SendPort sendPort;
/// The [ReceivePort] used to receive messages from the peer.
final ReceivePort receivePort;
/// The input stream of this channel. This could be a [ReceivePort].
final Stream<dynamic> input;
StreamSubscription _inputSubscription;
// note that there are two IsolateCommunication instances in each connection,
// and each of them has an independent _currentRequestId field!
@ -20,8 +21,10 @@ class IsolateCommunication {
final Map<int, Completer> _pendingRequests = {};
final StreamController<Request> _incomingRequests = StreamController();
IsolateCommunication._(this.sendPort, this.receivePort) {
receivePort.listen(_handleMessage);
final bool _debugLog;
IsolateCommunication._(this.sendPort, this.input, [this._debugLog = false]) {
_inputSubscription = input.listen(_handleMessage);
}
/// Returns a future that resolves when this communication channel was closed,
@ -33,25 +36,27 @@ class IsolateCommunication {
/// Establishes an [IsolateCommunication] by connecting to the [Server] which
/// emitted the [key].
static Future<IsolateCommunication> connectAsClient(ServerKey key) async {
static Future<IsolateCommunication> connectAsClient(ServerKey key,
[bool debugLog = false]) async {
final clientReceive = ReceivePort();
final stream = clientReceive.asBroadcastStream();
key.openConnectionPort
.send(_ClientConnectionRequest(clientReceive.sendPort));
final response = (await clientReceive.first) as _ServerConnectionResponse;
final response = (await stream.first) as _ServerConnectionResponse;
return IsolateCommunication._(response.sendPort, clientReceive);
return IsolateCommunication._(response.sendPort, stream, debugLog);
}
/// Closes the connection to the server.
void close() {
sendPort.send(_ConnectionClose());
_send(_ConnectionClose());
_closeLocally();
}
void _closeLocally() {
receivePort.close();
_inputSubscription?.cancel();
_closeCompleter.complete();
for (var pending in _pendingRequests.values) {
@ -61,6 +66,10 @@ class IsolateCommunication {
}
void _handleMessage(dynamic msg) {
if (_debugLog) {
print('[IN]: $msg');
}
if (msg is _ConnectionClose) {
_closeLocally();
} else if (msg is _Response) {
@ -79,6 +88,8 @@ class IsolateCommunication {
_pendingRequests.remove(msg.requestId);
}
} else if (msg is Request) {
_incomingRequests.add(msg);
}
}
@ -89,28 +100,40 @@ class IsolateCommunication {
final completer = Completer<T>();
_pendingRequests[id] = completer;
sendPort.send(Request._(_currentRequestId++, request));
_send(Request._(id, request));
return completer.future;
}
void _send(dynamic msg) {
if (_debugLog) {
print('[OUT]: $msg');
}
sendPort.send(msg);
}
/// Sends a response for a handled [Request].
void respond(Request request, dynamic response) {
sendPort.send(_Response(request.id, response));
_send(_Response(request.id, response));
}
/// Sends an erroneous response for a [Request].
void respondError(Request request, dynamic error, [StackTrace trace]) {
sendPort.send(_ErrorResponse(request.id, error, trace.toString()));
_send(_ErrorResponse(request.id, error, trace.toString()));
}
/// Utility that listens to [incomingRequests] and invokes the [handler] on
/// each request, sending the result back to the originating client. If
/// [handler] throws, the error will be re-directed to the client.
/// [handler] throws, the error will be re-directed to the client. If
/// [handler] returns a [Future], it will be awaited.
void setRequestHandler(dynamic Function(Request) handler) {
incomingRequests.listen((request) {
try {
final result = handler(request);
respond(request, result);
if (result is Future) {
result.then((value) => respond(request, value));
} else {
respond(request, result);
}
} catch (e, s) {
respondError(request, e, s);
}
@ -213,6 +236,11 @@ class Request {
final dynamic payload;
Request._(this.id, this.payload);
@override
String toString() {
return 'request (id = $id): $payload';
}
}
class _Response {
@ -220,6 +248,11 @@ class _Response {
final dynamic response;
_Response(this.requestId, this.response);
@override
String toString() {
return 'response (id = $requestId): $response';
}
}
class _ErrorResponse extends _Response {
@ -229,4 +262,9 @@ class _ErrorResponse extends _Response {
_ErrorResponse(int requestId, dynamic error, [this.stackTrace])
: super(requestId, error);
@override
String toString() {
return 'error response (id = $requestId): $error at $stackTrace';
}
}

View File

@ -2,7 +2,6 @@ import 'dart:async';
import 'dart:isolate';
import 'package:moor/moor.dart';
import 'package:moor/moor_web.dart';
import 'communication.dart';
part 'client.dart';
@ -34,15 +33,23 @@ class MoorIsolate {
/// Identifier for the server isolate that we can connect to.
final ServerKey _server;
MoorIsolate._(this._server);
final Isolate _isolate;
MoorIsolate._(this._server, this._isolate);
/// Connects to this [MoorIsolate] from another isolate. All operations on the
/// returned [DatabaseConnection] will be executed on a background isolate.
Future<DatabaseConnection> connect() async {
final client = await _MoorClient.connect(this);
/// Setting the [isolateDebugLog] is only helpful when debugging moor itself.
Future<DatabaseConnection> connect({bool isolateDebugLog = false}) async {
final client = await _MoorClient.connect(this, isolateDebugLog);
return client._connection;
}
/// Calls [Isolate.kill] on the underlying isolate.
void kill() {
_isolate.kill();
}
/// Creates a new [MoorIsolate] on a background thread.
///
/// The [opener] function will be used to open the [DatabaseConnection] used
@ -57,9 +64,10 @@ class MoorIsolate {
final receiveServer = ReceivePort();
final keyFuture = receiveServer.first;
await Isolate.spawn(_startMoorIsolate, [receiveServer.sendPort, opener]);
final isolate = await Isolate.spawn(
_startMoorIsolate, [receiveServer.sendPort, opener]);
final key = await keyFuture as ServerKey;
return MoorIsolate._(key);
return MoorIsolate._(key, isolate);
}
/// Creates a [MoorIsolate] in the [Isolate.current] isolate. The returned
@ -68,7 +76,7 @@ class MoorIsolate {
/// connection which operations are all executed on this isolate.
static MoorIsolate inCurrent(DatabaseOpener opener) {
final server = _MoorServer(opener);
return MoorIsolate._(server.key);
return MoorIsolate._(server.key, Isolate.current);
}
}

View File

@ -30,6 +30,11 @@ class _ExecuteQuery {
final List<dynamic> args;
_ExecuteQuery(this.method, this.sql, this.args);
@override
String toString() {
return '$method: $sql with $args';
}
}
/// Sent from the client to notify the server of the

View File

@ -41,8 +41,26 @@ class _MoorServer {
} else if (payload is _SetSchemaVersion) {
_fakeDb.schemaVersion = payload.schemaVersion;
return null;
} else if (payload is _ExecuteQuery) {
return _runQuery(payload.method, payload.sql, payload.args);
}
}
Future<dynamic> _runQuery(_StatementMethod method, String sql, List args) {
final executor = connection.executor;
switch (method) {
case _StatementMethod.custom:
return executor.runCustom(sql, args);
case _StatementMethod.deleteOrUpdate:
return executor.runDelete(sql, args);
case _StatementMethod.insert:
return executor.runInsert(sql, args);
case _StatementMethod.select:
return executor.runSelect(sql, args);
}
throw AssertionError("Unknown _StatementMethod, this can't happen.");
}
}
/// A mock database so that the [QueryExecutor] which is running on a background

View File

@ -20,6 +20,8 @@ dependencies:
dev_dependencies:
moor_generator: ^2.0.0
moor_ffi: # Used to run some tests
path: ../moor_ffi
build_runner: '>=1.3.0 <2.0.0'
build_test: ^0.10.8
test: ^1.6.4

View File

@ -0,0 +1,32 @@
import 'package:moor/isolate.dart';
import 'package:moor/moor.dart';
import 'package:moor_ffi/moor_ffi.dart';
import 'package:test/test.dart';
import 'data/tables/todos.dart';
void main() {
MoorIsolate isolate;
DatabaseConnection isolateConnection;
setUp(() async {
isolate = await MoorIsolate.spawn(_backgroundConnection);
isolateConnection = await isolate.connect(isolateDebugLog: false);
});
tearDown(() {
isolateConnection.executor.close();
isolate.kill();
});
test('can open database and send requests', () async {
final database = TodoDb.connect(isolateConnection);
final result = await database.select(database.todosTable).get();
expect(result, isEmpty);
});
}
DatabaseConnection _backgroundConnection() {
return DatabaseConnection.fromExecutor(VmDatabase.memory());
}