diff --git a/drift/lib/src/runtime/executor/stream_queries.dart b/drift/lib/src/runtime/executor/stream_queries.dart index 7810808c..98aad1b7 100644 --- a/drift/lib/src/runtime/executor/stream_queries.dart +++ b/drift/lib/src/runtime/executor/stream_queries.dart @@ -182,49 +182,49 @@ class QueryStream { final QueryStreamFetcher _fetcher; final StreamQueryStore _store; - final List> _activeListeners = []; + final List<_QueryStreamListener> _listeners = []; int _pausedListeners = 0; + int get _activeListeners => _listeners.length - _pausedListeners; + // We're using a Stream.multi to implement a broadcast-ish stream with per- // subscription pauses. late final Stream<_Row> stream = Stream.multi( (listener) { + final queryListener = _QueryStreamListener(listener); + if (_isClosed) { listener.closeSync(); return; } - var isPaused = false; - // When this callback is called we have a new listener, so invoke the // handler now. - _activeListeners.add(listener); - _onListenOrResume(listener); + _listeners.add(queryListener); + _onListenOrResume(queryListener); listener ..onPause = () { - assert(!isPaused); - isPaused = true; + assert(!queryListener.isPaused); + queryListener.isPaused = true; _pausedListeners++; - _activeListeners.remove(listener); _onCancelOrPause(); } ..onCancel = () { - if (isPaused) { + if (queryListener.isPaused) { _pausedListeners--; } - _activeListeners.remove(listener); + _listeners.remove(queryListener); _onCancelOrPause(); } ..onResume = () { - assert(isPaused); - isPaused = false; + assert(queryListener.isPaused); + queryListener.isPaused = false; _pausedListeners--; - _activeListeners.add(listener); - _onListenOrResume(listener); + _onListenOrResume(queryListener); }; }, isBroadcast: true, @@ -240,7 +240,7 @@ class QueryStream { QueryStream(this._fetcher, this._store); - void _onListenOrResume(MultiStreamController<_Row> newListener) { + void _onListenOrResume(_QueryStreamListener newListener) { // First listener, start fetching data _store.markAsOpened(this); @@ -261,7 +261,7 @@ class QueryStream { // that case, we still want to invalidate cached data but there's no // point in fetching new data now. We'll load the query again after // a listener unpauses. - if (_activeListeners.isNotEmpty) { + if (_activeListeners > 0) { fetchAndEmitData(); } }); @@ -278,7 +278,8 @@ class QueryStream { } void _onCancelOrPause() { - if (_activeListeners.isEmpty && _pausedListeners == 0) { + if (_listeners.isEmpty) { + // Last listener has stopped listening properly (not just a pause) _store.markAsClosed(this, () { // last listener gone, dispose _tablesChangedSubscription?.cancel(); @@ -306,12 +307,16 @@ class QueryStream { if (data == null) return; _lastData = data; - for (final listener in _activeListeners) { - listener.add(data); + for (final listener in _listeners) { + if (!listener.isPaused) { + listener.add(data); + } } } catch (e, s) { - for (final listener in _activeListeners) { - listener.addError(e, s); + for (final listener in _listeners) { + if (!listener.isPaused) { + listener.controller.addError(e, s); + } } } finally { _runningOperations.remove(operation); @@ -320,10 +325,26 @@ class QueryStream { void close() { _isClosed = true; - for (final listener in _activeListeners) { - listener.close(); + for (final listener in _listeners) { + listener.controller.close(); + } + _listeners.clear(); + } +} + +class _QueryStreamListener { + final MultiStreamController<_Row> controller; + _Row? lastEvent; + bool isPaused = false; + + _QueryStreamListener(this.controller); + + void add(_Row row) { + // Don't emit events that have already been dispatched to this listener. + if (!identical(row, lastEvent)) { + lastEvent = row; + controller.add(row); } - _activeListeners.clear(); } } diff --git a/drift/test/streams_test.dart b/drift/test/streams_test.dart index ce12e8b0..d64485c0 100644 --- a/drift/test/streams_test.dart +++ b/drift/test/streams_test.dart @@ -81,6 +81,50 @@ void main() { verifyNever(executor.runSelect(any, any)); }); + test('does not emit cached data when resuming and data did not change', + () async { + final stream = db.select(db.users).watch(); + final completer = Completer(); + + final subscription = stream.listen(expectAsync1((data) { + completer.complete(); + })); + + // The stream should emit a first event as we have a new subscription + await completer.future; + + subscription.pause(); + await pumpEventQueue(); + + // Resume and wait for a bit. The stream should not emit a second event. + subscription.resume(); + await pumpEventQueue(); + await subscription.cancel(); + }); + + test('emits new data if it changed during a paused subscription', () async { + final stream = db.select(db.users).watch(); + final completer = Completer(); + + final subscription = stream.listen(expectAsync1((data) { + if (!completer.isCompleted) completer.complete(); + }, count: 2)); + + // The stream should emit a first event as we have a new subscription + await completer.future; + + subscription.pause(); + await pumpEventQueue(); + db.markTablesUpdated({db.users}); + await pumpEventQueue(); + + // Resume and wait for a bit. The stream should update as one of its sources + // has changed. + subscription.resume(); + await pumpEventQueue(); + await subscription.cancel(); + }); + group('updating clears cached data', () { test('when an older stream is no longer listened to', () async { when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));