Migrate wasm internals to new interop

This commit is contained in:
Simon Binder 2024-02-22 17:23:08 +01:00
parent 27877a72e9
commit f9d5443a8e
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
17 changed files with 405 additions and 237 deletions

View File

@ -12,6 +12,7 @@
- Improve stack traces for errors happening on drift isolates (which includes
usages of `NativeDatabase.createInBackground`).
- Don't cache `EXPLAIN` statements, avoiding schema locks.
- Migrate `WasmDatabase` to `dart:js_interop` and `package:web`.
## 2.15.0

View File

@ -162,7 +162,7 @@ class GeneratedColumn<T extends Object> extends Column<T> {
// these custom constraints refer to builtin constraints from drift
if (!isSerial && _defaultConstraints != null) {
_defaultConstraints!(into);
_defaultConstraints(into);
}
} else if ($customConstraints?.isNotEmpty == true) {
into.buffer

View File

@ -348,7 +348,7 @@ class InsertStatement<T extends Table, D> {
if (onConflict._where != null) {
ctx.writeWhitespace();
final where = onConflict._where!(
final where = onConflict._where(
table.asDslTable, table.createAlias('excluded').asDslTable);
where.writeInto(ctx);
}

View File

@ -1,34 +1,47 @@
@JS()
library;
import 'dart:async';
import 'dart:html';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';
import 'package:drift/src/runtime/api/runtime_api.dart';
import 'package:drift/src/runtime/executor/stream_queries.dart';
import 'package:js/js.dart';
import 'package:js/js_util.dart';
import 'package:web/web.dart' as web;
@JS('Array')
extension type _ArrayWrapper._(JSArray _) implements JSObject {
external static JSBoolean isArray(JSAny? value);
}
/// A [StreamQueryStore] using [web broadcast] APIs
///
/// [web broadcast]: https://developer.mozilla.org/en-US/docs/Web/API/Broadcast_Channel_API
class BroadcastStreamQueryStore extends StreamQueryStore {
final BroadcastChannel _channel;
StreamSubscription<MessageEvent>? _messageFromChannel;
final web.BroadcastChannel _channel;
StreamSubscription<web.MessageEvent>? _messageFromChannel;
/// Constructs a broadcast query store with the given [identifier].
///
/// All query stores with the same identifier will share stream query updates.
BroadcastStreamQueryStore(String identifier)
: _channel = BroadcastChannel('drift_updates_$identifier') {
_messageFromChannel = _channel.onMessage.listen(_handleMessage);
: _channel = web.BroadcastChannel('drift_updates_$identifier') {
_messageFromChannel = web.EventStreamProviders.messageEvent
.forTarget(_channel)
.listen(_handleMessage);
}
void _handleMessage(MessageEvent message) {
// Using getProperty to avoid dart2js structured clone that turns the
// anonymous object into a map.
final data = getProperty<Object?>(message, 'data');
if (data is! List || data.isEmpty) return;
void _handleMessage(web.MessageEvent message) {
final data = message.data;
if (!_ArrayWrapper.isArray(data).toDart) {
return;
}
final asList = (data as JSArray).toDart;
if (asList.isEmpty) return;
super.handleTableUpdates({
for (final entry in data.cast<_SerializedTableUpdate>())
for (final entry in asList.cast<_SerializedTableUpdate>())
entry.toTableUpdate,
});
}
@ -39,7 +52,7 @@ class BroadcastStreamQueryStore extends StreamQueryStore {
_channel.postMessage([
for (final update in updates) _SerializedTableUpdate.of(update),
]);
].toJS);
}
@override
@ -50,34 +63,31 @@ class BroadcastStreamQueryStore extends StreamQueryStore {
}
/// Whether the current JavaScript context supports broadcast channels.
static bool get supported => hasProperty(globalThis, 'BroadcastChannel');
static bool get supported => globalContext.has('BroadcastChannel');
}
@JS()
@anonymous
@staticInterop
class _SerializedTableUpdate {
extension type _SerializedTableUpdate._(JSObject _) implements JSObject {
external factory _SerializedTableUpdate({
required String? kind,
required String table,
required JSString? kind,
required JSString table,
});
factory _SerializedTableUpdate.of(TableUpdate update) {
return _SerializedTableUpdate(kind: update.kind?.name, table: update.table);
return _SerializedTableUpdate(
kind: update.kind?.name.toJS,
table: update.table.toJS,
);
}
}
extension on _SerializedTableUpdate {
@JS()
external String? get kind;
@JS()
external String get table;
external JSString? get kind;
external JSString get table;
TableUpdate get toTableUpdate {
final updateKind = _updateKindByName[kind];
final updateKind = _updateKindByName[kind?.toDart];
return TableUpdate(table, kind: updateKind);
return TableUpdate(table.toDart, kind: updateKind);
}
static final _updateKindByName = UpdateKind.values.asNameMap();

View File

@ -0,0 +1,50 @@
import 'dart:js_interop';
import 'package:stream_channel/stream_channel.dart';
import 'package:web/web.dart' as web;
/// Extension to transform a raw [MessagePort] from web workers into a Dart
/// [StreamChannel].
extension WebPortToChannel on web.MessagePort {
static const _disconnectMessage = '_disconnect';
/// Converts this port to a two-way communication channel, exposed as a
/// [StreamChannel].
///
/// This can be used to implement a remote database connection over service
/// workers.
///
/// The [explicitClose] parameter can be used to control whether a close
/// message should be sent through the channel when it is closed. This will
/// cause it to be closed on the other end as well. Note that this is not a
/// reliable way of determining channel closures though, as there is no event
/// for channels being closed due to a tab or worker being closed.
/// Both "ends" of a JS channel calling [channel] on their part must use the
/// value for [explicitClose].
StreamChannel<Object?> channel({bool explicitClose = false}) {
final controller = StreamChannelController<Object?>();
onmessage = (web.MessageEvent event) {
final message = event.data;
if (explicitClose && message == _disconnectMessage.toJS) {
// Other end has closed the connection
controller.local.sink.close();
} else {
controller.local.sink.add(message.dartify());
}
}.toJS;
controller.local.stream.listen((e) => postMessage(e.jsify()), onDone: () {
// Closed locally, inform the other end.
if (explicitClose) {
postMessage(_disconnectMessage.toJS);
}
close();
});
return controller.foreign;
}
}

View File

@ -7,22 +7,23 @@
/// asynchronous
// ignore_for_file: public_member_api_docs
@internal
@JS()
library;
import 'dart:async';
import 'dart:html';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';
import 'package:async/async.dart';
import 'package:drift/drift.dart';
import 'package:drift/remote.dart';
import 'package:drift/wasm.dart';
import 'package:js/js.dart';
import 'package:js/js_util.dart';
import 'package:meta/meta.dart';
import 'package:sqlite3/wasm.dart';
import 'package:web/web.dart' as web;
import 'broadcast_stream_queries.dart';
import 'channel.dart';
import 'new_channel.dart';
import 'wasm_setup/shared.dart';
import 'wasm_setup/protocol.dart';
@ -32,10 +33,10 @@ import 'wasm_setup/protocol.dart';
external bool get crossOriginIsolated;
/// Whether shared workers can be constructed in the current context.
bool get supportsSharedWorkers => hasProperty(globalThis, 'SharedWorker');
bool get supportsSharedWorkers => globalContext.has('SharedWorker');
/// Whether dedicated workers can be constructed in the current context.
bool get supportsWorkers => hasProperty(globalThis, 'Worker');
bool get supportsWorkers => globalContext.has('Worker');
class WasmDatabaseOpener {
final Uri sqlite3WasmUri;
@ -107,7 +108,7 @@ class WasmDatabaseOpener {
Future<void> _probeDedicated() async {
if (supportsWorkers) {
final dedicatedWorker = _dedicatedWorker =
_DriftWorker.dedicated(Worker(driftWorkerUri.toString()));
_DriftWorker.dedicated(web.Worker(driftWorkerUri.toString()));
_createCompatibilityCheck().sendTo(dedicatedWorker.send);
final status = await dedicatedWorker.workerMessages.nextNoError
@ -133,8 +134,8 @@ class WasmDatabaseOpener {
Future<void> _probeShared() async {
if (supportsSharedWorkers) {
final sharedWorker =
SharedWorker(driftWorkerUri.toString(), 'drift worker');
final port = sharedWorker.port!;
web.SharedWorker(driftWorkerUri.toString(), 'drift worker'.toJS);
final port = sharedWorker.port;
final shared = _sharedWorker = _DriftWorker.shared(sharedWorker, port);
// First, the shared worker will tell us which features it supports.
@ -161,40 +162,38 @@ class WasmDatabaseOpener {
}
final class _DriftWorker {
final AbstractWorker worker;
/// Either a [web.SharedWorker] or a [web.Worker].
final JSObject worker;
ProtocolVersion version = ProtocolVersion.legacy;
/// The message port to communicate with the worker, if it's a shared worker.
final MessagePort? portForShared;
final web.MessagePort? portForShared;
final StreamQueue<WasmInitializationMessage> workerMessages;
_DriftWorker.dedicated(Worker this.worker)
_DriftWorker.dedicated(web.Worker this.worker)
: portForShared = null,
workerMessages =
StreamQueue(_readMessages(worker.onMessage, worker.onError));
workerMessages = StreamQueue(_readMessages(worker, worker));
_DriftWorker.shared(SharedWorker this.worker, this.portForShared)
: workerMessages =
StreamQueue(_readMessages(worker.port!.onMessage, worker.onError));
_DriftWorker.shared(web.SharedWorker this.worker, this.portForShared)
: workerMessages = StreamQueue(_readMessages(worker.port, worker)) {
(worker as web.SharedWorker).port.start();
}
void send(Object? msg, [List<Object>? transfer]) {
switch (worker) {
case final Worker worker:
worker.postMessage(msg, transfer);
case SharedWorker():
portForShared!.postMessage(msg, transfer);
void send(JSAny? msg, List<JSObject>? transfer) {
if (portForShared case final port?) {
port.postMessage(msg, (transfer ?? const []).toJS);
} else {
(worker as web.Worker).postMessage(msg, (transfer ?? const []).toJS);
}
}
void close() {
workerMessages.cancel();
switch (worker) {
case final Worker dedicated:
dedicated.terminate();
case SharedWorker():
portForShared!.close();
if (portForShared case final port?) {
port.close();
} else {
(worker as web.Worker).terminate();
}
}
}
@ -225,9 +224,9 @@ final class _ProbeResult implements WasmProbeResult {
FutureOr<Uint8List?> Function()? initializeDatabase,
WasmDatabaseSetup? localSetup,
}) async {
final channel = MessageChannel();
final channel = web.MessageChannel();
final initializer = initializeDatabase;
final initChannel = initializer != null ? MessageChannel() : null;
final initChannel = initializer != null ? web.MessageChannel() : null;
ServeDriftDatabase message;
final sharedWorker = opener._sharedWorker;
@ -276,18 +275,24 @@ final class _ProbeResult implements WasmProbeResult {
initializeDatabase, localSetup);
}
initChannel?.port1.onMessage.listen((event) async {
// The worker hosting the database is asking for the initial blob because
// the database doesn't exist.
Uint8List? result;
try {
result = await initializer?.call();
} finally {
initChannel.port1
..postMessage(result, [if (result != null) result.buffer])
..close();
}
});
if (initChannel != null) {
initChannel.port1.start();
web.EventStreamProviders.messageEvent
.forTarget(initChannel.port1)
.listen((event) async {
// The worker hosting the database is asking for the initial blob because
// the database doesn't exist.
Uint8List? result;
try {
result = await initializer?.call();
} finally {
initChannel.port1
..postMessage(
result?.toJS, [if (result != null) result.buffer.toJS].toJS)
..close();
}
});
}
final local = channel.port1
.channel(explicitClose: message.protocolVersion >= ProtocolVersion.v1);
@ -350,7 +355,13 @@ final class _ProbeResult implements WasmProbeResult {
}
Stream<WasmInitializationMessage> _readMessages(
Stream<MessageEvent> messages, Stream<Event> errors) {
web.EventTarget messageTarget,
web.EventTarget errorTarget,
) {
final messages =
web.EventStreamProviders.messageEvent.forTarget(messageTarget);
final errors = web.EventStreamProviders.errorEvent.forTarget(errorTarget);
final mappedMessages = messages.map(WasmInitializationMessage.read);
return Stream.multi((listener) {

View File

@ -1,10 +1,12 @@
// ignore_for_file: public_member_api_docs
import 'dart:async';
import 'dart:html';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';
import 'package:js/js_util.dart';
import 'package:sqlite3/wasm.dart';
import 'package:web/web.dart'
show DedicatedWorkerGlobalScope, EventStreamProviders;
import '../../utils/synchronized.dart';
import 'protocol.dart';
@ -22,7 +24,7 @@ class DedicatedDriftWorker {
: _servers = DriftServerController(setup);
void start() {
self.onMessage.listen((event) {
EventStreamProviders.messageEvent.forTarget(self).listen((event) {
final message = WasmInitializationMessage.read(event);
_handleMessage(message);
});
@ -69,11 +71,10 @@ class DedicatedDriftWorker {
}
DedicatedWorkerCompatibilityResult(
supportsNestedWorkers: hasProperty(globalThis, 'Worker'),
supportsNestedWorkers: globalContext.has('Worker'),
canAccessOpfs: supportsOpfs,
supportsIndexedDb: supportsIndexedDb,
supportsSharedArrayBuffers:
hasProperty(globalThis, 'SharedArrayBuffer'),
supportsSharedArrayBuffers: globalContext.has('SharedArrayBuffer'),
opfsExists: opfsExists,
indexedDbExists: indexedDbExists,
existingDatabases: existingDatabases,
@ -83,7 +84,7 @@ class DedicatedDriftWorker {
_servers.serve(message);
case StartFileSystemServer(sqlite3Options: final options):
final worker = await VfsWorker.create(options);
self.postMessage(true);
self.postMessage(true.toJS);
await worker.start();
case DeleteDatabase(database: (final storage, final name)):
try {

View File

@ -1,9 +1,9 @@
// ignore_for_file: public_member_api_docs
import 'dart:html';
import 'dart:js';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';
import 'package:js/js_util.dart';
import 'package:web/web.dart' hide WorkerOptions;
import 'package:sqlite3/wasm.dart';
import 'types.dart';
@ -18,8 +18,8 @@ class ProtocolVersion {
const ProtocolVersion._(this.versionCode);
void writeToJs(Object object) {
setProperty(object, 'v', versionCode);
void writeToJs(JSObject object) {
object['v'] = versionCode.toJS;
}
bool operator >=(ProtocolVersion other) {
@ -36,9 +36,9 @@ class ProtocolVersion {
};
}
static ProtocolVersion fromJsObject(Object object) {
if (hasProperty(object, 'v')) {
return negotiate(getProperty<int>(object, 'v'));
static ProtocolVersion fromJsObject(JSObject object) {
if (object.has('v')) {
return negotiate((object['v'] as JSNumber).toDartInt);
} else {
return legacy;
}
@ -58,52 +58,56 @@ class ProtocolVersion {
static const current = v1;
}
typedef PostMessage = void Function(Object? msg, [List<Object>? transfer]);
typedef PostMessage = void Function(JSObject? msg, List<JSObject>? transfer);
/// Sealed superclass for JavaScript objects exchanged between the UI tab and
/// workers spawned by drift to find a suitable database implementation.
sealed class WasmInitializationMessage {
WasmInitializationMessage();
factory WasmInitializationMessage.fromJs(Object jsObject) {
final type = getProperty<String>(jsObject, 'type');
final payload = getProperty<Object?>(jsObject, 'payload');
factory WasmInitializationMessage.fromJs(JSObject jsObject) {
final type = (jsObject['type'] as JSString).toDart;
final payload = jsObject['payload'];
return switch (type) {
WorkerError.type => WorkerError.fromJsPayload(payload!),
ServeDriftDatabase.type => ServeDriftDatabase.fromJsPayload(payload!),
WorkerError.type => WorkerError.fromJsPayload(payload as JSObject),
ServeDriftDatabase.type =>
ServeDriftDatabase.fromJsPayload(payload as JSObject),
StartFileSystemServer.type =>
StartFileSystemServer.fromJsPayload(payload!),
StartFileSystemServer.fromJsPayload(payload as JSObject),
RequestCompatibilityCheck.type =>
RequestCompatibilityCheck.fromJsPayload(payload),
DedicatedWorkerCompatibilityResult.type =>
DedicatedWorkerCompatibilityResult.fromJsPayload(payload!),
DedicatedWorkerCompatibilityResult.fromJsPayload(payload as JSObject),
SharedWorkerCompatibilityResult.type =>
SharedWorkerCompatibilityResult.fromJsPayload(payload!),
DeleteDatabase.type => DeleteDatabase.fromJsPayload(payload!),
SharedWorkerCompatibilityResult.fromJsPayload(payload as JSArray),
DeleteDatabase.type => DeleteDatabase.fromJsPayload(payload as JSAny),
_ => throw ArgumentError('Unknown type $type'),
};
}
factory WasmInitializationMessage.read(MessageEvent event) {
// Not using event.data because we don't want the SDK to dartify the raw JS
// object we're passing around.
final rawData = getProperty<Object>(event, 'data');
return WasmInitializationMessage.fromJs(rawData);
return WasmInitializationMessage.fromJs(event.data as JSObject);
}
void sendTo(PostMessage sender);
void sendToWorker(Worker worker) {
sendTo(worker.postMessage);
sendTo((msg, transfer) {
worker.postMessage(msg, (transfer ?? const []).toJS);
});
}
void sendToPort(MessagePort port) {
sendTo(port.postMessage);
sendTo((msg, transfer) {
port.postMessage(msg, (transfer ?? const []).toJS);
});
}
void sendToClient(DedicatedWorkerGlobalScope worker) {
sendTo(worker.postMessage);
sendTo((msg, transfer) {
worker.postMessage(msg, (transfer ?? const []).toJS);
});
}
}
@ -156,16 +160,15 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
required super.version,
});
factory SharedWorkerCompatibilityResult.fromJsPayload(Object payload) {
final asList = payload as List;
factory SharedWorkerCompatibilityResult.fromJsPayload(JSArray payload) {
final asList = payload.toDart;
final asBooleans = asList.cast<bool>();
final List<ExistingDatabase> existingDatabases;
var version = ProtocolVersion.legacy;
if (asList.length > 5) {
existingDatabases =
EncodeLocations.readFromJs(asList[5] as List<dynamic>);
existingDatabases = EncodeLocations.readFromJs(asList[5] as JSArray);
if (asList.length > 6) {
version = ProtocolVersion.negotiate(asList[6] as int);
@ -187,15 +190,17 @@ final class SharedWorkerCompatibilityResult extends CompatibilityResult {
@override
void sendTo(PostMessage sender) {
sender.sendTyped(type, [
canSpawnDedicatedWorkers,
dedicatedWorkersCanUseOpfs,
canUseIndexedDb,
indexedDbExists,
opfsExists,
existingDatabases.encodeToJs(),
version.versionCode,
]);
sender.sendTyped(
type,
[
canSpawnDedicatedWorkers.toJS,
dedicatedWorkersCanUseOpfs.toJS,
canUseIndexedDb.toJS,
indexedDbExists.toJS,
opfsExists.toJS,
existingDatabases.encodeToJs(),
version.versionCode.toJS,
].toJS);
}
@override
@ -216,13 +221,13 @@ final class WorkerError extends WasmInitializationMessage implements Exception {
WorkerError(this.error);
factory WorkerError.fromJsPayload(Object payload) {
return WorkerError(payload as String);
factory WorkerError.fromJsPayload(JSObject payload) {
return WorkerError((payload as JSString).toDart);
}
@override
void sendTo(PostMessage sender) {
sender.sendTyped(type, error);
sender.sendTyped(type, error.toJS);
}
@override
@ -252,32 +257,32 @@ final class ServeDriftDatabase extends WasmInitializationMessage {
required this.protocolVersion,
});
factory ServeDriftDatabase.fromJsPayload(Object payload) {
factory ServeDriftDatabase.fromJsPayload(JSObject payload) {
return ServeDriftDatabase(
sqlite3WasmUri: Uri.parse(getProperty(payload, 'sqlite')),
port: getProperty(payload, 'port'),
sqlite3WasmUri: Uri.parse((payload['sqlite'] as JSString).toDart),
port: payload['port'] as MessagePort,
storage: WasmStorageImplementation.values
.byName(getProperty(payload, 'storage')),
databaseName: getProperty(payload, 'database'),
initializationPort: getProperty(payload, 'initPort'),
.byName((payload['storage'] as JSString).toDart),
databaseName: (payload['database'] as JSString).toDart,
initializationPort: payload['initPort'] as MessagePort?,
protocolVersion: ProtocolVersion.fromJsObject(payload),
);
}
@override
void sendTo(PostMessage sender) {
final object = newObject<Object>();
setProperty(object, 'sqlite', sqlite3WasmUri.toString());
setProperty(object, 'port', port);
setProperty(object, 'storage', storage.name);
setProperty(object, 'database', databaseName);
final initPort = initializationPort;
setProperty(object, 'initPort', initPort);
final object = JSObject()
..['sqlite'] = sqlite3WasmUri.toString().toJS
..['port'] = port
..['storage'] = storage.name.toJS
..['database'] = databaseName.toJS
..['initPort'] = initializationPort;
protocolVersion.writeToJs(object);
sender.sendTyped(type, object, [
port,
if (initPort != null) initPort,
if (initializationPort != null) initializationPort!,
]);
}
}
@ -293,13 +298,13 @@ final class RequestCompatibilityCheck extends WasmInitializationMessage {
RequestCompatibilityCheck(this.databaseName);
factory RequestCompatibilityCheck.fromJsPayload(Object? payload) {
return RequestCompatibilityCheck(payload as String);
factory RequestCompatibilityCheck.fromJsPayload(JSAny? payload) {
return RequestCompatibilityCheck((payload as JSString).toDart);
}
@override
void sendTo(PostMessage sender) {
sender.sendTyped(type, databaseName);
sender.sendTyped(type, databaseName.toJS);
}
}
@ -322,22 +327,23 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
required super.version,
});
factory DedicatedWorkerCompatibilityResult.fromJsPayload(Object payload) {
factory DedicatedWorkerCompatibilityResult.fromJsPayload(JSObject payload) {
final existingDatabases = <ExistingDatabase>[];
if (hasProperty(payload, 'existing')) {
if (payload.has('existing')) {
existingDatabases
.addAll(EncodeLocations.readFromJs(getProperty(payload, 'existing')));
.addAll(EncodeLocations.readFromJs(payload['existing'] as JSArray));
}
return DedicatedWorkerCompatibilityResult(
supportsNestedWorkers: getProperty(payload, 'supportsNestedWorkers'),
canAccessOpfs: getProperty(payload, 'canAccessOpfs'),
supportsNestedWorkers:
(payload['supportsNestedWorkers'] as JSBoolean).toDart,
canAccessOpfs: (payload['canAccessOpfs'] as JSBoolean).toDart,
supportsSharedArrayBuffers:
getProperty(payload, 'supportsSharedArrayBuffers'),
supportsIndexedDb: getProperty(payload, 'supportsIndexedDb'),
indexedDbExists: getProperty(payload, 'indexedDbExists'),
opfsExists: getProperty(payload, 'opfsExists'),
(payload['supportsSharedArrayBuffers'] as JSBoolean).toDart,
supportsIndexedDb: (payload['supportsIndexedDb'] as JSBoolean).toDart,
indexedDbExists: (payload['indexedDbExists'] as JSBoolean).toDart,
opfsExists: (payload['opfsExists'] as JSBoolean).toDart,
existingDatabases: existingDatabases,
version: ProtocolVersion.fromJsObject(payload),
);
@ -345,16 +351,14 @@ final class DedicatedWorkerCompatibilityResult extends CompatibilityResult {
@override
void sendTo(PostMessage sender) {
final object = newObject<Object>();
setProperty(object, 'supportsNestedWorkers', supportsNestedWorkers);
setProperty(object, 'canAccessOpfs', canAccessOpfs);
setProperty(object, 'supportsIndexedDb', supportsIndexedDb);
setProperty(
object, 'supportsSharedArrayBuffers', supportsSharedArrayBuffers);
setProperty(object, 'indexedDbExists', indexedDbExists);
setProperty(object, 'opfsExists', opfsExists);
setProperty(object, 'existing', existingDatabases.encodeToJs());
final object = JSObject()
..['supportsNestedWorkers'] = supportsNestedWorkers.toJS
..['canAccessOpfs'] = canAccessOpfs.toJS
..['supportsIndexedDb'] = supportsIndexedDb.toJS
..['supportsSharedArrayBuffers'] = supportsSharedArrayBuffers.toJS
..['indexedDbExists'] = indexedDbExists.toJS
..['opfsExists'] = opfsExists.toJS
..['existing'] = existingDatabases.encodeToJs();
version.writeToJs(object);
sender.sendTyped(type, object);
@ -381,13 +385,13 @@ final class StartFileSystemServer extends WasmInitializationMessage {
StartFileSystemServer(this.sqlite3Options);
factory StartFileSystemServer.fromJsPayload(Object payload) {
factory StartFileSystemServer.fromJsPayload(JSObject payload) {
return StartFileSystemServer(payload as WorkerOptions);
}
@override
void sendTo(PostMessage sender) {
sender.sendTyped(type, sqlite3Options);
sender.sendTyped(type, sqlite3Options as JSObject);
}
}
@ -398,53 +402,51 @@ final class DeleteDatabase extends WasmInitializationMessage {
DeleteDatabase(this.database);
factory DeleteDatabase.fromJsPayload(Object payload) {
final asList = payload as List<Object?>;
factory DeleteDatabase.fromJsPayload(JSAny payload) {
final asList = (payload as JSArray).toDart;
return DeleteDatabase((
WebStorageApi.byName[asList[0] as String]!,
asList[1] as String,
WebStorageApi.byName[(asList[0] as JSString).toDart]!,
(asList[1] as JSString).toDart,
));
}
@override
void sendTo(PostMessage sender) {
sender.sendTyped(type, [database.$1.name, database.$2]);
sender.sendTyped(type, [database.$1.name.toJS, database.$2.toJS].toJS);
}
}
extension EncodeLocations on List<ExistingDatabase> {
static List<ExistingDatabase> readFromJs(List<Object?> object) {
static List<ExistingDatabase> readFromJs(JSArray object) {
final existing = <ExistingDatabase>[];
for (final entry in object) {
for (final entry in object.toDart.cast<JSObject>()) {
existing.add((
WebStorageApi.byName[getProperty(entry as Object, 'l')]!,
getProperty(entry, 'n'),
WebStorageApi.byName[(entry['l'] as JSString).toDart]!,
(entry['n'] as JSString).toDart,
));
}
return existing;
}
Object encodeToJs() {
final existing = JsArray<Object>();
JSObject encodeToJs() {
final existing = <JSObject>[];
for (final entry in this) {
final object = newObject<Object>();
setProperty(object, 'l', entry.$1.name);
setProperty(object, 'n', entry.$2);
existing.add(object);
existing.add(JSObject()
..['l'] = entry.$1.name.toJS
..['n'] = entry.$2.toJS);
}
return existing;
return existing.toJS;
}
}
extension on PostMessage {
void sendTyped(String type, Object? payload, [List<Object>? transfer]) {
final object = newObject<Object>();
setProperty(object, 'type', type);
setProperty(object, 'payload', payload);
void sendTyped(String type, JSAny? payload, [List<JSObject>? transfer]) {
final object = JSObject()
..['type'] = type.toJS
..['payload'] = payload;
call(object, transfer);
}

View File

@ -1,17 +1,25 @@
import 'dart:async';
import 'dart:html';
import 'dart:indexed_db';
import 'dart:js_interop';
import 'dart:js_interop_unsafe';
import 'package:drift/drift.dart';
import 'package:drift/remote.dart';
import 'package:drift/wasm.dart';
import 'package:js/js_util.dart';
import 'package:web/web.dart'
show
Worker,
IDBFactory,
IDBRequest,
IDBDatabase,
IDBVersionChangeEvent,
EventStreamProviders,
MessageEvent;
// ignore: implementation_imports
import 'package:sqlite3/src/wasm/js_interop/file_system_access.dart';
import 'package:sqlite3/wasm.dart';
import 'package:stream_channel/stream_channel.dart';
import '../channel.dart';
import '../new_channel.dart';
import 'protocol.dart';
/// Checks whether the OPFS API is likely to be correctly implemented in the
@ -38,10 +46,10 @@ Future<bool> checkOpfsSupport() async {
// In earlier versions of the OPFS standard, some methods like `getSize()`
// on a sync file handle have actually been asynchronous. We don't support
// Browsers that implement the outdated spec.
final getSizeResult = callMethod<Object?>(openedFile, 'getSize', []);
if (typeofEquals<Object?>(getSizeResult, 'object')) {
final getSizeResult = (openedFile as JSObject).callMethod('getSize'.toJS);
if (getSizeResult.typeofEquals('object')) {
// Returned a promise, that's no good.
await promiseToFuture<Object?>(getSizeResult!);
await (getSizeResult as JSPromise).toDart;
return false;
}
@ -61,18 +69,18 @@ Future<bool> checkOpfsSupport() async {
/// Checks whether IndexedDB is working in the current browser.
Future<bool> checkIndexedDbSupport() async {
if (!hasProperty(globalThis, 'indexedDB') ||
if (!globalContext.has('indexedDB') ||
// FileReader needed to read and write blobs efficiently
!hasProperty(globalThis, 'FileReader')) {
!globalContext.has('FileReader')) {
return false;
}
final idb = getProperty<IdbFactory>(globalThis, 'indexedDB');
final idb = globalContext['indexedDB'] as IDBFactory;
try {
const name = 'drift_mock_db';
final mockDb = await idb.open(name);
final mockDb = await idb.open(name).complete<IDBDatabase>();
mockDb.close();
idb.deleteDatabase(name);
} catch (error) {
@ -87,19 +95,16 @@ Future<bool> checkIndexedDbExists(String databaseName) async {
bool? indexedDbExists;
try {
final idb = getProperty<IdbFactory>(globalThis, 'indexedDB');
final idb = globalContext['indexedDB'] as IDBFactory;
final database = await idb.open(
databaseName,
// Current schema version used by the [IndexedDbFileSystem]
version: 1,
onUpgradeNeeded: (event) {
// If there's an upgrade, we're going from 0 to 1 - the database doesn't
// exist! Abort the transaction so that we don't create it here.
event.target.transaction!.abort();
indexedDbExists = false;
},
);
final openRequest = idb.open(databaseName, 1);
openRequest.onupgradeneeded = (IDBVersionChangeEvent event) {
// If there's an upgrade, we're going from 0 to 1 - the database doesn't
// exist! Abort the transaction so that we don't create it here.
openRequest.transaction!.abort();
indexedDbExists = false;
}.toJS;
final database = await openRequest.complete<IDBDatabase>();
indexedDbExists ??= true;
database.close();
@ -112,9 +117,9 @@ Future<bool> checkIndexedDbExists(String databaseName) async {
/// Deletes a database from IndexedDb if supported.
Future<void> deleteDatabaseInIndexedDb(String databaseName) async {
final idb = window.indexedDB;
if (idb != null) {
await idb.deleteDatabase(databaseName);
if (globalContext.has('indexedDB')) {
final idb = globalContext['indexedDB'] as IDBFactory;
await idb.deleteDatabase(databaseName).complete<JSAny?>();
}
}
@ -181,12 +186,16 @@ class DriftServerController {
final initPort = message.initializationPort;
final initializer = initPort != null
? () async {
initPort.postMessage(true);
? () {
final completer = Completer<Uint8List?>();
initPort.postMessage(true.toJS);
return await initPort.onMessage
.map((e) => e.data as Uint8List?)
.first;
initPort.onmessage = (MessageEvent e) {
final data = (e.data as JSUint8Array?);
completer.complete(data?.toDart);
}.toJS;
return completer.future;
}
: null;
@ -269,7 +278,7 @@ class DriftServerController {
StartFileSystemServer(options).sendToWorker(worker);
// Wait for the server worker to report that it's ready
await worker.onMessage.first;
await EventStreamProviders.messageEvent.forTarget(worker).first;
return WasmVfs(workerOptions: options);
}
@ -349,3 +358,21 @@ extension StorageClassification on WasmStorageImplementation {
this == WasmStorageImplementation.sharedIndexedDb ||
this == WasmStorageImplementation.unsafeIndexedDb;
}
/// Utilities to complete an IndexedDB request.
extension CompleteIdbRequest on IDBRequest {
/// Turns this request into a Dart future that completes with the first
/// success or error event.
Future<T> complete<T extends JSAny?>() {
final completer = Completer<T>.sync();
EventStreamProviders.successEvent.forTarget(this).listen((event) {
completer.complete(result as T);
});
EventStreamProviders.errorEvent.forTarget(this).listen((event) {
completer.completeError(error ?? event);
});
return completer.future;
}
}

View File

@ -1,8 +1,8 @@
// ignore_for_file: public_member_api_docs
import 'dart:async';
import 'dart:html';
import 'dart:js_interop';
import 'package:js/js_util.dart';
import 'package:web/web.dart';
import '../wasm_setup.dart';
import 'protocol.dart';
@ -22,13 +22,15 @@ class SharedDriftWorker {
: _servers = DriftServerController(setup);
void start() {
const event = EventStreamProvider<MessageEvent>('connect');
event.forTarget(self).listen(_newConnection);
const event = EventStreamProviders.connectEvent;
event.forTarget(self).listen((e) => _newConnection(e as MessageEvent));
}
void _newConnection(MessageEvent event) async {
final clientPort = event.ports[0];
clientPort.onMessage
final clientPort = event.ports.toDart[0];
clientPort.start();
EventStreamProviders.messageEvent
.forTarget(clientPort)
.listen((event) => _messageFromClient(clientPort, event));
}
@ -111,9 +113,9 @@ class SharedDriftWorker {
}
}
messageSubscription = worker.onMessage.listen((event) {
final data =
WasmInitializationMessage.fromJs(getProperty(event, 'data'));
messageSubscription =
EventStreamProviders.messageEvent.forTarget(worker).listen((event) {
final data = WasmInitializationMessage.read(event);
final compatibilityResult = data as DedicatedWorkerCompatibilityResult;
result(
@ -124,7 +126,8 @@ class SharedDriftWorker {
);
});
errorSubscription = worker.onError.listen((event) {
errorSubscription =
EventStreamProviders.errorEvent.forTarget(worker).listen((event) {
result(false, false, false, const []);
worker.terminate();
_dedicatedWorker = null;

View File

@ -7,15 +7,17 @@
library drift.wasm;
import 'dart:async';
import 'dart:html';
import 'dart:js_interop';
import 'dart:typed_data';
import 'package:collection/collection.dart';
import 'package:drift/src/web/wasm_setup.dart';
import 'package:web/web.dart'
show DedicatedWorkerGlobalScope, SharedWorkerGlobalScope;
import 'package:sqlite3/wasm.dart';
import 'backends.dart';
import 'src/sqlite3/database.dart';
import 'src/web/wasm_setup.dart';
import 'src/web/wasm_setup/dedicated_worker.dart';
import 'src/web/wasm_setup/shared_worker.dart';
import 'src/web/wasm_setup/types.dart';
@ -205,12 +207,15 @@ class WasmDatabase extends DelegatedDatabase {
static void workerMainForOpen({
WasmDatabaseSetup? setupAllDatabases,
}) {
final self = WorkerGlobalScope.instance;
final self = globalContext;
if (self is DedicatedWorkerGlobalScope) {
DedicatedDriftWorker(self, setupAllDatabases).start();
} else if (self is SharedWorkerGlobalScope) {
SharedDriftWorker(self, setupAllDatabases).start();
if (self.instanceOfString('DedicatedWorkerGlobalScope')) {
DedicatedDriftWorker(
self as DedicatedWorkerGlobalScope, setupAllDatabases)
.start();
} else if (self.instanceOfString('SharedWorkerGlobalScope')) {
SharedDriftWorker(self as SharedWorkerGlobalScope, setupAllDatabases)
.start();
}
}
}

View File

@ -10,4 +10,4 @@ import 'package:meta/meta.dart';
export 'src/web/sql_js.dart';
export 'src/web/storage.dart' hide CustomSchemaVersionSave;
export 'src/web/web_db.dart';
export 'src/web/channel.dart';
export 'src/web/channel.dart' show PortToChannel;

View File

@ -6,21 +6,23 @@ homepage: https://drift.simonbinder.eu/
issue_tracker: https://github.com/simolus3/drift/issues
environment:
sdk: '>=3.0.0 <4.0.0'
sdk: '>=3.3.0 <4.0.0'
dependencies:
async: ^2.5.0
convert: ^3.0.0
collection: ^1.15.0
js: ^0.6.3
js: '>=0.6.3 <0.8.0'
meta: ^1.3.0
stream_channel: ^2.1.0
sqlite3: ^2.4.0
path: ^1.8.0
stack_trace: ^1.11.1
web: ^0.5.0
dev_dependencies:
archive: ^3.3.1
analyzer: ^6.4.1
build_test: ^2.0.0
build_runner_core: ^7.0.0
build_verify: ^3.0.0

View File

@ -0,0 +1,56 @@
@TestOn('vm')
import 'dart:io';
import 'package:analyzer/dart/ast/ast.dart';
import 'package:analyzer/dart/analysis/utilities.dart';
import 'package:test/test.dart';
void main() {
test('drift does not import legacy JS interop files', () {
// The old web APIs can't be used in dart2wasm, so we shouldn't use them in
// web-specific drift code.
// Legacy APIs (involving `WebDatabase`) are excempt from this.
const allowedLegacyCode = [
'lib/web/worker.dart', // Wasm uses a different worker
'lib/src/web/channel.dart',
'lib/src/web/storage.dart',
'lib/src/web/sql_js.dart',
];
final failures = <(String, String)>[];
void check(FileSystemEntity e) {
switch (e) {
case File():
if (allowedLegacyCode.contains(e.path)) return;
final text = e.readAsStringSync();
final parsed = parseString(content: text).unit;
for (final directive in parsed.directives) {
if (directive is ImportDirective) {
final uri = directive.uri.stringValue!;
if (uri.contains('package:js') ||
uri == 'dart:js' ||
uri == 'dart:js_util' ||
uri == 'dart:html' ||
uri == 'dart:indexeddb') {
failures.add((e.path, directive.toString()));
}
}
}
case Directory():
for (final entry in e.listSync()) {
check(entry);
}
}
}
final root = Directory('lib/');
check(root);
expect(failures, isEmpty,
reason: 'Drift should not import legacy JS code.');
});
}

View File

@ -109,7 +109,7 @@ class _GeneratesSqlMatcher extends Matcher {
final argsMatchState = <String, Object?>{};
if (_matchVariables != null &&
!_matchVariables!.matches(ctx.boundVariables, argsMatchState)) {
!_matchVariables.matches(ctx.boundVariables, argsMatchState)) {
matchState['vars'] = ctx.boundVariables;
matchState['vars_match'] = argsMatchState;
matches = false;

View File

@ -150,7 +150,7 @@ Future<void> _open(String? implementationName) async {
db.createFunction(
functionName: 'database_host',
function: (args) => 'document',
argumentCount: const AllowedArgumentCount(1),
argumentCount: const AllowedArgumentCount(0),
);
},
);

View File

@ -6,7 +6,7 @@ void main() {
db.createFunction(
functionName: 'database_host',
function: (args) => 'worker',
argumentCount: const AllowedArgumentCount(1),
argumentCount: const AllowedArgumentCount(0),
);
});
}