Make streams emit cached data for multiple listeners

Fixes #178
This commit is contained in:
Simon Binder 2019-10-09 22:39:19 +02:00
parent 720ca43cc4
commit 47f8dbb90d
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
4 changed files with 52 additions and 40 deletions

View File

@ -1,4 +1,5 @@
import 'dart:async'; import 'dart:async';
import 'dart:collection';
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
@ -64,6 +65,7 @@ class StreamKey {
/// them when needed. /// them when needed.
class StreamQueryStore { class StreamQueryStore {
final Map<StreamKey, QueryStream> _activeKeyStreams = {}; final Map<StreamKey, QueryStream> _activeKeyStreams = {};
final HashSet<StreamKey> _keysPendingRemoval = HashSet<StreamKey>();
// Why is this stream synchronous? We want to dispatch table updates before // Why is this stream synchronous? We want to dispatch table updates before
// the future from the query completes. This allows streams to invalidate // the future from the query completes. This allows streams to invalidate
@ -106,16 +108,23 @@ class StreamQueryStore {
void markAsClosed(QueryStream stream) { void markAsClosed(QueryStream stream) {
final key = stream._fetcher.key; final key = stream._fetcher.key;
_keysPendingRemoval.add(key);
scheduleMicrotask(() { scheduleMicrotask(() {
// if no other subscriber was found during this event iteration, remove // if no other subscriber was found during this event iteration, remove
// the stream from the cache. // the stream from the cache.
if (_keysPendingRemoval.contains(key)) {
_keysPendingRemoval.remove(key);
_activeKeyStreams.remove(key); _activeKeyStreams.remove(key);
}
}); });
} }
void markAsOpened(QueryStream stream) { void markAsOpened(QueryStream stream) {
final key = stream._fetcher.key; final key = stream._fetcher.key;
if (key != null) { if (key != null) {
_keysPendingRemoval.remove(key);
_activeKeyStreams[key] = stream; _activeKeyStreams[key] = stream;
} }
} }

View File

@ -15,38 +15,39 @@ class StartWithValueTransformer<T> extends StreamTransformerBase<T, T> {
@override @override
Stream<T> bind(Stream<T> stream) { Stream<T> bind(Stream<T> stream) {
// we're setting sync to true because we're proxying events return _StartWithValueStream(_value, stream);
final controller = StreamController<T>.broadcast(sync: true); }
}
// ignore: cancel_subscriptions class _StartWithValueStream<T> extends Stream<T> {
StreamSubscription subscription; final LatestValue<T> _value;
final Stream<T> _inner;
controller _StartWithValueStream(this._value, this._inner);
..onListen = () {
@override
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
var didReceiveData = false;
final wrappedCallback = (T event) {
didReceiveData = true;
onData?.call(event);
};
final subscription = _inner.listen(wrappedCallback,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
final data = _value();
// Dart's stream contract specifies that listeners are only notified // Dart's stream contract specifies that listeners are only notified
// after the .listen() code completes. So, we add the initial data in // after the .listen() code completes. So, we add the initial data in
// a later microtask. // a later microtask.
scheduleMicrotask(() { scheduleMicrotask(() {
final data = _value(); if (data != null && !didReceiveData) {
if (data != null) { onData?.call(data);
controller.add(data); didReceiveData = true;
} }
}); });
// the .listen will run in a later microtask, so the cached data would return subscription;
// still be added first.
subscription = stream.listen(
controller.add,
onError: controller.addError,
onDone: controller.close,
);
}
..onCancel = () {
// not using a tear-off here because subscription.cancel is null before
// onListen has been called
subscription?.cancel();
};
return controller.stream;
} }
} }

View File

@ -1,6 +1,6 @@
import 'dart:async'; import 'dart:async';
import 'package:moor/moor.dart'; import 'package:moor/moor.dart' hide isNull;
import 'package:test/test.dart'; import 'package:test/test.dart';
import 'data/tables/todos.dart'; import 'data/tables/todos.dart';
@ -142,7 +142,7 @@ void main() {
}); });
expectLater(db.select(db.todosTable).watchSingle(), expectLater(db.select(db.todosTable).watchSingle(),
emitsInOrder([_todoEntry, null, emitsError(anything)])); emitsInOrder([_todoEntry, isNull, emitsError(anything)]));
db db
..markTablesUpdated({db.todosTable}) ..markTablesUpdated({db.todosTable})

View File

@ -115,15 +115,17 @@ void main() {
await checkEmits; await checkEmits;
}); });
test('streams can be reused after a listener detaches', () async { test('same stream instance can be listened to multiple times', () async {
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([])); when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
final stream = db.select(db.users).watch(); final stream = db.select(db.users).watch();
await stream.first; // listen to stream, then cancel final firstSub = stream.take(2).listen(null); // will listen forever
await stream.first; // listen again final second = await stream.first;
verify(executor.runSelect(any, any)).called(1); // cached, only called once expect(second, isEmpty);
verify(executor.runSelect(any, any)).called(1);
await firstSub.cancel();
}); });
test('streams are disposed when not listening for a while', () async { test('streams are disposed when not listening for a while', () async {