Close vfs when wasm database is closed

This commit is contained in:
Simon Binder 2024-01-06 18:06:01 +01:00
parent 42a0a40055
commit 09c6cf0b4e
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
10 changed files with 214 additions and 27 deletions

View File

@ -1,6 +1,7 @@
## 2.15.0-dev
- Methods in the query builder API now respect custom types.
- Close wasm databases hosted in workers after the last client disconnects.
## 2.14.1

View File

@ -2,6 +2,8 @@ import 'dart:html';
import 'package:stream_channel/stream_channel.dart';
const _disconnectMessage = '_disconnect';
/// Extension to transform a raw [MessagePort] from web workers into a Dart
/// [StreamChannel].
extension PortToChannel on MessagePort {
@ -10,10 +12,35 @@ extension PortToChannel on MessagePort {
///
/// This can be used to implement a remote database connection over service
/// workers.
StreamChannel<Object?> channel() {
///
/// The [explicitClose] parameter can be used to control whether a close
/// message should be sent through the channel when it is closed. This will
/// cause it to be closed on the other end as well. Note that this is not a
/// reliable way of determining channel closures though, as there is no event
/// for channels being closed due to a tab or worker being closed.
/// Both "ends" of a JS channel calling [channel] on their part must use the
/// value for [explicitClose].
StreamChannel<Object?> channel({bool explicitClose = false}) {
final controller = StreamChannelController<Object?>();
onMessage.map((event) => event.data).pipe(controller.local.sink);
controller.local.stream.listen(postMessage, onDone: close);
onMessage.listen((event) {
final message = event.data;
if (explicitClose && message == _disconnectMessage) {
// Other end has closed the connection
controller.local.sink.close();
} else {
controller.local.sink.add(message);
}
});
controller.local.stream.listen(postMessage, onDone: () {
// Closed locally, inform the other end.
if (explicitClose) {
postMessage(_disconnectMessage);
}
close();
});
return controller.foreign;
}

View File

@ -114,6 +114,7 @@ class WasmDatabaseOpener {
as DedicatedWorkerCompatibilityResult;
_handleCompatibilityResult(status);
dedicatedWorker.version = status.version;
if (status.supportsNestedWorkers &&
status.canAccessOpfs &&
@ -142,6 +143,7 @@ class WasmDatabaseOpener {
as SharedWorkerCompatibilityResult;
_handleCompatibilityResult(sharedFeatures);
shared.version = sharedFeatures.version;
// Prefer to use the shared worker to host the database if it supports the
// necessary APIs.
@ -160,6 +162,7 @@ class WasmDatabaseOpener {
final class _DriftWorker {
final AbstractWorker worker;
ProtocolVersion version = ProtocolVersion.legacy;
/// The message port to communicate with the worker, if it's a shared worker.
final MessagePort? portForShared;
@ -225,16 +228,8 @@ final class _ProbeResult implements WasmProbeResult {
final channel = MessageChannel();
final initializer = initializeDatabase;
final initChannel = initializer != null ? MessageChannel() : null;
final local = channel.port1.channel();
final message = ServeDriftDatabase(
sqlite3WasmUri: opener.sqlite3WasmUri,
port: channel.port2,
storage: implementation,
databaseName: name,
initializationPort: initChannel?.port2,
);
ServeDriftDatabase message;
final sharedWorker = opener._sharedWorker;
final dedicatedWorker = opener._dedicatedWorker;
@ -242,10 +237,28 @@ final class _ProbeResult implements WasmProbeResult {
case WasmStorageImplementation.opfsShared:
case WasmStorageImplementation.sharedIndexedDb:
// Forward connection request to shared worker.
message.sendTo(sharedWorker!.send);
message = ServeDriftDatabase(
sqlite3WasmUri: opener.sqlite3WasmUri,
port: channel.port2,
storage: implementation,
databaseName: name,
initializationPort: initChannel?.port2,
protocolVersion: sharedWorker!.version,
);
message.sendTo(sharedWorker.send);
case WasmStorageImplementation.opfsLocks:
case WasmStorageImplementation.unsafeIndexedDb:
if (dedicatedWorker != null) {
message = ServeDriftDatabase(
sqlite3WasmUri: opener.sqlite3WasmUri,
port: channel.port2,
storage: implementation,
databaseName: name,
initializationPort: initChannel?.port2,
protocolVersion: dedicatedWorker.version,
);
message.sendTo(dedicatedWorker.send);
} else {
// Workers seem to be broken, but we don't need them with this storage
@ -276,6 +289,9 @@ final class _ProbeResult implements WasmProbeResult {
}
});
final local = channel.port1
.channel(explicitClose: message.protocolVersion >= ProtocolVersion.v1);
var connection = await connectToRemoteAndInitialize(local);
if (implementation == WasmStorageImplementation.opfsLocks) {
// We want stream queries to update for writes in other tabs. For the

View File

@ -77,6 +77,7 @@ class DedicatedDriftWorker {
opfsExists: opfsExists,
indexedDbExists: indexedDbExists,
existingDatabases: existingDatabases,
version: ProtocolVersion.current,
).sendToClient(self);
case ServeDriftDatabase():
_servers.serve(message);

View File

@ -8,6 +8,56 @@ import 'package:sqlite3/wasm.dart';
import 'types.dart';
/// Due to in-browser caching or users not updating their `drift_worker.dart`
/// file after updating drift, the main web app and the workers may be compiled
/// with different versions of drift. To avoid inconsistencies in the
/// communication channel between them, they compare their versions in a
/// handshake and only use features supported by both.
class ProtocolVersion {
final int versionCode;
const ProtocolVersion._(this.versionCode);
void writeToJs(Object object) {
setProperty(object, 'v', versionCode);
}
bool operator >=(ProtocolVersion other) {
return versionCode >= other.versionCode;
}
static ProtocolVersion negotiate(int? versionCode) {
return switch (versionCode) {
null => legacy,
<= 0 => legacy,
1 => v1,
> 1 => current,
_ => throw AssertionError(),
};
}
static ProtocolVersion fromJsObject(Object object) {
if (hasProperty(object, 'v')) {
return negotiate(getProperty<int>(object, 'v'));
} else {
return legacy;
}
}
/// The protocol version used for drift versions up to 2.14 - these don't have
/// a version marker anywhere.
static const legacy = ProtocolVersion._(0);
/// This version makes workers report their supported protocol version.
///
/// When both the client and the involved worker support this version, an
/// explicit close notification is sent from clients to workers when closing
/// databases. This allows workers to release resources more effieciently.
static const v1 = ProtocolVersion._(1);
static const current = v1;
}
typedef PostMessage = void Function(Object? msg, [List<Object>? transfer]);
/// Sealed superclass for JavaScript objects exchanged between the UI tab and
@ -65,6 +115,12 @@ sealed class CompatibilityResult extends WasmInitializationMessage {
/// be used to check whether the database exists.
final List<ExistingDatabase> existingDatabases;
/// The latest protocol version spoken by the worker.
///
/// Workers only started to report their version in drift 2.15, we assume
/// [ProtocolVersion.legacy] for workers that don't report their version.
final ProtocolVersion version;
final bool indexedDbExists;
final bool opfsExists;
@ -74,6 +130,7 @@ sealed class CompatibilityResult extends WasmInitializationMessage {
required this.existingDatabases,
required this.indexedDbExists,
required this.opfsExists,
required this.version,
});
}
@ -96,6 +153,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
required super.indexedDbExists,
required super.opfsExists,
required super.existingDatabases,
required super.version,
});
factory SharedWorkerCompatibilityResult.fromJsPayload(Object payload) {
@ -103,9 +161,15 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
final asBooleans = asList.cast<bool>();
final List<ExistingDatabase> existingDatabases;
var version = ProtocolVersion.legacy;
if (asList.length > 5) {
existingDatabases =
EncodeLocations.readFromJs(asList[5] as List<dynamic>);
if (asList.length > 6) {
version = ProtocolVersion.negotiate(asList[6] as int);
}
} else {
existingDatabases = const [];
}
@ -117,6 +181,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
indexedDbExists: asBooleans[3],
opfsExists: asBooleans[4],
existingDatabases: existingDatabases,
version: version,
);
}
@ -129,6 +194,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
indexedDbExists,
opfsExists,
existingDatabases.encodeToJs(),
version.versionCode,
]);
}
@ -175,6 +241,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
final WasmStorageImplementation storage;
final String databaseName;
final MessagePort? initializationPort;
final ProtocolVersion protocolVersion;
ServeDriftDatabase({
required this.sqlite3WasmUri,
@ -182,6 +249,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
required this.storage,
required this.databaseName,
required this.initializationPort,
required this.protocolVersion,
});
factory ServeDriftDatabase.fromJsPayload(Object payload) {
@ -192,6 +260,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
.byName(getProperty(payload, 'storage')),
databaseName: getProperty(payload, 'database'),
initializationPort: getProperty(payload, 'initPort'),
protocolVersion: ProtocolVersion.fromJsObject(payload),
);
}
@ -204,6 +273,7 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
setProperty(object, 'database', databaseName);
final initPort = initializationPort;
setProperty(object, 'initPort', initPort);
protocolVersion.writeToJs(object);
sender.sendTyped(type, object, [
port,
@ -249,6 +319,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
required super.indexedDbExists,
required super.opfsExists,
required super.existingDatabases,
required super.version,
});
factory DedicatedWorkerCompatibilityResult.fromJsPayload(Object payload) {
@ -268,6 +339,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
indexedDbExists: getProperty(payload, 'indexedDbExists'),
opfsExists: getProperty(payload, 'opfsExists'),
existingDatabases: existingDatabases,
version: ProtocolVersion.fromJsObject(payload),
);
}
@ -283,6 +355,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
setProperty(object, 'indexedDbExists', indexedDbExists);
setProperty(object, 'opfsExists', opfsExists);
setProperty(object, 'existing', existingDatabases.encodeToJs());
version.writeToJs(object);
sender.sendTyped(type, object);
}

