Implementation for isolate databases over rpc

This commit is contained in:
Simon Binder 2019-10-30 19:12:38 +01:00
parent ebc22c8382
commit c5d4e38ea9
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
7 changed files with 498 additions and 97 deletions

View File

@ -79,7 +79,7 @@ abstract class Table {
BoolColumnBuilder boolean() => null;
/// Use this as the body of a getter to declare a column that holds date and
/// time.
/// time. Note that [DateTime] values are stored on a second-accuracy.
/// Example (inside the body of a table class):
/// ```
/// DateTimeColumn get accountCreatedAt => dateTime()();

View File

@ -436,6 +436,10 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
executor?.databaseInfo = this;
}
/// Used by generated code to connect to a database that is already open.
GeneratedDatabase.connect(DatabaseConnection connection)
: super.fromConnection(connection);
/// Creates a [Migrator] with the provided query executor. Migrators generate
/// sql statements to create or drop tables.
///

View File

@ -1,46 +1,114 @@
part of 'moor_isolate.dart';
class _Client {
int _requestId = 0;
final ReceivePort _receive = ReceivePort();
class _MoorClient {
final IsolateCommunication _channel;
final SqlTypeSystem typeSystem;
SendPort _send;
Completer _initConnectionCompleter;
DatabaseConnection _connection;
final Map<int, Completer<_Response>> _pendingRequests = {};
GeneratedDatabase get connectedDb => _connection.executor.databaseInfo;
_Client() {
_receive.listen(_handleResponse);
_receive.close();
SqlExecutor get executor => _connection.executor.runCustom;
_MoorClient(this._channel, this.typeSystem) {
_connection = DatabaseConnection(
typeSystem,
_IsolateQueryExecutor(this),
null,
);
_channel.setRequestHandler(_handleRequest);
}
Future<T> _sendRequest<T extends _Response>(_Request request) {
final id = _requestId++;
final completer = Completer<_Response>();
_pendingRequests[id] = completer;
static Future<_MoorClient> connect(MoorIsolate isolate) async {
final connection =
await IsolateCommunication.connectAsClient(isolate._server);
_send.send(request);
return completer.future.then((r) => r as T);
final typeSystem =
await connection.request<SqlTypeSystem>(_NoArgsRequest.getTypeSystem);
return _MoorClient(connection, typeSystem);
}
Future<T> _connectVia<T extends GeneratedDatabase>(
MoorIsolate isolate) async {
_initConnectionCompleter = Completer();
dynamic _handleRequest(Request request) {
final payload = request.payload;
final initialSendPort = isolate._connectToDb;
initialSendPort.send(_ClientHello(_receive.sendPort));
await _initConnectionCompleter.future;
// todo construct new database by forking
return null;
}
void _handleResponse(dynamic response) {
if (response is _ServerHello) {
_send = response.sendToServer;
_initConnectionCompleter.complete();
} else if (response is _Response) {
_pendingRequests[response]?.complete(response);
if (payload is _NoArgsRequest) {
switch (payload) {
case _NoArgsRequest.runOnCreate:
connectedDb.handleDatabaseCreation(executor: executor);
return null;
default:
throw UnsupportedError('This operation must be run on the server');
}
} else if (payload is _RunOnUpgrade) {
connectedDb.handleDatabaseVersionChange(
executor: executor,
from: payload.versionBefore,
to: payload.versionNow,
);
} else if (payload is _RunBeforeOpen) {
connectedDb.beforeOpenCallback(_connection.executor, payload.details);
}
}
}
class _IsolateQueryExecutor extends QueryExecutor {
final _MoorClient client;
_IsolateQueryExecutor(this.client);
@override
set databaseInfo(GeneratedDatabase db) {
super.databaseInfo = db;
client._channel.request(_SetSchemaVersion(db.schemaVersion));
}
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError(
'Transactions are not currently supported over isolates');
}
@override
Future<bool> ensureOpen() {
return client._channel.request<bool>(_NoArgsRequest.ensureOpen);
}
@override
Future<void> runBatched(List<BatchedStatement> statements) async {
// todo optimize this case
for (var stmt in statements) {
for (var boundArgs in stmt.variables) {
await runCustom(stmt.sql, boundArgs);
}
}
}
Future<T> _runRequest<T>(_StatementMethod method, String sql, List args) {
return client._channel.request<T>(_ExecuteQuery(method, sql, args));
}
@override
Future<void> runCustom(String statement, [List args]) {
return _runRequest(_StatementMethod.custom, statement, args);
}
@override
Future<int> runDelete(String statement, List args) {
return _runRequest(_StatementMethod.deleteOrUpdate, statement, args);
}
@override
Future<int> runUpdate(String statement, List args) {
return _runRequest(_StatementMethod.deleteOrUpdate, statement, args);
}
@override
Future<int> runInsert(String statement, List args) {
return _runRequest(_StatementMethod.insert, statement, args);
}
@override
Future<List<Map<String, dynamic>>> runSelect(String statement, List args) {
return _runRequest(_StatementMethod.select, statement, args);
}
}

View File

@ -0,0 +1,232 @@
import 'dart:async';
import 'dart:isolate';
/// An isolate communication setup where there's a single "server" isolate that
/// communicates with a varying amount of "client" isolates.
///
/// Each communication is bi-directional, meaning that both the server and the
/// client can send requests to each other and expect responses for that.
class IsolateCommunication {
/// The [SendPort] used to send messages to the peer.
final SendPort sendPort;
/// The [ReceivePort] used to receive messages from the peer.
final ReceivePort receivePort;
// note that there are two IsolateCommunication instances in each connection,
// and each of them has an independent _currentRequestId field!
int _currentRequestId = 0;
final Completer<void> _closeCompleter = Completer();
final Map<int, Completer> _pendingRequests = {};
final StreamController<Request> _incomingRequests = StreamController();
IsolateCommunication._(this.sendPort, this.receivePort) {
receivePort.listen(_handleMessage);
}
/// Returns a future that resolves when this communication channel was closed,
/// either via a call to [close] from this isolate or from the other isolate.
Future<void> get closed => _closeCompleter.future;
/// A stream of requests coming from the other peer.
Stream<Request> get incomingRequests => _incomingRequests.stream;
/// Establishes an [IsolateCommunication] by connecting to the [Server] which
/// emitted the [key].
static Future<IsolateCommunication> connectAsClient(ServerKey key) async {
final clientReceive = ReceivePort();
key.openConnectionPort
.send(_ClientConnectionRequest(clientReceive.sendPort));
final response = (await clientReceive.first) as _ServerConnectionResponse;
return IsolateCommunication._(response.sendPort, clientReceive);
}
/// Closes the connection to the server.
void close() {
sendPort.send(_ConnectionClose());
_closeLocally();
}
void _closeLocally() {
receivePort.close();
_closeCompleter.complete();
for (var pending in _pendingRequests.values) {
pending.completeError(StateError('connection closed'));
}
_pendingRequests.clear();
}
void _handleMessage(dynamic msg) {
if (msg is _ConnectionClose) {
_closeLocally();
} else if (msg is _Response) {
final completer = _pendingRequests[msg.requestId];
if (completer != null) {
if (msg is _ErrorResponse) {
final trace = msg.stackTrace != null
? StackTrace.fromString(msg.stackTrace)
: null;
completer.completeError(msg.error, trace);
} else {
completer.complete(msg.response);
}
_pendingRequests.remove(msg.requestId);
}
}
}
/// Sends a request and waits for the peer to reply with a value that is
/// assumed to be of type [T].
Future<T> request<T>(dynamic request) {
final id = _currentRequestId++;
final completer = Completer<T>();
_pendingRequests[id] = completer;
sendPort.send(Request._(_currentRequestId++, request));
return completer.future;
}
/// Sends a response for a handled [Request].
void respond(Request request, dynamic response) {
sendPort.send(_Response(request.id, response));
}
/// Sends an erroneous response for a [Request].
void respondError(Request request, dynamic error, [StackTrace trace]) {
sendPort.send(_ErrorResponse(request.id, error, trace.toString()));
}
/// Utility that listens to [incomingRequests] and invokes the [handler] on
/// each request, sending the result back to the originating client. If
/// [handler] throws, the error will be re-directed to the client.
void setRequestHandler(dynamic Function(Request) handler) {
incomingRequests.listen((request) {
try {
final result = handler(request);
respond(request, result);
} catch (e, s) {
respondError(request, e, s);
}
});
}
}
/// A key generated by the server than can be sent across isolates. A client can
/// connect to a server by its key.
class ServerKey {
/// The [SendPort] used by clients to establish an [IsolateCommunication] with
/// this server.
final SendPort openConnectionPort;
ServerKey._(this.openConnectionPort);
}
/// Contains logic to implement the server isolate as described in
/// [IsolateCommunication]. Note that an instance of this class should not be
/// sent across isolates, use the []
class Server {
final ReceivePort _openConnectionPort = ReceivePort();
final StreamController<IsolateCommunication> _opened = StreamController();
ServerKey _key;
/// Returns all communication channels currently opened to this server.
final List<IsolateCommunication> currentChannels = [];
/// An identifier of this [Server] allowing a client to
/// [IsolateCommunication.connectAsClient].
ServerKey get key => _key;
/// A stream of established [IsolateCommunication] channels after they were
/// opened by the client. This is not a broadcast stream.
Stream<IsolateCommunication> get openedConnections => _opened.stream;
/// Opens a server in the current isolate.
Server() {
_key = ServerKey._(_openConnectionPort.sendPort);
_openConnectionPort.listen(_handleMessageOnConnectionPort);
}
/// Closes this server instance and disposes associated resources.
void close() {
_openConnectionPort.close();
_opened.close();
}
void _handleMessageOnConnectionPort(dynamic message) {
if (message is _ClientConnectionRequest) {
final receiveFromClient = ReceivePort();
final communication =
IsolateCommunication._(message.sendPort, receiveFromClient);
currentChannels.add(communication);
_opened.add(communication);
final response = _ServerConnectionResponse(receiveFromClient.sendPort);
message.sendPort.send(response);
communication.closed.whenComplete(() {
currentChannels.remove(communication);
});
}
}
}
/// Sent from a client to a [ServerKey.openConnectionPort] in order to
/// establish a connection.
class _ClientConnectionRequest {
/// The [SendPort] to use by a [Server] to send messages to the client
/// sending the connection request.
final SendPort sendPort;
_ClientConnectionRequest(this.sendPort);
}
/// Reply from a [Server] to a [_ClientConnectionRequest] to indicate that the
/// connection has been established.
class _ServerConnectionResponse {
/// The [SendPort] used by the client to send further messages to the
/// [Server].
final SendPort sendPort;
_ServerConnectionResponse(this.sendPort);
}
/// Sent from any peer to close the connection.
class _ConnectionClose {}
/// A request sent over an isolate connection. It is expected that the other
/// peer eventually answers with a matching response.
class Request {
/// The id of this request, generated by the sender.
final int id;
/// The payload associated with this request
final dynamic payload;
Request._(this.id, this.payload);
}
class _Response {
final int requestId;
final dynamic response;
_Response(this.requestId, this.response);
}
class _ErrorResponse extends _Response {
final String stackTrace;
dynamic get error => response;
_ErrorResponse(int requestId, dynamic error, [this.stackTrace])
: super(requestId, error);
}

View File

@ -2,6 +2,8 @@ import 'dart:async';
import 'dart:isolate';
import 'package:moor/moor.dart';
import 'package:moor/moor_web.dart';
import 'communication.dart';
part 'client.dart';
part 'protocol.dart';
@ -26,19 +28,57 @@ typedef DatabaseOpener = DatabaseConnection Function();
/// See also:
/// - [Isolate], for general information on multi threading in Dart.
/// - TODO: Write documentation tutorial for this on the website
/// also todo: Is MoorIsolate really a name we want to keep? It's not really
/// an isolate
class MoorIsolate {
/// The [SendPort] created by the background isolate running the db. We'll use
/// this port to initialize a connection to the background isolate. Further
/// communication happens across a port that is specific for each client
/// isolate.
SendPort _connectToDb;
/// Identifier for the server isolate that we can connect to.
final ServerKey _server;
static Future<MoorIsolate> spawn() {}
MoorIsolate._(this._server);
static MoorIsolate inCurrent() {}
/// Connects to this [MoorIsolate] from another isolate. All operations on the
/// returned [DatabaseConnection] will be executed on a background isolate.
Future<DatabaseConnection> connect() async {
final client = await _MoorClient.connect(this);
return client._connection;
}
Future<T> connect<T extends GeneratedDatabase>() {
final client = _Client();
return client._connectVia(this);
/// Creates a new [MoorIsolate] on a background thread.
///
/// The [opener] function will be used to open the [DatabaseConnection] used
/// by the isolate. Most implementations are likely to use
/// [DatabaseConnection.fromExecutor] instead of providing stream queries and
/// the type system manually.
///
/// Because [opener] will be called on another isolate with its own memory,
/// it must either be a top-level member or a static class method.
static Future<MoorIsolate> spawn(DatabaseOpener opener) async {
// todo: API to terminate the spawned isolate?
final receiveServer = ReceivePort();
final keyFuture = receiveServer.first;
await Isolate.spawn(_startMoorIsolate, [receiveServer.sendPort, opener]);
final key = await keyFuture as ServerKey;
return MoorIsolate._(key);
}
/// Creates a [MoorIsolate] in the [Isolate.current] isolate. The returned
/// [MoorIsolate] is an object than can be sent across isolates - any other
/// isolate can then use [MoorIsolate.connect] to obtain a special database
/// connection which operations are all executed on this isolate.
static MoorIsolate inCurrent(DatabaseOpener opener) {
final server = _MoorServer(opener);
return MoorIsolate._(server.key);
}
}
/// Creates a [_MoorServer] and sends the resulting [ServerKey] over a
/// [SendPort]. The [args] param must have two parameters, the first one being
/// a [SendPort] and the second one being a [DatabaseOpener].
void _startMoorIsolate(List args) {
final sendPort = args[0] as SendPort;
final opener = args[1] as DatabaseOpener;
final server = _MoorServer(opener);
sendPort.send(server.key);
}

View File

@ -1,29 +1,56 @@
part of 'moor_isolate.dart';
abstract class _Message {}
/// A request without further parameters
enum _NoArgsRequest {
/// Sent from the client to the server. The server will reply with the
/// [SqlTypeSystem] of the [_MoorServer.connection] it's managing.
getTypeSystem,
abstract class _Request extends _Message {
/// An id for this request that is unique per client.
int id;
/// Sent from the client to the server. The server will reply with
/// [QueryExecutor.ensureOpen], based on the [_MoorServer.connection].
ensureOpen,
/// Sent from the server to a client. The client should run the on create
/// method of the attached database
runOnCreate,
}
abstract class _Response extends _Message {
/// The [_Request.id] from the request this is response to.
int id;
enum _StatementMethod {
custom,
deleteOrUpdate,
insert,
select,
}
/// A notification is only sent from the server
abstract class _Notification extends _Message {}
/// Sent from the client to run a sql query. The server replies with the
/// result.
class _ExecuteQuery {
final _StatementMethod method;
final String sql;
final List<dynamic> args;
class _ClientHello extends _Message {
/// The [SendPort] used by the server to send messages to this client.
final SendPort sendMsgToClient;
_ClientHello(this.sendMsgToClient);
_ExecuteQuery(this.method, this.sql, this.args);
}
class _ServerHello extends _Message {
final SendPort sendToServer;
/// Sent from the client to notify the server of the
/// [GeneratedDatabase.schemaVersion] used by the attached database.
class _SetSchemaVersion {
final int schemaVersion;
_ServerHello(this.sendToServer);
_SetSchemaVersion(this.schemaVersion);
}
/// Sent from the server to the client. The client should run a database upgrade
/// migration.
class _RunOnUpgrade {
final int versionBefore;
final int versionNow;
_RunOnUpgrade(this.versionBefore, this.versionNow);
}
class _RunBeforeOpen {
final OpeningDetails details;
_RunBeforeOpen(this.details);
}

View File

@ -1,50 +1,80 @@
part of 'moor_isolate.dart';
/// A "server" runs in an [Isolate] and takes requests from "client" isolates.
class _Server {
/// The [ReceivePort] used to establish connections with new clients. This is
/// the pendant to [MoorIsolate._connectToDb].
ReceivePort _connectionRequest;
class _MoorServer {
final Server server;
DatabaseConnection _connection;
DatabaseConnection connection;
_FakeDatabase _fakeDb;
final List<_ConnectedClient> _clients = [];
ServerKey get key => server.key;
_Server(DatabaseOpener opener, SendPort sendPort) {
_connection = opener();
_connectionRequest = ReceivePort();
_connectionRequest.listen(_handleConnectionRequest);
sendPort.send(_connectionRequest.sendPort);
}
void _handleConnectionRequest(dynamic message) {
if (message is! _ClientHello) {
throw AssertionError('Unexpected initial message from client: $message');
// we can't replay this to the client because we don't have a SendPort
}
final sendToClient = (message as _ClientHello).sendMsgToClient;
final receive = ReceivePort();
final client = _ConnectedClient(receive, sendToClient);
receive.listen((data) {
if (data is _Request) {
_handleRequest(client, data);
}
// todo send error message to client when it sends something that isn't
// a request
_MoorServer(DatabaseOpener opener) : server = Server() {
server.openedConnections.listen((connection) {
connection.setRequestHandler(_handleRequest);
});
connection = opener();
sendToClient.send(_ServerHello(receive.sendPort));
_fakeDb = _FakeDatabase(connection, this);
connection.executor.databaseInfo = _fakeDb;
}
void _handleRequest(_ConnectedClient client, _Request request) {}
/// Returns the first connected client, or null if no client is connected.
IsolateCommunication get firstClient {
final channels = server.currentChannels;
return channels.isEmpty ? null : channels.first;
}
dynamic _handleRequest(Request r) {
final payload = r.payload;
if (payload is _NoArgsRequest) {
switch (payload) {
case _NoArgsRequest.getTypeSystem:
return connection.typeSystem;
case _NoArgsRequest.ensureOpen:
return connection.executor.ensureOpen();
// the following are requests which are handled on the client side
case _NoArgsRequest.runOnCreate:
throw UnsupportedError(
'This operation needs to be run on the client');
}
} else if (payload is _SetSchemaVersion) {
_fakeDb.schemaVersion = payload.schemaVersion;
return null;
}
}
}
class _ConnectedClient {
final ReceivePort receiveFromClient;
final SendPort sendToClient;
/// A mock database so that the [QueryExecutor] which is running on a background
/// isolate can have the [QueryExecutor.databaseInfo] set. The query executor
/// uses that to set the schema version and to run migration callbacks. For a
/// server, all of that is delegated via clients.
class _FakeDatabase extends GeneratedDatabase {
final _MoorServer server;
_ConnectedClient(this.receiveFromClient, this.sendToClient);
_FakeDatabase(DatabaseConnection connection, this.server)
: super.connect(connection);
@override
final List<TableInfo<Table, DataClass>> allTables = const [];
@override
int schemaVersion = 0; // will be overridden by client requests
@override
Future<void> handleDatabaseCreation({SqlExecutor executor}) {
return server.firstClient.request(_NoArgsRequest.runOnCreate);
}
@override
Future<void> handleDatabaseVersionChange(
{SqlExecutor executor, int from, int to}) {
return server.firstClient.request(_RunOnUpgrade(from, to));
}
@override
Future<void> beforeOpenCallback(
QueryExecutor executor, OpeningDetails details) {
return server.firstClient.request(_RunBeforeOpen(details));
}
}