Don't serialize messages for isolates

This commit is contained in:
Simon Binder 2021-12-09 23:40:29 +01:00
parent 19bd92bfd9
commit 1c128ea885
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
10 changed files with 37 additions and 51 deletions

View File

@ -12,6 +12,8 @@
- Add `OrderingTerm.random` to fetch rows in a random order.
- Improved support for pausing query stream subscriptions. Instead of buffering events,
query streams will suspend fetching data if all listeners are paused.
- Drift isolates no longer serialize messages into a primitive format. This will reduce
the overhead of using isolates with Drift.
## 1.0.0

View File

@ -51,11 +51,9 @@ class DriftIsolate {
StreamChannelController(allowForeignErrors: false, sync: true);
receive.listen((message) {
if (message is SendPort) {
controller.local.stream
.map(prepareForTransport)
.listen(message.send, onDone: receive.close);
controller.local.stream.listen(message.send, onDone: receive.close);
} else {
controller.local.sink.add(decodeAfterTransport(message));
controller.local.sink.add(message);
}
});
@ -67,9 +65,9 @@ class DriftIsolate {
/// All operations on the returned [DatabaseConnection] will be executed on a
/// background isolate. Setting the [isolateDebugLog] is only helpful when
/// debugging drift itself.
// todo: breaking: Make synchronous in drift 5
// todo: breaking: Make synchronous in drift 2
Future<DatabaseConnection> connect({bool isolateDebugLog = false}) async {
return remote(_open(), debugLog: isolateDebugLog);
return remote(_open(), debugLog: isolateDebugLog, serialize: false);
}
/// Stops the background isolate and disconnects all [DatabaseConnection]s
@ -77,7 +75,7 @@ class DriftIsolate {
/// If you only want to disconnect a database connection created via
/// [connect], use [GeneratedDatabase.close] instead.
Future<void> shutdownAll() {
return shutdown(_open());
return shutdown(_open(), serialize: false);
}
/// Creates a new [DriftIsolate] on a background thread.

View File

@ -91,11 +91,15 @@ abstract class DriftServer {
/// requests. Drift assumes full control over the [channel]. Manually sending
/// messages over it, or closing it prematurely, can disrupt the server.
///
/// If [serialize] is true, drift will only send [bool], [int], [double],
/// [Uint8List], [String] or [List]'s thereof over the channel. Otherwise,
/// the message may be any Dart object.
///
/// __Warning__: As long as this library is marked experimental, the protocol
/// might change with every drift version. For this reason, make sure that
/// your server and clients are using the exact same version of the drift
/// package to avoid conflicts.
void serve(StreamChannel<Object?> channel);
void serve(StreamChannel<Object?> channel, {bool serialize = true});
/// Shuts this server down.
///
@ -111,11 +115,17 @@ abstract class DriftServer {
/// On the remote side, the corresponding [channel] must have been passed to
/// [DriftServer.serve] for this setup to work.
///
/// If [serialize] is true, drift will only send [bool], [int], [double],
/// [Uint8List], [String] or [List]'s thereof over the channel. Otherwise,
/// the message may be any Dart object.
/// The value of [serialize] for [remote] should be the same value passed to
/// [DriftServer.serve].
///
/// The optional [debugLog] can be enabled to print incoming and outgoing
/// messages.
DatabaseConnection remote(StreamChannel<Object?> channel,
{bool debugLog = false}) {
final client = DriftClient(channel, debugLog);
{bool debugLog = false, bool serialize = true}) {
final client = DriftClient(channel, debugLog, serialize);
return client.connection;
}
@ -124,7 +134,7 @@ DatabaseConnection remote(StreamChannel<Object?> channel,
/// On the remote side, the corresponding channel must have been passed to
/// [DriftServer.serve] for this setup to work.
/// Also, the [DriftServer] must have been configured to allow remote-shutdowns.
Future<void> shutdown(StreamChannel<Object?> channel) {
final comm = DriftCommunication(channel);
Future<void> shutdown(StreamChannel<Object?> channel, {bool serialize = true}) {
final comm = DriftCommunication(channel, false, serialize);
return comm.request(NoArgsRequest.terminateAll);
}

View File

@ -1,6 +1,5 @@
import 'dart:isolate';
import 'package:async/async.dart';
import 'package:meta/meta.dart';
import 'package:stream_channel/isolate_channel.dart';
@ -29,15 +28,9 @@ class RunningDriftServer {
final receiveForConnection =
ReceivePort('drift channel #${_counter++}');
message.send(receiveForConnection.sendPort);
final channel = IsolateChannel(receiveForConnection, message)
.changeStream((source) => source.map(decodeAfterTransport))
.transformSink(
StreamSinkTransformer.fromHandlers(
handleData: (data, sink) =>
sink.add(prepareForTransport(data))),
);
final channel = IsolateChannel(receiveForConnection, message);
server.serve(channel);
server.serve(channel, serialize: false);
}
});
@ -48,23 +41,3 @@ class RunningDriftServer {
});
}
}
Object? prepareForTransport(Object? source) {
if (source is! List) return source;
if (source is Uint8List) {
return TransferableTypedData.fromList([source]);
}
return source.map(prepareForTransport).toList();
}
Object? decodeAfterTransport(Object? source) {
if (source is TransferableTypedData) {
return source.materialize().asUint8List();
} else if (source is List) {
return source.map(decodeAfterTransport).toList();
} else {
return source;
}
}

View File

@ -28,8 +28,8 @@ class DriftClient {
late QueryExecutorUser _connectedDb;
/// Starts relaying database operations over the request channel.
DriftClient(StreamChannel<Object?> channel, bool debugLog)
: _channel = DriftCommunication(channel, debugLog) {
DriftClient(StreamChannel<Object?> channel, bool debugLog, bool serialize)
: _channel = DriftCommunication(channel, debugLog, serialize) {
_channel.setRequestHandler(_handleRequest);
}

View File

@ -13,6 +13,7 @@ class DriftCommunication {
final StreamChannel<Object?> _channel;
final bool _debugLog;
final bool _serialize;
StreamSubscription? _inputSubscription;
@ -25,7 +26,8 @@ class DriftCommunication {
StreamController(sync: true);
/// Starts a drift communication channel over a raw [StreamChannel].
DriftCommunication(this._channel, [this._debugLog = false]) {
DriftCommunication(this._channel,
[this._debugLog = false, this._serialize = true]) {
_inputSubscription = _channel.stream.listen(
_handleMessage,
onDone: _closeCompleter.complete,
@ -63,7 +65,7 @@ class DriftCommunication {
}
void _handleMessage(Object? msg) {
msg = _protocol.deserialize(msg!);
if (_serialize) msg = _protocol.deserialize(msg!);
if (_debugLog) {
driftRuntimeOptions.debugPrint('[IN]: $msg');
@ -111,7 +113,8 @@ class DriftCommunication {
if (_debugLog) {
driftRuntimeOptions.debugPrint('[OUT]: $msg');
}
_channel.sink.add(_protocol.serialize(msg));
_channel.sink.add(_serialize ? _protocol.serialize(msg) : msg);
}
/// Sends a response for a handled [Request].

View File

@ -45,12 +45,12 @@ class ServerImplementation implements DriftServer {
Future<void> get done => _done.future;
@override
void serve(StreamChannel<Object?> channel) {
void serve(StreamChannel<Object?> channel, {bool serialize = true}) {
if (_isShuttingDown) {
throw StateError('Cannot add new channels after shutdown() was called');
}
final comm = DriftCommunication(channel);
final comm = DriftCommunication(channel, serialize, serialize);
comm.setRequestHandler((request) => _handleRequest(comm, request));
_activeChannels.add(comm);

View File

@ -25,7 +25,7 @@ CancellationToken<T> runCancellable<T>(
/// child zone.
@internal
class CancellationToken<T> {
final Completer<T> _resultCompleter = Completer();
final Completer<T> _resultCompleter = Completer.sync();
final List<void Function()> _cancellationCallbacks = [];
bool _cancellationRequested = false;

View File

@ -6,7 +6,7 @@ homepage: https://drift.simonbinder.eu/
issue_tracker: https://github.com/simolus3/moor/issues
environment:
sdk: '>=2.14.0 <3.0.0'
sdk: '>=2.15.0 <3.0.0'
dependencies:
async: ^2.5.0

View File

@ -83,7 +83,7 @@ void main() {
final db = EmptyDb.connect(await isolate.connect());
await runTest(db);
});
});
}, skip: 'todo: Cancellations are currently broken on Dart 2.15');
test('together with switchMap', () async {
String slowQuery(int i) => '''