Scaffold layout for multi-isolate implementation

This commit is contained in:
Simon Binder 2019-10-19 14:27:15 +02:00
parent 263004fe7b
commit 0c2362a625
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
5 changed files with 222 additions and 10 deletions

View File

@ -34,9 +34,13 @@ abstract class DatabaseAccessor<T extends GeneratedDatabase>
DatabaseAccessor(this.db) : super.delegate(db);
}
/// Manages a [QueryExecutor] and optionally an own [SqlTypeSystem] or
/// [StreamQueryStore] to send queries to the database.
abstract class DatabaseConnectionUser {
/// A database connection managed by moor. Contains three components:
/// - a [SqlTypeSystem], which is responsible to map between Dart types and
/// values understood by the database engine.
/// - a [QueryExecutor], which runs sql commands
/// - a [StreamQueryStore], which dispatches table changes to listening queries,
/// on which the auto-updating queries are based.
class DatabaseConnection {
/// The type system to use with this database. The type system is responsible
/// for mapping Dart objects into sql expressions and vice-versa.
final SqlTypeSystem typeSystem;
@ -44,16 +48,43 @@ abstract class DatabaseConnectionUser {
/// The executor to use when queries are executed.
final QueryExecutor executor;
/// Manages active streams from select statements.
final StreamQueryStore streamQueries;
/// Constructs a raw database connection from the three components.
DatabaseConnection(this.typeSystem, this.executor, this.streamQueries);
/// Constructs a [DatabaseConnection] from the [QueryExecutor] by using the
/// default type system and a new [StreamQueryStore].
DatabaseConnection.fromExecutor(this.executor)
: typeSystem = SqlTypeSystem.defaultInstance,
streamQueries = StreamQueryStore();
}
/// Manages a [DatabaseConnection] to send queries to the database.
abstract class DatabaseConnectionUser {
/// The database connection used by this [DatabaseConnectionUser].
@protected
final DatabaseConnection connection;
/// The type system to use with this database. The type system is responsible
/// for mapping Dart objects into sql expressions and vice-versa.
SqlTypeSystem get typeSystem => connection.typeSystem;
/// The executor to use when queries are executed.
QueryExecutor get executor => connection.executor;
/// Manages active streams from select statements.
@visibleForTesting
@protected
StreamQueryStore streamQueries;
StreamQueryStore get streamQueries => connection.streamQueries;
/// Constructs a database connection user, which is responsible to store query
/// streams, wrap the underlying executor and perform type mapping.
DatabaseConnectionUser(this.typeSystem, this.executor, {this.streamQueries}) {
streamQueries ??= StreamQueryStore();
}
DatabaseConnectionUser(SqlTypeSystem typeSystem, QueryExecutor executor,
{StreamQueryStore streamQueries})
: connection = DatabaseConnection(
typeSystem, executor, streamQueries ?? StreamQueryStore());
/// Creates another [DatabaseConnectionUser] by referencing the implementation
/// from the [other] user.
@ -61,9 +92,15 @@ abstract class DatabaseConnectionUser {
{SqlTypeSystem typeSystem,
QueryExecutor executor,
StreamQueryStore streamQueries})
: typeSystem = typeSystem ?? other.typeSystem,
executor = executor ?? other.executor,
streamQueries = streamQueries ?? other.streamQueries;
: connection = DatabaseConnection(
typeSystem ?? other.connection.typeSystem,
executor ?? other.connection.executor,
streamQueries ?? other.connection.streamQueries,
);
/// Constructs a [DatabaseConnectionUser] that will use the provided
/// [DatabaseConnection].
DatabaseConnectionUser.fromConnection(this.connection);
/// Marks the tables as updated. This method will be called internally
/// whenever a update, delete or insert statement is issued on the database.
@ -422,4 +459,10 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
Future<void> close() async {
await executor.close();
}
/// Creates another instance of this [GeneratedDatabase] that uses the
/// [connection] instead of the current connection.
GeneratedDatabase cloneWith(DatabaseConnection connection) {
throw UnimplementedError();
}
}

View File

@ -0,0 +1,46 @@
part of 'moor_isolate.dart';
class _Client {
int _requestId = 0;
final ReceivePort _receive = ReceivePort();
SendPort _send;
Completer _initConnectionCompleter;
final Map<int, Completer<_Response>> _pendingRequests = {};
_Client() {
_receive.listen(_handleResponse);
_receive.close();
}
Future<T> _sendRequest<T extends _Response>(_Request request) {
final id = _requestId++;
final completer = Completer<_Response>();
_pendingRequests[id] = completer;
_send.send(request);
return completer.future.then((r) => r as T);
}
Future<T> _connectVia<T extends GeneratedDatabase>(
MoorIsolate isolate) async {
_initConnectionCompleter = Completer();
final initialSendPort = isolate._connectToDb;
initialSendPort.send(_ClientHello(_receive.sendPort));
await _initConnectionCompleter.future;
// todo construct new database by forking
return null;
}
void _handleResponse(dynamic response) {
if (response is _ServerHello) {
_send = response.sendToServer;
_initConnectionCompleter.complete();
} else if (response is _Response) {
_pendingRequests[response]?.complete(response);
}
}
}

View File

@ -0,0 +1,44 @@
import 'dart:async';
import 'dart:isolate';
import 'package:moor/moor.dart';
part 'client.dart';
part 'protocol.dart';
part 'server.dart';
/// Signature of a function that opens a database connection.
typedef DatabaseOpener = DatabaseConnection Function();
/// Defines utilities to run moor in a background isolate. In the operation mode
/// created by these utilities, there's a single background isolate doing all
/// the work. Any other isolate can use the [connect] method to obtain an
/// instance of a [GeneratedDatabase] class that will delegate its work onto a
/// background isolate. Auto-updating queries, and transactions work across
/// isolates, and the user facing api is exactly the same.
///
/// Please note that, while running moor in a background isolate can reduce
/// latency in foreground isolates (thus reducing UI lags), the overall
/// performance is going to be much worse as data has to be serialized and
/// deserialized to be sent over isolates.
/// Also, be aware that this api is not available on the web.
///
/// See also:
/// - [Isolate], for general information on multi threading in Dart.
/// - TODO: Write documentation tutorial for this on the website
class MoorIsolate {
/// The [SendPort] created by the background isolate running the db. We'll use
/// this port to initialize a connection to the background isolate. Further
/// communication happens across a port that is specific for each client
/// isolate.
SendPort _connectToDb;
static Future<MoorIsolate> spawn() {}
static MoorIsolate inCurrent() {}
Future<T> connect<T extends GeneratedDatabase>() {
final client = _Client();
return client._connectVia(this);
}
}

View File

@ -0,0 +1,29 @@
part of 'moor_isolate.dart';
abstract class _Message {}
abstract class _Request extends _Message {
/// An id for this request that is unique per client.
int id;
}
abstract class _Response extends _Message {
/// The [_Request.id] from the request this is response to.
int id;
}
/// A notification is only sent from the server
abstract class _Notification extends _Message {}
class _ClientHello extends _Message {
/// The [SendPort] used by the server to send messages to this client.
final SendPort sendMsgToClient;
_ClientHello(this.sendMsgToClient);
}
class _ServerHello extends _Message {
final SendPort sendToServer;
_ServerHello(this.sendToServer);
}

View File

@ -0,0 +1,50 @@
part of 'moor_isolate.dart';
/// A "server" runs in an [Isolate] and takes requests from "client" isolates.
class _Server {
/// The [ReceivePort] used to establish connections with new clients. This is
/// the pendant to [MoorIsolate._connectToDb].
ReceivePort _connectionRequest;
DatabaseConnection _connection;
final List<_ConnectedClient> _clients = [];
_Server(DatabaseOpener opener, SendPort sendPort) {
_connection = opener();
_connectionRequest = ReceivePort();
_connectionRequest.listen(_handleConnectionRequest);
sendPort.send(_connectionRequest.sendPort);
}
void _handleConnectionRequest(dynamic message) {
if (message is! _ClientHello) {
throw AssertionError('Unexpected initial message from client: $message');
// we can't replay this to the client because we don't have a SendPort
}
final sendToClient = (message as _ClientHello).sendMsgToClient;
final receive = ReceivePort();
final client = _ConnectedClient(receive, sendToClient);
receive.listen((data) {
if (data is _Request) {
_handleRequest(client, data);
}
// todo send error message to client when it sends something that isn't
// a request
});
sendToClient.send(_ServerHello(receive.sendPort));
}
void _handleRequest(_ConnectedClient client, _Request request) {}
}
class _ConnectedClient {
final ReceivePort receiveFromClient;
final SendPort sendToClient;
_ConnectedClient(this.receiveFromClient, this.sendToClient);
}