Fix stream queries in `DelayedStreamQueryStore`

This commit is contained in:
Simon Binder 2022-11-19 18:40:36 +01:00
parent 72a85b799c
commit f41cff5fc6
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
3 changed files with 46 additions and 7 deletions

View File

@ -72,7 +72,9 @@ abstract class _BaseExecutor extends QueryExecutor {
final id = client._channel.newRequestId();
// otherwise, send the request now and cancel it later, if that's desired
doOnCancellation(() {
client._channel.request(RequestCancellation(id));
client._channel.request<void>(RequestCancellation(id)).onError((_, __) {
// Couldn't be cancelled. Ok then.
});
});
return client._channel.request<T>(

View File

@ -36,18 +36,35 @@ class DelayedStreamQueryStore implements StreamQueryStore {
throw UnimplementedError('The stream will call this on the delegate');
}
Stream<T> _delegateStream<T>(
Stream<T> Function(StreamQueryStore store) createStream) {
if (_resolved != null) {
return createStream(_resolved!);
} else {
// Note: We can't use Stream.fromFuture(...).asyncExpand() since it is a
// single-subscription stream.
// `.asBroadcastStream()` doesn't work either because the internal caching
// breaks query streams which need to know about live subscribers.
return Stream.multi(
(listener) async {
final store = await _delegate;
if (!listener.isClosed) {
await listener.addStream(createStream(store));
}
},
isBroadcast: true,
);
}
}
@override
Stream<List<Map<String, Object?>>> registerStream(
QueryStreamFetcher fetcher) {
return Stream.fromFuture(_delegate)
.asyncExpand((resolved) => resolved.registerStream(fetcher))
.asBroadcastStream();
return _delegateStream((store) => store.registerStream(fetcher));
}
@override
Stream<Set<TableUpdate>> updatesForSync(TableUpdateQuery query) {
return Stream.fromFuture(_delegate)
.asyncExpand((resolved) => resolved.updatesForSync(query))
.asBroadcastStream();
return _delegateStream((store) => store.updatesForSync(query));
}
}

View File

@ -193,6 +193,26 @@ void _runTests(FutureOr<DriftIsolate> Function() spawner, bool terminateIsolate,
);
});
test('stream queries can be listened to multiple times', () async {
// Regression test for https://github.com/simolus3/drift/issues/2158
final stream = database
.customSelect('select 1 as x')
.map((x) => x.read<int>('x'))
.watchSingle();
Future<void> listenThenCancel() async {
final result = await stream.first.timeout(
const Duration(seconds: 2),
onTimeout: () => fail('timed out!'),
);
expect(result, equals(1));
}
await listenThenCancel();
await pumpEventQueue();
await listenThenCancel(); // times out here when using DatabaseConnection.delayed
});
test('can start transactions', () async {
final initialCompanion = TodosTableCompanion.insert(content: 'my content');