Fix connection pool causing unhandled exceptions

This commit is contained in:
Simon Binder 2023-02-27 17:00:05 +01:00
parent 46390a2da5
commit b137b4065e
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
2 changed files with 30 additions and 27 deletions

View File

@ -29,23 +29,13 @@ abstract class MultiExecutor extends QueryExecutor {
MultiExecutor._(); MultiExecutor._();
} }
class _ExecutorCompleter { class _PendingSelect {
_ExecutorCompleter(this.statement, this.args) _PendingSelect(this.statement, this.args)
: _completer = Completer<List<Map<String, Object?>>>(); : completer = Completer<List<Map<String, Object?>>>();
final String statement; final String statement;
final List<Object?> args; final List<Object?> args;
final Completer<List<Map<String, Object?>>> _completer; final Completer<List<Map<String, Object?>>> completer;
Future<List<Map<String, Object?>>> get future => _completer.future;
void complete([FutureOr<List<Map<String, Object?>>>? value]) {
_completer.complete(value);
}
void completeError(Object error, [StackTrace? stackTrace]) {
_completer.completeError(error, stackTrace);
}
} }
class _QueryExecutorPool { class _QueryExecutorPool {
@ -54,8 +44,8 @@ class _QueryExecutorPool {
final List<QueryExecutor> _executors; final List<QueryExecutor> _executors;
final List<QueryExecutor> _idleExecutors; final List<QueryExecutor> _idleExecutors;
final List<_ExecutorCompleter> _queue = []; final List<_PendingSelect> _queue = [];
final List<_ExecutorCompleter> _running = []; final List<_PendingSelect> _running = [];
Future<bool> ensureOpen(QueryExecutorUser user) async { Future<bool> ensureOpen(QueryExecutorUser user) async {
final result = await Future.wait( final result = await Future.wait(
@ -72,10 +62,10 @@ class _QueryExecutorPool {
return _executors.single.runSelect(statement, args); return _executors.single.runSelect(statement, args);
} }
final executorCompleter = _ExecutorCompleter(statement, args); final executorCompleter = _PendingSelect(statement, args);
_queue.add(executorCompleter); _queue.add(executorCompleter);
_run(); _run();
return executorCompleter.future; return executorCompleter.completer.future;
} }
void _run() { void _run() {
@ -87,15 +77,15 @@ class _QueryExecutorPool {
_running.add(completer); _running.add(completer);
completer.future.whenComplete(() { completer.completer.complete(Future.sync(() async {
_running.remove(completer); try {
_idleExecutors.add(executor); return await executor.runSelect(completer.statement, completer.args);
_run(); } finally {
}); _running.remove(completer);
_idleExecutors.add(executor);
executor _run();
.runSelect(completer.statement, completer.args) }
.then(completer.complete, onError: completer.completeError); }));
} }
} }

View File

@ -115,4 +115,17 @@ void main() {
verify(write.transactions.ensureOpen(any)); verify(write.transactions.ensureOpen(any));
verify(write.transactions.runSelect('select', [])); verify(write.transactions.runSelect('select', []));
}); });
test('select failure does not cause an unhandled exception', () async {
// https://github.com/simolus3/drift/issues/2323
final read2 = MockExecutor();
final multi =
MultiExecutor.withReadPool(reads: [read2, read], write: write);
when(read2.runSelect(any, any)).thenThrow('bang!');
await multi.ensureOpen(db);
expect(multi.runSelect('select 1', []), throwsA(isA<String>()));
});
} }