Support deleting databases

This commit is contained in:
Simon Binder 2023-08-09 18:13:45 +02:00
parent b774290b3a
commit 9d4d3cd2a4
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
9 changed files with 246 additions and 67 deletions

View File

@ -23,6 +23,7 @@ import 'package:sqlite3/wasm.dart';
import 'broadcast_stream_queries.dart'; import 'broadcast_stream_queries.dart';
import 'channel.dart'; import 'channel.dart';
import 'wasm_setup/shared.dart';
import 'wasm_setup/protocol.dart'; import 'wasm_setup/protocol.dart';
/// Whether the `crossOriginIsolated` JavaScript property is true in the current /// Whether the `crossOriginIsolated` JavaScript property is true in the current
@ -46,10 +47,9 @@ class WasmDatabaseOpener {
final List<WasmStorageImplementation> availableImplementations = [ final List<WasmStorageImplementation> availableImplementations = [
WasmStorageImplementation.inMemory, WasmStorageImplementation.inMemory,
]; ];
final Set<(DatabaseLocation, String)> existingDatabases = {}; final Set<ExistingDatabase> existingDatabases = {};
MessagePort? _sharedWorker; _DriftWorker? _sharedWorker, _dedicatedWorker;
Worker? _dedicatedWorker;
WasmDatabaseOpener( WasmDatabaseOpener(
this.sqlite3WasmUri, this.sqlite3WasmUri,
@ -78,10 +78,10 @@ class WasmDatabaseOpener {
// database name and can interpret the opfsExists and indexedDbExists // database name and can interpret the opfsExists and indexedDbExists
// fields we're getting from older workers accordingly. // fields we're getting from older workers accordingly.
if (result.opfsExists) { if (result.opfsExists) {
existingDatabases.add((DatabaseLocation.opfs, databaseName)); existingDatabases.add((WebStorageApi.opfs, databaseName));
} }
if (result.indexedDbExists) { if (result.indexedDbExists) {
existingDatabases.add((DatabaseLocation.indexedDb, databaseName)); existingDatabases.add((WebStorageApi.indexedDb, databaseName));
} }
} }
} }
@ -96,7 +96,7 @@ class WasmDatabaseOpener {
try { try {
await _probeDedicated(); await _probeDedicated();
} on Object { } on Object {
_dedicatedWorker?.terminate(); _dedicatedWorker?.close();
_dedicatedWorker = null; _dedicatedWorker = null;
} }
@ -106,14 +106,11 @@ class WasmDatabaseOpener {
Future<void> _probeDedicated() async { Future<void> _probeDedicated() async {
if (supportsWorkers) { if (supportsWorkers) {
final dedicatedWorker = final dedicatedWorker = _dedicatedWorker =
_dedicatedWorker = Worker(driftWorkerUri.toString()); _DriftWorker.dedicated(Worker(driftWorkerUri.toString()));
_createCompatibilityCheck().sendToWorker(dedicatedWorker); _createCompatibilityCheck().sendTo(dedicatedWorker.send);
final workerMessages = StreamQueue( final status = await dedicatedWorker.workerMessages.nextNoError
_readMessages(dedicatedWorker.onMessage, dedicatedWorker.onError));
final status = await workerMessages.nextNoError
as DedicatedWorkerCompatibilityResult; as DedicatedWorkerCompatibilityResult;
_handleCompatibilityResult(status); _handleCompatibilityResult(status);
@ -136,16 +133,13 @@ class WasmDatabaseOpener {
if (supportsSharedWorkers) { if (supportsSharedWorkers) {
final sharedWorker = final sharedWorker =
SharedWorker(driftWorkerUri.toString(), 'drift worker'); SharedWorker(driftWorkerUri.toString(), 'drift worker');
final port = _sharedWorker = sharedWorker.port!; final port = sharedWorker.port!;
final shared = _sharedWorker = _DriftWorker.shared(sharedWorker, port);
final sharedMessages =
StreamQueue(_readMessages(port.onMessage, sharedWorker.onError));
// First, the shared worker will tell us which features it supports. // First, the shared worker will tell us which features it supports.
_createCompatibilityCheck().sendToPort(port); _createCompatibilityCheck().sendToPort(port);
final sharedFeatures = final sharedFeatures = await shared.workerMessages.nextNoError
await sharedMessages.nextNoError as SharedWorkerCompatibilityResult; as SharedWorkerCompatibilityResult;
await sharedMessages.cancel();
_handleCompatibilityResult(sharedFeatures); _handleCompatibilityResult(sharedFeatures);
@ -164,12 +158,50 @@ class WasmDatabaseOpener {
} }
} }
final class _DriftWorker {
final AbstractWorker worker;
/// The message port to communicate with the worker, if it's a shared worker.
final MessagePort? portForShared;
final StreamQueue<WasmInitializationMessage> workerMessages;
_DriftWorker.dedicated(Worker this.worker)
: portForShared = null,
workerMessages =
StreamQueue(_readMessages(worker.onMessage, worker.onError));
_DriftWorker.shared(SharedWorker this.worker, this.portForShared)
: workerMessages =
StreamQueue(_readMessages(worker.port!.onMessage, worker.onError));
void send(Object? msg, [List<Object>? transfer]) {
switch (worker) {
case final Worker worker:
worker.postMessage(msg, transfer);
case SharedWorker():
portForShared!.postMessage(msg, transfer);
}
}
void close() {
workerMessages.cancel();
switch (worker) {
case final Worker dedicated:
dedicated.terminate();
case SharedWorker():
portForShared!.close();
}
}
}
final class _ProbeResult implements WasmProbeResult { final class _ProbeResult implements WasmProbeResult {
@override @override
final List<WasmStorageImplementation> availableStorages; final List<WasmStorageImplementation> availableStorages;
@override @override
final List<(DatabaseLocation, String)> existingDatabases; final List<ExistingDatabase> existingDatabases;
@override @override
final Set<MissingBrowserFeature> missingFeatures; final Set<MissingBrowserFeature> missingFeatures;
@ -208,16 +240,12 @@ final class _ProbeResult implements WasmProbeResult {
switch (implementation) { switch (implementation) {
case WasmStorageImplementation.opfsShared: case WasmStorageImplementation.opfsShared:
case WasmStorageImplementation.sharedIndexedDb: case WasmStorageImplementation.sharedIndexedDb:
// These are handled by the shared worker, so we can close the dedicated // Forward connection request to shared worker.
// worker used for feature detection. message.sendTo(sharedWorker!.send);
dedicatedWorker?.terminate();
message.sendToPort(sharedWorker!);
case WasmStorageImplementation.opfsLocks: case WasmStorageImplementation.opfsLocks:
case WasmStorageImplementation.unsafeIndexedDb: case WasmStorageImplementation.unsafeIndexedDb:
sharedWorker?.close();
if (dedicatedWorker != null) { if (dedicatedWorker != null) {
message.sendToWorker(dedicatedWorker); message.sendTo(dedicatedWorker.send);
} else { } else {
// Workers seem to be broken, but we don't need them with this storage // Workers seem to be broken, but we don't need them with this storage
// mode. // mode.
@ -291,10 +319,21 @@ final class _ProbeResult implements WasmProbeResult {
} }
@override @override
Future<void> deleteDatabase( Future<void> deleteDatabase(ExistingDatabase database) async {
DatabaseLocation implementation, String name) async { switch (database.$1) {
// TODO: implement deleteDatabase case WebStorageApi.indexedDb:
throw UnimplementedError(); await deleteDatabaseInIndexedDb(database.$2);
case WebStorageApi.opfs:
final dedicated = opener._dedicatedWorker;
if (dedicated != null) {
DeleteDatabase(database).sendTo(dedicated.send);
await dedicated.workerMessages.nextNoError;
} else {
throw StateError(
'No dedicated worker available to delete OPFS database');
}
}
} }
} }

View File

@ -48,11 +48,11 @@ class DedicatedDriftWorker {
final existingServer = _servers.servers[dbName]; final existingServer = _servers.servers[dbName];
var indexedDbExists = false, opfsExists = false; var indexedDbExists = false, opfsExists = false;
final existingDatabases = <(DatabaseLocation, String)>[]; final existingDatabases = <ExistingDatabase>[];
if (supportsOpfs) { if (supportsOpfs) {
for (final database in await opfsDatabases()) { for (final database in await opfsDatabases()) {
existingDatabases.add((DatabaseLocation.opfs, database)); existingDatabases.add((WebStorageApi.opfs, database));
if (database == dbName) { if (database == dbName) {
opfsExists = true; opfsExists = true;
@ -83,6 +83,22 @@ class DedicatedDriftWorker {
final worker = await VfsWorker.create(options); final worker = await VfsWorker.create(options);
self.postMessage(true); self.postMessage(true);
await worker.start(); await worker.start();
case DeleteDatabase(database: (final storage, final name)):
try {
switch (storage) {
case WebStorageApi.indexedDb:
await deleteDatabaseInIndexedDb(name);
case WebStorageApi.opfs:
await deleteDatabaseInOpfs(name);
}
// Send the request back to indicate a successful delete.
message.sendToClient(self);
} catch (e) {
WorkerError(e.toString()).sendToClient(self);
}
break;
default: default:
break; break;
} }

View File

@ -30,6 +30,7 @@ sealed class WasmInitializationMessage {
DedicatedWorkerCompatibilityResult.fromJsPayload(payload!), DedicatedWorkerCompatibilityResult.fromJsPayload(payload!),
SharedWorkerCompatibilityResult.type => SharedWorkerCompatibilityResult.type =>
SharedWorkerCompatibilityResult.fromJsPayload(payload!), SharedWorkerCompatibilityResult.fromJsPayload(payload!),
DeleteDatabase.type => DeleteDatabase.fromJsPayload(payload!),
_ => throw ArgumentError('Unknown type $type'), _ => throw ArgumentError('Unknown type $type'),
}; };
} }
@ -62,7 +63,7 @@ sealed class CompatibilityResult extends WasmInitializationMessage {
/// This list is only reported by the drift worker shipped with drift 2.11. /// This list is only reported by the drift worker shipped with drift 2.11.
/// When an older worker is used, only [indexedDbExists] and [opfsExists] can /// When an older worker is used, only [indexedDbExists] and [opfsExists] can
/// be used to check whether the database exists. /// be used to check whether the database exists.
final List<(DatabaseLocation, String)> existingDatabases; final List<ExistingDatabase> existingDatabases;
final bool indexedDbExists; final bool indexedDbExists;
final bool opfsExists; final bool opfsExists;
@ -101,7 +102,7 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
final asList = payload as List; final asList = payload as List;
final asBooleans = asList.cast<bool>(); final asBooleans = asList.cast<bool>();
final List<(DatabaseLocation, String)> existingDatabases; final List<ExistingDatabase> existingDatabases;
if (asList.length > 5) { if (asList.length > 5) {
existingDatabases = existingDatabases =
EncodeLocations.readFromJs(asList[5] as List<dynamic>); EncodeLocations.readFromJs(asList[5] as List<dynamic>);
@ -251,7 +252,7 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
}); });
factory DedicatedWorkerCompatibilityResult.fromJsPayload(Object payload) { factory DedicatedWorkerCompatibilityResult.fromJsPayload(Object payload) {
final existingDatabases = <(DatabaseLocation, String)>[]; final existingDatabases = <ExistingDatabase>[];
if (hasProperty(payload, 'existing')) { if (hasProperty(payload, 'existing')) {
existingDatabases existingDatabases
@ -300,13 +301,51 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
} }
} }
extension EncodeLocations on List<(DatabaseLocation, String)> { final class StartFileSystemServer extends WasmInitializationMessage {
static List<(DatabaseLocation, String)> readFromJs(List<Object?> object) { static const type = 'StartFileSystemServer';
final existing = <(DatabaseLocation, String)>[];
final WorkerOptions sqlite3Options;
StartFileSystemServer(this.sqlite3Options);
factory StartFileSystemServer.fromJsPayload(Object payload) {
return StartFileSystemServer(payload as WorkerOptions);
}
@override
void sendTo(PostMessage sender) {
sender.sendTyped(type, sqlite3Options);
}
}
final class DeleteDatabase extends WasmInitializationMessage {
static const type = 'DeleteDatabase';
final ExistingDatabase database;
DeleteDatabase(this.database);
factory DeleteDatabase.fromJsPayload(Object payload) {
final asList = payload as List<Object?>;
return DeleteDatabase((
WebStorageApi.byName[asList[0] as String]!,
asList[1] as String,
));
}
@override
void sendTo(PostMessage sender) {
sender.sendTyped(type, [database.$1.name, database.$2]);
}
}
extension EncodeLocations on List<ExistingDatabase> {
static List<ExistingDatabase> readFromJs(List<Object?> object) {
final existing = <ExistingDatabase>[];
for (final entry in object) { for (final entry in object) {
existing.add(( existing.add((
DatabaseLocation.byName[getProperty(entry as Object, 'l')]!, WebStorageApi.byName[getProperty(entry as Object, 'l')]!,
getProperty(entry, 'n'), getProperty(entry, 'n'),
)); ));
} }
@ -328,23 +367,6 @@ extension EncodeLocations on List<(DatabaseLocation, String)> {
} }
} }
final class StartFileSystemServer extends WasmInitializationMessage {
static const type = 'StartFileSystemServer';
final WorkerOptions sqlite3Options;
StartFileSystemServer(this.sqlite3Options);
factory StartFileSystemServer.fromJsPayload(Object payload) {
return StartFileSystemServer(payload as WorkerOptions);
}
@override
void sendTo(PostMessage sender) {
sender.sendTyped(type, sqlite3Options);
}
}
extension on PostMessage { extension on PostMessage {
void sendTyped(String type, Object? payload, [List<Object>? transfer]) { void sendTyped(String type, Object? payload, [List<Object>? transfer]) {
final object = newObject<Object>(); final object = newObject<Object>();

View File

@ -87,7 +87,7 @@ Future<bool> checkIndexedDbExists(String databaseName) async {
try { try {
final idb = getProperty<IdbFactory>(globalThis, 'indexedDB'); final idb = getProperty<IdbFactory>(globalThis, 'indexedDB');
await idb.open( final database = await idb.open(
databaseName, databaseName,
// Current schema version used by the [IndexedDbFileSystem] // Current schema version used by the [IndexedDbFileSystem]
version: 1, version: 1,
@ -100,6 +100,7 @@ Future<bool> checkIndexedDbExists(String databaseName) async {
); );
indexedDbExists ??= true; indexedDbExists ??= true;
database.close();
} catch (_) { } catch (_) {
// May throw due to us aborting in the upgrade callback. // May throw due to us aborting in the upgrade callback.
} }
@ -107,6 +108,14 @@ Future<bool> checkIndexedDbExists(String databaseName) async {
return indexedDbExists ?? false; return indexedDbExists ?? false;
} }
/// Deletes a database from IndexedDb if supported.
Future<void> deleteDatabaseInIndexedDb(String databaseName) async {
final idb = window.indexedDB;
if (idb != null) {
await idb.deleteDatabase(databaseName);
}
}
/// Constructs the path used by drift to store a database in the origin-private /// Constructs the path used by drift to store a database in the origin-private
/// section of the agent's file system. /// section of the agent's file system.
String pathForOpfs(String databaseName) { String pathForOpfs(String databaseName) {
@ -132,6 +141,22 @@ Future<List<String>> opfsDatabases() async {
]; ];
} }
/// Deletes the OPFS folder storing a database with the given [databaseName] if
/// such folder exists.
Future<void> deleteDatabaseInOpfs(String databaseName) async {
final storage = storageManager;
if (storage == null) return;
var directory = await storage.directory;
try {
directory = await directory.getDirectory('drift_db');
await directory.removeEntry(databaseName, recursive: true);
} on Object {
// fine, an error probably means that the database didn't exist in the first
// place.
}
}
/// Manages drift servers. /// Manages drift servers.
/// ///
/// When using a shared worker, multiple clients may want to use different drift /// When using a shared worker, multiple clients may want to use different drift
@ -151,7 +176,8 @@ class DriftServerController {
final vfs = await switch (message.storage) { final vfs = await switch (message.storage) {
WasmStorageImplementation.opfsShared => WasmStorageImplementation.opfsShared =>
SimpleOpfsFileSystem.loadFromStorage(message.databaseName), SimpleOpfsFileSystem.loadFromStorage(
pathForOpfs(message.databaseName)),
WasmStorageImplementation.opfsLocks => WasmStorageImplementation.opfsLocks =>
_loadLockedWasmVfs(message.databaseName), _loadLockedWasmVfs(message.databaseName),
WasmStorageImplementation.unsafeIndexedDb || WasmStorageImplementation.unsafeIndexedDb ||
@ -190,7 +216,7 @@ class DriftServerController {
Future<WasmVfs> _loadLockedWasmVfs(String databaseName) async { Future<WasmVfs> _loadLockedWasmVfs(String databaseName) async {
// Create SharedArrayBuffers to synchronize requests // Create SharedArrayBuffers to synchronize requests
final options = WasmVfs.createOptions( final options = WasmVfs.createOptions(
root: 'drift_db/$databaseName/', root: pathForOpfs(databaseName),
); );
final worker = Worker(Uri.base.toString()); final worker = Worker(Uri.base.toString());

View File

@ -82,11 +82,17 @@ enum WasmStorageImplementation {
inMemory, inMemory,
} }
enum DatabaseLocation { /// The storage API used by drift to store a database.
enum WebStorageApi {
/// The database is stored in the origin-private section of the user's file
/// system via the FileSystem Access API.
opfs, opfs,
/// The database is stored in IndexedDb.
indexedDb; indexedDb;
static final byName = DatabaseLocation.values.asNameMap(); /// Cached [EnumByName.asNameMap] for [values].
static final byName = WebStorageApi.values.asNameMap();
} }
/// An enumeration of features not supported by the current browsers. /// An enumeration of features not supported by the current browsers.
@ -127,8 +133,20 @@ enum MissingBrowserFeature {
sharedArrayBuffers, sharedArrayBuffers,
} }
typedef ExistingDatabase = (DatabaseLocation, String); /// Information about an existing web database, consisting of its
/// storage API ([WebStorageApi]) and its name.
typedef ExistingDatabase = (WebStorageApi, String);
/// The result of probing the current browser for wasm compatibility.
///
/// This reports available storage implementations ([availableStorages]) and
/// [missingFeatures] that contributed to some storage implementations not being
/// available.
///
/// In addition, [existingDatabases] reports a list of existing databases. Note
/// that databases stored in IndexedDb can't be listed reliably. Only databases
/// with the name given in [WasmDatabase.probe] are listed. Databases stored in
/// OPFS are always listed.
abstract interface class WasmProbeResult { abstract interface class WasmProbeResult {
/// All available [WasmStorageImplementation]s supported by the current /// All available [WasmStorageImplementation]s supported by the current
/// browsing context. /// browsing context.
@ -150,13 +168,21 @@ abstract interface class WasmProbeResult {
/// Missing browser features limit the available storage implementations. /// Missing browser features limit the available storage implementations.
Set<MissingBrowserFeature> get missingFeatures; Set<MissingBrowserFeature> get missingFeatures;
/// Opens a connection to a database via the chosen [implementation].
///
/// When this database doesn't exist, [initializeDatabase] is invoked to
/// optionally return the initial bytes of the database.
Future<DatabaseConnection> open( Future<DatabaseConnection> open(
WasmStorageImplementation implementation, WasmStorageImplementation implementation,
String name, { String name, {
FutureOr<Uint8List?> Function()? initializeDatabase, FutureOr<Uint8List?> Function()? initializeDatabase,
}); });
Future<void> deleteDatabase(DatabaseLocation implementation, String name); /// Deletes an [ExistingDatabase] from storage.
///
/// This method should not be called while a connection to the database is
/// opened.
Future<void> deleteDatabase(ExistingDatabase database);
} }
/// The result of opening a WASM database with default options. /// The result of opening a WASM database with default options.

View File

@ -108,11 +108,11 @@ class WasmDatabase extends DelegatedDatabase {
for (final (location, name) in probed.existingDatabases) { for (final (location, name) in probed.existingDatabases) {
if (name == databaseName) { if (name == databaseName) {
final implementationsForStorage = switch (location) { final implementationsForStorage = switch (location) {
DatabaseLocation.indexedDb => const [ WebStorageApi.indexedDb => const [
WasmStorageImplementation.sharedIndexedDb, WasmStorageImplementation.sharedIndexedDb,
WasmStorageImplementation.unsafeIndexedDb WasmStorageImplementation.unsafeIndexedDb
], ],
DatabaseLocation.opfs => const [ WebStorageApi.opfs => const [
WasmStorageImplementation.opfsShared, WasmStorageImplementation.opfsShared,
WasmStorageImplementation.opfsLocks, WasmStorageImplementation.opfsLocks,
], ],

View File

@ -105,6 +105,7 @@ class DriftWebDriver {
({ ({
Set<WasmStorageImplementation> storages, Set<WasmStorageImplementation> storages,
Set<MissingBrowserFeature> missingFeatures, Set<MissingBrowserFeature> missingFeatures,
List<ExistingDatabase> existing,
})> probeImplementations() async { })> probeImplementations() async {
final rawResult = await driver final rawResult = await driver
.executeAsync('detectImplementations("", arguments[0])', []); .executeAsync('detectImplementations("", arguments[0])', []);
@ -119,6 +120,13 @@ class DriftWebDriver {
for (final entry in result['missing']) for (final entry in result['missing'])
MissingBrowserFeature.values.byName(entry) MissingBrowserFeature.values.byName(entry)
}, },
existing: <ExistingDatabase>[
for (final entry in result['existing'])
(
WebStorageApi.byName[entry[0] as String]!,
entry[1] as String,
),
],
); );
} }
@ -149,4 +157,10 @@ class DriftWebDriver {
throw 'Could not set initialization mode'; throw 'Could not set initialization mode';
} }
} }
Future<void> deleteDatabase(WebStorageApi storageApi, String name) async {
await driver.executeAsync('delete_database(arguments[0], arguments[1])', [
json.encode([storageApi.name, name]),
]);
}
} }

View File

@ -118,6 +118,29 @@ void main() {
} }
}); });
if (entry != WasmStorageImplementation.inMemory) {
test('delete', () async {
final impl = await driver.probeImplementations();
expect(impl.existing, isEmpty);
await driver.openDatabase(entry);
await driver.insertIntoDatabase();
await driver.waitForTableUpdate();
await driver.driver.refresh(); // Reset JS state
final newImpls = await driver.probeImplementations();
expect(newImpls.existing, hasLength(1));
final existing = newImpls.existing[0];
await driver.deleteDatabase(existing.$1, existing.$2);
await driver.driver.refresh();
final finalImpls = await driver.probeImplementations();
expect(finalImpls.existing, isEmpty);
});
}
group( group(
'initialization from ', 'initialization from ',
() { () {

View File

@ -29,6 +29,18 @@ void main() {
initializationMode = InitializationMode.values.byName(arg!); initializationMode = InitializationMode.values.byName(arg!);
return true; return true;
}); });
_addCallbackForWebDriver('delete_database', (arg) async {
final result = await WasmDatabase.probe(
sqlite3Uri: sqlite3WasmUri,
driftWorkerUri: driftWorkerUri,
);
final decoded = json.decode(arg!);
await result.deleteDatabase(
(WebStorageApi.byName[decoded[0] as String]!, decoded[1] as String),
);
});
document.getElementById('selfcheck')?.onClick.listen((event) async { document.getElementById('selfcheck')?.onClick.listen((event) async {
print('starting'); print('starting');
@ -104,6 +116,7 @@ Future<String> _detectImplementations(String? _) async {
return json.encode({ return json.encode({
'impls': result.availableStorages.map((r) => r.name).toList(), 'impls': result.availableStorages.map((r) => r.name).toList(),
'missing': result.missingFeatures.map((r) => r.name).toList(), 'missing': result.missingFeatures.map((r) => r.name).toList(),
'existing': result.existingDatabases.map((r) => [r.$1.name, r.$2]).toList(),
}); });
} }