Only send primitive objects over isolates (#399)

This commit is contained in:
Simon Binder 2020-04-23 21:03:50 +02:00
parent 1fc38f8434
commit 9300010890
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
6 changed files with 316 additions and 23 deletions

View File

@ -23,7 +23,7 @@ class _MoorClient {
static Future<_MoorClient> connect(
MoorIsolate isolate, bool isolateDebugLog) async {
final connection = await IsolateCommunication.connectAsClient(
isolate.connectPort, isolateDebugLog);
isolate.connectPort, const _MoorCodec(), isolateDebugLog);
final typeSystem =
await connection.request<SqlTypeSystem>(_NoArgsRequest.getTypeSystem);
@ -56,7 +56,7 @@ abstract class _BaseExecutor extends QueryExecutor {
Future<T> _runRequest<T>(_StatementMethod method, String sql, List args) {
return client._channel
.request<T>(_ExecuteQuery(method, sql, args, _executorId));
.request<T>(_ExecuteQuery(method, sql, args ?? const [], _executorId));
}
@override

View File

@ -14,6 +14,11 @@ class IsolateCommunication {
/// The input stream of this channel. This could be a [ReceivePort].
final Stream<dynamic> input;
/// The coded responsible to transform application-specific messages into
/// primitive objects.
final MessageCodec messageCodec;
StreamSubscription _inputSubscription;
// note that there are two IsolateCommunication instances in each connection,
@ -25,7 +30,8 @@ class IsolateCommunication {
final bool _debugLog;
IsolateCommunication._(this.sendPort, this.input, [this._debugLog = false]) {
IsolateCommunication._(this.sendPort, this.input, this.messageCodec,
[this._debugLog = false]) {
_inputSubscription = input.listen(_handleMessage);
}
@ -44,17 +50,19 @@ class IsolateCommunication {
/// The server must listen for incoming connections on the receiving end of
/// [openConnectionPort].
static Future<IsolateCommunication> connectAsClient(
SendPort openConnectionPort,
SendPort openConnectionPort, MessageCodec messageCodec,
[bool debugLog = false]) async {
final clientReceive = ReceivePort();
final stream = clientReceive.asBroadcastStream();
openConnectionPort.send(_ClientConnectionRequest(clientReceive.sendPort));
openConnectionPort.send(messageCodec
._encodeMessage(_ClientConnectionRequest(clientReceive.sendPort)));
final response = await stream.first as _ServerConnectionResponse;
final response = messageCodec._decodeMessage(await stream.first)
as _ServerConnectionResponse;
final communication =
IsolateCommunication._(response.sendPort, stream, debugLog);
final communication = IsolateCommunication._(
response.sendPort, stream, messageCodec, debugLog);
unawaited(communication.closed.then((_) => clientReceive.close()));
@ -65,7 +73,7 @@ class IsolateCommunication {
void close() {
if (isClosed) return;
_send(_ConnectionClose());
_send(const _ConnectionClose());
_closeLocally();
}
@ -80,6 +88,8 @@ class IsolateCommunication {
}
void _handleMessage(dynamic msg) {
msg = messageCodec._decodeMessage(msg);
if (_debugLog) {
print('[IN]: $msg');
}
@ -118,7 +128,7 @@ class IsolateCommunication {
return completer.future;
}
void _send(dynamic msg) {
void _send(IsolateMessage msg) {
if (isClosed) {
throw StateError('Tried to send $msg over isolate channel, but the '
'connection was closed!');
@ -127,7 +137,7 @@ class IsolateCommunication {
if (_debugLog) {
print('[OUT]: $msg');
}
sendPort.send(msg);
sendPort.send(messageCodec._encodeMessage(msg));
}
/// Sends a response for a handled [Request].
@ -176,6 +186,10 @@ class Server {
final ReceivePort _openConnectionPort = ReceivePort();
final StreamController<IsolateCommunication> _opened = StreamController();
/// The coded responsible to transform application-specific messages into
/// primitive objects.
final MessageCodec messageCodec;
/// The port that should be used by new clients when they want to establish
/// a new connection.
SendPort get portToOpenConnection => _openConnectionPort.sendPort;
@ -188,7 +202,7 @@ class Server {
Stream<IsolateCommunication> get openedConnections => _opened.stream;
/// Opens a server in the current isolate.
Server() {
Server(this.messageCodec) {
_openConnectionPort.listen(_handleMessageOnConnectionPort);
}
@ -203,16 +217,21 @@ class Server {
}
void _handleMessageOnConnectionPort(dynamic message) {
message = messageCodec._decodeMessage(message);
if (message is _ClientConnectionRequest) {
final receiveFromClient = ReceivePort();
final communication =
IsolateCommunication._(message.sendPort, receiveFromClient);
final communication = IsolateCommunication._(
message.sendPort,
receiveFromClient,
messageCodec,
);
currentChannels.add(communication);
_opened.add(communication);
final response = _ServerConnectionResponse(receiveFromClient.sendPort);
message.sendPort.send(response);
message.sendPort.send(messageCodec._encodeMessage(response));
communication.closed.whenComplete(() {
currentChannels.remove(communication);
@ -222,8 +241,94 @@ class Server {
}
}
/// Class used to encode and decode messages.
///
/// As explained in [SendPort.send], we can only send some objects across
/// isolates, notably:
/// - primitive types (null, num, bool, double, String)
/// - instances of [SendPort]
/// - [TransferableTypedData]
/// - lists and maps thereof.
///
/// This class is used to ensure we only send those types over isolates.
abstract class MessageCodec {
/// Default constant constructor so that subclasses can be constant.
const MessageCodec();
dynamic _encodeMessage(IsolateMessage message) {
if (message is _ClientConnectionRequest) {
return [_ClientConnectionRequest._tag, message.sendPort];
} else if (message is _ServerConnectionResponse) {
return [_ServerConnectionResponse._tag, message.sendPort];
} else if (message is _ConnectionClose) {
return _ConnectionClose._tag;
} else if (message is Request) {
return [Request._tag, message.id, encodePayload(message.payload)];
} else if (message is _ErrorResponse) {
return [
_ErrorResponse._tag,
message.requestId,
Error.safeToString(message.error),
message.stackTrace,
];
} else if (message is _Response) {
return [
_Response._tag,
message.requestId,
encodePayload(message.response),
];
}
throw AssertionError('Unknown message: $message');
}
IsolateMessage _decodeMessage(dynamic encoded) {
if (encoded is int) {
// _ConnectionClosed is the only message only consisting of a tag
assert(encoded == _ConnectionClose._tag);
return const _ConnectionClose();
}
final components = encoded as List;
final tag = components.first as int;
switch (tag) {
case _ClientConnectionRequest._tag:
return _ClientConnectionRequest(components[1] as SendPort);
case _ServerConnectionResponse._tag:
return _ServerConnectionResponse(components[1] as SendPort);
case Request._tag:
return Request._(components[1] as int, decodePayload(components[2]));
case _ErrorResponse._tag:
return _ErrorResponse(
components[1] as int, // request id
components[2], // error
components[3] as String /*?*/,
);
case _Response._tag:
return _Response(components[1] as int, decodePayload(encoded[2]));
}
throw AssertionError('Unrecognized message: $encoded');
}
/// Encodes an application-specific [payload], which can be any Dart object,
/// so that it can be sent via [SendPort.send].
dynamic encodePayload(dynamic payload);
/// Counter-part of [encodePayload], which should decode a payload encoded by
/// that function.
dynamic decodePayload(dynamic encoded);
}
/// Marker interface for classes that can be sent over this communication
/// protocol.
abstract class IsolateMessage {}
/// Sent from a client to a server in order to establish a connection.
class _ClientConnectionRequest {
class _ClientConnectionRequest implements IsolateMessage {
static const _tag = 1;
/// The [SendPort] for server to client communication.
final SendPort sendPort;
@ -232,7 +337,9 @@ class _ClientConnectionRequest {
/// Reply from a [Server] to a [_ClientConnectionRequest] to indicate that the
/// connection has been established.
class _ServerConnectionResponse {
class _ServerConnectionResponse implements IsolateMessage {
static const _tag = 2;
/// The [SendPort] used by the client to send further messages to the
/// [Server].
final SendPort sendPort;
@ -241,11 +348,17 @@ class _ServerConnectionResponse {
}
/// Sent from any peer to close the connection.
class _ConnectionClose {}
class _ConnectionClose implements IsolateMessage {
static const _tag = 3;
const _ConnectionClose();
}
/// A request sent over an isolate connection. It is expected that the other
/// peer eventually answers with a matching response.
class Request {
class Request implements IsolateMessage {
static const _tag = 4;
/// The id of this request, generated by the sender.
final int id;
@ -260,7 +373,9 @@ class Request {
}
}
class _Response {
class _Response implements IsolateMessage {
static const _tag = 5;
final int requestId;
final dynamic response;
@ -273,6 +388,8 @@ class _Response {
}
class _ErrorResponse extends _Response {
static const _tag = 6;
final String stackTrace;
dynamic get error => response;

View File

@ -55,7 +55,8 @@ class MoorIsolate {
/// If you only want to disconnect a database connection created via
/// [connect], use [GeneratedDatabase.close] instead.
Future<void> shutdownAll() async {
final connection = await IsolateCommunication.connectAsClient(connectPort);
final connection = await IsolateCommunication.connectAsClient(
connectPort, const _MoorCodec());
unawaited(connection.request(_NoArgsRequest.terminateAll).then((_) {},
onError: (_) {
// the background isolate is closed before it gets a chance to reply

View File

@ -1,5 +1,180 @@
part of 'moor_isolate.dart';
// ignore_for_file: constant_identifier_names
class _MoorCodec extends MessageCodec {
const _MoorCodec();
static const _tag_NoArgsRequest_getTypeSystem = 0;
static const _tag_NoArgsRequest_startTransaction = 1;
static const _tag_NoArgsRequest_terminateAll = 2;
static const _tag_ExecuteQuery = 3;
static const _tag_ExecuteBatchedStatement = 4;
static const _tag_RunTransactionAction = 5;
static const _tag_EnsureOpen = 6;
static const _tag_RunBeforeOpen = 7;
static const _tag_NotifyTablesUpdated = 8;
static const _tag_DefaultSqlTypeSystem = 9;
static const _tag_DirectValue = 10;
@override
dynamic encodePayload(dynamic payload) {
if (payload == null || payload is bool) return payload;
if (payload is _NoArgsRequest) {
return payload.index;
} else if (payload is _ExecuteQuery) {
return [
_tag_ExecuteQuery,
payload.method.index,
payload.sql,
[for (final arg in payload.args) _encodeDbValue(arg)],
payload.executorId,
];
} else if (payload is _ExecuteBatchedStatement) {
return [
_tag_ExecuteBatchedStatement,
for (final stmt in payload.stmts) _encodeBatchedStatement(stmt),
payload.executorId,
];
} else if (payload is _RunTransactionAction) {
return [
_tag_RunTransactionAction,
payload.control.index,
payload.transactionId,
];
} else if (payload is _EnsureOpen) {
return [_tag_EnsureOpen, payload.schemaVersion, payload.executorId];
} else if (payload is _RunBeforeOpen) {
return [
_tag_RunBeforeOpen,
payload.details.versionBefore,
payload.details.versionNow,
payload.createdExecutor,
];
} else if (payload is _NotifyTablesUpdated) {
return [
_tag_NotifyTablesUpdated,
for (final update in payload.updates)
[
update.table,
update.kind.index,
]
];
} else if (payload is SqlTypeSystem) {
// assume connection uses SqlTypeSystem.defaultInstance, this can't
// possibly be encoded.
return _tag_DefaultSqlTypeSystem;
} else {
return [_tag_DirectValue, payload];
}
}
@override
dynamic decodePayload(dynamic encoded) {
if (encoded == null || encoded is bool) return encoded;
int tag;
List fullMessage;
if (encoded is int) {
tag = encoded;
} else {
fullMessage = encoded as List;
tag = fullMessage[0] as int;
}
int readInt(int index) => fullMessage[index] as int;
switch (tag) {
case _tag_NoArgsRequest_getTypeSystem:
return _NoArgsRequest.getTypeSystem;
case _tag_NoArgsRequest_startTransaction:
return _NoArgsRequest.startTransaction;
case _tag_NoArgsRequest_terminateAll:
return _NoArgsRequest.terminateAll;
case _tag_ExecuteQuery:
final method = _StatementMethod.values[readInt(1)];
final sql = fullMessage[2] as String;
final args = (fullMessage[3] as List).map(_decodeDbValue).toList();
final executorId = fullMessage[4] as int /*?*/;
return _ExecuteQuery(method, sql, args, executorId);
case _tag_ExecuteBatchedStatement:
final stmts = <BatchedStatement>[];
for (var i = 1; i < fullMessage.length - 1; i++) {
stmts.add(_decodeBatchedStatement(fullMessage[i] as List));
}
final executorId = fullMessage.last as int;
return _ExecuteBatchedStatement(stmts, executorId);
case _tag_RunTransactionAction:
final control = _TransactionControl.values[readInt(1)];
return _RunTransactionAction(control, readInt(2));
case _tag_EnsureOpen:
return _EnsureOpen(readInt(1), readInt(2));
case _tag_RunBeforeOpen:
return _RunBeforeOpen(
OpeningDetails(readInt(1), readInt(2)),
readInt(3),
);
case _tag_DefaultSqlTypeSystem:
return SqlTypeSystem.defaultInstance;
case _tag_NotifyTablesUpdated:
final updates = <TableUpdate>[];
for (var i = 1; i < fullMessage.length; i++) {
final encodedUpdate = fullMessage[i] as List;
updates.add(
TableUpdate(encodedUpdate[0] as String,
kind: UpdateKind.values[encodedUpdate[1] as int]),
);
}
return _NotifyTablesUpdated(updates);
case _tag_DirectValue:
return encoded[1];
}
throw ArgumentError.value(tag, 'tag', 'Tag was unknown');
}
dynamic _encodeDbValue(dynamic variable) {
if (variable is List<int>) {
return TransferableTypedData.fromList([Uint8List.fromList(variable)]);
} else {
return variable;
}
}
dynamic _decodeDbValue(dynamic encoded) {
if (encoded is TransferableTypedData) {
return encoded.materialize().asUint8List();
} else {
return encoded;
}
}
dynamic _encodeBatchedStatement(BatchedStatement stmt) {
return [
stmt.sql,
for (final variableSet in stmt.variables)
[
for (final variable in variableSet) _encodeDbValue(variable),
],
];
}
BatchedStatement _decodeBatchedStatement(List encoded) {
final sql = encoded[0] as String;
final args = <List>[];
for (var i = 1; i < encoded.length; i++) {
final encodedVariableSet = encoded[i] as List;
args.add(encodedVariableSet.map(_decodeDbValue).toList());
}
return BatchedStatement(sql, args);
}
}
/// A request without further parameters
enum _NoArgsRequest {
/// Sent from the client to the server. The server will reply with the

View File

@ -23,7 +23,7 @@ class _MoorServer {
SendPort get portToOpenConnection => server.portToOpenConnection;
_MoorServer(DatabaseOpener opener) : server = Server() {
_MoorServer(DatabaseOpener opener) : server = Server(const _MoorCodec()) {
server.openedConnections.listen((connection) {
connection.setRequestHandler(_handleRequest);
});

View File

@ -68,7 +68,7 @@ void _runTests(
setUp(() async {
isolate = await spawner();
isolateConnection = await isolate.connect(isolateDebugLog: false);
isolateConnection = await isolate.connect(isolateDebugLog: true);
});
tearDown(() {