Add single-client mode for remote connections

This commit is contained in:
Simon Binder 2022-11-25 13:47:08 +01:00
parent 1d02ad9cd9
commit 097f941f88
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
10 changed files with 119 additions and 65 deletions

View File

@ -50,8 +50,8 @@ void main() async {
// internally. This is NOT what we returned from _backgroundConnection, drift
// uses an internal proxy class for isolate communication.
// As long as the isolate is used by only one database (it is here), we can
// use `shutdownOnClose` to dispose the isolate after closing the connection.
final connection = await isolate.connect(shutdownOnClose: true);
// use `singleClientMode` to dispose the isolate after closing the connection.
final connection = await isolate.connect(singleClientMode: true);
final db = TodoDb.connect(connection);
@ -69,7 +69,7 @@ void connectSynchronously() {
TodoDb.connect(
DatabaseConnection.delayed(Future.sync(() async {
final isolate = await DriftIsolate.spawn(_backgroundConnection);
return isolate.connect(shutdownOnClose: true);
return isolate.connect(singleClientMode: true);
})),
);
// #enddocregion delayed
@ -123,7 +123,7 @@ class _IsolateStartRequest {
DatabaseConnection createDriftIsolateAndConnect() {
return DatabaseConnection.delayed(Future.sync(() async {
final isolate = await _createDriftIsolate();
return await isolate.connect(shutdownOnClose: true);
return await isolate.connect(singleClientMode: true);
}));
}
// #enddocregion init_connect

View File

@ -117,7 +117,7 @@ This call will release all resources used by the drift isolate.
In many cases, you know that only a single client will connect to the
`DriftIsolate` (for instance because you're spawning a new `DriftIsolate` when
opening a database). In this case, you can set the `shutdownOnClose: true`
opening a database). In this case, you can set the `singleClientMode: true`
parameter on `connect()`.
With this parameter, closing the single connection will also fully dispose the
drift isolate.

View File

@ -6,8 +6,8 @@
They can be used to compare the column against a list of Dart expressions that
will be mapped through a type converter.
- Add `TableStatements.insertAll` to atomically insert multiple rows.
- Add `shutdownOnClose` to `remote()` and `DriftIsolate` connections to shutdown
a drift server or isolate after closing a database connection.
- Add `singleClientMode` to `remote()` and `DriftIsolate` connections to make
the common case with one client more efficient.
- Fix a concurrency issues around transactions.
## 2.2.0

View File

@ -88,23 +88,26 @@ class DriftIsolate {
/// All operations on the returned [DatabaseConnection] will be executed on a
/// background isolate.
///
/// When [shutdownOnClose] is enabled (it defaults to `false`), the drift
/// server on the remote isolate will be shut down when this database
/// connection is closed. This option can be enabled when it is known that the
/// drift isolate will only ever serve one client.
/// When [singleClientMode] is enabled (it defaults to `false`), drift assumes
/// that the isolate will only be connected to once. In this mode, drift will
/// shutdown the remote isolate once the returned [DatabaseConnection] is
/// closed.
/// Also, stream queries are more efficient when this mode is enables since we
/// don't have to synchronize table updates to other clients (since there are
/// none).
///
/// Setting the [isolateDebugLog] is only helpful when debugging drift itself.
/// It will print messages exchanged between the two isolates.
// todo: breaking: Make synchronous in drift 2
Future<DatabaseConnection> connect({
bool isolateDebugLog = false,
bool shutdownOnClose = false,
bool singleClientMode = false,
}) async {
return remote(
_open(),
debugLog: isolateDebugLog,
serialize: serialize,
shutdownOnClose: shutdownOnClose,
singleClientMode: singleClientMode,
);
}

View File

@ -119,11 +119,12 @@ abstract class DriftServer {
/// The other end of the [channel] must be attached to a drift server with
/// [DriftServer.serve] for this setup to work.
///
/// The [shutdownOnClose] parameter controls whether [shutdown] is called
/// after closing the returned database connection. By default, only this
/// connection will be closed and the server will continue to run. When enabled,
/// the server will shutdown when this connection is closed. This is useful when
/// it is known that the server will only serve a single connection.
/// If it is known that only a single client will connect to this database
/// server, [singleClientMode] can be enabled.
/// When enabled, [shutdown] is implicitly called when the database connection
/// is closed. This may make it easier to dispose the remote isolate or server.
/// Also, update notifications for table updates don't have to be sent which
/// reduces load on the connection.
///
/// If [serialize] is true, drift will only send [bool], [int], [double],
/// [Uint8List], [String] or [List]'s thereof over the channel. Otherwise,
@ -137,9 +138,9 @@ DatabaseConnection remote(
StreamChannel<Object?> channel, {
bool debugLog = false,
bool serialize = true,
bool shutdownOnClose = false,
bool singleClientMode = false,
}) {
final client = DriftClient(channel, debugLog, serialize, shutdownOnClose);
final client = DriftClient(channel, debugLog, serialize, singleClientMode);
return client.connection;
}

View File

@ -13,7 +13,12 @@ import 'protocol.dart';
/// The client part of a remote drift communication scheme.
class DriftClient {
final DriftCommunication _channel;
final bool _shutdownOnClose;
/// Whether we know that only a single client will use the database server.
///
/// In this case, we shutdown the server after the client disconnects and
/// can avoid forwarding stream query updaten notifications.
final bool _singleClientMode;
late final _RemoteStreamQueryStore _streamStore =
_RemoteStreamQueryStore(this);
@ -32,7 +37,7 @@ class DriftClient {
StreamChannel<Object?> channel,
bool debugLog,
bool serialize,
this._shutdownOnClose,
this._singleClientMode,
) : _channel = DriftCommunication(channel,
debugLog: debugLog, serialize: serialize) {
_channel.setRequestHandler(_handleRequest);
@ -149,7 +154,7 @@ class _RemoteQueryExecutor extends _BaseExecutor {
final channel = client._channel;
if (!channel.isClosed) {
if (client._shutdownOnClose) {
if (client._singleClientMode) {
return channel
.request(NoArgsRequest.terminateAll)
.whenComplete(channel.close);
@ -233,11 +238,13 @@ class _RemoteStreamQueryStore extends StreamQueryStore {
@override
void handleTableUpdates(Set<TableUpdate> updates,
[bool comesFromServer = false]) {
if (comesFromServer) {
if (comesFromServer || _client._singleClientMode) {
super.handleTableUpdates(updates);
} else {
// requests are async, but the function is synchronous. We await that
// future in close()
// Also notify the server (so that queries on other connections have a
// chance to update as well). Since this method is synchronous but the
// connection isn't, we store this request in a completer and await
// pending operations in close() (which is async).
final completer = Completer<void>();
_awaitingUpdates.add(completer);

View File

@ -11,50 +11,67 @@ void main() {
for (final existingListener in [true, false]) {
for (final useIsolate in [true, false]) {
for (final useTransaction in [true, false]) {
final vars = 'existingListener=$existingListener, '
'useIsolate=$useIsolate, '
'useTransaction=$useTransaction';
test('can read-your-writes ($vars)', () async {
final db = useIsolate
? _SomeDb.connect(await _connect())
: _SomeDb(NativeDatabase.memory());
addTearDown(db.close);
await db.into(db.someTable).insert(_SomeTableCompanion());
Stream<_SomeTableData> getRow() =>
db.select(db.someTable).watchSingle();
Future<void> readYourWrite() async {
final update = _SomeTableCompanion(name: Value('x'));
await db.update(db.someTable).write(update);
// await pumpEventQueue();
final row = await getRow().first;
expect(row.name, equals('x'),
reason: 'should be able to read the row we just wrote');
}
if (existingListener) {
getRow().listen(null);
}
await (useTransaction
? db.transaction(readYourWrite)
: readYourWrite());
});
for (final singleClientMode in [true, false]) {
_defineTest(
existingListener, useIsolate, useTransaction, singleClientMode);
}
}
}
}
}
Future<DatabaseConnection> _connect() async {
void _defineTest(
bool existingListener,
bool useIsolate,
bool useTransaction,
bool singleClientMode,
) {
final vars = 'existingListener=$existingListener, '
'useIsolate=$useIsolate, '
'useTransaction=$useTransaction, '
'singleClientMode=$singleClientMode';
test('can read-your-writes ($vars)', () async {
final isolate = useIsolate ? await _spawnIsolate() : null;
final db = useIsolate
? _SomeDb.connect(await isolate!.connect())
: _SomeDb(NativeDatabase.memory());
addTearDown(() async {
await db.close();
if (!singleClientMode && useIsolate) {
await isolate!.shutdownAll();
}
});
await db.into(db.someTable).insert(_SomeTableCompanion());
Stream<_SomeTableData> getRow() => db.select(db.someTable).watchSingle();
Future<void> readYourWrite() async {
final update = _SomeTableCompanion(name: Value('x'));
await db.update(db.someTable).write(update);
// await pumpEventQueue();
final row = await getRow().first;
expect(row.name, equals('x'),
reason: 'should be able to read the row we just wrote');
}
if (existingListener) {
getRow().listen(null);
}
await (useTransaction ? db.transaction(readYourWrite) : readYourWrite());
});
}
Future<DriftIsolate> _spawnIsolate() async {
final out = ReceivePort();
final args = out.sendPort;
await Isolate.spawn(_isolateEntrypoint, args);
final isolate = (await out.first) as DriftIsolate;
return isolate.connect();
return (await out.first) as DriftIsolate;
}
void _isolateEntrypoint(SendPort out) {

View File

@ -127,7 +127,7 @@ void main() {
expect(done.first, completion(anything));
final drift = await spawned.first as DriftIsolate;
final db = TodoDb.connect(await drift.connect(shutdownOnClose: true));
final db = TodoDb.connect(await drift.connect(singleClientMode: true));
await db.close();
}, tags: 'background_isolate');

View File

@ -1,6 +1,7 @@
@TestOn('vm')
import 'package:async/async.dart';
import 'package:drift/drift.dart';
import 'package:drift/native.dart';
import 'package:drift/remote.dart';
import 'package:drift/src/remote/protocol.dart';
import 'package:mockito/mockito.dart';
@ -30,7 +31,7 @@ void main() {
server.serve(controller.foreign);
final client =
remote(controller.local.expectedToClose, shutdownOnClose: true);
remote(controller.local.expectedToClose, singleClientMode: true);
final db = TodoDb.connect(client);
await db.todosTable.select().get();
@ -39,6 +40,31 @@ void main() {
expect(server.done, completes);
});
test(
'does not send table update notifications in single client mode',
() async {
final server =
DriftServer(testInMemoryDatabase(), allowRemoteShutdown: true);
final controller = StreamChannelController();
server.serve(controller.foreign, serialize: false);
final client = remote(
controller.local.transformSink(StreamSinkTransformer.fromHandlers(
handleData: (data, out) {
expect(data, isNot(isA<NotifyTablesUpdated>()));
out.add(data);
},
)),
serialize: false,
singleClientMode: true,
);
final db = TodoDb.connect(client);
await db.todosTable.select().get();
await db.close();
},
);
test('Uint8Lists are mapped from and to Uint8Lists', () async {
const protocol = DriftProtocol();

View File

@ -29,7 +29,7 @@ DatabaseConnection connect() {
// Each connect() spawns a new isolate which is only used for one
// connection, so we shutdown the isolate when the database is closed.
return driftIsolate.connect(shutdownOnClose: true);
return driftIsolate.connect(singleClientMode: true);
}));
}