mirror of https://github.com/AMT-Cheif/drift.git
Support cancellations across isolates
This commit is contained in:
parent
357764a1b7
commit
04a8bb5694
|
@ -1,3 +1,8 @@
|
|||
## 4.3.0-dev
|
||||
|
||||
- On supported platforms, cancel ongoing stream selects when the stream is disposed
|
||||
- At the moment, `moor/ffi` on a background isolate supports this feature
|
||||
|
||||
## 4.2.1
|
||||
|
||||
- Deprecate `readBool`, `readString`, `readInt`, `readDouble`, `readDateTime`
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
import 'package:moor/ffi.dart';
|
||||
import 'package:moor/isolate.dart';
|
||||
import 'package:moor/moor.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
DatabaseConnection createConnection() =>
|
||||
DatabaseConnection.fromExecutor(VmDatabase.memory(logStatements: true));
|
||||
|
||||
class EmptyDb extends GeneratedDatabase {
|
||||
EmptyDb.connect(DatabaseConnection c) : super.connect(c);
|
||||
@override
|
||||
final List<TableInfo> allTables = const [];
|
||||
@override
|
||||
final int schemaVersion = 1;
|
||||
}
|
||||
|
||||
void main() async {
|
||||
final isolate = await MoorIsolate.spawn(createConnection);
|
||||
final db = EmptyDb.connect(await isolate.connect(isolateDebugLog: true));
|
||||
|
||||
var i = 0;
|
||||
String slowQuery() => '''
|
||||
with recursive slow(x) as (values(1) union all select x+1 from slow where x < 1000000)
|
||||
select ${i++} from slow;
|
||||
'''; // ^ to get different `StreamKey`s
|
||||
|
||||
await db.doWhenOpened((e) {});
|
||||
|
||||
final subscriptions = List.generate(
|
||||
4, (_) => db.customSelect(slowQuery()).watch().listen(null));
|
||||
await pumpEventQueue();
|
||||
await Future.wait(subscriptions.map((e) => e.cancel()));
|
||||
|
||||
await db.customSelect('select 1').getSingle();
|
||||
}
|
|
@ -34,6 +34,7 @@ CancellationToken<T> runCancellable<T>(
|
|||
@internal
|
||||
class CancellationToken<T> {
|
||||
final Completer<T?> _resultCompleter = Completer.sync();
|
||||
final List<void Function()> _cancellationCallbacks = [];
|
||||
bool _cancellationRequested = false;
|
||||
|
||||
/// Loads the result for the cancellable operation.
|
||||
|
@ -42,7 +43,14 @@ class CancellationToken<T> {
|
|||
Future<T?> get result => _resultCompleter.future;
|
||||
|
||||
/// Requests the inner asynchronous operation to be cancelled.
|
||||
void cancel() => _cancellationRequested = true;
|
||||
void cancel() {
|
||||
if (_cancellationRequested) return;
|
||||
|
||||
for (final callback in _cancellationCallbacks) {
|
||||
callback();
|
||||
}
|
||||
_cancellationRequested = true;
|
||||
}
|
||||
}
|
||||
|
||||
/// Thrown inside a cancellation zone when it has been cancelled.
|
||||
|
@ -65,3 +73,12 @@ void checkIfCancelled() {
|
|||
throw const CancellationException();
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests the [callback] to be invoked when the enclosing asynchronous
|
||||
/// operation is cancelled.
|
||||
void doOnCancellation(void Function() callback) {
|
||||
final token = Zone.current[_key];
|
||||
if (token is CancellationToken) {
|
||||
token._cancellationCallbacks.add(callback);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -127,7 +127,8 @@ class StreamQueryStore {
|
|||
final key = stream._fetcher.key;
|
||||
_keysPendingRemoval.add(key);
|
||||
|
||||
final completer = Completer<void>();
|
||||
// sync because it's only triggered after the timer
|
||||
final completer = Completer<void>.sync();
|
||||
_pendingTimers.add(completer);
|
||||
|
||||
// Hey there! If you're sent here because your Flutter tests fail, please
|
||||
|
|
|
@ -6,6 +6,7 @@ import 'package:moor/src/runtime/executor/stream_queries.dart';
|
|||
import 'package:moor/src/runtime/types/sql_types.dart';
|
||||
import 'package:stream_channel/stream_channel.dart';
|
||||
|
||||
import '../cancellation_zone.dart';
|
||||
import 'communication.dart';
|
||||
import 'protocol.dart';
|
||||
|
||||
|
@ -58,8 +59,20 @@ abstract class _BaseExecutor extends QueryExecutor {
|
|||
|
||||
Future<T> _runRequest<T>(
|
||||
StatementMethod method, String sql, List<Object?>? args) {
|
||||
return client._channel
|
||||
.request<T>(ExecuteQuery(method, sql, args ?? const [], _executorId));
|
||||
// fast path: If the operation has already been cancelled, don't bother
|
||||
// sending a request in the first place
|
||||
checkIfCancelled();
|
||||
|
||||
final id = client._channel.newRequestId();
|
||||
// otherwise, send the request now and cancel it later, if that's desired
|
||||
doOnCancellation(() {
|
||||
client._channel.request(RequestCancellation(id));
|
||||
});
|
||||
|
||||
return client._channel.request<T>(
|
||||
ExecuteQuery(method, sql, args ?? const [], _executorId),
|
||||
requestId: id,
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
|
|
|
@ -2,6 +2,7 @@ import 'dart:async';
|
|||
|
||||
import 'package:stream_channel/stream_channel.dart';
|
||||
|
||||
import '../cancellation_zone.dart';
|
||||
import 'protocol.dart';
|
||||
|
||||
/// Wrapper around a two-way communication channel to support requests and
|
||||
|
@ -40,6 +41,9 @@ class MoorCommunication {
|
|||
/// A stream of requests coming from the other peer.
|
||||
Stream<Request> get incomingRequests => _incomingRequests.stream;
|
||||
|
||||
/// Returns a new request id to be used for the next request.
|
||||
int newRequestId() => _currentRequestId++;
|
||||
|
||||
/// Closes the connection to the server.
|
||||
void close() {
|
||||
if (isClosed) return;
|
||||
|
@ -77,13 +81,19 @@ class MoorCommunication {
|
|||
_pendingRequests.remove(msg.requestId);
|
||||
} else if (msg is Request) {
|
||||
_incomingRequests.add(msg);
|
||||
} else if (msg is CancelledResponse) {
|
||||
final completer = _pendingRequests[msg.requestId];
|
||||
completer?.completeError(const CancellationException());
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a request and waits for the peer to reply with a value that is
|
||||
/// assumed to be of type [T].
|
||||
Future<T> request<T>(Object? request) {
|
||||
final id = _currentRequestId++;
|
||||
///
|
||||
/// The [requestId] parameter can be used to set a fixed request id for the
|
||||
/// request.
|
||||
Future<T> request<T>(Object? request, {int? requestId}) {
|
||||
final id = requestId ?? newRequestId();
|
||||
final completer = Completer<T>();
|
||||
|
||||
_pendingRequests[id] = completer;
|
||||
|
@ -113,7 +123,11 @@ class MoorCommunication {
|
|||
// sending a message while closed will throw, so don't even try.
|
||||
if (isClosed) return;
|
||||
|
||||
_send(ErrorResponse(request.id, error.toString(), trace.toString()));
|
||||
if (error is CancellationException) {
|
||||
_send(CancelledResponse(request.id));
|
||||
} else {
|
||||
_send(ErrorResponse(request.id, error.toString(), trace.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Utility that listens to [incomingRequests] and invokes the [handler] on
|
||||
|
|
|
@ -9,6 +9,7 @@ class MoorProtocol {
|
|||
static const _tag_Request = 0;
|
||||
static const _tag_Response_success = 1;
|
||||
static const _tag_Response_error = 2;
|
||||
static const _tag_Response_cancelled = 3;
|
||||
|
||||
static const _tag_NoArgsRequest_getTypeSystem = 0;
|
||||
static const _tag_NoArgsRequest_terminateAll = 1;
|
||||
|
@ -22,6 +23,7 @@ class MoorProtocol {
|
|||
static const _tag_DefaultSqlTypeSystem = 9;
|
||||
static const _tag_DirectValue = 10;
|
||||
static const _tag_SelectResult = 11;
|
||||
static const _tag_RequestCancellation = 12;
|
||||
|
||||
Object? serialize(Message message) {
|
||||
if (message is Request) {
|
||||
|
@ -43,6 +45,8 @@ class MoorProtocol {
|
|||
message.requestId,
|
||||
encodePayload(message.response),
|
||||
];
|
||||
} else if (message is CancelledResponse) {
|
||||
return [_tag_Response_cancelled, message.requestId];
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,6 +63,8 @@ class MoorProtocol {
|
|||
return ErrorResponse(id, message[2] as Object, message[3] as String);
|
||||
case _tag_Response_success:
|
||||
return SuccessResponse(id, decodePayload(message[2]));
|
||||
case _tag_Response_cancelled:
|
||||
return CancelledResponse(id);
|
||||
}
|
||||
|
||||
throw const FormatException('Unknown tag');
|
||||
|
@ -136,6 +142,8 @@ class MoorProtocol {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
} else if (payload is RequestCancellation) {
|
||||
return [_tag_RequestCancellation, payload.originalRequestId];
|
||||
} else {
|
||||
return [_tag_DirectValue, payload];
|
||||
}
|
||||
|
@ -223,6 +231,8 @@ class MoorProtocol {
|
|||
});
|
||||
}
|
||||
return SelectResult(result);
|
||||
case _tag_RequestCancellation:
|
||||
return RequestCancellation(readInt(1));
|
||||
case _tag_DirectValue:
|
||||
return encoded[1];
|
||||
}
|
||||
|
@ -286,6 +296,12 @@ class ErrorResponse extends Message {
|
|||
}
|
||||
}
|
||||
|
||||
class CancelledResponse extends Message {
|
||||
final int requestId;
|
||||
|
||||
CancelledResponse(this.requestId);
|
||||
}
|
||||
|
||||
/// A request without further parameters
|
||||
enum NoArgsRequest {
|
||||
/// Sent from the client to the server. The server will reply with the
|
||||
|
@ -323,6 +339,22 @@ class ExecuteQuery {
|
|||
}
|
||||
}
|
||||
|
||||
/// Requests a previous request to be cancelled.
|
||||
///
|
||||
/// Whether this is supported or not depends on the server and its internal
|
||||
/// state. This request will be immediately be acknowledged with a null
|
||||
/// response, which does not indicate whether a cancellation actually happened.
|
||||
class RequestCancellation {
|
||||
final int originalRequestId;
|
||||
|
||||
RequestCancellation(this.originalRequestId);
|
||||
|
||||
@override
|
||||
String toString() {
|
||||
return 'Cancel previous request $originalRequestId';
|
||||
}
|
||||
}
|
||||
|
||||
/// Sent from the client to run [BatchedStatements]
|
||||
class ExecuteBatchedStatement {
|
||||
final BatchedStatements stmts;
|
||||
|
|
|
@ -4,6 +4,7 @@ import 'package:moor/moor.dart';
|
|||
import 'package:moor/remote.dart';
|
||||
import 'package:stream_channel/stream_channel.dart';
|
||||
|
||||
import '../cancellation_zone.dart';
|
||||
import 'communication.dart';
|
||||
import 'protocol.dart';
|
||||
|
||||
|
@ -19,6 +20,8 @@ class ServerImplementation implements MoorServer {
|
|||
final Map<int, QueryExecutor> _managedExecutors = {};
|
||||
int _currentExecutorId = 0;
|
||||
|
||||
final Map<int, CancellationToken> _cancellableOperations = {};
|
||||
|
||||
/// when a transaction is active, all queries that don't operate on another
|
||||
/// query executor have to wait!
|
||||
///
|
||||
|
@ -88,8 +91,13 @@ class ServerImplementation implements MoorServer {
|
|||
} else if (payload is EnsureOpen) {
|
||||
return _handleEnsureOpen(payload);
|
||||
} else if (payload is ExecuteQuery) {
|
||||
return _runQuery(
|
||||
payload.method, payload.sql, payload.args, payload.executorId);
|
||||
final token = runCancellable(() => _runQuery(
|
||||
payload.method, payload.sql, payload.args, payload.executorId));
|
||||
_cancellableOperations[request.id] = token;
|
||||
return token.result
|
||||
// If we get a null response the operation was cancelled
|
||||
.then((value) => value ?? Future.error(const CancellationException()))
|
||||
.whenComplete(() => _cancellableOperations.remove(request.id));
|
||||
} else if (payload is ExecuteBatchedStatement) {
|
||||
return _runBatched(payload.stmts, payload.executorId);
|
||||
} else if (payload is NotifyTablesUpdated) {
|
||||
|
@ -98,6 +106,9 @@ class ServerImplementation implements MoorServer {
|
|||
}
|
||||
} else if (payload is RunTransactionAction) {
|
||||
return _transactionControl(payload.control, payload.executorId);
|
||||
} else if (payload is RequestCancellation) {
|
||||
_cancellableOperations[payload.originalRequestId]?.cancel();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,6 +121,11 @@ class ServerImplementation implements MoorServer {
|
|||
|
||||
Future<dynamic> _runQuery(StatementMethod method, String sql,
|
||||
List<Object?> args, int? transactionId) async {
|
||||
// Wait for one event loop iteration until we run the request. This helps
|
||||
// with cancellations for synchronous database implementations like moor/ffi
|
||||
// since we don't have a chance to react to cancellation requests otherwise.
|
||||
await Future.delayed(Duration.zero);
|
||||
|
||||
final executor = await _loadExecutor(transactionId);
|
||||
|
||||
switch (method) {
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
@Tags(['integration'])
|
||||
import 'package:moor/ffi.dart';
|
||||
import 'package:moor/isolate.dart';
|
||||
import 'package:moor/moor.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
DatabaseConnection createConnection() {
|
||||
var counter = 0;
|
||||
|
||||
return DatabaseConnection.fromExecutor(
|
||||
VmDatabase.memory(
|
||||
setup: (rawDb) {
|
||||
rawDb.createFunction(
|
||||
functionName: 'increment_counter',
|
||||
function: (args) => counter++,
|
||||
);
|
||||
rawDb.createFunction(
|
||||
functionName: 'get_counter',
|
||||
function: (args) => counter,
|
||||
);
|
||||
},
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
class EmptyDb extends GeneratedDatabase {
|
||||
EmptyDb.connect(DatabaseConnection c) : super.connect(c);
|
||||
@override
|
||||
final List<TableInfo> allTables = const [];
|
||||
@override
|
||||
final int schemaVersion = 1;
|
||||
}
|
||||
|
||||
void main() {
|
||||
late EmptyDb db;
|
||||
late MoorIsolate isolate;
|
||||
|
||||
setUp(() async {
|
||||
isolate = await MoorIsolate.spawn(createConnection);
|
||||
db = EmptyDb.connect(await isolate.connect());
|
||||
|
||||
// Avoid delays caused by opening the database to interfere with the
|
||||
// cancellation mechanism (we need to react to cancellations quicker if the
|
||||
// db is already open, which is what we want to test)
|
||||
await db.doWhenOpened((e) {});
|
||||
});
|
||||
|
||||
tearDown(() => isolate.shutdownAll());
|
||||
|
||||
var i = 0;
|
||||
String slowQuery() => '''
|
||||
with recursive slow(x) as (values(increment_counter()) union all select x+1 from slow where x < 1000000)
|
||||
select ${i++} from slow;
|
||||
'''; // ^ to get different `StreamKey`s
|
||||
|
||||
test('stream queries are aborted on cancellations', () async {
|
||||
final subscriptions = List.generate(
|
||||
4, (_) => db.customSelect(slowQuery()).watch().listen(null));
|
||||
await pumpEventQueue();
|
||||
await Future.wait(subscriptions.map((e) => e.cancel()));
|
||||
|
||||
final amountOfSlowQueries = await db
|
||||
.customSelect('select get_counter() r')
|
||||
.map((row) => row.read<int>('r'))
|
||||
.getSingle();
|
||||
|
||||
// One slow query is ok if the cancellation wasn't quick enough, we just
|
||||
// shouldn't run all 4 of them.
|
||||
expect(amountOfSlowQueries, anyOf(0, 1));
|
||||
});
|
||||
}
|
Loading…
Reference in New Issue