Don't await StreamController.close() when closing streams

This commit is contained in:
Simon Binder 2020-02-19 20:01:24 +01:00
parent 2b780492d3
commit 23585ad920
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
4 changed files with 29 additions and 4 deletions

View File

@ -15,6 +15,8 @@
- __Breaking__: Remove `customSelectStream` from `QueryEngine`. The `customSelect`
method now returns an `Selectable` (like `customSelectQuery`, which in turn has been deprecated).
- Experimentally support IndexedDB to store sqlite data on the web
- Moor will no longer wait for query stream listeners to receive a done event when closing a database
or transaction.
## 2.4.0

View File

@ -5,6 +5,7 @@ import 'package:collection/collection.dart';
import 'package:meta/meta.dart';
import 'package:moor/moor.dart';
import 'package:moor/src/utils/start_with_value_transformer.dart';
import 'package:pedantic/pedantic.dart';
const _listEquality = ListEquality<dynamic>();
@ -161,8 +162,15 @@ class StreamQueryStore {
_isShuttingDown = true;
for (final stream in _activeKeyStreams.values) {
await stream._controller.close();
// Note: StreamController.close waits until the done event has been
// received by a subscriber. If there is a paused StreamSubscription on
// a query stream, this would pause forever. In particular, this is
// causing deadlocks in tests.
// https://github.com/dart-lang/test/issues/1183#issuecomment-588357154
unawaited(stream._controller.close());
}
// awaiting this is fine - the stream is never exposed to users and we don't
// pause any subscriptions on it.
await _updatedTableNames.close();
while (_pendingTimers.isNotEmpty) {
@ -251,7 +259,7 @@ class QueryStream<T> {
}
}
Future<void> close() {
return _controller.close();
void close() {
_controller.close();
}
}

View File

@ -72,7 +72,9 @@ class _TransactionStreamStore extends StreamQueryStore {
parent.handleTableUpdatesByName(affectedTables);
await super.close();
await Future.wait(_queriesWithoutKey.map((e) => e.close()));
for (final query in _queriesWithoutKey) {
query.close();
}
}
}

View File

@ -162,6 +162,19 @@ void main() {
expectLater(result, throwsA(exception));
});
test('database can be closed when a stream has a paused subscription',
() async {
// this test is more relevant than it seems - some test stream matchers
// leave the stream in an empty state.
final stream = db.select(db.users).watch();
final subscription = stream.listen((_) {})..pause();
await db.close();
subscription.resume();
await subscription.cancel();
});
group('stream keys', () {
final keyA = StreamKey('SELECT * FROM users;', [], User);
final keyB = StreamKey('SELECT * FROM users;', [], User);