Don't emit stale data for resuming subscriptions

This commit is contained in:
Simon Binder 2021-11-22 21:20:00 +01:00
parent 1eb36eaaea
commit 871f1e4198
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
2 changed files with 89 additions and 24 deletions

View File

@ -182,49 +182,49 @@ class QueryStream {
final QueryStreamFetcher _fetcher;
final StreamQueryStore _store;
final List<MultiStreamController<_Row>> _activeListeners = [];
final List<_QueryStreamListener> _listeners = [];
int _pausedListeners = 0;
int get _activeListeners => _listeners.length - _pausedListeners;
// We're using a Stream.multi to implement a broadcast-ish stream with per-
// subscription pauses.
late final Stream<_Row> stream = Stream.multi(
(listener) {
final queryListener = _QueryStreamListener(listener);
if (_isClosed) {
listener.closeSync();
return;
}
var isPaused = false;
// When this callback is called we have a new listener, so invoke the
// handler now.
_activeListeners.add(listener);
_onListenOrResume(listener);
_listeners.add(queryListener);
_onListenOrResume(queryListener);
listener
..onPause = () {
assert(!isPaused);
isPaused = true;
assert(!queryListener.isPaused);
queryListener.isPaused = true;
_pausedListeners++;
_activeListeners.remove(listener);
_onCancelOrPause();
}
..onCancel = () {
if (isPaused) {
if (queryListener.isPaused) {
_pausedListeners--;
}
_activeListeners.remove(listener);
_listeners.remove(queryListener);
_onCancelOrPause();
}
..onResume = () {
assert(isPaused);
isPaused = false;
assert(queryListener.isPaused);
queryListener.isPaused = false;
_pausedListeners--;
_activeListeners.add(listener);
_onListenOrResume(listener);
_onListenOrResume(queryListener);
};
},
isBroadcast: true,
@ -240,7 +240,7 @@ class QueryStream {
QueryStream(this._fetcher, this._store);
void _onListenOrResume(MultiStreamController<_Row> newListener) {
void _onListenOrResume(_QueryStreamListener newListener) {
// First listener, start fetching data
_store.markAsOpened(this);
@ -261,7 +261,7 @@ class QueryStream {
// that case, we still want to invalidate cached data but there's no
// point in fetching new data now. We'll load the query again after
// a listener unpauses.
if (_activeListeners.isNotEmpty) {
if (_activeListeners > 0) {
fetchAndEmitData();
}
});
@ -278,7 +278,8 @@ class QueryStream {
}
void _onCancelOrPause() {
if (_activeListeners.isEmpty && _pausedListeners == 0) {
if (_listeners.isEmpty) {
// Last listener has stopped listening properly (not just a pause)
_store.markAsClosed(this, () {
// last listener gone, dispose
_tablesChangedSubscription?.cancel();
@ -306,12 +307,16 @@ class QueryStream {
if (data == null) return;
_lastData = data;
for (final listener in _activeListeners) {
listener.add(data);
for (final listener in _listeners) {
if (!listener.isPaused) {
listener.add(data);
}
}
} catch (e, s) {
for (final listener in _activeListeners) {
listener.addError(e, s);
for (final listener in _listeners) {
if (!listener.isPaused) {
listener.controller.addError(e, s);
}
}
} finally {
_runningOperations.remove(operation);
@ -320,10 +325,26 @@ class QueryStream {
void close() {
_isClosed = true;
for (final listener in _activeListeners) {
listener.close();
for (final listener in _listeners) {
listener.controller.close();
}
_listeners.clear();
}
}
class _QueryStreamListener {
final MultiStreamController<_Row> controller;
_Row? lastEvent;
bool isPaused = false;
_QueryStreamListener(this.controller);
void add(_Row row) {
// Don't emit events that have already been dispatched to this listener.
if (!identical(row, lastEvent)) {
lastEvent = row;
controller.add(row);
}
_activeListeners.clear();
}
}

View File

@ -81,6 +81,50 @@ void main() {
verifyNever(executor.runSelect(any, any));
});
test('does not emit cached data when resuming and data did not change',
() async {
final stream = db.select(db.users).watch();
final completer = Completer();
final subscription = stream.listen(expectAsync1((data) {
completer.complete();
}));
// The stream should emit a first event as we have a new subscription
await completer.future;
subscription.pause();
await pumpEventQueue();
// Resume and wait for a bit. The stream should not emit a second event.
subscription.resume();
await pumpEventQueue();
await subscription.cancel();
});
test('emits new data if it changed during a paused subscription', () async {
final stream = db.select(db.users).watch();
final completer = Completer();
final subscription = stream.listen(expectAsync1((data) {
if (!completer.isCompleted) completer.complete();
}, count: 2));
// The stream should emit a first event as we have a new subscription
await completer.future;
subscription.pause();
await pumpEventQueue();
db.markTablesUpdated({db.users});
await pumpEventQueue();
// Resume and wait for a bit. The stream should update as one of its sources
// has changed.
subscription.resume();
await pumpEventQueue();
await subscription.cancel();
});
group('updating clears cached data', () {
test('when an older stream is no longer listened to', () async {
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));