View File

@ -9,6 +9,7 @@ import 'package:js/js_util.dart';
// ignore: implementation_imports
import 'package:sqlite3/src/wasm/js_interop/file_system_access.dart';
import 'package:sqlite3/wasm.dart';
import 'package:stream_channel/stream_channel.dart';
import '../channel.dart';
import 'protocol.dart';
@ -196,15 +197,21 @@ class DriftServerController {
initializer: initializer,
)));
return RunningWasmServer(message.storage, server);
final wasmServer = RunningWasmServer(message.storage, server);
wasmServer.lastClientDisconnected.whenComplete(() {
servers.remove(message.databaseName);
wasmServer.server.shutdown();
});
return wasmServer;
});
server.server.serve(message.port.channel());
server.serve(message.port
.channel(explicitClose: message.protocolVersion >= ProtocolVersion.v1));
}
/// Loads a new sqlite3 WASM module, registers an appropriate VFS for [storage]
/// and finally opens a database, creating it if it doesn't exist.
Future<WasmDatabase> openConnection({
Future<QueryExecutor> openConnection({
required Uri sqlite3WasmUri,
required String databaseName,
required WasmStorageImplementation storage,
@ -212,15 +219,24 @@ class DriftServerController {
}) async {
final sqlite3 = await WasmSqlite3.loadFromUrl(sqlite3WasmUri);
final vfs = await switch (storage) {
WasmStorageImplementation.opfsShared =>
SimpleOpfsFileSystem.loadFromStorage(pathForOpfs(databaseName)),
WasmStorageImplementation.opfsLocks => _loadLockedWasmVfs(databaseName),
WasmStorageImplementation.unsafeIndexedDb ||
WasmStorageImplementation.sharedIndexedDb =>
IndexedDbFileSystem.open(dbName: databaseName),
WasmStorageImplementation.inMemory => Future.value(InMemoryFileSystem()),
};
VirtualFileSystem vfs;
void Function()? close;
switch (storage) {
case WasmStorageImplementation.opfsShared:
final simple = vfs = await SimpleOpfsFileSystem.loadFromStorage(
pathForOpfs(databaseName));
close = simple.close;
case WasmStorageImplementation.opfsLocks:
final locks = vfs = await _loadLockedWasmVfs(databaseName);
close = locks.close;
case WasmStorageImplementation.unsafeIndexedDb:
case WasmStorageImplementation.sharedIndexedDb:
final idb = vfs = await IndexedDbFileSystem.open(dbName: databaseName);
close = idb.close;
case WasmStorageImplementation.inMemory:
vfs = InMemoryFileSystem();
}
if (initializer != null && vfs.xAccess('/database', 0) == 0) {
final response = await initializer();
@ -234,7 +250,13 @@ class DriftServerController {
}
sqlite3.registerVirtualFileSystem(vfs, makeDefault: true);
return WasmDatabase(sqlite3: sqlite3, path: '/database', setup: _setup);
var db = WasmDatabase(sqlite3: sqlite3, path: '/database', setup: _setup);
if (close != null) {
return db.interceptWith(_CloseVfsOnClose(close));
} else {
return db;
}
}
Future<WasmVfs> _loadLockedWasmVfs(String databaseName) async {
@ -253,6 +275,20 @@ class DriftServerController {
}
}
class _CloseVfsOnClose extends QueryInterceptor {
final FutureOr<void> Function() _close;
_CloseVfsOnClose(this._close);
@override
Future<void> close(QueryExecutor inner) async {
await inner.close();
if (inner is! TransactionExecutor) {
await _close();
}
}
}
/// Information about a running drift server in a web worker.
class RunningWasmServer {
/// The storage implementation used by the VFS of this server.
@ -261,8 +297,32 @@ class RunningWasmServer {
/// The server hosting the drift database.
final DriftServer server;
int _connectedClients = 0;
final Completer<void> _lastClientDisconnected = Completer.sync();
/// A future that completes synchronously after all [serve]d connections have
/// closed.
Future<void> get lastClientDisconnected => _lastClientDisconnected.future;
/// Default constructor
RunningWasmServer(this.storage, this.server);
/// Tracks a new connection and serves drift database requests over it.
void serve(StreamChannel<Object?> channel) {
_connectedClients++;
server.serve(
channel.transformStream(StreamTransformer.fromHandlers(
handleDone: (sink) {
if (--_connectedClients == 0) {
_lastClientDisconnected.complete();
}
sink.close();
},
)),
);
}
}
/// Reported compatibility results with IndexedDB and OPFS.

View File

@ -78,6 +78,7 @@ class SharedDriftWorker {
indexedDbExists: indexedDbExists,
opfsExists: false,
existingDatabases: const [],
version: ProtocolVersion.current,
);
} else {
final worker = _dedicatedWorker ??= Worker(Uri.base.toString());
@ -102,6 +103,7 @@ class SharedDriftWorker {
indexedDbExists: indexedDbExists,
opfsExists: opfsExists,
existingDatabases: databases,
version: ProtocolVersion.current,
));
messageSubscription?.cancel();

View File

@ -135,6 +135,10 @@ class DriftWebDriver {
'open(arguments[0], arguments[1])', [implementation?.name]);
}
Future<void> closeDatabase() async {
await driver.executeAsync("close('', arguments[0])", []);
}
Future<void> insertIntoDatabase() async {
await driver.executeAsync('insert("", arguments[0])', []);
}

View File

@ -144,7 +144,7 @@ void main() {
await driver.insertIntoDatabase();
await driver.waitForTableUpdate();
await driver.driver.refresh(); // Reset JS state
await driver.closeDatabase();
final newImpls = await driver.probeImplementations();
expect(newImpls.existing, hasLength(1));

View File

@ -22,6 +22,9 @@ InitializationMode initializationMode = InitializationMode.none;
void main() {
_addCallbackForWebDriver('detectImplementations', _detectImplementations);
_addCallbackForWebDriver('open', _open);
_addCallbackForWebDriver('close', (arg) async {
await openedDatabase?.close();
});
_addCallbackForWebDriver('insert', _insert);
_addCallbackForWebDriver('get_rows', _getRows);
_addCallbackForWebDriver('wait_for_update', _waitForUpdate);