From 66046894d82d04f5c2ca4eb465180b6a15ff1f2e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 30 Sep 2021 17:11:14 +0200 Subject: [PATCH] Move isolate implementation into lib/src --- moor/lib/isolate.dart | 68 +++----------------------------------- moor/lib/src/isolate.dart | 69 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 63 deletions(-) create mode 100644 moor/lib/src/isolate.dart diff --git a/moor/lib/isolate.dart b/moor/lib/isolate.dart index 9b80af7d..347bde42 100644 --- a/moor/lib/isolate.dart +++ b/moor/lib/isolate.dart @@ -4,12 +4,11 @@ library isolate; import 'dart:isolate'; -import 'package:async/async.dart'; -import 'package:stream_channel/isolate_channel.dart'; import 'package:stream_channel/stream_channel.dart'; import 'moor.dart'; import 'remote.dart'; +import 'src/isolate.dart'; /// Signature of a function that opens a database connection. typedef DatabaseOpener = DatabaseConnection Function(); @@ -52,10 +51,10 @@ class MoorIsolate { receive.listen((message) { if (message is SendPort) { controller.local.stream - .map(_prepareForTransport) + .map(prepareForTransport) .listen(message.send, onDone: receive.close); } else { - controller.local.sink.add(_decodeAfterTransport(message)); + controller.local.sink.add(decodeAfterTransport(message)); } }); @@ -108,7 +107,7 @@ class MoorIsolate { /// to call [inCurrent] will be killed. factory MoorIsolate.inCurrent(DatabaseOpener opener, {bool killIsolateWhenDone = false}) { - final server = _RunningMoorServer(Isolate.current, opener(), + final server = RunningMoorServer(Isolate.current, opener(), killIsolateWhenDone: killIsolateWhenDone); return MoorIsolate.fromConnectPort(server.portToOpenConnection); } @@ -125,63 +124,6 @@ void _startMoorIsolate(List args) { final sendPort = args[0] as SendPort; final opener = args[1] as DatabaseOpener; - final server = _RunningMoorServer(Isolate.current, opener()); + final server = RunningMoorServer(Isolate.current, opener()); sendPort.send(server.portToOpenConnection); } - -class _RunningMoorServer { - final Isolate self; - final bool killIsolateWhenDone; - - final MoorServer server; - final ReceivePort connectPort = ReceivePort('moor connect'); - int _counter = 0; - - SendPort get portToOpenConnection => connectPort.sendPort; - - _RunningMoorServer(this.self, DatabaseConnection connection, - {this.killIsolateWhenDone = true}) - : server = MoorServer(connection, allowRemoteShutdown: true) { - final subscription = connectPort.listen((message) { - if (message is SendPort) { - final receiveForConnection = ReceivePort('moor channel #${_counter++}'); - message.send(receiveForConnection.sendPort); - final channel = IsolateChannel(receiveForConnection, message) - .changeStream((source) => source.map(_decodeAfterTransport)) - .transformSink( - StreamSinkTransformer.fromHandlers( - handleData: (data, sink) => - sink.add(_prepareForTransport(data))), - ); - - server.serve(channel); - } - }); - - server.done.then((_) { - subscription.cancel(); - connectPort.close(); - if (killIsolateWhenDone) self.kill(); - }); - } -} - -Object? _prepareForTransport(Object? source) { - if (source is! List) return source; - - if (source is Uint8List) { - return TransferableTypedData.fromList([source]); - } - - return source.map(_prepareForTransport).toList(); -} - -Object? _decodeAfterTransport(Object? source) { - if (source is TransferableTypedData) { - return source.materialize().asUint8List(); - } else if (source is List) { - return source.map(_decodeAfterTransport).toList(); - } else { - return source; - } -} diff --git a/moor/lib/src/isolate.dart b/moor/lib/src/isolate.dart new file mode 100644 index 00000000..92fe70b5 --- /dev/null +++ b/moor/lib/src/isolate.dart @@ -0,0 +1,69 @@ +import 'dart:isolate'; + +import 'package:async/async.dart'; +import 'package:meta/meta.dart'; +import 'package:stream_channel/isolate_channel.dart'; + +import '../moor.dart'; +import '../remote.dart'; + +// All of this is moor-internal and not exported, so: +// ignore_for_file: public_member_api_docs + +@internal +class RunningMoorServer { + final Isolate self; + final bool killIsolateWhenDone; + + final MoorServer server; + final ReceivePort connectPort = ReceivePort('moor connect'); + int _counter = 0; + + SendPort get portToOpenConnection => connectPort.sendPort; + + RunningMoorServer(this.self, DatabaseConnection connection, + {this.killIsolateWhenDone = true}) + : server = MoorServer(connection, allowRemoteShutdown: true) { + final subscription = connectPort.listen((message) { + if (message is SendPort) { + final receiveForConnection = ReceivePort('moor channel #${_counter++}'); + message.send(receiveForConnection.sendPort); + final channel = IsolateChannel(receiveForConnection, message) + .changeStream((source) => source.map(decodeAfterTransport)) + .transformSink( + StreamSinkTransformer.fromHandlers( + handleData: (data, sink) => + sink.add(prepareForTransport(data))), + ); + + server.serve(channel); + } + }); + + server.done.then((_) { + subscription.cancel(); + connectPort.close(); + if (killIsolateWhenDone) self.kill(); + }); + } +} + +Object? prepareForTransport(Object? source) { + if (source is! List) return source; + + if (source is Uint8List) { + return TransferableTypedData.fromList([source]); + } + + return source.map(prepareForTransport).toList(); +} + +Object? decodeAfterTransport(Object? source) { + if (source is TransferableTypedData) { + return source.materialize().asUint8List(); + } else if (source is List) { + return source.map(decodeAfterTransport).toList(); + } else { + return source; + } +}