mirror of https://github.com/AMT-Cheif/drift.git
Invalidate cached stream data on update (#166)
This commit is contained in:
parent
a74cc3b624
commit
c2845cb248
|
@ -61,8 +61,14 @@ class StreamKey {
|
|||
/// them when needed.
|
||||
class StreamQueryStore {
|
||||
final Map<StreamKey, QueryStream> _activeKeyStreams = {};
|
||||
|
||||
// Why is this stream synchronous? We want to dispatch table updates before
|
||||
// the future from the query completes. This allows streams to invalidate
|
||||
// their cached data before the user can send another query.
|
||||
// There shouldn't be a problem as this stream is not exposed in any user-
|
||||
// facing api.
|
||||
final StreamController<Set<String>> _updatedTableNames =
|
||||
StreamController.broadcast();
|
||||
StreamController.broadcast(sync: true);
|
||||
|
||||
StreamQueryStore();
|
||||
|
||||
|
@ -151,7 +157,11 @@ class QueryStream<T> {
|
|||
final names = _fetcher.readsFrom.map((t) => t.actualTableName).toSet();
|
||||
_tablesChangedSubscription = _store._updatedTableNames.stream
|
||||
.where((changed) => changed.any(names.contains))
|
||||
.listen((_) => fetchAndEmitData());
|
||||
.listen((_) {
|
||||
// table has changed, invalidate cache
|
||||
_lastData = null;
|
||||
fetchAndEmitData();
|
||||
});
|
||||
}
|
||||
|
||||
void _onCancel() {
|
||||
|
@ -159,6 +169,12 @@ class QueryStream<T> {
|
|||
_tablesChangedSubscription?.cancel();
|
||||
_tablesChangedSubscription = null;
|
||||
|
||||
// we don't listen for table updates anymore, and we're guaranteed to
|
||||
// re-fetch data after a new listener comes in. We can't know if the table
|
||||
// was updated in the meantime, but let's delete the cached data just in
|
||||
// case
|
||||
_lastData = null;
|
||||
|
||||
_store.markAsClosed(this);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,14 +21,13 @@ class StartWithValueTransformer<T> extends StreamTransformerBase<T, T> {
|
|||
|
||||
controller
|
||||
..onListen = () {
|
||||
final data = _value();
|
||||
if (data != null) {
|
||||
// Dart's stream contract specifies that listeners are only notified
|
||||
// after the .listen() code completes. So, we add the initial data in
|
||||
// a later microtask.
|
||||
scheduleMicrotask(() {
|
||||
final data = _value();
|
||||
if (data != null) {
|
||||
controller.add(data);
|
||||
});
|
||||
}
|
||||
|
||||
subscription = stream.listen(
|
||||
|
@ -36,6 +35,7 @@ class StartWithValueTransformer<T> extends StreamTransformerBase<T, T> {
|
|||
onError: controller.addError,
|
||||
onDone: controller.close,
|
||||
);
|
||||
});
|
||||
}
|
||||
..onCancel = () {
|
||||
// not using a tear-off here because subscription.cancel is null before
|
||||
|
|
|
@ -129,7 +129,7 @@ void main() {
|
|||
expect(db.select(db.todosTable).getSingle(), completion(_todoEntry));
|
||||
});
|
||||
|
||||
test('get multiple times', () {
|
||||
test('get multiple times', () async {
|
||||
final resultRows = <List<Map<String, dynamic>>>[
|
||||
[_dataOfTodoEntry],
|
||||
[],
|
||||
|
@ -141,12 +141,15 @@ void main() {
|
|||
return Future.value(resultRows[_currentRow++]);
|
||||
});
|
||||
|
||||
expectLater(db.select(db.todosTable).watchSingle(),
|
||||
final expectation = expectLater(db.select(db.todosTable).watchSingle(),
|
||||
emitsInOrder([_todoEntry, null, emitsError(anything)]));
|
||||
|
||||
db
|
||||
..markTablesUpdated({db.todosTable})
|
||||
..markTablesUpdated({db.todosTable});
|
||||
await pumpEventQueue(times: 1);
|
||||
db.markTablesUpdated({db.todosTable});
|
||||
await pumpEventQueue(times: 1);
|
||||
db.markTablesUpdated({db.todosTable});
|
||||
|
||||
await expectation;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -14,18 +14,20 @@ void main() {
|
|||
db = TodoDb(executor);
|
||||
});
|
||||
|
||||
test('streams fetch when the first listener attaches', () {
|
||||
test('streams fetch when the first listener attaches', () async {
|
||||
final stream = db.select(db.users).watch();
|
||||
|
||||
verifyNever(executor.runSelect(any, any));
|
||||
|
||||
stream.listen((_) {});
|
||||
|
||||
await pumpEventQueue(times: 1);
|
||||
verify(executor.runSelect(any, any)).called(1);
|
||||
});
|
||||
|
||||
test('streams fetch when the underlying data changes', () async {
|
||||
db.select(db.users).watch().listen((_) {});
|
||||
await pumpEventQueue(times: 1);
|
||||
|
||||
db.markTablesUpdated({db.users});
|
||||
await pumpEventQueue(times: 1);
|
||||
|
@ -39,6 +41,7 @@ void main() {
|
|||
final second = db.alias(db.users, 'two');
|
||||
|
||||
db.select(first).watch().listen((_) {});
|
||||
await pumpEventQueue(times: 1);
|
||||
|
||||
db.markTablesUpdated({second});
|
||||
await pumpEventQueue(times: 1);
|
||||
|
@ -50,19 +53,53 @@ void main() {
|
|||
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
||||
|
||||
final first = (db.select(db.users).watch());
|
||||
expect(first, emits(isEmpty));
|
||||
await expectLater(first, emits(isEmpty));
|
||||
|
||||
clearInteractions(executor);
|
||||
|
||||
final second = (db.select(db.users).watch());
|
||||
expect(second, emits(isEmpty));
|
||||
await expectLater(second, emits(isEmpty));
|
||||
|
||||
await pumpEventQueue(times: 1);
|
||||
// calling executor.dialect is ok, it's needed to construct the statement
|
||||
verify(executor.dialect);
|
||||
verifyNoMoreInteractions(executor);
|
||||
});
|
||||
|
||||
group('updating clears cached data', () {
|
||||
test('when an older stream is no longer listened to', () async {
|
||||
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
||||
final first = db.select(db.categories).watch();
|
||||
await first.first; // subscribe to first stream, then drop subscription
|
||||
|
||||
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([
|
||||
{'id': 1, 'description': 'd'}
|
||||
]));
|
||||
await db
|
||||
.into(db.categories)
|
||||
.insert(CategoriesCompanion.insert(description: 'd'));
|
||||
|
||||
final second = db.select(db.categories).watch();
|
||||
expect(second.first, completion(isNotEmpty));
|
||||
});
|
||||
|
||||
test('when an older stream is still listened to', () async {
|
||||
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
||||
final first = db.select(db.categories).watch();
|
||||
final subscription = first.listen((_) {});
|
||||
|
||||
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([
|
||||
{'id': 1, 'description': 'd'}
|
||||
]));
|
||||
await db
|
||||
.into(db.categories)
|
||||
.insert(CategoriesCompanion.insert(description: 'd'));
|
||||
|
||||
final second = db.select(db.categories).watch();
|
||||
expect(second.first, completion(isNotEmpty));
|
||||
await subscription.cancel();
|
||||
});
|
||||
});
|
||||
|
||||
test('every stream instance can be listened to', () async {
|
||||
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
||||
|
||||
|
@ -72,7 +109,7 @@ void main() {
|
|||
await first.first; // will listen to stream, then cancel
|
||||
await pumpEventQueue(times: 1); // give cancel event time to propagate
|
||||
|
||||
final checkEmits = expectLater(second, emitsInOrder([[], []]));
|
||||
final checkEmits = expectLater(second, emitsInOrder([[]]));
|
||||
|
||||
db.markTablesUpdated({db.users});
|
||||
await pumpEventQueue(times: 1);
|
||||
|
@ -99,6 +136,7 @@ void main() {
|
|||
await stream.first; // listen to stream, then cancel
|
||||
await pumpEventQueue(); // should remove the stream from the cache
|
||||
await stream.first; // listen again
|
||||
await pumpEventQueue(times: 1);
|
||||
|
||||
verify(executor.runSelect(any, any)).called(2);
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue