Exchange dialect information for remote database

This commit is contained in:
Simon Binder 2022-12-18 22:02:49 +01:00
parent 4644bce9dd
commit 18c6139eb3
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
10 changed files with 108 additions and 18 deletions

View File

@ -98,12 +98,11 @@ class DriftIsolate {
/// ///
/// Setting the [isolateDebugLog] is only helpful when debugging drift itself. /// Setting the [isolateDebugLog] is only helpful when debugging drift itself.
/// It will print messages exchanged between the two isolates. /// It will print messages exchanged between the two isolates.
// todo: breaking: Make synchronous in drift 2
Future<DatabaseConnection> connect({ Future<DatabaseConnection> connect({
bool isolateDebugLog = false, bool isolateDebugLog = false,
bool singleClientMode = false, bool singleClientMode = false,
}) async { }) async {
return remote( return connectToRemoteAndInitialize(
_open(), _open(),
debugLog: isolateDebugLog, debugLog: isolateDebugLog,
serialize: serialize, serialize: serialize,

View File

@ -97,7 +97,7 @@ abstract class DriftServer {
/// the message may be any Dart object. /// the message may be any Dart object.
/// ///
/// After calling [serve], you can obtain a [DatabaseConnection] on the other /// After calling [serve], you can obtain a [DatabaseConnection] on the other
/// end of the [channel] by calling [remote]. /// end of the [channel] by calling [connectToRemoteAndInitialize].
/// ///
/// __Warning__: As long as this library is marked experimental, the protocol /// __Warning__: As long as this library is marked experimental, the protocol
/// might change with every drift version. For this reason, make sure that /// might change with every drift version. For this reason, make sure that
@ -134,6 +134,17 @@ abstract class DriftServer {
/// ///
/// The optional [debugLog] can be enabled to print incoming and outgoing /// The optional [debugLog] can be enabled to print incoming and outgoing
/// messages. /// messages.
///
/// __NOTE__: This synchronous method has a flaw, as its [QueryExecutor.dialect]
/// is always going to be [SqlDialect.sqlite]. While this not a problem in most
/// scenarios where that is the actual database, it makes it harder to use with
/// other database clients. The [connectToRemoteAndInitialize] method does not
/// have this issue.
///
/// Due to this problem, it is recommended to avoid [remote] altogether. If you
/// know the dialect beforehand, you can wrap [connectToRemoteAndInitialize] in
/// a [DatabaseConnection.delayed] to get a connection sychronously.
@Deprecated('Use the asynchronous `connectToRemoteAndInitialize` instead')
DatabaseConnection remote( DatabaseConnection remote(
StreamChannel<Object?> channel, { StreamChannel<Object?> channel, {
bool debugLog = false, bool debugLog = false,
@ -144,6 +155,37 @@ DatabaseConnection remote(
return client.connection; return client.connection;
} }
/// Connects to a remote server over a two-way communication channel.
///
/// The other end of the [channel] must be attached to a drift server with
/// [DriftServer.serve] for this setup to work.
///
/// If it is known that only a single client will connect to this database
/// server, [singleClientMode] can be enabled.
/// When enabled, [shutdown] is implicitly called when the database connection
/// is closed. This may make it easier to dispose the remote isolate or server.
/// Also, update notifications for table updates don't have to be sent which
/// reduces load on the connection.
///
/// If [serialize] is true, drift will only send [bool], [int], [double],
/// [Uint8List], [String] or [List]'s thereof over the channel. Otherwise,
/// the message may be any Dart object.
/// The value of [serialize] for [connectToRemoteAndInitialize] must be the same
/// value passed to [DriftServer.serve].
///
/// The optional [debugLog] can be enabled to print incoming and outgoing
/// messages.
Future<DatabaseConnection> connectToRemoteAndInitialize(
StreamChannel<Object?> channel, {
bool debugLog = false,
bool serialize = true,
bool singleClientMode = false,
}) async {
final client = DriftClient(channel, debugLog, serialize, singleClientMode);
await client.serverInfo;
return client.connection;
}
/// Sends a shutdown request over a channel. /// Sends a shutdown request over a channel.
/// ///
/// On the remote side, the corresponding channel must have been passed to /// On the remote side, the corresponding channel must have been passed to

View File

@ -14,6 +14,12 @@ import 'protocol.dart';
class DriftClient { class DriftClient {
final DriftCommunication _channel; final DriftCommunication _channel;
SqlDialect _serverDialect = SqlDialect.sqlite;
final Completer<ServerInfo> _serverInfo = Completer();
/// Waits for the first [ServerInfo] message to this client.
Future<ServerInfo> get serverInfo => _serverInfo.future;
/// Whether we know that only a single client will use the database server. /// Whether we know that only a single client will use the database server.
/// ///
/// In this case, we shutdown the server after the client disconnects and /// In this case, we shutdown the server after the client disconnects and
@ -51,6 +57,9 @@ class DriftClient {
return _connectedDb.beforeOpen(executor, payload.details); return _connectedDb.beforeOpen(executor, payload.details);
} else if (payload is NotifyTablesUpdated) { } else if (payload is NotifyTablesUpdated) {
_streamStore.handleTableUpdates(payload.updates.toSet(), true); _streamStore.handleTableUpdates(payload.updates.toSet(), true);
} else if (payload is ServerInfo) {
_serverDialect = payload.dialect;
_serverInfo.complete(payload);
} }
} }
} }
@ -62,6 +71,9 @@ abstract class _BaseExecutor extends QueryExecutor {
// ignore: unused_element, https://github.com/dart-lang/sdk/issues/49007 // ignore: unused_element, https://github.com/dart-lang/sdk/issues/49007
_BaseExecutor(this.client, [this._executorId]); _BaseExecutor(this.client, [this._executorId]);
@override
SqlDialect get dialect => client._serverDialect;
@override @override
Future<void> runBatched(BatchedStatements statements) { Future<void> runBatched(BatchedStatements statements) {
return client._channel return client._channel
@ -129,9 +141,6 @@ class _RemoteQueryExecutor extends _BaseExecutor {
Completer<void>? _setSchemaVersion; Completer<void>? _setSchemaVersion;
Future<bool>? _serverIsOpen; Future<bool>? _serverIsOpen;
@override
SqlDialect get dialect => SqlDialect.sqlite;
@override @override
TransactionExecutor beginTransaction() { TransactionExecutor beginTransaction() {
return _RemoteTransactionExecutor(client, _executorId); return _RemoteTransactionExecutor(client, _executorId);

View File

@ -22,6 +22,8 @@ class DriftProtocol {
static const _tag_DirectValue = 10; static const _tag_DirectValue = 10;
static const _tag_SelectResult = 11; static const _tag_SelectResult = 11;
static const _tag_RequestCancellation = 12; static const _tag_RequestCancellation = 12;
static const _tag_ServerInfo = 13;
static const _tag_BigInt = 'bigint'; static const _tag_BigInt = 'bigint';
Object? serialize(Message message) { Object? serialize(Message message) {
@ -103,6 +105,11 @@ class DriftProtocol {
]; ];
} else if (payload is EnsureOpen) { } else if (payload is EnsureOpen) {
return [_tag_EnsureOpen, payload.schemaVersion, payload.executorId]; return [_tag_EnsureOpen, payload.schemaVersion, payload.executorId];
} else if (payload is ServerInfo) {
return [
_tag_ServerInfo,
payload.dialect.name,
];
} else if (payload is RunBeforeOpen) { } else if (payload is RunBeforeOpen) {
return [ return [
_tag_RunBeforeOpen, _tag_RunBeforeOpen,
@ -189,6 +196,8 @@ class DriftProtocol {
return RunTransactionAction(control, readNullableInt(2)); return RunTransactionAction(control, readNullableInt(2));
case _tag_EnsureOpen: case _tag_EnsureOpen:
return EnsureOpen(readInt(1), readNullableInt(2)); return EnsureOpen(readInt(1), readNullableInt(2));
case _tag_ServerInfo:
return ServerInfo(SqlDialect.values.byName(fullMessage![1] as String));
case _tag_RunBeforeOpen: case _tag_RunBeforeOpen:
return RunBeforeOpen( return RunBeforeOpen(
OpeningDetails(readNullableInt(1), readInt(2)), OpeningDetails(readNullableInt(1), readInt(2)),
@ -408,6 +417,17 @@ class EnsureOpen {
} }
} }
class ServerInfo {
final SqlDialect dialect;
ServerInfo(this.dialect);
@override
String toString() {
return 'ServerInfo($dialect)';
}
}
/// Sent from the server to the client when it should run the before open /// Sent from the server to the client when it should run the before open
/// callback. /// callback.
class RunBeforeOpen { class RunBeforeOpen {

View File

@ -54,6 +54,7 @@ class ServerImplementation implements DriftServer {
final comm = DriftCommunication(channel, serialize: serialize); final comm = DriftCommunication(channel, serialize: serialize);
comm.setRequestHandler((request) => _handleRequest(comm, request)); comm.setRequestHandler((request) => _handleRequest(comm, request));
comm.request(ServerInfo(connection.executor.dialect));
_activeChannels.add(comm); _activeChannels.add(comm);
comm.closed.then((_) => _activeChannels.remove(comm)); comm.closed.then((_) => _activeChannels.remove(comm));

View File

@ -44,15 +44,17 @@ class DatabaseConnection {
/// } /// }
/// } /// }
/// ``` /// ```
factory DatabaseConnection.delayed(FutureOr<DatabaseConnection> connection) { factory DatabaseConnection.delayed(FutureOr<DatabaseConnection> connection,
{SqlDialect dialect = SqlDialect.sqlite}) {
if (connection is DatabaseConnection) { if (connection is DatabaseConnection) {
return connection; return connection;
} }
return DatabaseConnection( return DatabaseConnection(
LazyDatabase(() async => (await connection).executor), LazyDatabase(() async => (await connection).executor, dialect: dialect),
streamQueries: DelayedStreamQueryStore( streamQueries: DelayedStreamQueryStore(
connection.then((conn) => conn.streamQueries)), connection.then((conn) => conn.streamQueries),
),
); );
} }

View File

@ -29,8 +29,9 @@ void main() {
DriftServer(testInMemoryDatabase(), allowRemoteShutdown: true); DriftServer(testInMemoryDatabase(), allowRemoteShutdown: true);
server.serve(controller.foreign); server.serve(controller.foreign);
final client = final client = await connectToRemoteAndInitialize(
remote(controller.local.expectedToClose, singleClientMode: true); controller.local.expectedToClose,
singleClientMode: true);
final db = TodoDb.connect(client); final db = TodoDb.connect(client);
await db.todosTable.select().get(); await db.todosTable.select().get();
@ -47,7 +48,7 @@ void main() {
final controller = StreamChannelController(); final controller = StreamChannelController();
server.serve(controller.foreign, serialize: false); server.serve(controller.foreign, serialize: false);
final client = remote( final client = await connectToRemoteAndInitialize(
controller.local.transformSink(StreamSinkTransformer.fromHandlers( controller.local.transformSink(StreamSinkTransformer.fromHandlers(
handleData: (data, out) { handleData: (data, out) {
expect(data, isNot(isA<NotifyTablesUpdated>())); expect(data, isNot(isA<NotifyTablesUpdated>()));
@ -117,7 +118,7 @@ void main() {
server.serve(channelController.foreign.changeStream(_checkStreamOfSimple), server.serve(channelController.foreign.changeStream(_checkStreamOfSimple),
serialize: true); serialize: true);
final connection = remote( final connection = await connectToRemoteAndInitialize(
channelController.local channelController.local
.changeStream(_checkStreamOfSimple) .changeStream(_checkStreamOfSimple)
.expectedToClose, .expectedToClose,
@ -162,7 +163,8 @@ void main() {
server.serve(controller.foreign); server.serve(controller.foreign);
addTearDown(server.shutdown); addTearDown(server.shutdown);
final db = TodoDb.connect(remote(controller.local)); final db =
TodoDb.connect(await connectToRemoteAndInitialize(controller.local));
addTearDown(db.close); addTearDown(db.close);
await db.transaction(() async { await db.transaction(() async {
@ -190,6 +192,19 @@ void main() {
verify(innerTransactions[1].send()); verify(innerTransactions[1].send());
verify(outerTransaction.send()); verify(outerTransaction.send());
}); });
test('reports correct dialect of remote', () async {
final executor = MockExecutor();
when(executor.dialect).thenReturn(SqlDialect.postgres);
final controller = StreamChannelController();
final server = DriftServer(DatabaseConnection(executor))
..serve(controller.foreign);
final client = await connectToRemoteAndInitialize(controller.local);
await server.shutdown();
expect(client.executor.dialect, SqlDialect.postgres);
});
} }
Stream<Object?> _checkStreamOfSimple(Stream<Object?> source) { Stream<Object?> _checkStreamOfSimple(Stream<Object?> source) {

View File

@ -15,7 +15,8 @@ const _useWorker = true;
DatabaseConnection connect({bool isInWebWorker = false}) { DatabaseConnection connect({bool isInWebWorker = false}) {
if (_useWorker && !isInWebWorker) { if (_useWorker && !isInWebWorker) {
final worker = SharedWorker('shared_worker.dart.js'); final worker = SharedWorker('shared_worker.dart.js');
return remote(worker.port!.channel()); return DatabaseConnection.delayed(
connectToRemoteAndInitialize(worker.port!.channel()));
} else { } else {
return DatabaseConnection.delayed(Future.sync(() async { return DatabaseConnection.delayed(Future.sync(() async {
// We're using the experimental wasm support in Drift because this gives // We're using the experimental wasm support in Drift because this gives

View File

@ -16,6 +16,7 @@ class PlatformInterface {
static DatabaseConnection _connectToWorker(String databaseName) { static DatabaseConnection _connectToWorker(String databaseName) {
final worker = SharedWorker( final worker = SharedWorker(
kReleaseMode ? 'worker.dart.min.js' : 'worker.dart.js', databaseName); kReleaseMode ? 'worker.dart.min.js' : 'worker.dart.js', databaseName);
return remote(worker.port!.channel()); return DatabaseConnection.delayed(
connectToRemoteAndInitialize(worker.port!.channel()));
} }
} }

View File

@ -4,9 +4,9 @@ import 'package:drift/remote.dart';
import 'package:drift/web.dart'; import 'package:drift/web.dart';
import 'package:web_worker_example/database.dart'; import 'package:web_worker_example/database.dart';
void main() { void main() async {
final worker = SharedWorker('worker.dart.js'); final worker = SharedWorker('worker.dart.js');
final connection = remote(worker.port!.channel()); final connection = await connectToRemoteAndInitialize(worker.port!.channel());
final db = MyDatabase(connection); final db = MyDatabase(connection);
final output = document.getElementById('output')!; final output = document.getElementById('output')!;