From 01771758789070c9deb8401fe72b925a7087dba6 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 26 May 2023 20:40:03 +0200 Subject: [PATCH] Avoid internal uses of `asyncMap` --- .../runtime/query_builder/query_builder.dart | 1 + .../query_builder/statements/query.dart | 37 +-------------- .../statements/select/select.dart | 2 +- .../statements/select/select_with_join.dart | 2 +- drift/lib/src/utils/async_map.dart | 45 +++++++++++++++++++ drift/test/database/streams_test.dart | 9 ++-- 6 files changed, 55 insertions(+), 41 deletions(-) create mode 100644 drift/lib/src/utils/async_map.dart diff --git a/drift/lib/src/runtime/query_builder/query_builder.dart b/drift/lib/src/runtime/query_builder/query_builder.dart index 152644a8..2150aa1a 100644 --- a/drift/lib/src/runtime/query_builder/query_builder.dart +++ b/drift/lib/src/runtime/query_builder/query_builder.dart @@ -15,6 +15,7 @@ import 'package:drift/src/runtime/exceptions.dart'; import 'package:drift/src/runtime/executor/stream_queries.dart'; import 'package:drift/src/runtime/types/converters.dart'; import 'package:drift/src/runtime/types/mapping.dart'; +import 'package:drift/src/utils/async_map.dart'; import 'package:drift/src/utils/single_transformer.dart'; import 'package:meta/meta.dart'; diff --git a/drift/lib/src/runtime/query_builder/statements/query.dart b/drift/lib/src/runtime/query_builder/statements/query.dart index 54b2d298..542c7bc9 100644 --- a/drift/lib/src/runtime/query_builder/statements/query.dart +++ b/drift/lib/src/runtime/query_builder/statements/query.dart @@ -304,41 +304,8 @@ class _AsyncMappedSelectable extends Selectable { @override Stream> watch() { - final source = _source.watch(); - - // The easiest thing to do here would be to just - // `source.watch().asyncMap(_mapResults)`. However, since _source is - // typically a broadcast stream, asyncMap also uses a broadcast stream - // controller internally which will not generally call `onListen` multiple - // times for multiple stream subscriptions. - // Drift streams are broadcast streams (since they can be listened too - // multiple times), but also special since each subscription receives the - // current snapshot when it gets added. The `asyncMap` implementation in the - // SDK breaks this because listen events don't get forwarded. - // - // So, this small implementation of asyncMap does the same thing while making - // sure the stream returned by this function behaves like one would expect - // drift streams to behave. - return Stream.multi( - (listener) { - late StreamSubscription> subscription; - - void onData(List original) { - subscription.pause(); - _mapResults(original) - .then(listener.addSync, onError: listener.addErrorSync) - .whenComplete(subscription.resume); - } - - subscription = source.listen( - onData, - onError: listener.addErrorSync, - onDone: listener.closeSync, - cancelOnError: false, // Determined by downstream subscription - ); - }, - isBroadcast: source.isBroadcast, - ); + return AsyncMapPerSubscription(_source.watch()) + .asyncMapPerSubscription(_mapResults); } Future> _mapResults(List results) async { diff --git a/drift/lib/src/runtime/query_builder/statements/select/select.dart b/drift/lib/src/runtime/query_builder/statements/select/select.dart index ccdbac1e..7c179024 100644 --- a/drift/lib/src/runtime/query_builder/statements/select/select.dart +++ b/drift/lib/src/runtime/query_builder/statements/select/select.dart @@ -73,7 +73,7 @@ class SimpleSelectStatement extends Query key: StreamKey(query.sql, query.boundVariables), ); - return database.createStream(fetcher).asyncMap(_mapResponse); + return database.createStream(fetcher).asyncMapPerSubscription(_mapResponse); } Future>> _getRaw(GenerationContext ctx) { diff --git a/drift/lib/src/runtime/query_builder/statements/select/select_with_join.dart b/drift/lib/src/runtime/query_builder/statements/select/select_with_join.dart index 034239bf..036ec079 100644 --- a/drift/lib/src/runtime/query_builder/statements/select/select_with_join.dart +++ b/drift/lib/src/runtime/query_builder/statements/select/select_with_join.dart @@ -233,7 +233,7 @@ class JoinedSelectStatement return database .createStream(fetcher) - .asyncMap((rows) => _mapResponse(ctx, rows)); + .asyncMapPerSubscription((rows) => _mapResponse(ctx, rows)); } @override diff --git a/drift/lib/src/utils/async_map.dart b/drift/lib/src/utils/async_map.dart new file mode 100644 index 00000000..ad8adcb9 --- /dev/null +++ b/drift/lib/src/utils/async_map.dart @@ -0,0 +1,45 @@ +import 'dart:async'; + +/// Extension to make the drift-specific version of [asyncMap] available. +extension AsyncMapPerSubscription on Stream { + /// A variant of [Stream.asyncMap] that forwards each subscription of the + /// returned stream to the source (`this`). + /// + /// The `asyncMap` implementation from the SDK uses a broadcast controller + /// when given an input stream that [Stream.isBroadcast]. As broadcast + /// controllers only call `onListen` once, these subscriptions aren't + /// forwarded to the original stream. + /// + /// Drift query streams send the current snapshot to each attaching listener, + /// a behavior that is lost when wrapping these streams in a broadcast stream + /// controller. Since we need the behavior of `asyncMap` internally though, we + /// re-implement it in a simple variant that transforms each subscription + /// individually. + Stream asyncMapPerSubscription(Future Function(S) mapper) { + return Stream.multi( + (listener) { + late StreamSubscription subscription; + + void onData(S original) { + subscription.pause(); + mapper(original) + .then(listener.addSync, onError: listener.addErrorSync) + .whenComplete(subscription.resume); + } + + subscription = listen( + onData, + onError: listener.addErrorSync, + onDone: listener.closeSync, + cancelOnError: false, // Determined by downstream subscription + ); + + listener + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }, + isBroadcast: isBroadcast, + ); + } +} diff --git a/drift/test/database/streams_test.dart b/drift/test/database/streams_test.dart index 91efdd7a..f82216ec 100644 --- a/drift/test/database/streams_test.dart +++ b/drift/test/database/streams_test.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:async/async.dart'; import 'package:drift/drift.dart'; import 'package:drift/src/runtime/api/runtime_api.dart'; import 'package:drift/src/runtime/executor/stream_queries.dart'; @@ -177,13 +178,13 @@ void main() { await first.first; // will listen to stream, then cancel await pumpEventQueue(times: 1); // give cancel event time to propagate - final checkEmits = - expectLater(second, emitsInOrder([[], []])); + final listener = StreamQueue(second); + await expectLater(listener, emits(isEmpty)); db.markTablesUpdated({db.users}); - await pumpEventQueue(times: 1); + await expectLater(listener, emits(isEmpty)); - await checkEmits; + await listener.cancel(); }); test('same stream instance can be listened to multiple times', () async {