diff --git a/moor/CHANGELOG.md b/moor/CHANGELOG.md index 95dc1858..c7d46afb 100644 --- a/moor/CHANGELOG.md +++ b/moor/CHANGELOG.md @@ -1,3 +1,8 @@ +## 4.3.0-dev + +- On supported platforms, cancel ongoing stream selects when the stream is disposed + - At the moment, `moor/ffi` on a background isolate supports this feature + ## 4.2.1 - Deprecate `readBool`, `readString`, `readInt`, `readDouble`, `readDateTime` diff --git a/moor/example/stream_cancellation_test.dart b/moor/example/stream_cancellation_test.dart new file mode 100644 index 00000000..2a6f5118 --- /dev/null +++ b/moor/example/stream_cancellation_test.dart @@ -0,0 +1,35 @@ +import 'package:moor/ffi.dart'; +import 'package:moor/isolate.dart'; +import 'package:moor/moor.dart'; +import 'package:test/test.dart'; + +DatabaseConnection createConnection() => + DatabaseConnection.fromExecutor(VmDatabase.memory(logStatements: true)); + +class EmptyDb extends GeneratedDatabase { + EmptyDb.connect(DatabaseConnection c) : super.connect(c); + @override + final List allTables = const []; + @override + final int schemaVersion = 1; +} + +void main() async { + final isolate = await MoorIsolate.spawn(createConnection); + final db = EmptyDb.connect(await isolate.connect(isolateDebugLog: true)); + + var i = 0; + String slowQuery() => ''' + with recursive slow(x) as (values(1) union all select x+1 from slow where x < 1000000) + select ${i++} from slow; + '''; // ^ to get different `StreamKey`s + + await db.doWhenOpened((e) {}); + + final subscriptions = List.generate( + 4, (_) => db.customSelect(slowQuery()).watch().listen(null)); + await pumpEventQueue(); + await Future.wait(subscriptions.map((e) => e.cancel())); + + await db.customSelect('select 1').getSingle(); +} diff --git a/moor/lib/src/runtime/cancellation_zone.dart b/moor/lib/src/runtime/cancellation_zone.dart index 6cce8378..b5996d45 100644 --- a/moor/lib/src/runtime/cancellation_zone.dart +++ b/moor/lib/src/runtime/cancellation_zone.dart @@ -34,6 +34,7 @@ CancellationToken runCancellable( @internal class CancellationToken { final Completer _resultCompleter = Completer.sync(); + final List _cancellationCallbacks = []; bool _cancellationRequested = false; /// Loads the result for the cancellable operation. @@ -42,7 +43,14 @@ class CancellationToken { Future get result => _resultCompleter.future; /// Requests the inner asynchronous operation to be cancelled. - void cancel() => _cancellationRequested = true; + void cancel() { + if (_cancellationRequested) return; + + for (final callback in _cancellationCallbacks) { + callback(); + } + _cancellationRequested = true; + } } /// Thrown inside a cancellation zone when it has been cancelled. @@ -65,3 +73,12 @@ void checkIfCancelled() { throw const CancellationException(); } } + +/// Requests the [callback] to be invoked when the enclosing asynchronous +/// operation is cancelled. +void doOnCancellation(void Function() callback) { + final token = Zone.current[_key]; + if (token is CancellationToken) { + token._cancellationCallbacks.add(callback); + } +} diff --git a/moor/lib/src/runtime/executor/stream_queries.dart b/moor/lib/src/runtime/executor/stream_queries.dart index 60d30b7f..de11a318 100644 --- a/moor/lib/src/runtime/executor/stream_queries.dart +++ b/moor/lib/src/runtime/executor/stream_queries.dart @@ -127,7 +127,8 @@ class StreamQueryStore { final key = stream._fetcher.key; _keysPendingRemoval.add(key); - final completer = Completer(); + // sync because it's only triggered after the timer + final completer = Completer.sync(); _pendingTimers.add(completer); // Hey there! If you're sent here because your Flutter tests fail, please diff --git a/moor/lib/src/runtime/remote/client_impl.dart b/moor/lib/src/runtime/remote/client_impl.dart index 577191c8..49a0e90f 100644 --- a/moor/lib/src/runtime/remote/client_impl.dart +++ b/moor/lib/src/runtime/remote/client_impl.dart @@ -6,6 +6,7 @@ import 'package:moor/src/runtime/executor/stream_queries.dart'; import 'package:moor/src/runtime/types/sql_types.dart'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'communication.dart'; import 'protocol.dart'; @@ -58,8 +59,20 @@ abstract class _BaseExecutor extends QueryExecutor { Future _runRequest( StatementMethod method, String sql, List? args) { - return client._channel - .request(ExecuteQuery(method, sql, args ?? const [], _executorId)); + // fast path: If the operation has already been cancelled, don't bother + // sending a request in the first place + checkIfCancelled(); + + final id = client._channel.newRequestId(); + // otherwise, send the request now and cancel it later, if that's desired + doOnCancellation(() { + client._channel.request(RequestCancellation(id)); + }); + + return client._channel.request( + ExecuteQuery(method, sql, args ?? const [], _executorId), + requestId: id, + ); } @override diff --git a/moor/lib/src/runtime/remote/communication.dart b/moor/lib/src/runtime/remote/communication.dart index 0f34b97d..8f03d6a5 100644 --- a/moor/lib/src/runtime/remote/communication.dart +++ b/moor/lib/src/runtime/remote/communication.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'protocol.dart'; /// Wrapper around a two-way communication channel to support requests and @@ -40,6 +41,9 @@ class MoorCommunication { /// A stream of requests coming from the other peer. Stream get incomingRequests => _incomingRequests.stream; + /// Returns a new request id to be used for the next request. + int newRequestId() => _currentRequestId++; + /// Closes the connection to the server. void close() { if (isClosed) return; @@ -77,13 +81,19 @@ class MoorCommunication { _pendingRequests.remove(msg.requestId); } else if (msg is Request) { _incomingRequests.add(msg); + } else if (msg is CancelledResponse) { + final completer = _pendingRequests[msg.requestId]; + completer?.completeError(const CancellationException()); } } /// Sends a request and waits for the peer to reply with a value that is /// assumed to be of type [T]. - Future request(Object? request) { - final id = _currentRequestId++; + /// + /// The [requestId] parameter can be used to set a fixed request id for the + /// request. + Future request(Object? request, {int? requestId}) { + final id = requestId ?? newRequestId(); final completer = Completer(); _pendingRequests[id] = completer; @@ -113,7 +123,11 @@ class MoorCommunication { // sending a message while closed will throw, so don't even try. if (isClosed) return; - _send(ErrorResponse(request.id, error.toString(), trace.toString())); + if (error is CancellationException) { + _send(CancelledResponse(request.id)); + } else { + _send(ErrorResponse(request.id, error.toString(), trace.toString())); + } } /// Utility that listens to [incomingRequests] and invokes the [handler] on diff --git a/moor/lib/src/runtime/remote/protocol.dart b/moor/lib/src/runtime/remote/protocol.dart index baadbf02..dbca8e2c 100644 --- a/moor/lib/src/runtime/remote/protocol.dart +++ b/moor/lib/src/runtime/remote/protocol.dart @@ -9,6 +9,7 @@ class MoorProtocol { static const _tag_Request = 0; static const _tag_Response_success = 1; static const _tag_Response_error = 2; + static const _tag_Response_cancelled = 3; static const _tag_NoArgsRequest_getTypeSystem = 0; static const _tag_NoArgsRequest_terminateAll = 1; @@ -22,6 +23,7 @@ class MoorProtocol { static const _tag_DefaultSqlTypeSystem = 9; static const _tag_DirectValue = 10; static const _tag_SelectResult = 11; + static const _tag_RequestCancellation = 12; Object? serialize(Message message) { if (message is Request) { @@ -43,6 +45,8 @@ class MoorProtocol { message.requestId, encodePayload(message.response), ]; + } else if (message is CancelledResponse) { + return [_tag_Response_cancelled, message.requestId]; } } @@ -59,6 +63,8 @@ class MoorProtocol { return ErrorResponse(id, message[2] as Object, message[3] as String); case _tag_Response_success: return SuccessResponse(id, decodePayload(message[2])); + case _tag_Response_cancelled: + return CancelledResponse(id); } throw const FormatException('Unknown tag'); @@ -136,6 +142,8 @@ class MoorProtocol { } return result; } + } else if (payload is RequestCancellation) { + return [_tag_RequestCancellation, payload.originalRequestId]; } else { return [_tag_DirectValue, payload]; } @@ -223,6 +231,8 @@ class MoorProtocol { }); } return SelectResult(result); + case _tag_RequestCancellation: + return RequestCancellation(readInt(1)); case _tag_DirectValue: return encoded[1]; } @@ -286,6 +296,12 @@ class ErrorResponse extends Message { } } +class CancelledResponse extends Message { + final int requestId; + + CancelledResponse(this.requestId); +} + /// A request without further parameters enum NoArgsRequest { /// Sent from the client to the server. The server will reply with the @@ -323,6 +339,22 @@ class ExecuteQuery { } } +/// Requests a previous request to be cancelled. +/// +/// Whether this is supported or not depends on the server and its internal +/// state. This request will be immediately be acknowledged with a null +/// response, which does not indicate whether a cancellation actually happened. +class RequestCancellation { + final int originalRequestId; + + RequestCancellation(this.originalRequestId); + + @override + String toString() { + return 'Cancel previous request $originalRequestId'; + } +} + /// Sent from the client to run [BatchedStatements] class ExecuteBatchedStatement { final BatchedStatements stmts; diff --git a/moor/lib/src/runtime/remote/server_impl.dart b/moor/lib/src/runtime/remote/server_impl.dart index 2ffc7141..78d5195e 100644 --- a/moor/lib/src/runtime/remote/server_impl.dart +++ b/moor/lib/src/runtime/remote/server_impl.dart @@ -4,6 +4,7 @@ import 'package:moor/moor.dart'; import 'package:moor/remote.dart'; import 'package:stream_channel/stream_channel.dart'; +import '../cancellation_zone.dart'; import 'communication.dart'; import 'protocol.dart'; @@ -19,6 +20,8 @@ class ServerImplementation implements MoorServer { final Map _managedExecutors = {}; int _currentExecutorId = 0; + final Map _cancellableOperations = {}; + /// when a transaction is active, all queries that don't operate on another /// query executor have to wait! /// @@ -88,8 +91,13 @@ class ServerImplementation implements MoorServer { } else if (payload is EnsureOpen) { return _handleEnsureOpen(payload); } else if (payload is ExecuteQuery) { - return _runQuery( - payload.method, payload.sql, payload.args, payload.executorId); + final token = runCancellable(() => _runQuery( + payload.method, payload.sql, payload.args, payload.executorId)); + _cancellableOperations[request.id] = token; + return token.result + // If we get a null response the operation was cancelled + .then((value) => value ?? Future.error(const CancellationException())) + .whenComplete(() => _cancellableOperations.remove(request.id)); } else if (payload is ExecuteBatchedStatement) { return _runBatched(payload.stmts, payload.executorId); } else if (payload is NotifyTablesUpdated) { @@ -98,6 +106,9 @@ class ServerImplementation implements MoorServer { } } else if (payload is RunTransactionAction) { return _transactionControl(payload.control, payload.executorId); + } else if (payload is RequestCancellation) { + _cancellableOperations[payload.originalRequestId]?.cancel(); + return null; } } @@ -110,6 +121,11 @@ class ServerImplementation implements MoorServer { Future _runQuery(StatementMethod method, String sql, List args, int? transactionId) async { + // Wait for one event loop iteration until we run the request. This helps + // with cancellations for synchronous database implementations like moor/ffi + // since we don't have a chance to react to cancellation requests otherwise. + await Future.delayed(Duration.zero); + final executor = await _loadExecutor(transactionId); switch (method) { diff --git a/moor/test/integration_tests/cancellation_test.dart b/moor/test/integration_tests/cancellation_test.dart new file mode 100644 index 00000000..35528987 --- /dev/null +++ b/moor/test/integration_tests/cancellation_test.dart @@ -0,0 +1,71 @@ +@Tags(['integration']) +import 'package:moor/ffi.dart'; +import 'package:moor/isolate.dart'; +import 'package:moor/moor.dart'; +import 'package:test/test.dart'; + +DatabaseConnection createConnection() { + var counter = 0; + + return DatabaseConnection.fromExecutor( + VmDatabase.memory( + setup: (rawDb) { + rawDb.createFunction( + functionName: 'increment_counter', + function: (args) => counter++, + ); + rawDb.createFunction( + functionName: 'get_counter', + function: (args) => counter, + ); + }, + ), + ); +} + +class EmptyDb extends GeneratedDatabase { + EmptyDb.connect(DatabaseConnection c) : super.connect(c); + @override + final List allTables = const []; + @override + final int schemaVersion = 1; +} + +void main() { + late EmptyDb db; + late MoorIsolate isolate; + + setUp(() async { + isolate = await MoorIsolate.spawn(createConnection); + db = EmptyDb.connect(await isolate.connect()); + + // Avoid delays caused by opening the database to interfere with the + // cancellation mechanism (we need to react to cancellations quicker if the + // db is already open, which is what we want to test) + await db.doWhenOpened((e) {}); + }); + + tearDown(() => isolate.shutdownAll()); + + var i = 0; + String slowQuery() => ''' + with recursive slow(x) as (values(increment_counter()) union all select x+1 from slow where x < 1000000) + select ${i++} from slow; + '''; // ^ to get different `StreamKey`s + + test('stream queries are aborted on cancellations', () async { + final subscriptions = List.generate( + 4, (_) => db.customSelect(slowQuery()).watch().listen(null)); + await pumpEventQueue(); + await Future.wait(subscriptions.map((e) => e.cancel())); + + final amountOfSlowQueries = await db + .customSelect('select get_counter() r') + .map((row) => row.read('r')) + .getSingle(); + + // One slow query is ok if the cancellation wasn't quick enough, we just + // shouldn't run all 4 of them. + expect(amountOfSlowQueries, anyOf(0, 1)); + }); +}