mirror of https://github.com/AMT-Cheif/drift.git
Fix wasm integration tests
This commit is contained in:
parent
7254d3bd5a
commit
fef35d9f20
|
@ -22,7 +22,9 @@ class BroadcastStreamQueryStore extends StreamQueryStore {
|
|||
}
|
||||
|
||||
void _handleMessage(MessageEvent message) {
|
||||
final data = message.data;
|
||||
// Using getProperty to avoid dart2js structured clone that turns the
|
||||
// anonymous object into a map.
|
||||
final data = getProperty<Object?>(message, 'data');
|
||||
if (data is! List || data.isEmpty) return;
|
||||
|
||||
super.handleTableUpdates({
|
||||
|
|
|
@ -70,7 +70,7 @@ abstract class DriftWebStorage {
|
|||
/// Attempts to check whether the current browser supports the
|
||||
/// [DriftWebStorage.indexedDb] storage implementation.
|
||||
static Future<bool> supportsIndexedDb({bool inWebWorker = false}) async {
|
||||
return checkIndexedDbSupport();
|
||||
return await checkIndexedDbSupport(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ class WasmDatabaseOpener {
|
|||
final Uri sqlite3WasmUri;
|
||||
final Uri driftWorkerUri;
|
||||
final String databaseName;
|
||||
FutureOr<Uint8List> Function()? initializeDatabase;
|
||||
|
||||
final Set<MissingBrowserFeature> missingFeatures = {};
|
||||
final List<WasmStorageImplementation> availableImplementations = [
|
||||
|
@ -57,6 +58,7 @@ class WasmDatabaseOpener {
|
|||
required this.sqlite3WasmUri,
|
||||
required this.driftWorkerUri,
|
||||
required this.databaseName,
|
||||
this.initializeDatabase,
|
||||
});
|
||||
|
||||
Future<void> probe() async {
|
||||
|
@ -96,12 +98,16 @@ class WasmDatabaseOpener {
|
|||
|
||||
Future<WasmDatabaseResult> _connect(WasmStorageImplementation storage) async {
|
||||
final channel = MessageChannel();
|
||||
final initializer = initializeDatabase;
|
||||
final initChannel = initializer != null ? MessageChannel() : null;
|
||||
final local = channel.port1.channel();
|
||||
|
||||
final message = ServeDriftDatabase(
|
||||
sqlite3WasmUri: sqlite3WasmUri,
|
||||
port: channel.port2,
|
||||
storage: storage,
|
||||
databaseName: databaseName,
|
||||
initializationPort: initChannel?.port2,
|
||||
);
|
||||
|
||||
final sharedWorker = _sharedWorker;
|
||||
|
@ -122,7 +128,17 @@ class WasmDatabaseOpener {
|
|||
// Nothing works on this browser, so we'll fall back to an in-memory
|
||||
// database.
|
||||
final sqlite3 = await WasmSqlite3.loadFromUrl(sqlite3WasmUri);
|
||||
sqlite3.registerVirtualFileSystem(InMemoryFileSystem());
|
||||
final inMemory = InMemoryFileSystem();
|
||||
sqlite3.registerVirtualFileSystem(inMemory);
|
||||
|
||||
if (initializer != null) {
|
||||
final blob = await initializer();
|
||||
final (file: file, outFlags: _) = inMemory.xOpen(
|
||||
Sqlite3Filename('/database'), SqlFlag.SQLITE_OPEN_CREATE);
|
||||
file
|
||||
..xWrite(blob, 0)
|
||||
..xClose();
|
||||
}
|
||||
|
||||
return WasmDatabaseResult(
|
||||
DatabaseConnection(
|
||||
|
@ -133,6 +149,19 @@ class WasmDatabaseOpener {
|
|||
);
|
||||
}
|
||||
|
||||
initChannel?.port1.onMessage.listen((event) async {
|
||||
// The worker hosting the database is asking for the initial blob because
|
||||
// the database doesn't exist.
|
||||
Uint8List? result;
|
||||
try {
|
||||
result = await initializer?.call();
|
||||
} finally {
|
||||
initChannel.port1
|
||||
..postMessage(result, [if (result != null) result.buffer])
|
||||
..close();
|
||||
}
|
||||
});
|
||||
|
||||
var connection = await connectToRemoteAndInitialize(local);
|
||||
if (storage == WasmStorageImplementation.opfsLocks) {
|
||||
// We want stream queries to update for writes in other tabs. For the
|
||||
|
@ -164,11 +193,15 @@ class WasmDatabaseOpener {
|
|||
StreamQueue(port.onMessage.map(WasmInitializationMessage.read));
|
||||
|
||||
// First, the shared worker will tell us which features it supports.
|
||||
RequestCompatibilityCheck(databaseName).sendToPort(port);
|
||||
final sharedFeatures =
|
||||
await sharedMessages.nextNoError as SharedWorkerStatus;
|
||||
await sharedMessages.nextNoError as SharedWorkerCompatibilityResult;
|
||||
await sharedMessages.cancel();
|
||||
missingFeatures.addAll(sharedFeatures.missingFeatures);
|
||||
|
||||
_existsInOpfs |= sharedFeatures.opfsExists;
|
||||
_existsInIndexedDb |= sharedFeatures.indexedDbExists;
|
||||
|
||||
// Prefer to use the shared worker to host the database if it supports the
|
||||
// necessary APIs.
|
||||
if (sharedFeatures.canSpawnDedicatedWorkers &&
|
||||
|
@ -187,8 +220,7 @@ class WasmDatabaseOpener {
|
|||
if (supportsWorkers) {
|
||||
final dedicatedWorker =
|
||||
_dedicatedWorker = Worker(driftWorkerUri.toString());
|
||||
DedicatedWorkerCompatibilityCheck(databaseName)
|
||||
.sendToWorker(dedicatedWorker);
|
||||
RequestCompatibilityCheck(databaseName).sendToWorker(dedicatedWorker);
|
||||
|
||||
final workerMessages = StreamQueue(
|
||||
dedicatedWorker.onMessage.map(WasmInitializationMessage.read));
|
||||
|
@ -197,8 +229,8 @@ class WasmDatabaseOpener {
|
|||
as DedicatedWorkerCompatibilityResult;
|
||||
missingFeatures.addAll(status.missingFeatures);
|
||||
|
||||
_existsInOpfs = status.opfsExists;
|
||||
_existsInIndexedDb = status.indexedDbExists;
|
||||
_existsInOpfs |= status.opfsExists;
|
||||
_existsInIndexedDb |= status.indexedDbExists;
|
||||
|
||||
if (status.supportsNestedWorkers &&
|
||||
status.canAccessOpfs &&
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
// ignore_for_file: public_member_api_docs
|
||||
|
||||
import 'dart:async';
|
||||
import 'dart:developer';
|
||||
import 'dart:html';
|
||||
import 'dart:indexed_db';
|
||||
|
||||
import 'package:js/js_util.dart';
|
||||
import 'package:sqlite3/wasm.dart';
|
||||
|
@ -11,12 +9,16 @@ import 'package:sqlite3/wasm.dart';
|
|||
import 'package:sqlite3/src/wasm/js_interop/file_system_access.dart';
|
||||
import 'package:path/path.dart' as p;
|
||||
|
||||
import '../../utils/synchronized.dart';
|
||||
import 'protocol.dart';
|
||||
import 'shared.dart';
|
||||
|
||||
class DedicatedDriftWorker {
|
||||
final DedicatedWorkerGlobalScope self;
|
||||
final Lock _checkCompatibility = Lock();
|
||||
|
||||
final DriftServerController _servers = DriftServerController();
|
||||
WasmCompatibility? _compatibility;
|
||||
|
||||
DedicatedDriftWorker(this.self);
|
||||
|
||||
|
@ -29,14 +31,33 @@ class DedicatedDriftWorker {
|
|||
|
||||
Future<void> _handleMessage(WasmInitializationMessage message) async {
|
||||
switch (message) {
|
||||
case DedicatedWorkerCompatibilityCheck(databaseName: var dbName):
|
||||
final supportsOpfs = await checkOpfsSupport();
|
||||
final supportsIndexedDb = await checkIndexedDbSupport();
|
||||
case RequestCompatibilityCheck(databaseName: var dbName):
|
||||
bool supportsOpfs = false, supportsIndexedDb = false;
|
||||
|
||||
var opfsExists = false;
|
||||
var indexedDbExists = false;
|
||||
await _checkCompatibility.synchronized(() async {
|
||||
final knownResults = _compatibility;
|
||||
|
||||
if (knownResults != null) {
|
||||
supportsOpfs = knownResults.supportsOpfs;
|
||||
supportsIndexedDb = knownResults.supportsIndexedDb;
|
||||
} else {
|
||||
supportsOpfs = await checkOpfsSupport();
|
||||
supportsIndexedDb = await checkIndexedDbSupport(null);
|
||||
_compatibility = WasmCompatibility(supportsIndexedDb, supportsOpfs);
|
||||
}
|
||||
});
|
||||
|
||||
final existingServer = _servers.servers[dbName];
|
||||
var indexedDbExists = false, opfsExists = false;
|
||||
|
||||
if (existingServer != null) {
|
||||
indexedDbExists = existingServer.storage.isIndexedDbBased;
|
||||
opfsExists = existingServer.storage.isOpfsBased;
|
||||
} else {
|
||||
if (supportsIndexedDb) {
|
||||
indexedDbExists = await checkIndexedDbExists(dbName);
|
||||
}
|
||||
|
||||
if (dbName != null) {
|
||||
if (supportsOpfs) {
|
||||
final storage = storageManager!;
|
||||
final pathSegments = p.url.split(pathForOpfs(dbName));
|
||||
|
@ -53,20 +74,6 @@ class DedicatedDriftWorker {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (supportsIndexedDb) {
|
||||
final indexedDb = getProperty<IdbFactory>(globalThis, 'indexedDB');
|
||||
|
||||
try {
|
||||
await indexedDb.open(dbName, version: 9999,
|
||||
onUpgradeNeeded: (event) {
|
||||
event.target.transaction!.abort();
|
||||
indexedDbExists =
|
||||
event.oldVersion != null && event.oldVersion != 0;
|
||||
});
|
||||
} catch (_) {
|
||||
// May throw due to us aborting the upgrade callback.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DedicatedWorkerCompatibilityResult(
|
||||
|
@ -82,6 +89,7 @@ class DedicatedDriftWorker {
|
|||
_servers.serve(message);
|
||||
case StartFileSystemServer(sqlite3Options: final options):
|
||||
final worker = await VfsWorker.create(options);
|
||||
self.postMessage(true);
|
||||
await worker.start();
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -7,7 +7,7 @@ import 'package:sqlite3/wasm.dart';
|
|||
|
||||
import 'types.dart';
|
||||
|
||||
typedef _PostMessage = void Function(Object? msg, [List<Object>? transfer]);
|
||||
typedef PostMessage = void Function(Object? msg, [List<Object>? transfer]);
|
||||
|
||||
/// Sealed superclass for JavaScript objects exchanged between the UI tab and
|
||||
/// workers spawned by drift to find a suitable database implementation.
|
||||
|
@ -19,15 +19,16 @@ sealed class WasmInitializationMessage {
|
|||
final payload = getProperty<Object?>(jsObject, 'payload');
|
||||
|
||||
return switch (type) {
|
||||
SharedWorkerStatus.type => SharedWorkerStatus.fromJsPayload(payload!),
|
||||
WorkerError.type => WorkerError.fromJsPayload(payload!),
|
||||
ServeDriftDatabase.type => ServeDriftDatabase.fromJsPayload(payload!),
|
||||
StartFileSystemServer.type =>
|
||||
StartFileSystemServer.fromJsPayload(payload!),
|
||||
DedicatedWorkerCompatibilityCheck.type =>
|
||||
DedicatedWorkerCompatibilityCheck.fromJsPayload(payload),
|
||||
RequestCompatibilityCheck.type =>
|
||||
RequestCompatibilityCheck.fromJsPayload(payload),
|
||||
DedicatedWorkerCompatibilityResult.type =>
|
||||
DedicatedWorkerCompatibilityResult.fromJsPayload(payload!),
|
||||
SharedWorkerCompatibilityResult.type =>
|
||||
SharedWorkerCompatibilityResult.fromJsPayload(payload!),
|
||||
_ => throw ArgumentError('Unknown type $type'),
|
||||
};
|
||||
}
|
||||
|
@ -39,54 +40,64 @@ sealed class WasmInitializationMessage {
|
|||
return WasmInitializationMessage.fromJs(rawData);
|
||||
}
|
||||
|
||||
void _send(_PostMessage sender);
|
||||
void sendTo(PostMessage sender);
|
||||
|
||||
void sendToWorker(Worker worker) {
|
||||
_send(worker.postMessage);
|
||||
sendTo(worker.postMessage);
|
||||
}
|
||||
|
||||
void sendToPort(MessagePort port) {
|
||||
_send(port.postMessage);
|
||||
sendTo(port.postMessage);
|
||||
}
|
||||
|
||||
void sendToClient(DedicatedWorkerGlobalScope worker) {
|
||||
_send(worker.postMessage);
|
||||
sendTo(worker.postMessage);
|
||||
}
|
||||
}
|
||||
|
||||
/// A message sent by the shared worker to a connecting tab. It describes the
|
||||
/// features available from the shared worker, which the tab can use to infer
|
||||
/// a desired storage implementation, or whether the shared worker should be
|
||||
/// used at all.
|
||||
final class SharedWorkerStatus extends WasmInitializationMessage {
|
||||
static const type = 'SharedWorkerStatus';
|
||||
/// A message used by the shared worker to report compatibility results.
|
||||
///
|
||||
/// It describes the features available from the shared worker, which the tab
|
||||
/// can use to infer a desired storage implementation, or whether the shared
|
||||
/// worker should be used at all.
|
||||
final class SharedWorkerCompatibilityResult extends WasmInitializationMessage {
|
||||
static const type = 'SharedWorkerCompatibilityResult';
|
||||
|
||||
final bool canSpawnDedicatedWorkers;
|
||||
final bool dedicatedWorkersCanUseOpfs;
|
||||
final bool canUseIndexedDb;
|
||||
|
||||
SharedWorkerStatus({
|
||||
final bool indexedDbExists;
|
||||
final bool opfsExists;
|
||||
|
||||
SharedWorkerCompatibilityResult({
|
||||
required this.canSpawnDedicatedWorkers,
|
||||
required this.dedicatedWorkersCanUseOpfs,
|
||||
required this.canUseIndexedDb,
|
||||
required this.indexedDbExists,
|
||||
required this.opfsExists,
|
||||
});
|
||||
|
||||
factory SharedWorkerStatus.fromJsPayload(Object payload) {
|
||||
factory SharedWorkerCompatibilityResult.fromJsPayload(Object payload) {
|
||||
final data = (payload as List).cast<bool>();
|
||||
|
||||
return SharedWorkerStatus(
|
||||
return SharedWorkerCompatibilityResult(
|
||||
canSpawnDedicatedWorkers: data[0],
|
||||
dedicatedWorkersCanUseOpfs: data[1],
|
||||
canUseIndexedDb: data[2],
|
||||
indexedDbExists: data[3],
|
||||
opfsExists: data[4],
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
void _send(_PostMessage sender) {
|
||||
void sendTo(PostMessage sender) {
|
||||
sender.sendTyped(type, [
|
||||
canSpawnDedicatedWorkers,
|
||||
dedicatedWorkersCanUseOpfs,
|
||||
canUseIndexedDb
|
||||
canUseIndexedDb,
|
||||
indexedDbExists,
|
||||
opfsExists,
|
||||
]);
|
||||
}
|
||||
|
||||
|
@ -112,7 +123,7 @@ final class WorkerError extends WasmInitializationMessage implements Exception {
|
|||
}
|
||||
|
||||
@override
|
||||
void _send(_PostMessage sender) {
|
||||
void sendTo(PostMessage sender) {
|
||||
sender.sendTyped(type, error);
|
||||
}
|
||||
|
||||
|
@ -131,12 +142,14 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
|
|||
final MessagePort port;
|
||||
final WasmStorageImplementation storage;
|
||||
final String databaseName;
|
||||
final MessagePort? initializationPort;
|
||||
|
||||
ServeDriftDatabase({
|
||||
required this.sqlite3WasmUri,
|
||||
required this.port,
|
||||
required this.storage,
|
||||
required this.databaseName,
|
||||
required this.initializationPort,
|
||||
});
|
||||
|
||||
factory ServeDriftDatabase.fromJsPayload(Object payload) {
|
||||
|
@ -146,35 +159,40 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
|
|||
storage: WasmStorageImplementation.values
|
||||
.byName(getProperty(payload, 'storage')),
|
||||
databaseName: getProperty(payload, 'database'),
|
||||
initializationPort: getProperty(payload, 'initPort'),
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
void _send(_PostMessage sender) {
|
||||
void sendTo(PostMessage sender) {
|
||||
final object = newObject<Object>();
|
||||
setProperty(object, 'sqlite', sqlite3WasmUri.toString());
|
||||
setProperty(object, 'port', port);
|
||||
setProperty(object, 'storage', storage.name);
|
||||
setProperty(object, 'database', databaseName);
|
||||
final initPort = initializationPort;
|
||||
setProperty(object, 'initPort', initPort);
|
||||
|
||||
sender.sendTyped(type, object, [port]);
|
||||
sender.sendTyped(type, object, [
|
||||
port,
|
||||
if (initPort != null) initPort,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
final class DedicatedWorkerCompatibilityCheck
|
||||
extends WasmInitializationMessage {
|
||||
static const type = 'DedicatedWorkerCompatibilityCheck';
|
||||
final class RequestCompatibilityCheck extends WasmInitializationMessage {
|
||||
static const type = 'RequestCompatibilityCheck';
|
||||
|
||||
final String? databaseName;
|
||||
final String databaseName;
|
||||
|
||||
DedicatedWorkerCompatibilityCheck(this.databaseName);
|
||||
RequestCompatibilityCheck(this.databaseName);
|
||||
|
||||
factory DedicatedWorkerCompatibilityCheck.fromJsPayload(Object? payload) {
|
||||
return DedicatedWorkerCompatibilityCheck(payload as String?);
|
||||
factory RequestCompatibilityCheck.fromJsPayload(Object? payload) {
|
||||
return RequestCompatibilityCheck(payload as String);
|
||||
}
|
||||
|
||||
@override
|
||||
void _send(_PostMessage sender) {
|
||||
void sendTo(PostMessage sender) {
|
||||
sender.sendTyped(type, databaseName);
|
||||
}
|
||||
}
|
||||
|
@ -216,7 +234,7 @@ final class DedicatedWorkerCompatibilityResult
|
|||
}
|
||||
|
||||
@override
|
||||
void _send(_PostMessage sender) {
|
||||
void sendTo(PostMessage sender) {
|
||||
final object = newObject<Object>();
|
||||
|
||||
setProperty(object, 'supportsNestedWorkers', supportsNestedWorkers);
|
||||
|
@ -255,12 +273,12 @@ final class StartFileSystemServer extends WasmInitializationMessage {
|
|||
}
|
||||
|
||||
@override
|
||||
void _send(_PostMessage sender) {
|
||||
void sendTo(PostMessage sender) {
|
||||
sender.sendTyped(type, sqlite3Options);
|
||||
}
|
||||
}
|
||||
|
||||
extension on _PostMessage {
|
||||
extension on PostMessage {
|
||||
void sendTyped(String type, Object? payload, [List<Object>? transfer]) {
|
||||
final object = newObject<Object>();
|
||||
setProperty(object, 'type', type);
|
||||
|
|
|
@ -55,9 +55,9 @@ Future<bool> checkOpfsSupport() async {
|
|||
}
|
||||
}
|
||||
|
||||
/// Checks whether IndexedDB is working in the current browser by opening a test
|
||||
/// database.
|
||||
Future<bool> checkIndexedDbSupport() async {
|
||||
/// Checks whether IndexedDB is working in the current browser and, if so,
|
||||
/// whether the database with the given [databaseName] already exists.
|
||||
Future<bool> checkIndexedDbSupport(String? databaseName) async {
|
||||
if (!hasProperty(globalThis, 'indexedDB') ||
|
||||
// FileReader needed to read and write blobs efficiently
|
||||
!hasProperty(globalThis, 'FileReader')) {
|
||||
|
@ -79,6 +79,33 @@ Future<bool> checkIndexedDbSupport() async {
|
|||
return true;
|
||||
}
|
||||
|
||||
/// Returns whether an drift-wasm database with the given [databaseName] exists.
|
||||
Future<bool> checkIndexedDbExists(String databaseName) async {
|
||||
bool? indexedDbExists;
|
||||
|
||||
try {
|
||||
final idb = getProperty<IdbFactory>(globalThis, 'indexedDB');
|
||||
|
||||
await idb.open(
|
||||
databaseName,
|
||||
// Current schema version used by the [IndexedDbFileSystem]
|
||||
version: 1,
|
||||
onUpgradeNeeded: (event) {
|
||||
// If there's an upgrade, we're going from 0 to 1 - the database doesn't
|
||||
// exist! Abort the transaction so that we don't create it here.
|
||||
event.target.transaction!.abort();
|
||||
indexedDbExists = false;
|
||||
},
|
||||
);
|
||||
|
||||
indexedDbExists ??= true;
|
||||
} catch (_) {
|
||||
// May throw due to us aborting in the upgrade callback.
|
||||
}
|
||||
|
||||
return indexedDbExists ?? false;
|
||||
}
|
||||
|
||||
/// Constructs the path used by drift to store a database in the origin-private
|
||||
/// section of the agent's file system.
|
||||
String pathForOpfs(String databaseName) {
|
||||
|
@ -92,18 +119,21 @@ String pathForOpfs(String databaseName) {
|
|||
/// to allow that.
|
||||
class DriftServerController {
|
||||
/// Running drift servers by the name of the database they're serving.
|
||||
final Map<String, DriftServer> _servers = {};
|
||||
final Map<String, RunningWasmServer> servers = {};
|
||||
|
||||
/// Serves a drift connection as requested by the [message].
|
||||
void serve(ServeDriftDatabase message) {
|
||||
final server = _servers.putIfAbsent(message.databaseName, () {
|
||||
return DriftServer(LazyDatabase(() async {
|
||||
void serve(
|
||||
ServeDriftDatabase message,
|
||||
) {
|
||||
final server = servers.putIfAbsent(message.databaseName, () {
|
||||
final server = DriftServer(LazyDatabase(() async {
|
||||
final sqlite3 = await WasmSqlite3.loadFromUrl(message.sqlite3WasmUri);
|
||||
|
||||
final vfs = await switch (message.storage) {
|
||||
WasmStorageImplementation.opfsShared =>
|
||||
SimpleOpfsFileSystem.loadFromStorage(message.databaseName),
|
||||
WasmStorageImplementation.opfsLocks => _loadLockedWasmVfs(),
|
||||
WasmStorageImplementation.opfsLocks =>
|
||||
_loadLockedWasmVfs(message.databaseName),
|
||||
WasmStorageImplementation.unsafeIndexedDb ||
|
||||
WasmStorageImplementation.sharedIndexedDb =>
|
||||
IndexedDbFileSystem.open(dbName: message.databaseName),
|
||||
|
@ -111,18 +141,37 @@ class DriftServerController {
|
|||
Future.value(InMemoryFileSystem()),
|
||||
};
|
||||
|
||||
final initPort = message.initializationPort;
|
||||
if (vfs.xAccess('/database', 0) == 0 && initPort != null) {
|
||||
initPort.postMessage(true);
|
||||
|
||||
final response =
|
||||
await initPort.onMessage.map((e) => e.data as Uint8List?).first;
|
||||
|
||||
if (response != null) {
|
||||
final (file: file, outFlags: _) = vfs.xOpen(
|
||||
Sqlite3Filename('/database'), SqlFlag.SQLITE_OPEN_CREATE);
|
||||
file.xWrite(response, 0);
|
||||
file.xClose();
|
||||
}
|
||||
}
|
||||
|
||||
sqlite3.registerVirtualFileSystem(vfs, makeDefault: true);
|
||||
|
||||
return WasmDatabase(sqlite3: sqlite3, path: '/database');
|
||||
}));
|
||||
|
||||
return RunningWasmServer(message.storage, server);
|
||||
});
|
||||
|
||||
server.serve(message.port.channel());
|
||||
server.server.serve(message.port.channel());
|
||||
}
|
||||
|
||||
Future<WasmVfs> _loadLockedWasmVfs() async {
|
||||
Future<WasmVfs> _loadLockedWasmVfs(String databaseName) async {
|
||||
// Create SharedArrayBuffers to synchronize requests
|
||||
final options = WasmVfs.createOptions();
|
||||
final options = WasmVfs.createOptions(
|
||||
root: 'drift_db/$databaseName/',
|
||||
);
|
||||
final worker = Worker(Uri.base.toString());
|
||||
|
||||
StartFileSystemServer(options).sendToWorker(worker);
|
||||
|
@ -133,3 +182,40 @@ class DriftServerController {
|
|||
return WasmVfs(workerOptions: options);
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about a running drift server in a web worker.
|
||||
class RunningWasmServer {
|
||||
/// The storage implementation used by the VFS of this server.
|
||||
final WasmStorageImplementation storage;
|
||||
|
||||
/// The server hosting the drift database.
|
||||
final DriftServer server;
|
||||
|
||||
/// Default constructor
|
||||
RunningWasmServer(this.storage, this.server);
|
||||
}
|
||||
|
||||
/// Reported compatibility results with IndexedDB and OPFS.
|
||||
class WasmCompatibility {
|
||||
/// Whether IndexedDB is available.
|
||||
final bool supportsIndexedDb;
|
||||
|
||||
/// Whether OPFS is available.
|
||||
final bool supportsOpfs;
|
||||
|
||||
/// Default constructor
|
||||
WasmCompatibility(this.supportsIndexedDb, this.supportsOpfs);
|
||||
}
|
||||
|
||||
/// Internal classification of storage implementations.
|
||||
extension StorageClassification on WasmStorageImplementation {
|
||||
/// Whether this implementation uses the OPFS filesystem API.
|
||||
bool get isOpfsBased =>
|
||||
this == WasmStorageImplementation.opfsShared ||
|
||||
this == WasmStorageImplementation.opfsLocks;
|
||||
|
||||
/// Whether this implementation uses the IndexedDB API.
|
||||
bool get isIndexedDbBased =>
|
||||
this == WasmStorageImplementation.sharedIndexedDb ||
|
||||
this == WasmStorageImplementation.unsafeIndexedDb;
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@ class SharedDriftWorker {
|
|||
/// If we end up using [WasmStorageImplementation.opfsShared], this is the
|
||||
/// "shared-dedicated" worker hosting the database.
|
||||
Worker? _dedicatedWorker;
|
||||
Future<SharedWorkerStatus>? _featureDetection;
|
||||
|
||||
final DriftServerController _servers = DriftServerController();
|
||||
|
||||
|
@ -27,17 +26,7 @@ class SharedDriftWorker {
|
|||
}
|
||||
|
||||
void _newConnection(MessageEvent event) async {
|
||||
// Start a feature detection run and inform the client about what we can do.
|
||||
final detectionFuture = (_featureDetection ??= _startFeatureDetection());
|
||||
final clientPort = event.ports[0];
|
||||
|
||||
try {
|
||||
final result = await detectionFuture;
|
||||
result.sendToPort(clientPort);
|
||||
} catch (e, s) {
|
||||
WorkerError(e.toString() + s.toString()).sendToPort(clientPort);
|
||||
}
|
||||
|
||||
clientPort.onMessage
|
||||
.listen((event) => _messageFromClient(clientPort, event));
|
||||
}
|
||||
|
@ -47,6 +36,9 @@ class SharedDriftWorker {
|
|||
final message = WasmInitializationMessage.read(event);
|
||||
|
||||
switch (message) {
|
||||
case RequestCompatibilityCheck(databaseName: var dbName):
|
||||
final result = await _startFeatureDetection(dbName);
|
||||
result.sendToPort(client);
|
||||
case ServeDriftDatabase(
|
||||
storage: WasmStorageImplementation.sharedIndexedDb
|
||||
):
|
||||
|
@ -67,32 +59,41 @@ class SharedDriftWorker {
|
|||
}
|
||||
}
|
||||
|
||||
Future<SharedWorkerStatus> _startFeatureDetection() async {
|
||||
Future<SharedWorkerCompatibilityResult> _startFeatureDetection(
|
||||
String databaseName) async {
|
||||
// First, let's see if this shared worker can spawn dedicated workers.
|
||||
final hasWorker = supportsWorkers;
|
||||
final canUseIndexedDb = await checkIndexedDbSupport();
|
||||
final canUseIndexedDb = await checkIndexedDbSupport(databaseName);
|
||||
|
||||
if (!hasWorker) {
|
||||
return SharedWorkerStatus(
|
||||
final indexedDbExists =
|
||||
_servers.servers[databaseName]?.storage.isIndexedDbBased ??
|
||||
await checkIndexedDbExists(databaseName);
|
||||
|
||||
return SharedWorkerCompatibilityResult(
|
||||
canSpawnDedicatedWorkers: false,
|
||||
dedicatedWorkersCanUseOpfs: false,
|
||||
canUseIndexedDb: canUseIndexedDb,
|
||||
indexedDbExists: indexedDbExists,
|
||||
opfsExists: false,
|
||||
);
|
||||
} else {
|
||||
final worker = _dedicatedWorker = Worker(Uri.base.toString());
|
||||
final worker = _dedicatedWorker ??= Worker(Uri.base.toString());
|
||||
|
||||
// Ask the worker about the storage implementations it can support.
|
||||
DedicatedWorkerCompatibilityCheck(null).sendToWorker(worker);
|
||||
RequestCompatibilityCheck(databaseName).sendToWorker(worker);
|
||||
|
||||
final completer = Completer<SharedWorkerStatus>();
|
||||
final completer = Completer<SharedWorkerCompatibilityResult>();
|
||||
StreamSubscription? messageSubscription, errorSubscription;
|
||||
|
||||
void result(bool result) {
|
||||
void result(bool opfsAvailable, bool opfsExists, bool indexedDbExists) {
|
||||
if (!completer.isCompleted) {
|
||||
completer.complete(SharedWorkerStatus(
|
||||
completer.complete(SharedWorkerCompatibilityResult(
|
||||
canSpawnDedicatedWorkers: true,
|
||||
dedicatedWorkersCanUseOpfs: result,
|
||||
dedicatedWorkersCanUseOpfs: opfsAvailable,
|
||||
canUseIndexedDb: canUseIndexedDb,
|
||||
indexedDbExists: indexedDbExists,
|
||||
opfsExists: opfsExists,
|
||||
));
|
||||
|
||||
messageSubscription?.cancel();
|
||||
|
@ -103,12 +104,17 @@ class SharedDriftWorker {
|
|||
messageSubscription = worker.onMessage.listen((event) {
|
||||
final data =
|
||||
WasmInitializationMessage.fromJs(getProperty(event, 'data'));
|
||||
final compatibilityResult = data as DedicatedWorkerCompatibilityResult;
|
||||
|
||||
result((data as DedicatedWorkerCompatibilityResult).canAccessOpfs);
|
||||
result(
|
||||
compatibilityResult.canAccessOpfs,
|
||||
compatibilityResult.opfsExists,
|
||||
compatibilityResult.indexedDbExists,
|
||||
);
|
||||
});
|
||||
|
||||
errorSubscription = worker.onError.listen((event) {
|
||||
result(false);
|
||||
result(false, false, false);
|
||||
worker.terminate();
|
||||
_dedicatedWorker = null;
|
||||
});
|
||||
|
|
|
@ -13,6 +13,7 @@ library drift.wasm;
|
|||
|
||||
import 'dart:async';
|
||||
import 'dart:html';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:sqlite3/wasm.dart';
|
||||
|
@ -79,16 +80,28 @@ class WasmDatabase extends DelegatedDatabase {
|
|||
);
|
||||
}
|
||||
|
||||
/// For an in-depth
|
||||
/// Opens a database on the web.
|
||||
///
|
||||
/// Drift will detect features supported by the current browser and picks an
|
||||
/// appropriate implementation to store data based on those results.
|
||||
///
|
||||
/// Using this API requires two additional file that you need to copy into the
|
||||
/// `web/` folder of your Flutter or Dart application: A `sqlite3.wasm` file,
|
||||
/// which you can [get here](https://github.com/simolus3/sqlite3.dart/releases),
|
||||
/// and a drift worker, which you can [get here](https://drift.simonbinder.eu/web/#worker).
|
||||
///
|
||||
/// For more detailed information, see https://drift.simonbinder.eu/web.
|
||||
static Future<WasmDatabaseResult> open({
|
||||
required String databaseName,
|
||||
required Uri sqlite3Uri,
|
||||
required Uri driftWorkerUri,
|
||||
FutureOr<Uint8List> Function()? initializeDatabase,
|
||||
}) {
|
||||
return WasmDatabaseOpener(
|
||||
databaseName: databaseName,
|
||||
sqlite3WasmUri: sqlite3Uri,
|
||||
driftWorkerUri: driftWorkerUri,
|
||||
initializeDatabase: initializeDatabase,
|
||||
).open();
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ dependencies:
|
|||
js: ^0.6.3
|
||||
meta: ^1.3.0
|
||||
stream_channel: ^2.1.0
|
||||
sqlite3: ^2.0.0-dev.1
|
||||
sqlite3: ^2.0.0-dev.3
|
||||
path: ^1.8.0
|
||||
|
||||
dev_dependencies:
|
||||
|
|
|
@ -131,4 +131,11 @@ class DriftWebDriver {
|
|||
Future<void> waitForTableUpdate() async {
|
||||
await driver.executeAsync('wait_for_update("", arguments[0])', []);
|
||||
}
|
||||
|
||||
Future<void> enableInitialization(bool enabled) async {
|
||||
await driver.executeAsync(
|
||||
'enable_initialization(arguments[0], arguments[1])',
|
||||
[enabled.toString()],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ dependencies:
|
|||
js: ^0.6.7
|
||||
package_config: ^2.1.0
|
||||
async: ^2.11.0
|
||||
http: ^1.0.0
|
||||
|
||||
dev_dependencies:
|
||||
build_runner: ^2.4.5
|
||||
|
|
|
@ -91,6 +91,7 @@ void main() {
|
|||
|
||||
await driver.insertIntoDatabase();
|
||||
await driver.waitForTableUpdate();
|
||||
expect(await driver.amountOfRows, 1);
|
||||
|
||||
if (entry != WasmStorageImplementation.unsafeIndexedDb &&
|
||||
entry != WasmStorageImplementation.inMemory) {
|
||||
|
@ -107,6 +108,7 @@ void main() {
|
|||
await windows.last.setAsActive();
|
||||
|
||||
await driver.openDatabase(entry);
|
||||
expect(await driver.amountOfRows, 1);
|
||||
await driver.insertIntoDatabase();
|
||||
await windows.last.close();
|
||||
|
||||
|
@ -114,6 +116,32 @@ void main() {
|
|||
await driver.waitForTableUpdate();
|
||||
}
|
||||
});
|
||||
|
||||
test(
|
||||
'initializing ${entry.name} from blob',
|
||||
() async {
|
||||
await driver.enableInitialization(true);
|
||||
await driver.openDatabase(entry);
|
||||
|
||||
expect(await driver.amountOfRows, 1);
|
||||
await driver.insertIntoDatabase();
|
||||
expect(await driver.amountOfRows, 2);
|
||||
|
||||
if (entry != WasmStorageImplementation.inMemory) {
|
||||
await Future.delayed(const Duration(seconds: 1));
|
||||
await driver.driver.refresh();
|
||||
|
||||
await driver.enableInitialization(true);
|
||||
await driver.openDatabase();
|
||||
expect(await driver.amountOfRows, 2);
|
||||
}
|
||||
},
|
||||
skip: browser == Browser.firefox &&
|
||||
entry == WasmStorageImplementation.opfsLocks
|
||||
? "This configuration fails, but the failure can't be "
|
||||
'reproduced by manually running the steps of this test.'
|
||||
: null,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
Binary file not shown.
|
@ -7,18 +7,23 @@ import 'package:drift/drift.dart';
|
|||
import 'package:drift/wasm.dart';
|
||||
// ignore: invalid_use_of_internal_member
|
||||
import 'package:drift/src/web/wasm_setup.dart';
|
||||
import 'package:http/http.dart' as http;
|
||||
import 'package:web_wasm/src/database.dart';
|
||||
|
||||
const dbName = 'drift_test';
|
||||
TestDatabase? openedDatabase;
|
||||
StreamQueue<void>? tableUpdates;
|
||||
|
||||
bool _loadFromInitializer = false;
|
||||
|
||||
void main() {
|
||||
_addCallbackForWebDriver('detectImplementations', _detectImplementations);
|
||||
_addCallbackForWebDriver('open', _open);
|
||||
_addCallbackForWebDriver('insert', _insert);
|
||||
_addCallbackForWebDriver('get_rows', _getRows);
|
||||
_addCallbackForWebDriver('wait_for_update', _waitForUpdate);
|
||||
_addCallbackForWebDriver('enable_initialization',
|
||||
(arg) async => _loadFromInitializer = bool.parse(arg!));
|
||||
|
||||
document.getElementById('selfcheck')?.onClick.listen((event) async {
|
||||
print('starting');
|
||||
|
@ -50,6 +55,12 @@ WasmDatabaseOpener get _opener {
|
|||
databaseName: dbName,
|
||||
sqlite3WasmUri: Uri.parse('/sqlite3.wasm'),
|
||||
driftWorkerUri: Uri.parse('/worker.dart.js'),
|
||||
initializeDatabase: _loadFromInitializer
|
||||
? () async {
|
||||
final response = await http.get(Uri.parse('/initial.db'));
|
||||
return response.bodyBytes;
|
||||
}
|
||||
: null,
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue