Make query streams emit their last item for new listeners

This commit is contained in:
Simon Binder 2019-07-02 21:46:04 +02:00
parent 3024157ec9
commit 59235783c3
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
3 changed files with 56 additions and 1 deletions

View File

@ -1,3 +1,6 @@
## 1.5.1
- Fixed an issue where streams would behave inconsistently when transformed.
## 1.5.0
This version introduces some new concepts and features, which are explained in more detail below.
Here is a quick overview of the new features:

View File

@ -3,6 +3,7 @@ import 'dart:async';
import 'package:collection/collection.dart';
import 'package:meta/meta.dart';
import 'package:moor/moor.dart';
import 'package:moor/src/utils/start_with_value_transformer.dart';
const _listEquality = ListEquality<dynamic>();
@ -120,17 +121,24 @@ class QueryStream<T> {
// caching the stream so that the stream getter always returns the same stream
Stream<T> _stream;
T _lastData;
Stream<T> get stream {
_controller ??= StreamController.broadcast(
onListen: _onListen,
onCancel: _onCancel,
);
return _stream ??= _controller.stream;
return _stream ??=
_controller.stream.transform(StartWithValueTransformer(_cachedData));
}
QueryStream(this._fetcher, this._store);
/// Called when we have a new listener, makes the stream query behave similar
/// to an `BehaviorSubject` from rxdart.
T _cachedData() => _lastData;
void _onListen() {
// first listener added, fetch query
fetchAndEmitData();
@ -151,6 +159,7 @@ class QueryStream<T> {
if (!_controller.hasListener) return;
final data = await _fetcher.fetchData();
_lastData = data;
if (!_controller.isClosed) {
_controller.add(data);

View File

@ -0,0 +1,43 @@
import 'dart:async';
/// Signature of a function that returns the latest current value of a
/// [StartWithValueTransformer].
typedef LatestValue<T> = T Function();
/// Lightweight implementation that turns a [StreamController] into a behavior
/// subject (we try to avoid depending on rxdart because of its size).
class StartWithValueTransformer<T> extends StreamTransformerBase<T, T> {
final LatestValue<T> _value;
StartWithValueTransformer(this._value);
@override
Stream<T> bind(Stream<T> stream) {
// we're setting sync to true because we're proxying events
final controller = StreamController<T>.broadcast(sync: true);
// ignore: cancel_subscriptions
StreamSubscription subscription;
controller
..onListen = () {
final data = _value();
if (data != null) {
controller.add(data);
}
subscription = stream.listen(
controller.add,
onError: controller.addError,
onDone: controller.close,
);
}
..onCancel = () {
// not using a tear-off here because subscription.cancel is null before
// onListen has been called
subscription?.cancel();
};
return controller.stream;
}
}