Keep using the same storage implementation

This commit is contained in:
Simon Binder 2023-06-04 20:52:08 +02:00
parent c79f95ac03
commit 0b52bee48c
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
10 changed files with 217 additions and 140 deletions

View File

@ -6,7 +6,7 @@ void main() async {
final btn = querySelector('#drift-compat-btn')!;
final results = querySelector('#drift-compat-results')!;
await for (final click in btn.onClick) {
await for (final _ in btn.onClick) {
btn.attributes['disabled'] = 'true';
results.innerText = '';

View File

@ -5,17 +5,21 @@
/// implementation of a persistence solution. Being a C library, sqlite3 expects
/// synchronous access to a file system, which is tricky to implement with
/// asynchronous
// ignore_for_file: public_member_api_docs
@internal
library;
import 'dart:async';
import 'dart:html';
import 'package:async/async.dart';
import 'package:collection/collection.dart';
import 'package:drift/drift.dart';
import 'package:drift/remote.dart';
import 'package:drift/wasm.dart';
import 'package:js/js.dart';
import 'package:js/js_util.dart';
import 'package:meta/meta.dart';
import 'package:sqlite3/wasm.dart';
import 'channel.dart';
@ -29,92 +33,164 @@ external bool get crossOriginIsolated;
/// Whether shared workers can be constructed in the current context.
bool get supportsSharedWorkers => hasProperty(globalThis, 'SharedWorker');
Future<WasmDatabaseResult> openWasmDatabase({
required Uri sqlite3WasmUri,
required Uri driftWorkerUri,
required String databaseName,
}) async {
final missingFeatures = <MissingBrowserFeature>{};
/// Whether dedicated workers can be constructed in the current context.
bool get supportsWorkers => hasProperty(globalThis, 'Worker');
Future<WasmDatabaseResult> connect(WasmStorageImplementation impl,
void Function(WasmInitializationMessage) send) async {
class WasmDatabaseOpener {
final Uri sqlite3WasmUri;
final Uri driftWorkerUri;
final String databaseName;
final Set<MissingBrowserFeature> _missingFeatures = {};
final List<WasmStorageImplementation> _availableImplementations = [
WasmStorageImplementation.inMemory,
];
bool _existsInIndexedDb = false;
bool _existsInOpfs = false;
MessagePort? _sharedWorker;
Worker? _dedicatedWorker;
WasmDatabaseOpener({
required this.sqlite3WasmUri,
required this.driftWorkerUri,
required this.databaseName,
});
Future<WasmDatabaseResult> open() async {
await _probeShared();
await _probeDedicated();
// If we have an existing database in storage, we want to keep using that
// format to avoid data loss (e.g. after a browser update that enables a
// otherwise preferred storage implementation). In the future, we might want
// to consider migrating between storage implementations as well.
if (_existsInIndexedDb) {
_availableImplementations.removeWhere((element) =>
element != WasmStorageImplementation.sharedIndexedDb &&
element != WasmStorageImplementation.unsafeIndexedDb);
} else if (_existsInOpfs) {
_availableImplementations.removeWhere((element) =>
element != WasmStorageImplementation.opfsShared &&
element != WasmStorageImplementation.opfsLocks);
}
// Enum values are ordered by preferrability, so just pick the best option
// left.
_availableImplementations.sortBy<num>((element) => element.index);
return await _connect(_availableImplementations.firstOrNull ??
WasmStorageImplementation.inMemory);
}
void _closeSharedWorker() {
_sharedWorker?.close();
}
void _closeDedicatedWorker() {
_dedicatedWorker?.terminate();
}
Future<WasmDatabaseResult> _connect(WasmStorageImplementation storage) async {
final channel = MessageChannel();
final local = channel.port1.channel();
final message = ServeDriftDatabase(
sqlite3WasmUri: sqlite3WasmUri,
port: channel.port2,
storage: impl,
storage: storage,
databaseName: databaseName,
);
send(message);
switch (storage) {
case WasmStorageImplementation.opfsShared:
case WasmStorageImplementation.sharedIndexedDb:
message.sendToPort(_sharedWorker!);
// These are handled by the shared worker, so we can close the dedicated
// worker used for feature detection.
_closeDedicatedWorker();
case WasmStorageImplementation.opfsLocks:
case WasmStorageImplementation.unsafeIndexedDb:
_closeSharedWorker();
message.sendToWorker(_dedicatedWorker!);
case WasmStorageImplementation.inMemory:
// Nothing works on this browser, so we'll fall back to an in-memory
// database.
final sqlite3 = await WasmSqlite3.loadFromUrl(sqlite3WasmUri);
sqlite3.registerVirtualFileSystem(InMemoryFileSystem());
return WasmDatabaseResult(
DatabaseConnection(
WasmDatabase(sqlite3: sqlite3, path: '/database'),
),
WasmStorageImplementation.inMemory,
_missingFeatures,
);
}
final connection = await connectToRemoteAndInitialize(local);
return WasmDatabaseResult(connection, impl, missingFeatures);
return WasmDatabaseResult(connection, storage, _missingFeatures);
}
// First, let's see if we can spawn dedicated workers in shared workers, which
// would enable us to efficiently share a OPFS database.
if (supportsSharedWorkers) {
final sharedWorker =
SharedWorker(driftWorkerUri.toString(), 'drift worker');
final port = sharedWorker.port!;
Future<void> _probeShared() async {
if (supportsSharedWorkers) {
final sharedWorker =
SharedWorker(driftWorkerUri.toString(), 'drift worker');
final port = _sharedWorker = sharedWorker.port!;
final sharedMessages =
StreamQueue(port.onMessage.map(WasmInitializationMessage.read));
final sharedMessages =
StreamQueue(port.onMessage.map(WasmInitializationMessage.read));
// First, the shared worker will tell us which features it supports.
final sharedFeatures =
await sharedMessages.nextNoError as SharedWorkerStatus;
missingFeatures.addAll(sharedFeatures.missingFeatures);
// Prefer to use the shared worker to host the database if it supports the
// necessary APIs.
if (sharedFeatures.canSpawnDedicatedWorkers &&
sharedFeatures.dedicatedWorkersCanUseOpfs) {
return connect(
WasmStorageImplementation.opfsShared, (msg) => msg.sendToPort(port));
} else if (sharedFeatures.canUseIndexedDb) {
return connect(WasmStorageImplementation.sharedIndexedDb,
(msg) => msg.sendToPort(port));
} else {
// First, the shared worker will tell us which features it supports.
final sharedFeatures =
await sharedMessages.nextNoError as SharedWorkerStatus;
await sharedMessages.cancel();
port.close();
_missingFeatures.addAll(sharedFeatures.missingFeatures);
// Prefer to use the shared worker to host the database if it supports the
// necessary APIs.
if (sharedFeatures.canSpawnDedicatedWorkers &&
sharedFeatures.dedicatedWorkersCanUseOpfs) {
_availableImplementations.add(WasmStorageImplementation.opfsShared);
} else if (sharedFeatures.canUseIndexedDb) {
_availableImplementations
.add(WasmStorageImplementation.sharedIndexedDb);
} else {
port.close();
}
} else {
_missingFeatures.add(MissingBrowserFeature.sharedWorkers);
}
} else {
missingFeatures.add(MissingBrowserFeature.sharedWorkers);
}
final dedicatedWorker = Worker(driftWorkerUri.toString());
DedicatedWorkerCompatibilityCheck().sendToWorker(dedicatedWorker);
Future<void> _probeDedicated() async {
if (supportsWorkers) {
final dedicatedWorker = Worker(driftWorkerUri.toString());
DedicatedWorkerCompatibilityCheck(databaseName)
.sendToWorker(dedicatedWorker);
final workerMessages = StreamQueue(
dedicatedWorker.onMessage.map(WasmInitializationMessage.read));
final workerMessages = StreamQueue(
dedicatedWorker.onMessage.map(WasmInitializationMessage.read));
final status =
await workerMessages.nextNoError as DedicatedWorkerCompatibilityResult;
missingFeatures.addAll(status.missingFeatures);
final status = await workerMessages.nextNoError
as DedicatedWorkerCompatibilityResult;
_missingFeatures.addAll(status.missingFeatures);
if (status.supportsNestedWorkers &&
status.canAccessOpfs &&
status.supportsSharedArrayBuffers) {
return connect(WasmStorageImplementation.opfsLocks,
(msg) => msg.sendToWorker(dedicatedWorker));
} else if (status.supportsIndexedDb) {
return connect(WasmStorageImplementation.unsafeIndexedDb,
(msg) => msg.sendToWorker(dedicatedWorker));
} else {
// Nothing works on this browser, so we'll fall back to an in-memory
// database.
final sqlite3 = await WasmSqlite3.loadFromUrl(sqlite3WasmUri);
sqlite3.registerVirtualFileSystem(InMemoryFileSystem());
_existsInOpfs = status.opfsExists;
_existsInIndexedDb = status.indexedDbExists;
return WasmDatabaseResult(
DatabaseConnection(
WasmDatabase(sqlite3: sqlite3, path: '/database'),
),
WasmStorageImplementation.inMemory,
missingFeatures,
);
if (status.supportsNestedWorkers &&
status.canAccessOpfs &&
status.supportsSharedArrayBuffers) {
_availableImplementations.add(WasmStorageImplementation.opfsLocks);
}
if (status.supportsIndexedDb) {
_availableImplementations
.add(WasmStorageImplementation.sharedIndexedDb);
}
} else {
_missingFeatures.add(MissingBrowserFeature.dedicatedWorkers);
}
}
}

View File

@ -2,9 +2,13 @@
import 'dart:async';
import 'dart:html';
import 'dart:indexed_db';
import 'package:js/js_util.dart';
import 'package:sqlite3/wasm.dart';
// ignore: implementation_imports
import 'package:sqlite3/src/wasm/js_interop/file_system_access.dart';
import 'package:path/path.dart' as p;
import 'protocol.dart';
import 'shared.dart';
@ -24,16 +28,48 @@ class DedicatedDriftWorker {
Future<void> _handleMessage(WasmInitializationMessage message) async {
switch (message) {
case DedicatedWorkerCompatibilityCheck():
case DedicatedWorkerCompatibilityCheck(databaseName: var dbName):
final supportsOpfs = await checkOpfsSupport();
final supportsIndexedDb = await checkIndexedDbSupport();
var opfsExists = false;
var indexedDbExists = false;
if (dbName != null) {
if (supportsOpfs) {
final storage = storageManager!;
final pathSegments = p.url.split(pathForOpfs(dbName));
var directory = await storage.directory;
opfsExists = true;
for (final segment in pathSegments) {
try {
directory = await directory.getDirectory(segment);
} on Object {
opfsExists = false;
break;
}
}
} else if (supportsIndexedDb) {
final indexedDb = getProperty<IdbFactory>(globalThis, 'indexedDB');
await indexedDb.open(dbName, version: 1, onUpgradeNeeded: (event) {
event.target.transaction!.abort();
indexedDbExists =
event.oldVersion != null && event.oldVersion != 0;
});
}
}
DedicatedWorkerCompatibilityResult(
supportsNestedWorkers: hasProperty(globalThis, 'Worker'),
canAccessOpfs: supportsOpfs,
supportsIndexedDb: supportsIndexedDb,
supportsSharedArrayBuffers:
hasProperty(globalThis, 'SharedArrayBuffer'),
opfsExists: opfsExists,
indexedDbExists: indexedDbExists,
).sendToClient(self);
case ServeDriftDatabase():
_servers.serve(message);

View File

@ -1,52 +0,0 @@
import 'dart:typed_data';
import 'package:sqlite3/wasm.dart';
import 'shared.dart';
const paths = {'/database', '/database-journal'};
/// Migrates the drift database identified by [databaseName] from the IndexedDB
/// storage implementation to the OPFS storage implementation.
///
/// Must be called in a dedicated worker, as only those have access to OPFS.
Future<void> migrateFromIndexedDbToOpfs(String databaseName) async {
}
/// Migrates the drift database identified by [databaseName] from the OPFS
/// storage implementation back to IndexedDB.
///
/// Must be called in a dedicated worker, as only those have access to OPFS.
Future<void> migrateFromOpfsToIndexedDb(String databaseName) async {
final opfs =
await SimpleOpfsFileSystem.loadFromStorage(pathForOpfs(databaseName));
final indexedDb = await IndexedDbFileSystem.open(dbName: databaseName);
await _migrate(opfs, indexedDb);
}
Future<void> _migrate(
VirtualFileSystem source, VirtualFileSystem target) async {
for (final path in paths) {
if (target.xAccess(path, 0) != 0) {
target.xDelete(path, 0);
}
if (source.xAccess(path, 0) != 0) {
final (file: sourceFile, outFlags: _) =
source.xOpen(Sqlite3Filename(path), SqlFlag.SQLITE_OPEN_CREATE);
final (file: targetFile, outFlags: _) =
target.xOpen(Sqlite3Filename(path), SqlFlag.SQLITE_OPEN_CREATE);
final buffer = Uint8List(sourceFile.xFileSize());
sourceFile.xRead(buffer, 0);
targetFile.xWrite(buffer, 0);
sourceFile.xClose();
targetFile.xClose();
}
}
}

View File

@ -15,18 +15,18 @@ sealed class WasmInitializationMessage {
factory WasmInitializationMessage.fromJs(Object jsObject) {
final type = getProperty<String>(jsObject, 'type');
final payload = getProperty<Object>(jsObject, 'payload');
final payload = getProperty<Object?>(jsObject, 'payload');
return switch (type) {
SharedWorkerStatus.type => SharedWorkerStatus.fromJsPayload(payload),
WorkerError.type => WorkerError.fromJsPayload(payload),
ServeDriftDatabase.type => ServeDriftDatabase.fromJsPayload(payload),
SharedWorkerStatus.type => SharedWorkerStatus.fromJsPayload(payload!),
WorkerError.type => WorkerError.fromJsPayload(payload!),
ServeDriftDatabase.type => ServeDriftDatabase.fromJsPayload(payload!),
StartFileSystemServer.type =>
StartFileSystemServer.fromJsPayload(payload),
StartFileSystemServer.fromJsPayload(payload!),
DedicatedWorkerCompatibilityCheck.type =>
DedicatedWorkerCompatibilityCheck.fromJsPayload(payload),
DedicatedWorkerCompatibilityResult.type =>
DedicatedWorkerCompatibilityResult.fromJsPayload(payload),
DedicatedWorkerCompatibilityResult.fromJsPayload(payload!),
_ => throw ArgumentError('Unknown type $type'),
};
}
@ -164,15 +164,17 @@ final class DedicatedWorkerCompatibilityCheck
extends WasmInitializationMessage {
static const type = 'DedicatedWorkerCompatibilityCheck';
DedicatedWorkerCompatibilityCheck();
final String? databaseName;
factory DedicatedWorkerCompatibilityCheck.fromJsPayload(Object payload) {
return DedicatedWorkerCompatibilityCheck();
DedicatedWorkerCompatibilityCheck(this.databaseName);
factory DedicatedWorkerCompatibilityCheck.fromJsPayload(Object? payload) {
return DedicatedWorkerCompatibilityCheck(payload as String?);
}
@override
void _send(_PostMessage sender) {
sender.sendTyped(type, newObject());
sender.sendTyped(type, databaseName);
}
}
@ -185,11 +187,19 @@ final class DedicatedWorkerCompatibilityResult
final bool supportsSharedArrayBuffers;
final bool supportsIndexedDb;
/// Whether an IndexedDb database under the desired name exists already.
final bool indexedDbExists;
/// Whether an OPFS database under the desired name exists already.
final bool opfsExists;
DedicatedWorkerCompatibilityResult({
required this.supportsNestedWorkers,
required this.canAccessOpfs,
required this.supportsSharedArrayBuffers,
required this.supportsIndexedDb,
required this.indexedDbExists,
required this.opfsExists,
});
factory DedicatedWorkerCompatibilityResult.fromJsPayload(Object payload) {
@ -199,6 +209,8 @@ final class DedicatedWorkerCompatibilityResult
supportsSharedArrayBuffers:
getProperty(payload, 'supportsSharedArrayBuffers'),
supportsIndexedDb: getProperty(payload, 'supportsIndexedDb'),
indexedDbExists: getProperty(payload, 'indexedDbExists'),
opfsExists: getProperty(payload, 'opfsExists'),
);
}
@ -211,6 +223,8 @@ final class DedicatedWorkerCompatibilityResult
setProperty(object, 'supportsIndexedDb', supportsIndexedDb);
setProperty(
object, 'supportsSharedArrayBuffers', supportsSharedArrayBuffers);
setProperty(object, 'indexedDbExists', indexedDbExists);
setProperty(object, 'opfsExists', opfsExists);
sender.sendTyped(type, object);
}

View File

@ -80,7 +80,7 @@ Future<bool> checkIndexedDbSupport() async {
}
String pathForOpfs(String databaseName) {
return '/drift_db/${databaseName}';
return 'drift_db/$databaseName';
}
class DriftServerController {

View File

@ -3,8 +3,10 @@ import 'dart:async';
import 'dart:html';
import 'package:drift/wasm.dart';
import 'package:js/js.dart';
import 'package:js/js_util.dart';
import '../wasm_setup.dart';
import 'protocol.dart';
import 'shared.dart';
@ -68,7 +70,7 @@ class SharedDriftWorker {
Future<SharedWorkerStatus> _startFeatureDetection() async {
// First, let's see if this shared worker can spawn dedicated workers.
final hasWorker = hasProperty(self, 'Worker');
final hasWorker = supportsWorkers;
final canUseIndexedDb = await checkIndexedDbSupport();
if (!hasWorker) {
@ -81,7 +83,7 @@ class SharedDriftWorker {
final worker = _dedicatedWorker = Worker(Uri.base.toString());
// Ask the worker about the storage implementations it can support.
DedicatedWorkerCompatibilityCheck().sendToWorker(worker);
DedicatedWorkerCompatibilityCheck(null).sendToWorker(worker);
final completer = Completer<SharedWorkerStatus>();
StreamSubscription? messageSubscription, errorSubscription;

View File

@ -11,6 +11,7 @@
@experimental
library drift.wasm;
import 'dart:async';
import 'dart:html';
import 'package:meta/meta.dart';
@ -75,11 +76,11 @@ class WasmDatabase extends DelegatedDatabase {
required Uri sqlite3Uri,
required Uri driftWorkerUri,
}) {
return openWasmDatabase(
return WasmDatabaseOpener(
databaseName: databaseName,
sqlite3WasmUri: sqlite3Uri,
driftWorkerUri: driftWorkerUri,
);
).open();
}
static void workerMainForOpen() {
@ -233,6 +234,7 @@ enum WasmStorageImplementation {
enum MissingBrowserFeature {
sharedWorkers,
dedicatedWorkers,
dedicatedWorkersInSharedWorkers,
nestedDedicatedWorkers,
fileSystemAccess,

View File

@ -16,6 +16,7 @@ dependencies:
meta: ^1.3.0
stream_channel: ^2.1.0
sqlite3: ^2.0.0-dev.1
path: ^1.8.0
dev_dependencies:
archive: ^3.3.1
@ -29,7 +30,6 @@ dev_dependencies:
http: ^0.13.4
lints: ^2.0.0
uuid: ^3.0.0
path: ^1.8.0
build_runner: ^2.0.0
test: ^1.17.0
mockito: ^5.0.7

View File

@ -5,7 +5,7 @@ import 'package:sqlite3/wasm.dart';
import 'package:test/test.dart';
class DriftWasmExecutor extends TestExecutor {
final FileSystem fs;
final InMemoryFileSystem fs;
final WasmSqlite3 Function() sqlite3;
DriftWasmExecutor(this.fs, this.sqlite3);
@ -24,12 +24,12 @@ class DriftWasmExecutor extends TestExecutor {
@override
Future<void> deleteData() async {
fs.clear();
fs.fileData.clear();
}
}
void main() {
final fs = FileSystem.inMemory();
final fs = InMemoryFileSystem();
late WasmSqlite3 sqlite3;
setUpAll(() async {
@ -37,9 +37,8 @@ void main() {
final port = await channel.stream.first as int;
sqlite3 = await WasmSqlite3.loadFromUrl(
Uri.parse('http://localhost:$port/sqlite3.wasm'),
environment: SqliteEnvironment(fileSystem: fs),
);
Uri.parse('http://localhost:$port/sqlite3.wasm'));
sqlite3.registerVirtualFileSystem(fs, makeDefault: true);
});
runAllTests(DriftWasmExecutor(fs, () => sqlite3));