Avoid internal uses of `asyncMap`

This commit is contained in:
Simon Binder 2023-05-26 20:40:03 +02:00
parent 3b68386992
commit 0177175878
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
6 changed files with 55 additions and 41 deletions

View File

@ -15,6 +15,7 @@ import 'package:drift/src/runtime/exceptions.dart';
import 'package:drift/src/runtime/executor/stream_queries.dart'; import 'package:drift/src/runtime/executor/stream_queries.dart';
import 'package:drift/src/runtime/types/converters.dart'; import 'package:drift/src/runtime/types/converters.dart';
import 'package:drift/src/runtime/types/mapping.dart'; import 'package:drift/src/runtime/types/mapping.dart';
import 'package:drift/src/utils/async_map.dart';
import 'package:drift/src/utils/single_transformer.dart'; import 'package:drift/src/utils/single_transformer.dart';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';

View File

@ -304,41 +304,8 @@ class _AsyncMappedSelectable<S, T> extends Selectable<T> {
@override @override
Stream<List<T>> watch() { Stream<List<T>> watch() {
final source = _source.watch(); return AsyncMapPerSubscription(_source.watch())
.asyncMapPerSubscription(_mapResults);
// The easiest thing to do here would be to just
// `source.watch().asyncMap(_mapResults)`. However, since _source is
// typically a broadcast stream, asyncMap also uses a broadcast stream
// controller internally which will not generally call `onListen` multiple
// times for multiple stream subscriptions.
// Drift streams are broadcast streams (since they can be listened too
// multiple times), but also special since each subscription receives the
// current snapshot when it gets added. The `asyncMap` implementation in the
// SDK breaks this because listen events don't get forwarded.
//
// So, this small implementation of asyncMap does the same thing while making
// sure the stream returned by this function behaves like one would expect
// drift streams to behave.
return Stream.multi(
(listener) {
late StreamSubscription<List<S>> subscription;
void onData(List<S> original) {
subscription.pause();
_mapResults(original)
.then(listener.addSync, onError: listener.addErrorSync)
.whenComplete(subscription.resume);
}
subscription = source.listen(
onData,
onError: listener.addErrorSync,
onDone: listener.closeSync,
cancelOnError: false, // Determined by downstream subscription
);
},
isBroadcast: source.isBroadcast,
);
} }
Future<List<T>> _mapResults(List<S> results) async { Future<List<T>> _mapResults(List<S> results) async {

View File

@ -73,7 +73,7 @@ class SimpleSelectStatement<T extends HasResultSet, D> extends Query<T, D>
key: StreamKey(query.sql, query.boundVariables), key: StreamKey(query.sql, query.boundVariables),
); );
return database.createStream(fetcher).asyncMap(_mapResponse); return database.createStream(fetcher).asyncMapPerSubscription(_mapResponse);
} }
Future<List<Map<String, Object?>>> _getRaw(GenerationContext ctx) { Future<List<Map<String, Object?>>> _getRaw(GenerationContext ctx) {

View File

@ -233,7 +233,7 @@ class JoinedSelectStatement<FirstT extends HasResultSet, FirstD>
return database return database
.createStream(fetcher) .createStream(fetcher)
.asyncMap((rows) => _mapResponse(ctx, rows)); .asyncMapPerSubscription((rows) => _mapResponse(ctx, rows));
} }
@override @override

View File

@ -0,0 +1,45 @@
import 'dart:async';
/// Extension to make the drift-specific version of [asyncMap] available.
extension AsyncMapPerSubscription<S> on Stream<S> {
/// A variant of [Stream.asyncMap] that forwards each subscription of the
/// returned stream to the source (`this`).
///
/// The `asyncMap` implementation from the SDK uses a broadcast controller
/// when given an input stream that [Stream.isBroadcast]. As broadcast
/// controllers only call `onListen` once, these subscriptions aren't
/// forwarded to the original stream.
///
/// Drift query streams send the current snapshot to each attaching listener,
/// a behavior that is lost when wrapping these streams in a broadcast stream
/// controller. Since we need the behavior of `asyncMap` internally though, we
/// re-implement it in a simple variant that transforms each subscription
/// individually.
Stream<T> asyncMapPerSubscription<T>(Future<T> Function(S) mapper) {
return Stream.multi(
(listener) {
late StreamSubscription<S> subscription;
void onData(S original) {
subscription.pause();
mapper(original)
.then(listener.addSync, onError: listener.addErrorSync)
.whenComplete(subscription.resume);
}
subscription = listen(
onData,
onError: listener.addErrorSync,
onDone: listener.closeSync,
cancelOnError: false, // Determined by downstream subscription
);
listener
..onPause = subscription.pause
..onResume = subscription.resume
..onCancel = subscription.cancel;
},
isBroadcast: isBroadcast,
);
}
}

View File

@ -1,5 +1,6 @@
import 'dart:async'; import 'dart:async';
import 'package:async/async.dart';
import 'package:drift/drift.dart'; import 'package:drift/drift.dart';
import 'package:drift/src/runtime/api/runtime_api.dart'; import 'package:drift/src/runtime/api/runtime_api.dart';
import 'package:drift/src/runtime/executor/stream_queries.dart'; import 'package:drift/src/runtime/executor/stream_queries.dart';
@ -177,13 +178,13 @@ void main() {
await first.first; // will listen to stream, then cancel await first.first; // will listen to stream, then cancel
await pumpEventQueue(times: 1); // give cancel event time to propagate await pumpEventQueue(times: 1); // give cancel event time to propagate
final checkEmits = final listener = StreamQueue(second);
expectLater(second, emitsInOrder([<Object?>[], <Object?>[]])); await expectLater(listener, emits(isEmpty));
db.markTablesUpdated({db.users}); db.markTablesUpdated({db.users});
await pumpEventQueue(times: 1); await expectLater(listener, emits(isEmpty));
await checkEmits; await listener.cancel();
}); });
test('same stream instance can be listened to multiple times', () async { test('same stream instance can be listened to multiple times', () async {