From 357764a1b720794e99218d3a9c3daae85a2292ab Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 31 Mar 2021 21:49:34 +0200 Subject: [PATCH 1/8] Initial support for query cancellations --- moor/lib/src/runtime/cancellation_zone.dart | 67 +++++++++++++++++++ .../src/runtime/executor/helpers/engines.dart | 14 ++-- .../src/runtime/executor/stream_queries.dart | 16 ++++- 3 files changed, 91 insertions(+), 6 deletions(-) create mode 100644 moor/lib/src/runtime/cancellation_zone.dart diff --git a/moor/lib/src/runtime/cancellation_zone.dart b/moor/lib/src/runtime/cancellation_zone.dart new file mode 100644 index 00000000..6cce8378 --- /dev/null +++ b/moor/lib/src/runtime/cancellation_zone.dart @@ -0,0 +1,67 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +const _key = #moor.runtime.cancellation; + +/// Runs an asynchronous operation with support for cancellations. +/// +/// The [CancellationToken] can be used to cancel the operation and to get the +/// eventual result. +CancellationToken runCancellable( + Future Function() operation, +) { + final token = CancellationToken(); + runZonedGuarded( + () => operation().then(token._resultCompleter.complete), + (error, trace) { + final completer = token._resultCompleter; + + if (error is CancellationException) { + completer.complete(null); + } else { + completer.completeError(error, trace); + } + }, + zoneValues: {_key: token}, + ); + + return token; +} + +/// A token that can be used to cancel an asynchronous operation running in a +/// child zone. +@internal +class CancellationToken { + final Completer _resultCompleter = Completer.sync(); + bool _cancellationRequested = false; + + /// Loads the result for the cancellable operation. + /// + /// When the operation is cancelled, the future might complete with `null`. + Future get result => _resultCompleter.future; + + /// Requests the inner asynchronous operation to be cancelled. + void cancel() => _cancellationRequested = true; +} + +/// Thrown inside a cancellation zone when it has been cancelled. +@internal +class CancellationException implements Exception { + /// Default const constructor + const CancellationException(); + + @override + String toString() { + return 'Operation was cancelled'; + } +} + +/// Checks whether the active zone is a cancellation zone that has been +/// cancelled. If it is, a [CancellationException] will be thrown. +void checkIfCancelled() { + final token = Zone.current[_key]; + if (token is CancellationToken && token._cancellationRequested) { + throw const CancellationException(); + } +} diff --git a/moor/lib/src/runtime/executor/helpers/engines.dart b/moor/lib/src/runtime/executor/helpers/engines.dart index e2afef4f..0cd47eb8 100644 --- a/moor/lib/src/runtime/executor/helpers/engines.dart +++ b/moor/lib/src/runtime/executor/helpers/engines.dart @@ -4,25 +4,31 @@ import 'package:moor/moor.dart'; import 'package:moor/src/utils/synchronized.dart'; import 'package:pedantic/pedantic.dart'; +import '../../cancellation_zone.dart'; import 'delegates.dart'; mixin _ExecutorWithQueryDelegate on QueryExecutor { + final Lock _lock = Lock(); + QueryDelegate get impl; bool get isSequential => false; + bool get logStatements => false; - final Lock _lock = Lock(); /// Used to provide better error messages when calling operations without /// calling [ensureOpen] before. bool _ensureOpenCalled = false; - Future _synchronized(FutureOr Function() action) async { + Future _synchronized(Future Function() action) { if (isSequential) { - return await _lock.synchronized(action); + return _lock.synchronized(() { + checkIfCancelled(); + return action(); + }); } else { // support multiple operations in parallel, so just run right away - return await action(); + return action(); } } diff --git a/moor/lib/src/runtime/executor/stream_queries.dart b/moor/lib/src/runtime/executor/stream_queries.dart index 415b63af..60d30b7f 100644 --- a/moor/lib/src/runtime/executor/stream_queries.dart +++ b/moor/lib/src/runtime/executor/stream_queries.dart @@ -7,6 +7,8 @@ import 'package:moor/moor.dart'; import 'package:moor/src/utils/start_with_value_transformer.dart'; import 'package:pedantic/pedantic.dart'; +import '../cancellation_zone.dart'; + const _listEquality = ListEquality(); // This is an internal moor library that's never exported to users. @@ -192,6 +194,7 @@ class QueryStream { StreamSubscription? _tablesChangedSubscription; List>? _lastData; + final List _runningOperations = []; Stream>> get stream { return _controller.stream.transform(StartWithValueTransformer(_cachedData)); @@ -236,14 +239,21 @@ class QueryStream { // case _lastData = null; _tablesChangedSubscription = null; + + for (final op in _runningOperations) { + op.cancel(); + } }); } Future fetchAndEmitData() async { - List> data; + final operation = runCancellable(_fetcher.fetchData); + _runningOperations.add(operation); try { - data = await _fetcher.fetchData(); + final data = await operation.result; + if (data == null) return; + _lastData = data; if (!_controller.isClosed) { _controller.add(data); @@ -252,6 +262,8 @@ class QueryStream { if (!_controller.isClosed) { _controller.addError(e, s); } + } finally { + _runningOperations.remove(operation); } } From 04a8bb5694dbee5ec6f159de07f7df15a81dd182 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 31 Mar 2021 22:55:35 +0200 Subject: [PATCH 2/8] Support cancellations across isolates --- moor/CHANGELOG.md | 5 ++ moor/example/stream_cancellation_test.dart | 35 +++++++++ moor/lib/src/runtime/cancellation_zone.dart | 19 ++++- .../src/runtime/executor/stream_queries.dart | 3 +- moor/lib/src/runtime/remote/client_impl.dart | 17 ++++- .../lib/src/runtime/remote/communication.dart | 20 +++++- moor/lib/src/runtime/remote/protocol.dart | 32 +++++++++ moor/lib/src/runtime/remote/server_impl.dart | 20 +++++- .../integration_tests/cancellation_test.dart | 71 +++++++++++++++++++ 9 files changed, 213 insertions(+), 9 deletions(-) create mode 100644 moor/example/stream_cancellation_test.dart create mode 100644 moor/test/integration_tests/cancellation_test.dart diff --git a/moor/CHANGELOG.md b/moor/CHANGELOG.md index 95dc1858..c7d46afb 100644 --- a/moor/CHANGELOG.md +++ b/moor/CHANGELOG.md @@ -1,3 +1,8 @@ +## 4.3.0-dev + +- On supported platforms, cancel ongoing stream selects when the stream is disposed + - At the moment, `moor/ffi` on a background isolate supports this feature + ## 4.2.1 - Deprecate `readBool`, `readString`, `readInt`, `readDouble`, `readDateTime` diff --git a/moor/example/stream_cancellation_test.dart b/moor/example/stream_cancellation_test.dart new file mode 100644 index 00000000..2a6f5118 --- /dev/null +++ b/moor/example/stream_cancellation_test.dart @@ -0,0 +1,35 @@ +import 'package:moor/ffi.dart'; +import 'package:moor/isolate.dart'; +import 'package:moor/moor.dart'; +import 'package:test/test.dart'; + +DatabaseConnection createConnection() => + DatabaseConnection.fromExecutor(VmDatabase.memory(logStatements: true)); + +class EmptyDb extends GeneratedDatabase { + EmptyDb.connect(DatabaseConnection c) : super.connect(c); + @override + final List allTables = const []; + @override + final int schemaVersion = 1; +} + +void main() async { + final isolate = await MoorIsolate.spawn(createConnection); + final db = EmptyDb.connect(await isolate.connect(isolateDebugLog: true)); + + var i = 0; + String slowQuery() => ''' + with recursive slow(x) as (values(1) union all select x+1 from slow where x < 1000000) + select ${i++} from slow; + '''; // ^ to get different `StreamKey`s + + await db.doWhenOpened((e) {}); + + final subscriptions = List.generate( + 4, (_) => db.customSelect(slowQuery()).watch().listen(null)); + await pumpEventQueue(); + await Future.wait(subscriptions.map((e) => e.cancel())); + + await db.customSelect('select 1').getSingle(); +} diff --git a/moor/lib/src/runtime/cancellation_zone.dart b/moor/lib/src/runtime/cancellation_zone.dart index 6cce8378..b5996d45 100644 --- a/moor/lib/src/runtime/cancellation_zone.dart +++ b/moor/lib/src/runtime/cancellation_zone.dart @@ -34,6 +34,7 @@ CancellationToken runCancellable( @internal class CancellationToken { final Completer _resultCompleter = Completer.sync(); + final List _cancellationCallbacks = []; bool _cancellationRequested = false; /// Loads the result for the cancellable operation. @@ -42,7 +43,14 @@ class CancellationToken { Future get result => _resultCompleter.future; /// Requests the inner asynchronous operation to be cancelled. - void cancel() => _cancellationRequested = true; + void cancel() { + if (_cancellationRequested) return; + + for (final callback in _cancellationCallbacks) { + callback(); + } + _cancellationRequested = true; + } } /// Thrown inside a cancellation zone when it has been cancelled. @@ -65,3 +73,12 @@ void checkIfCancelled() { throw const CancellationException(); } } + +/// Requests the [callback] to be invoked when the enclosing asynchronous +/// operation is cancelled. +void doOnCancellation(void Function() callback) { + final token = Zone.current[_key]; + if (token is CancellationToken) { + token._cancellationCallbacks.add(callback); + } +} diff --git a/moor/lib/src/runtime/executor/stream_queries.dart b/moor/lib/src/runtime/executor/stream_queries.dart index 60d30b7f..de11a318 100644 --- a/moor/lib/src/runtime/executor/stream_queries.dart +++ b/moor/lib/src/runtime/executor/stream_queries.dart @@ -127,7 +127,8 @@ class StreamQueryStore { final key = stream._fetcher.key; _keysPendingRemoval.add(key); - final completer = Completer(); + // sync because it's only triggered after the timer + final completer = Completer.sync(); _pendingTimers.add(completer); // Hey there! If you're sent here because your Flutter tests fail, please diff --git a/moor/lib/src/runtime/remote/client_impl.dart b/moor/lib/src/runtime/remote/client_impl.dart index 577191c8..49a0e90f 100644 --- a/moor/lib/src/runtime/remote/client_impl.dart +++ b/moor/lib/src/runtime/remote/client_impl.dart @@ -6,6 +6,7 @@ import 'package:moor/src/runtime/executor/stream_queries.dart'; import 'package:moor/src/runtime/types/sql_types.dart'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'communication.dart'; import 'protocol.dart'; @@ -58,8 +59,20 @@ abstract class _BaseExecutor extends QueryExecutor { Future _runRequest( StatementMethod method, String sql, List? args) { - return client._channel - .request(ExecuteQuery(method, sql, args ?? const [], _executorId)); + // fast path: If the operation has already been cancelled, don't bother + // sending a request in the first place + checkIfCancelled(); + + final id = client._channel.newRequestId(); + // otherwise, send the request now and cancel it later, if that's desired + doOnCancellation(() { + client._channel.request(RequestCancellation(id)); + }); + + return client._channel.request( + ExecuteQuery(method, sql, args ?? const [], _executorId), + requestId: id, + ); } @override diff --git a/moor/lib/src/runtime/remote/communication.dart b/moor/lib/src/runtime/remote/communication.dart index 0f34b97d..8f03d6a5 100644 --- a/moor/lib/src/runtime/remote/communication.dart +++ b/moor/lib/src/runtime/remote/communication.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'protocol.dart'; /// Wrapper around a two-way communication channel to support requests and @@ -40,6 +41,9 @@ class MoorCommunication { /// A stream of requests coming from the other peer. Stream get incomingRequests => _incomingRequests.stream; + /// Returns a new request id to be used for the next request. + int newRequestId() => _currentRequestId++; + /// Closes the connection to the server. void close() { if (isClosed) return; @@ -77,13 +81,19 @@ class MoorCommunication { _pendingRequests.remove(msg.requestId); } else if (msg is Request) { _incomingRequests.add(msg); + } else if (msg is CancelledResponse) { + final completer = _pendingRequests[msg.requestId]; + completer?.completeError(const CancellationException()); } } /// Sends a request and waits for the peer to reply with a value that is /// assumed to be of type [T]. - Future request(Object? request) { - final id = _currentRequestId++; + /// + /// The [requestId] parameter can be used to set a fixed request id for the + /// request. + Future request(Object? request, {int? requestId}) { + final id = requestId ?? newRequestId(); final completer = Completer(); _pendingRequests[id] = completer; @@ -113,7 +123,11 @@ class MoorCommunication { // sending a message while closed will throw, so don't even try. if (isClosed) return; - _send(ErrorResponse(request.id, error.toString(), trace.toString())); + if (error is CancellationException) { + _send(CancelledResponse(request.id)); + } else { + _send(ErrorResponse(request.id, error.toString(), trace.toString())); + } } /// Utility that listens to [incomingRequests] and invokes the [handler] on diff --git a/moor/lib/src/runtime/remote/protocol.dart b/moor/lib/src/runtime/remote/protocol.dart index baadbf02..dbca8e2c 100644 --- a/moor/lib/src/runtime/remote/protocol.dart +++ b/moor/lib/src/runtime/remote/protocol.dart @@ -9,6 +9,7 @@ class MoorProtocol { static const _tag_Request = 0; static const _tag_Response_success = 1; static const _tag_Response_error = 2; + static const _tag_Response_cancelled = 3; static const _tag_NoArgsRequest_getTypeSystem = 0; static const _tag_NoArgsRequest_terminateAll = 1; @@ -22,6 +23,7 @@ class MoorProtocol { static const _tag_DefaultSqlTypeSystem = 9; static const _tag_DirectValue = 10; static const _tag_SelectResult = 11; + static const _tag_RequestCancellation = 12; Object? serialize(Message message) { if (message is Request) { @@ -43,6 +45,8 @@ class MoorProtocol { message.requestId, encodePayload(message.response), ]; + } else if (message is CancelledResponse) { + return [_tag_Response_cancelled, message.requestId]; } } @@ -59,6 +63,8 @@ class MoorProtocol { return ErrorResponse(id, message[2] as Object, message[3] as String); case _tag_Response_success: return SuccessResponse(id, decodePayload(message[2])); + case _tag_Response_cancelled: + return CancelledResponse(id); } throw const FormatException('Unknown tag'); @@ -136,6 +142,8 @@ class MoorProtocol { } return result; } + } else if (payload is RequestCancellation) { + return [_tag_RequestCancellation, payload.originalRequestId]; } else { return [_tag_DirectValue, payload]; } @@ -223,6 +231,8 @@ class MoorProtocol { }); } return SelectResult(result); + case _tag_RequestCancellation: + return RequestCancellation(readInt(1)); case _tag_DirectValue: return encoded[1]; } @@ -286,6 +296,12 @@ class ErrorResponse extends Message { } } +class CancelledResponse extends Message { + final int requestId; + + CancelledResponse(this.requestId); +} + /// A request without further parameters enum NoArgsRequest { /// Sent from the client to the server. The server will reply with the @@ -323,6 +339,22 @@ class ExecuteQuery { } } +/// Requests a previous request to be cancelled. +/// +/// Whether this is supported or not depends on the server and its internal +/// state. This request will be immediately be acknowledged with a null +/// response, which does not indicate whether a cancellation actually happened. +class RequestCancellation { + final int originalRequestId; + + RequestCancellation(this.originalRequestId); + + @override + String toString() { + return 'Cancel previous request $originalRequestId'; + } +} + /// Sent from the client to run [BatchedStatements] class ExecuteBatchedStatement { final BatchedStatements stmts; diff --git a/moor/lib/src/runtime/remote/server_impl.dart b/moor/lib/src/runtime/remote/server_impl.dart index 2ffc7141..78d5195e 100644 --- a/moor/lib/src/runtime/remote/server_impl.dart +++ b/moor/lib/src/runtime/remote/server_impl.dart @@ -4,6 +4,7 @@ import 'package:moor/moor.dart'; import 'package:moor/remote.dart'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'communication.dart'; import 'protocol.dart'; @@ -19,6 +20,8 @@ class ServerImplementation implements MoorServer { final Map _managedExecutors = {}; int _currentExecutorId = 0; + final Map _cancellableOperations = {}; + /// when a transaction is active, all queries that don't operate on another /// query executor have to wait! /// @@ -88,8 +91,13 @@ class ServerImplementation implements MoorServer { } else if (payload is EnsureOpen) { return _handleEnsureOpen(payload); } else if (payload is ExecuteQuery) { - return _runQuery( - payload.method, payload.sql, payload.args, payload.executorId); + final token = runCancellable(() => _runQuery( + payload.method, payload.sql, payload.args, payload.executorId)); + _cancellableOperations[request.id] = token; + return token.result + // If we get a null response the operation was cancelled + .then((value) => value ?? Future.error(const CancellationException())) + .whenComplete(() => _cancellableOperations.remove(request.id)); } else if (payload is ExecuteBatchedStatement) { return _runBatched(payload.stmts, payload.executorId); } else if (payload is NotifyTablesUpdated) { @@ -98,6 +106,9 @@ class ServerImplementation implements MoorServer { } } else if (payload is RunTransactionAction) { return _transactionControl(payload.control, payload.executorId); + } else if (payload is RequestCancellation) { + _cancellableOperations[payload.originalRequestId]?.cancel(); + return null; } } @@ -110,6 +121,11 @@ class ServerImplementation implements MoorServer { Future _runQuery(StatementMethod method, String sql, List args, int? transactionId) async { + // Wait for one event loop iteration until we run the request. This helps + // with cancellations for synchronous database implementations like moor/ffi + // since we don't have a chance to react to cancellation requests otherwise. + await Future.delayed(Duration.zero); + final executor = await _loadExecutor(transactionId); switch (method) { diff --git a/moor/test/integration_tests/cancellation_test.dart b/moor/test/integration_tests/cancellation_test.dart new file mode 100644 index 00000000..35528987 --- /dev/null +++ b/moor/test/integration_tests/cancellation_test.dart @@ -0,0 +1,71 @@ +@Tags(['integration']) +import 'package:moor/ffi.dart'; +import 'package:moor/isolate.dart'; +import 'package:moor/moor.dart'; +import 'package:test/test.dart'; + +DatabaseConnection createConnection() { + var counter = 0; + + return DatabaseConnection.fromExecutor( + VmDatabase.memory( + setup: (rawDb) { + rawDb.createFunction( + functionName: 'increment_counter', + function: (args) => counter++, + ); + rawDb.createFunction( + functionName: 'get_counter', + function: (args) => counter, + ); + }, + ), + ); +} + +class EmptyDb extends GeneratedDatabase { + EmptyDb.connect(DatabaseConnection c) : super.connect(c); + @override + final List allTables = const []; + @override + final int schemaVersion = 1; +} + +void main() { + late EmptyDb db; + late MoorIsolate isolate; + + setUp(() async { + isolate = await MoorIsolate.spawn(createConnection); + db = EmptyDb.connect(await isolate.connect()); + + // Avoid delays caused by opening the database to interfere with the + // cancellation mechanism (we need to react to cancellations quicker if the + // db is already open, which is what we want to test) + await db.doWhenOpened((e) {}); + }); + + tearDown(() => isolate.shutdownAll()); + + var i = 0; + String slowQuery() => ''' + with recursive slow(x) as (values(increment_counter()) union all select x+1 from slow where x < 1000000) + select ${i++} from slow; + '''; // ^ to get different `StreamKey`s + + test('stream queries are aborted on cancellations', () async { + final subscriptions = List.generate( + 4, (_) => db.customSelect(slowQuery()).watch().listen(null)); + await pumpEventQueue(); + await Future.wait(subscriptions.map((e) => e.cancel())); + + final amountOfSlowQueries = await db + .customSelect('select get_counter() r') + .map((row) => row.read('r')) + .getSingle(); + + // One slow query is ok if the cancellation wasn't quick enough, we just + // shouldn't run all 4 of them. + expect(amountOfSlowQueries, anyOf(0, 1)); + }); +} From edb219ecfece6af744ec0dedd17326b46f5fec45 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 31 Mar 2021 23:09:23 +0200 Subject: [PATCH 3/8] Use Future.delayed in cancellation zones only --- moor/CHANGELOG.md | 2 +- moor/lib/src/runtime/cancellation_zone.dart | 3 +++ moor/lib/src/runtime/executor/helpers/engines.dart | 11 +++++++++-- moor/lib/src/runtime/remote/server_impl.dart | 5 ----- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/moor/CHANGELOG.md b/moor/CHANGELOG.md index c7d46afb..97cd8eea 100644 --- a/moor/CHANGELOG.md +++ b/moor/CHANGELOG.md @@ -1,7 +1,7 @@ ## 4.3.0-dev - On supported platforms, cancel ongoing stream selects when the stream is disposed - - At the moment, `moor/ffi` on a background isolate supports this feature + - At the moment, `moor/ffi` and isolate databases are supported ## 4.2.1 diff --git a/moor/lib/src/runtime/cancellation_zone.dart b/moor/lib/src/runtime/cancellation_zone.dart index b5996d45..11bea0d8 100644 --- a/moor/lib/src/runtime/cancellation_zone.dart +++ b/moor/lib/src/runtime/cancellation_zone.dart @@ -65,6 +65,9 @@ class CancellationException implements Exception { } } +/// Returns true if we're currently in a zone that can be cancelled. +bool get isInCancellationZone => Zone.current[_key] != null; + /// Checks whether the active zone is a cancellation zone that has been /// cancelled. If it is, a [CancellationException] will be thrown. void checkIfCancelled() { diff --git a/moor/lib/src/runtime/executor/helpers/engines.dart b/moor/lib/src/runtime/executor/helpers/engines.dart index 0cd47eb8..3bb5c68d 100644 --- a/moor/lib/src/runtime/executor/helpers/engines.dart +++ b/moor/lib/src/runtime/executor/helpers/engines.dart @@ -22,8 +22,15 @@ mixin _ExecutorWithQueryDelegate on QueryExecutor { Future _synchronized(Future Function() action) { if (isSequential) { - return _lock.synchronized(() { - checkIfCancelled(); + return _lock.synchronized(() async { + if (isInCancellationZone) { + // Popular sequential database implementations (especially moor/ffi) + // are synchronous. Wait for one event loop iteration to give + // cancellations time to come in. + await Future.delayed(Duration.zero); + checkIfCancelled(); + } + return action(); }); } else { diff --git a/moor/lib/src/runtime/remote/server_impl.dart b/moor/lib/src/runtime/remote/server_impl.dart index 78d5195e..c010bf10 100644 --- a/moor/lib/src/runtime/remote/server_impl.dart +++ b/moor/lib/src/runtime/remote/server_impl.dart @@ -121,11 +121,6 @@ class ServerImplementation implements MoorServer { Future _runQuery(StatementMethod method, String sql, List args, int? transactionId) async { - // Wait for one event loop iteration until we run the request. This helps - // with cancellations for synchronous database implementations like moor/ffi - // since we don't have a chance to react to cancellation requests otherwise. - await Future.delayed(Duration.zero); - final executor = await _loadExecutor(transactionId); switch (method) { From 8a9981154247a03a1706cc45b835ad9d27748503 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 31 Mar 2021 23:22:50 +0200 Subject: [PATCH 4/8] Test cancellations on the same isolate --- .../integration_tests/cancellation_test.dart | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/moor/test/integration_tests/cancellation_test.dart b/moor/test/integration_tests/cancellation_test.dart index 35528987..6ced4853 100644 --- a/moor/test/integration_tests/cancellation_test.dart +++ b/moor/test/integration_tests/cancellation_test.dart @@ -32,20 +32,7 @@ class EmptyDb extends GeneratedDatabase { } void main() { - late EmptyDb db; - late MoorIsolate isolate; - - setUp(() async { - isolate = await MoorIsolate.spawn(createConnection); - db = EmptyDb.connect(await isolate.connect()); - - // Avoid delays caused by opening the database to interfere with the - // cancellation mechanism (we need to react to cancellations quicker if the - // db is already open, which is what we want to test) - await db.doWhenOpened((e) {}); - }); - - tearDown(() => isolate.shutdownAll()); + moorRuntimeOptions.dontWarnAboutMultipleDatabases = true; var i = 0; String slowQuery() => ''' @@ -53,10 +40,14 @@ void main() { select ${i++} from slow; '''; // ^ to get different `StreamKey`s - test('stream queries are aborted on cancellations', () async { + Future runTest(EmptyDb db) async { + // Avoid delays caused by opening the database to interfere with the + // cancellation mechanism (we need to react to cancellations quicker if the + // db is already open, which is what we want to test) + await db.doWhenOpened((e) {}); + final subscriptions = List.generate( 4, (_) => db.customSelect(slowQuery()).watch().listen(null)); - await pumpEventQueue(); await Future.wait(subscriptions.map((e) => e.cancel())); final amountOfSlowQueries = await db @@ -67,5 +58,20 @@ void main() { // One slow query is ok if the cancellation wasn't quick enough, we just // shouldn't run all 4 of them. expect(amountOfSlowQueries, anyOf(0, 1)); + } + + group('stream queries are aborted on cancellations', () { + test('on the same isolate', () async { + final db = EmptyDb.connect(createConnection()); + await runTest(db); + }); + + test('on a background isolate', () async { + final isolate = await MoorIsolate.spawn(createConnection); + addTearDown(isolate.shutdownAll); + + final db = EmptyDb.connect(await isolate.connect()); + await runTest(db); + }); }); } From 2fa3b472355b0a14b41fe9fd0c9a798bec57c08a Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 31 Mar 2021 23:30:18 +0200 Subject: [PATCH 5/8] Actually, don't cancel on the same isolate --- moor/CHANGELOG.md | 2 +- moor/lib/src/runtime/cancellation_zone.dart | 3 --- moor/lib/src/runtime/executor/helpers/engines.dart | 11 ++--------- moor/lib/src/runtime/remote/server_impl.dart | 4 ++++ moor/test/integration_tests/cancellation_test.dart | 6 +----- 5 files changed, 8 insertions(+), 18 deletions(-) diff --git a/moor/CHANGELOG.md b/moor/CHANGELOG.md index 97cd8eea..23009ba1 100644 --- a/moor/CHANGELOG.md +++ b/moor/CHANGELOG.md @@ -1,7 +1,7 @@ ## 4.3.0-dev - On supported platforms, cancel ongoing stream selects when the stream is disposed - - At the moment, `moor/ffi` and isolate databases are supported + - At the moment, only `moor/ffi` on a background isolate is supported ## 4.2.1 diff --git a/moor/lib/src/runtime/cancellation_zone.dart b/moor/lib/src/runtime/cancellation_zone.dart index 11bea0d8..b5996d45 100644 --- a/moor/lib/src/runtime/cancellation_zone.dart +++ b/moor/lib/src/runtime/cancellation_zone.dart @@ -65,9 +65,6 @@ class CancellationException implements Exception { } } -/// Returns true if we're currently in a zone that can be cancelled. -bool get isInCancellationZone => Zone.current[_key] != null; - /// Checks whether the active zone is a cancellation zone that has been /// cancelled. If it is, a [CancellationException] will be thrown. void checkIfCancelled() { diff --git a/moor/lib/src/runtime/executor/helpers/engines.dart b/moor/lib/src/runtime/executor/helpers/engines.dart index 3bb5c68d..0cd47eb8 100644 --- a/moor/lib/src/runtime/executor/helpers/engines.dart +++ b/moor/lib/src/runtime/executor/helpers/engines.dart @@ -22,15 +22,8 @@ mixin _ExecutorWithQueryDelegate on QueryExecutor { Future _synchronized(Future Function() action) { if (isSequential) { - return _lock.synchronized(() async { - if (isInCancellationZone) { - // Popular sequential database implementations (especially moor/ffi) - // are synchronous. Wait for one event loop iteration to give - // cancellations time to come in. - await Future.delayed(Duration.zero); - checkIfCancelled(); - } - + return _lock.synchronized(() { + checkIfCancelled(); return action(); }); } else { diff --git a/moor/lib/src/runtime/remote/server_impl.dart b/moor/lib/src/runtime/remote/server_impl.dart index c010bf10..800eb750 100644 --- a/moor/lib/src/runtime/remote/server_impl.dart +++ b/moor/lib/src/runtime/remote/server_impl.dart @@ -122,6 +122,10 @@ class ServerImplementation implements MoorServer { Future _runQuery(StatementMethod method, String sql, List args, int? transactionId) async { final executor = await _loadExecutor(transactionId); + checkIfCancelled(); + + // Give cancellations more time to come in + await Future.delayed(Duration.zero); switch (method) { case StatementMethod.custom: diff --git a/moor/test/integration_tests/cancellation_test.dart b/moor/test/integration_tests/cancellation_test.dart index 6ced4853..5195c10c 100644 --- a/moor/test/integration_tests/cancellation_test.dart +++ b/moor/test/integration_tests/cancellation_test.dart @@ -48,6 +48,7 @@ void main() { final subscriptions = List.generate( 4, (_) => db.customSelect(slowQuery()).watch().listen(null)); + await pumpEventQueue(); await Future.wait(subscriptions.map((e) => e.cancel())); final amountOfSlowQueries = await db @@ -61,11 +62,6 @@ void main() { } group('stream queries are aborted on cancellations', () { - test('on the same isolate', () async { - final db = EmptyDb.connect(createConnection()); - await runTest(db); - }); - test('on a background isolate', () async { final isolate = await MoorIsolate.spawn(createConnection); addTearDown(isolate.shutdownAll); From cc93fa8238f7f386312ed95e3c80fd12611f4a07 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 1 Apr 2021 17:29:03 +0200 Subject: [PATCH 6/8] Fix nullability issues with cancellations --- moor/lib/src/runtime/cancellation_zone.dart | 38 +++++++++++++------ .../src/runtime/executor/stream_queries.dart | 2 +- moor/lib/src/runtime/remote/protocol.dart | 5 +++ moor/lib/src/runtime/remote/server_impl.dart | 2 - moor/test/isolate_test.dart | 5 +-- 5 files changed, 34 insertions(+), 18 deletions(-) diff --git a/moor/lib/src/runtime/cancellation_zone.dart b/moor/lib/src/runtime/cancellation_zone.dart index b5996d45..37172df9 100644 --- a/moor/lib/src/runtime/cancellation_zone.dart +++ b/moor/lib/src/runtime/cancellation_zone.dart @@ -14,15 +14,7 @@ CancellationToken runCancellable( final token = CancellationToken(); runZonedGuarded( () => operation().then(token._resultCompleter.complete), - (error, trace) { - final completer = token._resultCompleter; - - if (error is CancellationException) { - completer.complete(null); - } else { - completer.completeError(error, trace); - } - }, + token._resultCompleter.completeError, zoneValues: {_key: token}, ); @@ -33,14 +25,15 @@ CancellationToken runCancellable( /// child zone. @internal class CancellationToken { - final Completer _resultCompleter = Completer.sync(); + final Completer _resultCompleter = Completer(); final List _cancellationCallbacks = []; bool _cancellationRequested = false; /// Loads the result for the cancellable operation. /// - /// When the operation is cancelled, the future might complete with `null`. - Future get result => _resultCompleter.future; + /// When a cancellation has been requested and was honored, the future will + /// complete with a [CancellationException]. + Future get result => _resultCompleter.future; /// Requests the inner asynchronous operation to be cancelled. void cancel() { @@ -53,6 +46,27 @@ class CancellationToken { } } +/// Extensions that can be used on cancellable operations if they return a non- +/// nullable value. +extension NonNullableCancellationExtension + on CancellationToken { + /// Wait for the result, or return `null` if the operation was cancelled. + /// + /// To avoid situations where `null` could be a valid result from an async + /// operation, this getter is only available on non-nullable operations. This + /// avoids ambiguity. + /// + /// The future will still complete with an error if anything but a + /// [CancellationException] is thrown in [result]. + Future get resultOrNullIfCancelled async { + try { + return await result; + } on CancellationException { + return null; + } + } +} + /// Thrown inside a cancellation zone when it has been cancelled. @internal class CancellationException implements Exception { diff --git a/moor/lib/src/runtime/executor/stream_queries.dart b/moor/lib/src/runtime/executor/stream_queries.dart index de11a318..9119abbf 100644 --- a/moor/lib/src/runtime/executor/stream_queries.dart +++ b/moor/lib/src/runtime/executor/stream_queries.dart @@ -252,7 +252,7 @@ class QueryStream { _runningOperations.add(operation); try { - final data = await operation.result; + final data = await operation.resultOrNullIfCancelled; if (data == null) return; _lastData = data; diff --git a/moor/lib/src/runtime/remote/protocol.dart b/moor/lib/src/runtime/remote/protocol.dart index dbca8e2c..ad13b9ee 100644 --- a/moor/lib/src/runtime/remote/protocol.dart +++ b/moor/lib/src/runtime/remote/protocol.dart @@ -300,6 +300,11 @@ class CancelledResponse extends Message { final int requestId; CancelledResponse(this.requestId); + + @override + String toString() { + return 'Previous request $requestId was cancelled'; + } } /// A request without further parameters diff --git a/moor/lib/src/runtime/remote/server_impl.dart b/moor/lib/src/runtime/remote/server_impl.dart index 800eb750..2547aafc 100644 --- a/moor/lib/src/runtime/remote/server_impl.dart +++ b/moor/lib/src/runtime/remote/server_impl.dart @@ -95,8 +95,6 @@ class ServerImplementation implements MoorServer { payload.method, payload.sql, payload.args, payload.executorId)); _cancellableOperations[request.id] = token; return token.result - // If we get a null response the operation was cancelled - .then((value) => value ?? Future.error(const CancellationException())) .whenComplete(() => _cancellableOperations.remove(request.id)); } else if (payload is ExecuteBatchedStatement) { return _runBatched(payload.stmts, payload.executorId); diff --git a/moor/test/isolate_test.dart b/moor/test/isolate_test.dart index ffbd56a2..e2a3856d 100644 --- a/moor/test/isolate_test.dart +++ b/moor/test/isolate_test.dart @@ -1,4 +1,3 @@ -//@dart=2.9 @TestOn('vm') import 'dart:async'; import 'dart:isolate'; @@ -83,8 +82,8 @@ void main() { void _runTests( FutureOr Function() spawner, bool terminateIsolate) { - MoorIsolate isolate; - TodoDb database; + late MoorIsolate isolate; + late TodoDb database; setUp(() async { isolate = await spawner(); From 12ab64a33ef4ea6e34b6efc8b7b286e093dcdfb5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 1 Apr 2021 17:43:23 +0200 Subject: [PATCH 7/8] Support cancellations in moor_flutter --- extras/encryption/lib/encrypted_moor.dart | 8 ++++++++ moor_flutter/CHANGELOG.md | 4 ++++ moor_flutter/lib/moor_flutter.dart | 7 +++++++ moor_flutter/pubspec.yaml | 2 +- 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/extras/encryption/lib/encrypted_moor.dart b/extras/encryption/lib/encrypted_moor.dart index e697065d..da73fefb 100644 --- a/extras/encryption/lib/encrypted_moor.dart +++ b/extras/encryption/lib/encrypted_moor.dart @@ -37,6 +37,7 @@ class _SqfliteDelegate extends DatabaseDelegate with _SqfliteExecutor { }); DbVersionDelegate? _delegate; + @override DbVersionDelegate get versionDelegate { return _delegate ??= _SqfliteVersionDelegate(db); @@ -219,4 +220,11 @@ class EncryptedExecutor extends DelegatedDatabase { final sqfliteDelegate = delegate as _SqfliteDelegate; return sqfliteDelegate.isOpen ? sqfliteDelegate.db : null; } + + @override + // We're not really required to be sequential since sqflite has an internal + // lock to bring statements into a sequential order. + // Setting isSequential here helps with moor cancellations in stream queries + // though. + bool get isSequential => true; } diff --git a/moor_flutter/CHANGELOG.md b/moor_flutter/CHANGELOG.md index 97545cc1..0da07d24 100644 --- a/moor_flutter/CHANGELOG.md +++ b/moor_flutter/CHANGELOG.md @@ -1,3 +1,7 @@ +## 4.1.0-dev + +- Support query cancellations introduced in moor 4.3.0 + ## 4.0.0 - Support moor version 4 diff --git a/moor_flutter/lib/moor_flutter.dart b/moor_flutter/lib/moor_flutter.dart index 3a971d69..9082b50d 100644 --- a/moor_flutter/lib/moor_flutter.dart +++ b/moor_flutter/lib/moor_flutter.dart @@ -206,4 +206,11 @@ class FlutterQueryExecutor extends DelegatedDatabase { final sqfliteDelegate = delegate as _SqfliteDelegate; return sqfliteDelegate.isOpen ? sqfliteDelegate.db : null; } + + @override + // We're not really required to be sequential since sqflite has an internal + // lock to bring statements into a sequential order. + // Setting isSequential here helps with moor cancellations in stream queries + // though. + bool get isSequential => true; } diff --git a/moor_flutter/pubspec.yaml b/moor_flutter/pubspec.yaml index 79f1c73a..12c43ab6 100644 --- a/moor_flutter/pubspec.yaml +++ b/moor_flutter/pubspec.yaml @@ -1,6 +1,6 @@ name: moor_flutter description: Flutter implementation of moor, a safe and reactive persistence library for Dart applications -version: 4.0.0 +version: 4.1.0-dev repository: https://github.com/simolus3/moor homepage: https://moor.simonbinder.eu/ issue_tracker: https://github.com/simolus3/moor/issues From 925080bd3527d5cce050c22e51647111ce706840 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 15 Apr 2021 20:52:43 +0200 Subject: [PATCH 8/8] Improve cancellations, test with switchMap --- moor/lib/src/runtime/remote/client_impl.dart | 4 +- moor/lib/src/runtime/remote/server_impl.dart | 2 +- .../integration_tests/cancellation_test.dart | 61 ++++++++++++++++--- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/moor/lib/src/runtime/remote/client_impl.dart b/moor/lib/src/runtime/remote/client_impl.dart index 49a0e90f..954d5f00 100644 --- a/moor/lib/src/runtime/remote/client_impl.dart +++ b/moor/lib/src/runtime/remote/client_impl.dart @@ -114,6 +114,7 @@ class _RemoteQueryExecutor extends _BaseExecutor { : super(client, executorId); Completer? _setSchemaVersion; + Future? _serverIsOpen; @override TransactionExecutor beginTransaction() { @@ -127,7 +128,8 @@ class _RemoteQueryExecutor extends _BaseExecutor { await _setSchemaVersion!.future; _setSchemaVersion = null; } - return client._channel + + return _serverIsOpen ??= client._channel .request(EnsureOpen(user.schemaVersion, _executorId)); } diff --git a/moor/lib/src/runtime/remote/server_impl.dart b/moor/lib/src/runtime/remote/server_impl.dart index 2547aafc..5b21413f 100644 --- a/moor/lib/src/runtime/remote/server_impl.dart +++ b/moor/lib/src/runtime/remote/server_impl.dart @@ -120,10 +120,10 @@ class ServerImplementation implements MoorServer { Future _runQuery(StatementMethod method, String sql, List args, int? transactionId) async { final executor = await _loadExecutor(transactionId); - checkIfCancelled(); // Give cancellations more time to come in await Future.delayed(Duration.zero); + checkIfCancelled(); switch (method) { case StatementMethod.custom: diff --git a/moor/test/integration_tests/cancellation_test.dart b/moor/test/integration_tests/cancellation_test.dart index 5195c10c..d27688d5 100644 --- a/moor/test/integration_tests/cancellation_test.dart +++ b/moor/test/integration_tests/cancellation_test.dart @@ -2,10 +2,12 @@ import 'package:moor/ffi.dart'; import 'package:moor/isolate.dart'; import 'package:moor/moor.dart'; +import 'package:rxdart/rxdart.dart'; import 'package:test/test.dart'; DatabaseConnection createConnection() { var counter = 0; + final loggedValues = []; return DatabaseConnection.fromExecutor( VmDatabase.memory( @@ -18,6 +20,19 @@ DatabaseConnection createConnection() { functionName: 'get_counter', function: (args) => counter, ); + + rawDb.createFunction( + functionName: 'log_value', + function: (args) { + final value = args.single as int; + loggedValues.add(value); + return value; + }, + ); + rawDb.createFunction( + functionName: 'get_values', + function: (args) => loggedValues.join(','), + ); }, ), ); @@ -34,20 +49,19 @@ class EmptyDb extends GeneratedDatabase { void main() { moorRuntimeOptions.dontWarnAboutMultipleDatabases = true; - var i = 0; - String slowQuery() => ''' - with recursive slow(x) as (values(increment_counter()) union all select x+1 from slow where x < 1000000) - select ${i++} from slow; - '''; // ^ to get different `StreamKey`s - Future runTest(EmptyDb db) async { + String slowQuery(int i) => ''' + with recursive slow(x) as (values(increment_counter()) union all select x+1 from slow where x < 1000000) + select $i from slow; + '''; // ^ to get different `StreamKey`s + // Avoid delays caused by opening the database to interfere with the // cancellation mechanism (we need to react to cancellations quicker if the // db is already open, which is what we want to test) await db.doWhenOpened((e) {}); final subscriptions = List.generate( - 4, (_) => db.customSelect(slowQuery()).watch().listen(null)); + 4, (i) => db.customSelect(slowQuery(i)).watch().listen(null)); await pumpEventQueue(); await Future.wait(subscriptions.map((e) => e.cancel())); @@ -58,7 +72,7 @@ void main() { // One slow query is ok if the cancellation wasn't quick enough, we just // shouldn't run all 4 of them. - expect(amountOfSlowQueries, anyOf(0, 1)); + expect(amountOfSlowQueries, isNot(4)); } group('stream queries are aborted on cancellations', () { @@ -70,4 +84,35 @@ void main() { await runTest(db); }); }); + + test('together with switchMap', () async { + String slowQuery(int i) => ''' + with recursive slow(x) as (values(log_value($i)) union all select x+1 from slow where x < 1000000) + select $i from slow; + '''; + + final isolate = await MoorIsolate.spawn(createConnection); + addTearDown(isolate.shutdownAll); + + final db = EmptyDb.connect(await isolate.connect()); + await db.customSelect('select 1').getSingle(); + + final filter = BehaviorSubject(); + addTearDown(filter.close); + filter + .switchMap((value) => db.customSelect(slowQuery(value)).watch()) + .listen(null); + + for (var i = 0; i < 4; i++) { + filter.add(i); + await pumpEventQueue(); + } + + final values = await db + .customSelect('select get_values() r') + .map((row) => row.read('r')) + .getSingle(); + + expect(values, '0,3'); + }); }