Move isolate implementation into lib/src

This commit is contained in:
Simon Binder 2021-09-30 17:11:14 +02:00
parent 9266fdffe2
commit 66046894d8
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
2 changed files with 74 additions and 63 deletions

View File

@ -4,12 +4,11 @@ library isolate;
import 'dart:isolate';
import 'package:async/async.dart';
import 'package:stream_channel/isolate_channel.dart';
import 'package:stream_channel/stream_channel.dart';
import 'moor.dart';
import 'remote.dart';
import 'src/isolate.dart';
/// Signature of a function that opens a database connection.
typedef DatabaseOpener = DatabaseConnection Function();
@ -52,10 +51,10 @@ class MoorIsolate {
receive.listen((message) {
if (message is SendPort) {
controller.local.stream
.map(_prepareForTransport)
.map(prepareForTransport)
.listen(message.send, onDone: receive.close);
} else {
controller.local.sink.add(_decodeAfterTransport(message));
controller.local.sink.add(decodeAfterTransport(message));
}
});
@ -108,7 +107,7 @@ class MoorIsolate {
/// to call [inCurrent] will be killed.
factory MoorIsolate.inCurrent(DatabaseOpener opener,
{bool killIsolateWhenDone = false}) {
final server = _RunningMoorServer(Isolate.current, opener(),
final server = RunningMoorServer(Isolate.current, opener(),
killIsolateWhenDone: killIsolateWhenDone);
return MoorIsolate.fromConnectPort(server.portToOpenConnection);
}
@ -125,63 +124,6 @@ void _startMoorIsolate(List args) {
final sendPort = args[0] as SendPort;
final opener = args[1] as DatabaseOpener;
final server = _RunningMoorServer(Isolate.current, opener());
final server = RunningMoorServer(Isolate.current, opener());
sendPort.send(server.portToOpenConnection);
}
class _RunningMoorServer {
final Isolate self;
final bool killIsolateWhenDone;
final MoorServer server;
final ReceivePort connectPort = ReceivePort('moor connect');
int _counter = 0;
SendPort get portToOpenConnection => connectPort.sendPort;
_RunningMoorServer(this.self, DatabaseConnection connection,
{this.killIsolateWhenDone = true})
: server = MoorServer(connection, allowRemoteShutdown: true) {
final subscription = connectPort.listen((message) {
if (message is SendPort) {
final receiveForConnection = ReceivePort('moor channel #${_counter++}');
message.send(receiveForConnection.sendPort);
final channel = IsolateChannel(receiveForConnection, message)
.changeStream((source) => source.map(_decodeAfterTransport))
.transformSink(
StreamSinkTransformer.fromHandlers(
handleData: (data, sink) =>
sink.add(_prepareForTransport(data))),
);
server.serve(channel);
}
});
server.done.then((_) {
subscription.cancel();
connectPort.close();
if (killIsolateWhenDone) self.kill();
});
}
}
Object? _prepareForTransport(Object? source) {
if (source is! List) return source;
if (source is Uint8List) {
return TransferableTypedData.fromList([source]);
}
return source.map(_prepareForTransport).toList();
}
Object? _decodeAfterTransport(Object? source) {
if (source is TransferableTypedData) {
return source.materialize().asUint8List();
} else if (source is List) {
return source.map(_decodeAfterTransport).toList();
} else {
return source;
}
}

69
moor/lib/src/isolate.dart Normal file
View File

@ -0,0 +1,69 @@
import 'dart:isolate';
import 'package:async/async.dart';
import 'package:meta/meta.dart';
import 'package:stream_channel/isolate_channel.dart';
import '../moor.dart';
import '../remote.dart';
// All of this is moor-internal and not exported, so:
// ignore_for_file: public_member_api_docs
@internal
class RunningMoorServer {
final Isolate self;
final bool killIsolateWhenDone;
final MoorServer server;
final ReceivePort connectPort = ReceivePort('moor connect');
int _counter = 0;
SendPort get portToOpenConnection => connectPort.sendPort;
RunningMoorServer(this.self, DatabaseConnection connection,
{this.killIsolateWhenDone = true})
: server = MoorServer(connection, allowRemoteShutdown: true) {
final subscription = connectPort.listen((message) {
if (message is SendPort) {
final receiveForConnection = ReceivePort('moor channel #${_counter++}');
message.send(receiveForConnection.sendPort);
final channel = IsolateChannel(receiveForConnection, message)
.changeStream((source) => source.map(decodeAfterTransport))
.transformSink(
StreamSinkTransformer.fromHandlers(
handleData: (data, sink) =>
sink.add(prepareForTransport(data))),
);
server.serve(channel);
}
});
server.done.then((_) {
subscription.cancel();
connectPort.close();
if (killIsolateWhenDone) self.kill();
});
}
}
Object? prepareForTransport(Object? source) {
if (source is! List) return source;
if (source is Uint8List) {
return TransferableTypedData.fromList([source]);
}
return source.map(prepareForTransport).toList();
}
Object? decodeAfterTransport(Object? source) {
if (source is TransferableTypedData) {
return source.materialize().asUint8List();
} else if (source is List) {
return source.map(decodeAfterTransport).toList();
} else {
return source;
}
}