Improve cancellations, test with switchMap

This commit is contained in:
Simon Binder 2021-04-15 20:52:43 +02:00
parent 12ab64a33e
commit 925080bd35
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
3 changed files with 57 additions and 10 deletions

View File

@ -114,6 +114,7 @@ class _RemoteQueryExecutor extends _BaseExecutor {
: super(client, executorId);
Completer<void>? _setSchemaVersion;
Future<bool>? _serverIsOpen;
@override
TransactionExecutor beginTransaction() {
@ -127,7 +128,8 @@ class _RemoteQueryExecutor extends _BaseExecutor {
await _setSchemaVersion!.future;
_setSchemaVersion = null;
}
return client._channel
return _serverIsOpen ??= client._channel
.request<bool>(EnsureOpen(user.schemaVersion, _executorId));
}

View File

@ -120,10 +120,10 @@ class ServerImplementation implements MoorServer {
Future<dynamic> _runQuery(StatementMethod method, String sql,
List<Object?> args, int? transactionId) async {
final executor = await _loadExecutor(transactionId);
checkIfCancelled();
// Give cancellations more time to come in
await Future.delayed(Duration.zero);
checkIfCancelled();
switch (method) {
case StatementMethod.custom:

View File

@ -2,10 +2,12 @@
import 'package:moor/ffi.dart';
import 'package:moor/isolate.dart';
import 'package:moor/moor.dart';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';
DatabaseConnection createConnection() {
var counter = 0;
final loggedValues = <int>[];
return DatabaseConnection.fromExecutor(
VmDatabase.memory(
@ -18,6 +20,19 @@ DatabaseConnection createConnection() {
functionName: 'get_counter',
function: (args) => counter,
);
rawDb.createFunction(
functionName: 'log_value',
function: (args) {
final value = args.single as int;
loggedValues.add(value);
return value;
},
);
rawDb.createFunction(
functionName: 'get_values',
function: (args) => loggedValues.join(','),
);
},
),
);
@ -34,20 +49,19 @@ class EmptyDb extends GeneratedDatabase {
void main() {
moorRuntimeOptions.dontWarnAboutMultipleDatabases = true;
var i = 0;
String slowQuery() => '''
with recursive slow(x) as (values(increment_counter()) union all select x+1 from slow where x < 1000000)
select ${i++} from slow;
'''; // ^ to get different `StreamKey`s
Future<void> runTest(EmptyDb db) async {
String slowQuery(int i) => '''
with recursive slow(x) as (values(increment_counter()) union all select x+1 from slow where x < 1000000)
select $i from slow;
'''; // ^ to get different `StreamKey`s
// Avoid delays caused by opening the database to interfere with the
// cancellation mechanism (we need to react to cancellations quicker if the
// db is already open, which is what we want to test)
await db.doWhenOpened((e) {});
final subscriptions = List.generate(
4, (_) => db.customSelect(slowQuery()).watch().listen(null));
4, (i) => db.customSelect(slowQuery(i)).watch().listen(null));
await pumpEventQueue();
await Future.wait(subscriptions.map((e) => e.cancel()));
@ -58,7 +72,7 @@ void main() {
// One slow query is ok if the cancellation wasn't quick enough, we just
// shouldn't run all 4 of them.
expect(amountOfSlowQueries, anyOf(0, 1));
expect(amountOfSlowQueries, isNot(4));
}
group('stream queries are aborted on cancellations', () {
@ -70,4 +84,35 @@ void main() {
await runTest(db);
});
});
test('together with switchMap', () async {
String slowQuery(int i) => '''
with recursive slow(x) as (values(log_value($i)) union all select x+1 from slow where x < 1000000)
select $i from slow;
''';
final isolate = await MoorIsolate.spawn(createConnection);
addTearDown(isolate.shutdownAll);
final db = EmptyDb.connect(await isolate.connect());
await db.customSelect('select 1').getSingle();
final filter = BehaviorSubject<int>();
addTearDown(filter.close);
filter
.switchMap((value) => db.customSelect(slowQuery(value)).watch())
.listen(null);
for (var i = 0; i < 4; i++) {
filter.add(i);
await pumpEventQueue();
}
final values = await db
.customSelect('select get_values() r')
.map((row) => row.read<String>('r'))
.getSingle();
expect(values, '0,3');
});
}