Fix streams still emitting data after cancel (#766)

This commit is contained in:
Simon Binder 2020-08-15 21:19:29 +02:00
parent d0b13c43eb
commit 9ed52f8d1d
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
2 changed files with 90 additions and 19 deletions

View File

@ -31,42 +31,53 @@ class _StartWithValueStream<T> extends Stream<T> {
@override @override
StreamSubscription<T> listen(void Function(T event) onData, StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) { {Function onError, void Function() onDone, bool cancelOnError}) {
// We do cancel this subscription when the wrapper is cancelled.
// ignore: cancel_subscriptions
final subscription = _inner.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
final data = _value(); final data = _value();
return _StartWithValueSubscription(subscription, data, onData); return _StartWithValueSubscription(_inner, data, onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
} }
} }
class _StartWithValueSubscription<T> extends StreamSubscription<T> { class _StartWithValueSubscription<T> extends StreamSubscription<T> {
final StreamSubscription<T> _inner; StreamSubscription<T> _inner;
final T initialData; final T initialData;
bool receivedDataFromInner = false; bool needsInitialData = true;
void Function(T data) _onData; void Function(T data) _onData;
_StartWithValueSubscription(this._inner, this.initialData, this._onData) { _StartWithValueSubscription(
Stream<T> innerStream, this.initialData, this._onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
_inner = innerStream.listen(_wrappedDataCallback(_onData),
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
// Dart's stream contract specifies that listeners are only notified // Dart's stream contract specifies that listeners are only notified
// after the .listen() code completes. So, we add the initial data in // after the .listen() code completes. So, we add the initial data in
// a later microtask. // a later microtask.
if (initialData != null) { if (initialData != null) {
scheduleMicrotask(() { scheduleMicrotask(() {
if (!receivedDataFromInner) { if (needsInitialData) {
_onData?.call(initialData); _onData?.call(initialData);
receivedDataFromInner = true; needsInitialData = false;
} }
}); });
} }
} }
void Function(T data) _wrappedDataCallback(void Function(T data) onData) {
return (event) {
needsInitialData = false;
onData?.call(event);
};
}
@override @override
Future<E> asFuture<E>([E futureValue]) => _inner.asFuture(); Future<E> asFuture<E>([E futureValue]) => _inner.asFuture();
@override @override
Future<void> cancel() => _inner.cancel(); Future<void> cancel() {
needsInitialData = false;
return _inner.cancel();
}
@override @override
bool get isPaused => _inner.isPaused; bool get isPaused => _inner.isPaused;
@ -75,12 +86,7 @@ class _StartWithValueSubscription<T> extends StreamSubscription<T> {
void onData(void Function(T data) handleData) { void onData(void Function(T data) handleData) {
_onData = handleData; _onData = handleData;
void wrappedCallback(T event) { _inner.onData(_wrappedDataCallback(handleData));
receivedDataFromInner = true;
handleData?.call(event);
}
_inner.onData(wrappedCallback);
} }
@override @override
@ -90,7 +96,10 @@ class _StartWithValueSubscription<T> extends StreamSubscription<T> {
void onError(Function handleError) => _inner.onError(handleError); void onError(Function handleError) => _inner.onError(handleError);
@override @override
void pause([Future<void> resumeSignal]) => _inner.pause(resumeSignal); void pause([Future<void> resumeSignal]) {
needsInitialData = false;
_inner.pause(resumeSignal);
}
@override @override
void resume() => _inner.resume(); void resume() => _inner.resume();

View File

@ -0,0 +1,62 @@
import 'dart:async';
import 'package:moor/src/utils/start_with_value_transformer.dart';
import 'package:test/test.dart';
void main() {
/// Create a stream emitting a single event after one event loop iteration.
Stream streamForTests() {
return Stream.fromFuture(Future.delayed(Duration.zero, () => 1));
}
test('emits initial data after one microtask', () {
final stream =
streamForTests().transform(StartWithValueTransformer(() => 0));
final events = [];
stream.listen(events.add);
expect(events, isEmpty);
final testCompleter = Completer();
scheduleMicrotask(() {
expect(events, isNotEmpty);
expect(events.first, 0);
testCompleter.complete();
});
// Don't finish the test until the microtask fired
return testCompleter.future;
});
test('does not emit data if the source stream is faster', () {
final stream = Future.sync(() => 1)
.asStream()
.transform(StartWithValueTransformer(() => 0));
expect(stream.first, completion(1));
});
group('does not emit initial data', () {
test('if the subscription was cancelled', () async {
final stream =
streamForTests().transform(StartWithValueTransformer(() => 0));
final events = [];
await stream.listen(events.add).cancel();
await pumpEventQueue();
expect(events, isEmpty);
});
test('if the subscription is paused', () async {
final stream =
streamForTests().transform(StartWithValueTransformer(() => 0));
final events = [];
stream.listen(events.add).pause();
await pumpEventQueue();
expect(events, isEmpty);
});
});
}