diff --git a/extras/encryption/lib/encrypted_moor.dart b/extras/encryption/lib/encrypted_moor.dart index e697065d..da73fefb 100644 --- a/extras/encryption/lib/encrypted_moor.dart +++ b/extras/encryption/lib/encrypted_moor.dart @@ -37,6 +37,7 @@ class _SqfliteDelegate extends DatabaseDelegate with _SqfliteExecutor { }); DbVersionDelegate? _delegate; + @override DbVersionDelegate get versionDelegate { return _delegate ??= _SqfliteVersionDelegate(db); @@ -219,4 +220,11 @@ class EncryptedExecutor extends DelegatedDatabase { final sqfliteDelegate = delegate as _SqfliteDelegate; return sqfliteDelegate.isOpen ? sqfliteDelegate.db : null; } + + @override + // We're not really required to be sequential since sqflite has an internal + // lock to bring statements into a sequential order. + // Setting isSequential here helps with moor cancellations in stream queries + // though. + bool get isSequential => true; } diff --git a/moor/CHANGELOG.md b/moor/CHANGELOG.md index 56c9d2c3..472f10c1 100644 --- a/moor/CHANGELOG.md +++ b/moor/CHANGELOG.md @@ -3,6 +3,9 @@ - Support custom, existing classes for rows! See the `@UseRowClass` annotation for details. - Add `CASE WHEN` expressions with the `caseMatch` method on `Expression` +- On supported platforms, cancel pending stream selects when the stream is disposed + - `moor_flutter` is supported + - `moor/ffi` is supported when used on a background isolate ## 4.2.1 diff --git a/moor/example/stream_cancellation_test.dart b/moor/example/stream_cancellation_test.dart new file mode 100644 index 00000000..2a6f5118 --- /dev/null +++ b/moor/example/stream_cancellation_test.dart @@ -0,0 +1,35 @@ +import 'package:moor/ffi.dart'; +import 'package:moor/isolate.dart'; +import 'package:moor/moor.dart'; +import 'package:test/test.dart'; + +DatabaseConnection createConnection() => + DatabaseConnection.fromExecutor(VmDatabase.memory(logStatements: true)); + +class EmptyDb extends GeneratedDatabase { + EmptyDb.connect(DatabaseConnection c) : super.connect(c); + @override + final List allTables = const []; + @override + final int schemaVersion = 1; +} + +void main() async { + final isolate = await MoorIsolate.spawn(createConnection); + final db = EmptyDb.connect(await isolate.connect(isolateDebugLog: true)); + + var i = 0; + String slowQuery() => ''' + with recursive slow(x) as (values(1) union all select x+1 from slow where x < 1000000) + select ${i++} from slow; + '''; // ^ to get different `StreamKey`s + + await db.doWhenOpened((e) {}); + + final subscriptions = List.generate( + 4, (_) => db.customSelect(slowQuery()).watch().listen(null)); + await pumpEventQueue(); + await Future.wait(subscriptions.map((e) => e.cancel())); + + await db.customSelect('select 1').getSingle(); +} diff --git a/moor/lib/src/runtime/cancellation_zone.dart b/moor/lib/src/runtime/cancellation_zone.dart new file mode 100644 index 00000000..37172df9 --- /dev/null +++ b/moor/lib/src/runtime/cancellation_zone.dart @@ -0,0 +1,98 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +const _key = #moor.runtime.cancellation; + +/// Runs an asynchronous operation with support for cancellations. +/// +/// The [CancellationToken] can be used to cancel the operation and to get the +/// eventual result. +CancellationToken runCancellable( + Future Function() operation, +) { + final token = CancellationToken(); + runZonedGuarded( + () => operation().then(token._resultCompleter.complete), + token._resultCompleter.completeError, + zoneValues: {_key: token}, + ); + + return token; +} + +/// A token that can be used to cancel an asynchronous operation running in a +/// child zone. +@internal +class CancellationToken { + final Completer _resultCompleter = Completer(); + final List _cancellationCallbacks = []; + bool _cancellationRequested = false; + + /// Loads the result for the cancellable operation. + /// + /// When a cancellation has been requested and was honored, the future will + /// complete with a [CancellationException]. + Future get result => _resultCompleter.future; + + /// Requests the inner asynchronous operation to be cancelled. + void cancel() { + if (_cancellationRequested) return; + + for (final callback in _cancellationCallbacks) { + callback(); + } + _cancellationRequested = true; + } +} + +/// Extensions that can be used on cancellable operations if they return a non- +/// nullable value. +extension NonNullableCancellationExtension + on CancellationToken { + /// Wait for the result, or return `null` if the operation was cancelled. + /// + /// To avoid situations where `null` could be a valid result from an async + /// operation, this getter is only available on non-nullable operations. This + /// avoids ambiguity. + /// + /// The future will still complete with an error if anything but a + /// [CancellationException] is thrown in [result]. + Future get resultOrNullIfCancelled async { + try { + return await result; + } on CancellationException { + return null; + } + } +} + +/// Thrown inside a cancellation zone when it has been cancelled. +@internal +class CancellationException implements Exception { + /// Default const constructor + const CancellationException(); + + @override + String toString() { + return 'Operation was cancelled'; + } +} + +/// Checks whether the active zone is a cancellation zone that has been +/// cancelled. If it is, a [CancellationException] will be thrown. +void checkIfCancelled() { + final token = Zone.current[_key]; + if (token is CancellationToken && token._cancellationRequested) { + throw const CancellationException(); + } +} + +/// Requests the [callback] to be invoked when the enclosing asynchronous +/// operation is cancelled. +void doOnCancellation(void Function() callback) { + final token = Zone.current[_key]; + if (token is CancellationToken) { + token._cancellationCallbacks.add(callback); + } +} diff --git a/moor/lib/src/runtime/executor/helpers/engines.dart b/moor/lib/src/runtime/executor/helpers/engines.dart index e2afef4f..0cd47eb8 100644 --- a/moor/lib/src/runtime/executor/helpers/engines.dart +++ b/moor/lib/src/runtime/executor/helpers/engines.dart @@ -4,25 +4,31 @@ import 'package:moor/moor.dart'; import 'package:moor/src/utils/synchronized.dart'; import 'package:pedantic/pedantic.dart'; +import '../../cancellation_zone.dart'; import 'delegates.dart'; mixin _ExecutorWithQueryDelegate on QueryExecutor { + final Lock _lock = Lock(); + QueryDelegate get impl; bool get isSequential => false; + bool get logStatements => false; - final Lock _lock = Lock(); /// Used to provide better error messages when calling operations without /// calling [ensureOpen] before. bool _ensureOpenCalled = false; - Future _synchronized(FutureOr Function() action) async { + Future _synchronized(Future Function() action) { if (isSequential) { - return await _lock.synchronized(action); + return _lock.synchronized(() { + checkIfCancelled(); + return action(); + }); } else { // support multiple operations in parallel, so just run right away - return await action(); + return action(); } } diff --git a/moor/lib/src/runtime/executor/stream_queries.dart b/moor/lib/src/runtime/executor/stream_queries.dart index 415b63af..9119abbf 100644 --- a/moor/lib/src/runtime/executor/stream_queries.dart +++ b/moor/lib/src/runtime/executor/stream_queries.dart @@ -7,6 +7,8 @@ import 'package:moor/moor.dart'; import 'package:moor/src/utils/start_with_value_transformer.dart'; import 'package:pedantic/pedantic.dart'; +import '../cancellation_zone.dart'; + const _listEquality = ListEquality(); // This is an internal moor library that's never exported to users. @@ -125,7 +127,8 @@ class StreamQueryStore { final key = stream._fetcher.key; _keysPendingRemoval.add(key); - final completer = Completer(); + // sync because it's only triggered after the timer + final completer = Completer.sync(); _pendingTimers.add(completer); // Hey there! If you're sent here because your Flutter tests fail, please @@ -192,6 +195,7 @@ class QueryStream { StreamSubscription? _tablesChangedSubscription; List>? _lastData; + final List _runningOperations = []; Stream>> get stream { return _controller.stream.transform(StartWithValueTransformer(_cachedData)); @@ -236,14 +240,21 @@ class QueryStream { // case _lastData = null; _tablesChangedSubscription = null; + + for (final op in _runningOperations) { + op.cancel(); + } }); } Future fetchAndEmitData() async { - List> data; + final operation = runCancellable(_fetcher.fetchData); + _runningOperations.add(operation); try { - data = await _fetcher.fetchData(); + final data = await operation.resultOrNullIfCancelled; + if (data == null) return; + _lastData = data; if (!_controller.isClosed) { _controller.add(data); @@ -252,6 +263,8 @@ class QueryStream { if (!_controller.isClosed) { _controller.addError(e, s); } + } finally { + _runningOperations.remove(operation); } } diff --git a/moor/lib/src/runtime/remote/client_impl.dart b/moor/lib/src/runtime/remote/client_impl.dart index 577191c8..954d5f00 100644 --- a/moor/lib/src/runtime/remote/client_impl.dart +++ b/moor/lib/src/runtime/remote/client_impl.dart @@ -6,6 +6,7 @@ import 'package:moor/src/runtime/executor/stream_queries.dart'; import 'package:moor/src/runtime/types/sql_types.dart'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'communication.dart'; import 'protocol.dart'; @@ -58,8 +59,20 @@ abstract class _BaseExecutor extends QueryExecutor { Future _runRequest( StatementMethod method, String sql, List? args) { - return client._channel - .request(ExecuteQuery(method, sql, args ?? const [], _executorId)); + // fast path: If the operation has already been cancelled, don't bother + // sending a request in the first place + checkIfCancelled(); + + final id = client._channel.newRequestId(); + // otherwise, send the request now and cancel it later, if that's desired + doOnCancellation(() { + client._channel.request(RequestCancellation(id)); + }); + + return client._channel.request( + ExecuteQuery(method, sql, args ?? const [], _executorId), + requestId: id, + ); } @override @@ -101,6 +114,7 @@ class _RemoteQueryExecutor extends _BaseExecutor { : super(client, executorId); Completer? _setSchemaVersion; + Future? _serverIsOpen; @override TransactionExecutor beginTransaction() { @@ -114,7 +128,8 @@ class _RemoteQueryExecutor extends _BaseExecutor { await _setSchemaVersion!.future; _setSchemaVersion = null; } - return client._channel + + return _serverIsOpen ??= client._channel .request(EnsureOpen(user.schemaVersion, _executorId)); } diff --git a/moor/lib/src/runtime/remote/communication.dart b/moor/lib/src/runtime/remote/communication.dart index 0f34b97d..8f03d6a5 100644 --- a/moor/lib/src/runtime/remote/communication.dart +++ b/moor/lib/src/runtime/remote/communication.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'protocol.dart'; /// Wrapper around a two-way communication channel to support requests and @@ -40,6 +41,9 @@ class MoorCommunication { /// A stream of requests coming from the other peer. Stream get incomingRequests => _incomingRequests.stream; + /// Returns a new request id to be used for the next request. + int newRequestId() => _currentRequestId++; + /// Closes the connection to the server. void close() { if (isClosed) return; @@ -77,13 +81,19 @@ class MoorCommunication { _pendingRequests.remove(msg.requestId); } else if (msg is Request) { _incomingRequests.add(msg); + } else if (msg is CancelledResponse) { + final completer = _pendingRequests[msg.requestId]; + completer?.completeError(const CancellationException()); } } /// Sends a request and waits for the peer to reply with a value that is /// assumed to be of type [T]. - Future request(Object? request) { - final id = _currentRequestId++; + /// + /// The [requestId] parameter can be used to set a fixed request id for the + /// request. + Future request(Object? request, {int? requestId}) { + final id = requestId ?? newRequestId(); final completer = Completer(); _pendingRequests[id] = completer; @@ -113,7 +123,11 @@ class MoorCommunication { // sending a message while closed will throw, so don't even try. if (isClosed) return; - _send(ErrorResponse(request.id, error.toString(), trace.toString())); + if (error is CancellationException) { + _send(CancelledResponse(request.id)); + } else { + _send(ErrorResponse(request.id, error.toString(), trace.toString())); + } } /// Utility that listens to [incomingRequests] and invokes the [handler] on diff --git a/moor/lib/src/runtime/remote/protocol.dart b/moor/lib/src/runtime/remote/protocol.dart index baadbf02..ad13b9ee 100644 --- a/moor/lib/src/runtime/remote/protocol.dart +++ b/moor/lib/src/runtime/remote/protocol.dart @@ -9,6 +9,7 @@ class MoorProtocol { static const _tag_Request = 0; static const _tag_Response_success = 1; static const _tag_Response_error = 2; + static const _tag_Response_cancelled = 3; static const _tag_NoArgsRequest_getTypeSystem = 0; static const _tag_NoArgsRequest_terminateAll = 1; @@ -22,6 +23,7 @@ class MoorProtocol { static const _tag_DefaultSqlTypeSystem = 9; static const _tag_DirectValue = 10; static const _tag_SelectResult = 11; + static const _tag_RequestCancellation = 12; Object? serialize(Message message) { if (message is Request) { @@ -43,6 +45,8 @@ class MoorProtocol { message.requestId, encodePayload(message.response), ]; + } else if (message is CancelledResponse) { + return [_tag_Response_cancelled, message.requestId]; } } @@ -59,6 +63,8 @@ class MoorProtocol { return ErrorResponse(id, message[2] as Object, message[3] as String); case _tag_Response_success: return SuccessResponse(id, decodePayload(message[2])); + case _tag_Response_cancelled: + return CancelledResponse(id); } throw const FormatException('Unknown tag'); @@ -136,6 +142,8 @@ class MoorProtocol { } return result; } + } else if (payload is RequestCancellation) { + return [_tag_RequestCancellation, payload.originalRequestId]; } else { return [_tag_DirectValue, payload]; } @@ -223,6 +231,8 @@ class MoorProtocol { }); } return SelectResult(result); + case _tag_RequestCancellation: + return RequestCancellation(readInt(1)); case _tag_DirectValue: return encoded[1]; } @@ -286,6 +296,17 @@ class ErrorResponse extends Message { } } +class CancelledResponse extends Message { + final int requestId; + + CancelledResponse(this.requestId); + + @override + String toString() { + return 'Previous request $requestId was cancelled'; + } +} + /// A request without further parameters enum NoArgsRequest { /// Sent from the client to the server. The server will reply with the @@ -323,6 +344,22 @@ class ExecuteQuery { } } +/// Requests a previous request to be cancelled. +/// +/// Whether this is supported or not depends on the server and its internal +/// state. This request will be immediately be acknowledged with a null +/// response, which does not indicate whether a cancellation actually happened. +class RequestCancellation { + final int originalRequestId; + + RequestCancellation(this.originalRequestId); + + @override + String toString() { + return 'Cancel previous request $originalRequestId'; + } +} + /// Sent from the client to run [BatchedStatements] class ExecuteBatchedStatement { final BatchedStatements stmts; diff --git a/moor/lib/src/runtime/remote/server_impl.dart b/moor/lib/src/runtime/remote/server_impl.dart index 2ffc7141..5b21413f 100644 --- a/moor/lib/src/runtime/remote/server_impl.dart +++ b/moor/lib/src/runtime/remote/server_impl.dart @@ -4,6 +4,7 @@ import 'package:moor/moor.dart'; import 'package:moor/remote.dart'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'communication.dart'; import 'protocol.dart'; @@ -19,6 +20,8 @@ class ServerImplementation implements MoorServer { final Map _managedExecutors = {}; int _currentExecutorId = 0; + final Map _cancellableOperations = {}; + /// when a transaction is active, all queries that don't operate on another /// query executor have to wait! /// @@ -88,8 +91,11 @@ class ServerImplementation implements MoorServer { } else if (payload is EnsureOpen) { return _handleEnsureOpen(payload); } else if (payload is ExecuteQuery) { - return _runQuery( - payload.method, payload.sql, payload.args, payload.executorId); + final token = runCancellable(() => _runQuery( + payload.method, payload.sql, payload.args, payload.executorId)); + _cancellableOperations[request.id] = token; + return token.result + .whenComplete(() => _cancellableOperations.remove(request.id)); } else if (payload is ExecuteBatchedStatement) { return _runBatched(payload.stmts, payload.executorId); } else if (payload is NotifyTablesUpdated) { @@ -98,6 +104,9 @@ class ServerImplementation implements MoorServer { } } else if (payload is RunTransactionAction) { return _transactionControl(payload.control, payload.executorId); + } else if (payload is RequestCancellation) { + _cancellableOperations[payload.originalRequestId]?.cancel(); + return null; } } @@ -112,6 +121,10 @@ class ServerImplementation implements MoorServer { List args, int? transactionId) async { final executor = await _loadExecutor(transactionId); + // Give cancellations more time to come in + await Future.delayed(Duration.zero); + checkIfCancelled(); + switch (method) { case StatementMethod.custom: return executor.runCustom(sql, args); diff --git a/moor/test/integration_tests/cancellation_test.dart b/moor/test/integration_tests/cancellation_test.dart new file mode 100644 index 00000000..d27688d5 --- /dev/null +++ b/moor/test/integration_tests/cancellation_test.dart @@ -0,0 +1,118 @@ +@Tags(['integration']) +import 'package:moor/ffi.dart'; +import 'package:moor/isolate.dart'; +import 'package:moor/moor.dart'; +import 'package:rxdart/rxdart.dart'; +import 'package:test/test.dart'; + +DatabaseConnection createConnection() { + var counter = 0; + final loggedValues = []; + + return DatabaseConnection.fromExecutor( + VmDatabase.memory( + setup: (rawDb) { + rawDb.createFunction( + functionName: 'increment_counter', + function: (args) => counter++, + ); + rawDb.createFunction( + functionName: 'get_counter', + function: (args) => counter, + ); + + rawDb.createFunction( + functionName: 'log_value', + function: (args) { + final value = args.single as int; + loggedValues.add(value); + return value; + }, + ); + rawDb.createFunction( + functionName: 'get_values', + function: (args) => loggedValues.join(','), + ); + }, + ), + ); +} + +class EmptyDb extends GeneratedDatabase { + EmptyDb.connect(DatabaseConnection c) : super.connect(c); + @override + final List allTables = const []; + @override + final int schemaVersion = 1; +} + +void main() { + moorRuntimeOptions.dontWarnAboutMultipleDatabases = true; + + Future runTest(EmptyDb db) async { + String slowQuery(int i) => ''' + with recursive slow(x) as (values(increment_counter()) union all select x+1 from slow where x < 1000000) + select $i from slow; + '''; // ^ to get different `StreamKey`s + + // Avoid delays caused by opening the database to interfere with the + // cancellation mechanism (we need to react to cancellations quicker if the + // db is already open, which is what we want to test) + await db.doWhenOpened((e) {}); + + final subscriptions = List.generate( + 4, (i) => db.customSelect(slowQuery(i)).watch().listen(null)); + await pumpEventQueue(); + await Future.wait(subscriptions.map((e) => e.cancel())); + + final amountOfSlowQueries = await db + .customSelect('select get_counter() r') + .map((row) => row.read('r')) + .getSingle(); + + // One slow query is ok if the cancellation wasn't quick enough, we just + // shouldn't run all 4 of them. + expect(amountOfSlowQueries, isNot(4)); + } + + group('stream queries are aborted on cancellations', () { + test('on a background isolate', () async { + final isolate = await MoorIsolate.spawn(createConnection); + addTearDown(isolate.shutdownAll); + + final db = EmptyDb.connect(await isolate.connect()); + await runTest(db); + }); + }); + + test('together with switchMap', () async { + String slowQuery(int i) => ''' + with recursive slow(x) as (values(log_value($i)) union all select x+1 from slow where x < 1000000) + select $i from slow; + '''; + + final isolate = await MoorIsolate.spawn(createConnection); + addTearDown(isolate.shutdownAll); + + final db = EmptyDb.connect(await isolate.connect()); + await db.customSelect('select 1').getSingle(); + + final filter = BehaviorSubject(); + addTearDown(filter.close); + filter + .switchMap((value) => db.customSelect(slowQuery(value)).watch()) + .listen(null); + + for (var i = 0; i < 4; i++) { + filter.add(i); + await pumpEventQueue(); + } + + final values = await db + .customSelect('select get_values() r') + .map((row) => row.read('r')) + .getSingle(); + + expect(values, '0,3'); + }); +} diff --git a/moor/test/isolate_test.dart b/moor/test/isolate_test.dart index ffbd56a2..e2a3856d 100644 --- a/moor/test/isolate_test.dart +++ b/moor/test/isolate_test.dart @@ -1,4 +1,3 @@ -//@dart=2.9 @TestOn('vm') import 'dart:async'; import 'dart:isolate'; @@ -83,8 +82,8 @@ void main() { void _runTests( FutureOr Function() spawner, bool terminateIsolate) { - MoorIsolate isolate; - TodoDb database; + late MoorIsolate isolate; + late TodoDb database; setUp(() async { isolate = await spawner(); diff --git a/moor_flutter/CHANGELOG.md b/moor_flutter/CHANGELOG.md index 97545cc1..0da07d24 100644 --- a/moor_flutter/CHANGELOG.md +++ b/moor_flutter/CHANGELOG.md @@ -1,3 +1,7 @@ +## 4.1.0-dev + +- Support query cancellations introduced in moor 4.3.0 + ## 4.0.0 - Support moor version 4 diff --git a/moor_flutter/lib/moor_flutter.dart b/moor_flutter/lib/moor_flutter.dart index 3a971d69..9082b50d 100644 --- a/moor_flutter/lib/moor_flutter.dart +++ b/moor_flutter/lib/moor_flutter.dart @@ -206,4 +206,11 @@ class FlutterQueryExecutor extends DelegatedDatabase { final sqfliteDelegate = delegate as _SqfliteDelegate; return sqfliteDelegate.isOpen ? sqfliteDelegate.db : null; } + + @override + // We're not really required to be sequential since sqflite has an internal + // lock to bring statements into a sequential order. + // Setting isSequential here helps with moor cancellations in stream queries + // though. + bool get isSequential => true; } diff --git a/moor_flutter/pubspec.yaml b/moor_flutter/pubspec.yaml index 79f1c73a..12c43ab6 100644 --- a/moor_flutter/pubspec.yaml +++ b/moor_flutter/pubspec.yaml @@ -1,6 +1,6 @@ name: moor_flutter description: Flutter implementation of moor, a safe and reactive persistence library for Dart applications -version: 4.0.0 +version: 4.1.0-dev repository: https://github.com/simolus3/moor homepage: https://moor.simonbinder.eu/ issue_tracker: https://github.com/simolus3/moor/issues