Make StartWithValueTransformer subscribe in same microtask

This commit is contained in:
Simon Binder 2019-10-03 11:39:12 +02:00
parent c2845cb248
commit b0d69f346f
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
3 changed files with 17 additions and 20 deletions

View File

@ -29,13 +29,15 @@ class StartWithValueTransformer<T> extends StreamTransformerBase<T, T> {
if (data != null) {
controller.add(data);
}
subscription = stream.listen(
controller.add,
onError: controller.addError,
onDone: controller.close,
);
});
// the .listen will run in a later microtask, so the cached data would
// still be added first.
subscription = stream.listen(
controller.add,
onError: controller.addError,
onDone: controller.close,
);
}
..onCancel = () {
// not using a tear-off here because subscription.cancel is null before

View File

@ -129,7 +129,7 @@ void main() {
expect(db.select(db.todosTable).getSingle(), completion(_todoEntry));
});
test('get multiple times', () async {
test('get multiple times', () {
final resultRows = <List<Map<String, dynamic>>>[
[_dataOfTodoEntry],
[],
@ -141,15 +141,12 @@ void main() {
return Future.value(resultRows[_currentRow++]);
});
final expectation = expectLater(db.select(db.todosTable).watchSingle(),
expectLater(db.select(db.todosTable).watchSingle(),
emitsInOrder([_todoEntry, null, emitsError(anything)]));
await pumpEventQueue(times: 1);
db.markTablesUpdated({db.todosTable});
await pumpEventQueue(times: 1);
db.markTablesUpdated({db.todosTable});
await expectation;
db
..markTablesUpdated({db.todosTable})
..markTablesUpdated({db.todosTable});
});
});
}

View File

@ -14,20 +14,18 @@ void main() {
db = TodoDb(executor);
});
test('streams fetch when the first listener attaches', () async {
test('streams fetch when the first listener attaches', () {
final stream = db.select(db.users).watch();
verifyNever(executor.runSelect(any, any));
stream.listen((_) {});
await pumpEventQueue(times: 1);
verify(executor.runSelect(any, any)).called(1);
});
test('streams fetch when the underlying data changes', () async {
db.select(db.users).watch().listen((_) {});
await pumpEventQueue(times: 1);
db.markTablesUpdated({db.users});
await pumpEventQueue(times: 1);
@ -53,12 +51,12 @@ void main() {
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
final first = (db.select(db.users).watch());
await expectLater(first, emits(isEmpty));
expect(first, emits(isEmpty));
clearInteractions(executor);
final second = (db.select(db.users).watch());
await expectLater(second, emits(isEmpty));
expect(second, emits(isEmpty));
// calling executor.dialect is ok, it's needed to construct the statement
verify(executor.dialect);
@ -109,7 +107,7 @@ void main() {
await first.first; // will listen to stream, then cancel
await pumpEventQueue(times: 1); // give cancel event time to propagate
final checkEmits = expectLater(second, emitsInOrder([[]]));
final checkEmits = expectLater(second, emitsInOrder([[], []]));
db.markTablesUpdated({db.users});
await pumpEventQueue(times: 1);