mirror of https://github.com/AMT-Cheif/drift.git
Don't use types in StreamQueryStore
This commit is contained in:
parent
8f069c86f4
commit
786bdfa2fc
|
@ -52,7 +52,7 @@ abstract class DatabaseConnectionUser {
|
|||
|
||||
/// Creates and auto-updating stream from the given select statement. This
|
||||
/// method should not be used directly.
|
||||
Stream<T> createStream<T>(QueryStreamFetcher<T> stmt) =>
|
||||
Stream<List<Map<String, Object?>>> createStream(QueryStreamFetcher stmt) =>
|
||||
streamQueries.registerStream(stmt);
|
||||
|
||||
/// Creates a copy of the table with an alias so that it can be used in the
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import 'package:meta/meta.dart';
|
||||
import 'package:moor/src/runtime/api/runtime_api.dart';
|
||||
|
||||
import 'stream_queries.dart';
|
||||
|
@ -6,6 +7,7 @@ import 'stream_queries.dart';
|
|||
/// available delegate.
|
||||
/// This class is internal and should not be exposed to moor users. It's used
|
||||
/// through a delayed database connection.
|
||||
@internal
|
||||
class DelayedStreamQueryStore implements StreamQueryStore {
|
||||
late Future<StreamQueryStore> _delegate;
|
||||
StreamQueryStore? _resolved;
|
||||
|
@ -35,7 +37,8 @@ class DelayedStreamQueryStore implements StreamQueryStore {
|
|||
}
|
||||
|
||||
@override
|
||||
Stream<T> registerStream<T>(QueryStreamFetcher<T> fetcher) {
|
||||
Stream<List<Map<String, Object?>>> registerStream(
|
||||
QueryStreamFetcher fetcher) {
|
||||
return Stream.fromFuture(_delegate)
|
||||
.asyncExpand((resolved) => resolved.registerStream(fetcher))
|
||||
.asBroadcastStream();
|
||||
|
|
|
@ -2,18 +2,20 @@ import 'dart:async';
|
|||
import 'dart:collection';
|
||||
|
||||
import 'package:collection/collection.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:moor/moor.dart';
|
||||
import 'package:moor/src/utils/start_with_value_transformer.dart';
|
||||
import 'package:pedantic/pedantic.dart';
|
||||
|
||||
const _listEquality = ListEquality<dynamic>();
|
||||
const _listEquality = ListEquality<Object?>();
|
||||
|
||||
// This is an internal moor library that's never exported to users.
|
||||
// ignore_for_file: public_member_api_docs
|
||||
|
||||
/// Representation of a select statement that knows from which tables the
|
||||
/// statement is reading its data and how to execute the query.
|
||||
class QueryStreamFetcher<T> {
|
||||
@internal
|
||||
class QueryStreamFetcher {
|
||||
/// Table updates that will affect this stream.
|
||||
///
|
||||
/// If any of these tables changes, the stream must fetch its data again.
|
||||
|
@ -24,7 +26,7 @@ class QueryStreamFetcher<T> {
|
|||
final StreamKey? key;
|
||||
|
||||
/// Function that asynchronously fetches the latest set of data.
|
||||
final Future<T> Function() fetchData;
|
||||
final Future<List<Map<String, Object?>>> Function() fetchData;
|
||||
|
||||
QueryStreamFetcher(
|
||||
{required this.readsFrom, this.key, required this.fetchData});
|
||||
|
@ -36,20 +38,16 @@ class QueryStreamFetcher<T> {
|
|||
/// As two equal statements always yield the same result when operating on the
|
||||
/// same data, this can make streams more efficient as we can return the same
|
||||
/// stream for two equivalent queries.
|
||||
@internal
|
||||
class StreamKey {
|
||||
final String sql;
|
||||
final List<dynamic> variables;
|
||||
|
||||
/// Used to differentiate between custom streams, which return a [QueryRow],
|
||||
/// and regular streams, which return an instance of a generated data class.
|
||||
final Type returnType;
|
||||
|
||||
StreamKey(this.sql, this.variables, this.returnType);
|
||||
StreamKey(this.sql, this.variables);
|
||||
|
||||
@override
|
||||
int get hashCode {
|
||||
return (sql.hashCode * 31 + _listEquality.hash(variables)) * 31 +
|
||||
returnType.hashCode;
|
||||
return $mrjf($mrjc(sql.hashCode, _listEquality.hash(variables)));
|
||||
}
|
||||
|
||||
@override
|
||||
|
@ -57,18 +55,19 @@ class StreamKey {
|
|||
return identical(this, other) ||
|
||||
(other is StreamKey &&
|
||||
other.sql == sql &&
|
||||
_listEquality.equals(other.variables, variables) &&
|
||||
other.returnType == returnType);
|
||||
_listEquality.equals(other.variables, variables));
|
||||
}
|
||||
}
|
||||
|
||||
/// Keeps track of active streams created from [SimpleSelectStatement]s and
|
||||
/// updates them when needed.
|
||||
@internal
|
||||
class StreamQueryStore {
|
||||
final Map<StreamKey, QueryStream> _activeKeyStreams = {};
|
||||
final HashSet<StreamKey?> _keysPendingRemoval = HashSet<StreamKey?>();
|
||||
|
||||
bool _isShuttingDown = false;
|
||||
|
||||
// we track pending timers since Flutter throws an exception when timers
|
||||
// remain after a test run.
|
||||
final Set<Completer> _pendingTimers = {};
|
||||
|
@ -84,19 +83,20 @@ class StreamQueryStore {
|
|||
StreamQueryStore();
|
||||
|
||||
/// Creates a new stream from the select statement.
|
||||
Stream<T> registerStream<T>(QueryStreamFetcher<T> fetcher) {
|
||||
Stream<List<Map<String, Object?>>> registerStream(
|
||||
QueryStreamFetcher fetcher) {
|
||||
final key = fetcher.key;
|
||||
|
||||
if (key != null) {
|
||||
final cached = _activeKeyStreams[key];
|
||||
if (cached != null) {
|
||||
return (cached as QueryStream<T>).stream;
|
||||
return cached.stream;
|
||||
}
|
||||
}
|
||||
|
||||
// no cached instance found, create a new stream and register it so later
|
||||
// requests with the same key can be cached.
|
||||
final stream = QueryStream<T>(fetcher, this);
|
||||
final stream = QueryStream(fetcher, this);
|
||||
// todo this adds the stream to a map, where it will only be removed when
|
||||
// somebody listens to it and later calls .cancel(). Failing to do so will
|
||||
// cause a memory leak. Is there any way we can work around it? Perhaps a
|
||||
|
@ -180,19 +180,20 @@ class StreamQueryStore {
|
|||
}
|
||||
}
|
||||
|
||||
class QueryStream<T> {
|
||||
final QueryStreamFetcher<T> _fetcher;
|
||||
class QueryStream {
|
||||
final QueryStreamFetcher _fetcher;
|
||||
final StreamQueryStore _store;
|
||||
|
||||
late final StreamController<T> _controller = StreamController.broadcast(
|
||||
late final StreamController<List<Map<String, Object?>>> _controller =
|
||||
StreamController.broadcast(
|
||||
onListen: _onListen,
|
||||
onCancel: _onCancel,
|
||||
);
|
||||
StreamSubscription? _tablesChangedSubscription;
|
||||
|
||||
T? _lastData;
|
||||
List<Map<String, Object?>>? _lastData;
|
||||
|
||||
Stream<T> get stream {
|
||||
Stream<List<Map<String, Object?>>> get stream {
|
||||
return _controller.stream.transform(StartWithValueTransformer(_cachedData));
|
||||
}
|
||||
|
||||
|
@ -202,7 +203,7 @@ class QueryStream<T> {
|
|||
|
||||
/// Called when we have a new listener, makes the stream query behave similar
|
||||
/// to an `BehaviorSubject` from rxdart.
|
||||
T? _cachedData() => _lastData;
|
||||
List<Map<String, Object?>>? _cachedData() => _lastData;
|
||||
|
||||
void _onListen() {
|
||||
_store.markAsOpened(this);
|
||||
|
@ -239,7 +240,7 @@ class QueryStream<T> {
|
|||
}
|
||||
|
||||
Future<void> fetchAndEmitData() async {
|
||||
T data;
|
||||
List<Map<String, Object?>> data;
|
||||
|
||||
try {
|
||||
data = await _fetcher.fetchData();
|
||||
|
|
|
@ -22,24 +22,24 @@ class CustomSelectStatement with Selectable<QueryRow> {
|
|||
|
||||
/// Constructs a fetcher for this query. The fetcher is responsible for
|
||||
/// updating a stream at the right moment.
|
||||
QueryStreamFetcher<List<QueryRow>> _constructFetcher() {
|
||||
QueryStreamFetcher _constructFetcher() {
|
||||
final args = _mapArgs();
|
||||
|
||||
return QueryStreamFetcher<List<QueryRow>>(
|
||||
return QueryStreamFetcher(
|
||||
readsFrom: TableUpdateQuery.onAllTables(tables),
|
||||
fetchData: () => _executeWithMappedArgs(args),
|
||||
key: StreamKey(query, args, QueryRow),
|
||||
fetchData: () => _executeRaw(args),
|
||||
key: StreamKey(query, args),
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<List<QueryRow>> get() async {
|
||||
return _executeWithMappedArgs(_mapArgs());
|
||||
Future<List<QueryRow>> get() {
|
||||
return _executeRaw(_mapArgs()).then(_mapDbResponse);
|
||||
}
|
||||
|
||||
@override
|
||||
Stream<List<QueryRow>> watch() {
|
||||
return _db.createStream(_constructFetcher());
|
||||
return _db.createStream(_constructFetcher()).map(_mapDbResponse);
|
||||
}
|
||||
|
||||
List<dynamic> _mapArgs() {
|
||||
|
@ -47,12 +47,12 @@ class CustomSelectStatement with Selectable<QueryRow> {
|
|||
return variables.map((v) => v.mapToSimpleValue(ctx)).toList();
|
||||
}
|
||||
|
||||
Future<List<QueryRow>> _executeWithMappedArgs(
|
||||
List<dynamic> mappedArgs) async {
|
||||
final result =
|
||||
await _db.doWhenOpened((e) => e.runSelect(query, mappedArgs));
|
||||
Future<List<Map<String, Object?>>> _executeRaw(List<Object?> mappedArgs) {
|
||||
return _db.doWhenOpened((e) => e.runSelect(query, mappedArgs));
|
||||
}
|
||||
|
||||
return result.map((row) => QueryRow(row, _db)).toList();
|
||||
List<QueryRow> _mapDbResponse(List<Map<String, Object?>> rows) {
|
||||
return rows.map((row) => QueryRow(row, _db)).toList();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -44,16 +44,31 @@ class SimpleSelectStatement<T extends Table, D extends DataClass>
|
|||
}
|
||||
|
||||
@override
|
||||
Future<List<D>> get() async {
|
||||
Future<List<D>> get() {
|
||||
final ctx = constructQuery();
|
||||
return _getWithQuery(ctx);
|
||||
return _getRaw(ctx).then(_mapResponse);
|
||||
}
|
||||
|
||||
Future<List<D>> _getWithQuery(GenerationContext ctx) async {
|
||||
final results = await ctx.executor!.doWhenOpened((e) async {
|
||||
return await e.runSelect(ctx.sql, ctx.boundVariables);
|
||||
@override
|
||||
Stream<List<D>> watch() {
|
||||
final query = constructQuery();
|
||||
final fetcher = QueryStreamFetcher(
|
||||
readsFrom: TableUpdateQuery.onAllTables(query.watchedTables),
|
||||
fetchData: () => _getRaw(query),
|
||||
key: StreamKey(query.sql, query.boundVariables),
|
||||
);
|
||||
|
||||
return database.createStream(fetcher).map(_mapResponse);
|
||||
}
|
||||
|
||||
Future<List<Map<String, Object?>>> _getRaw(GenerationContext ctx) {
|
||||
return database.doWhenOpened((e) {
|
||||
return e.runSelect(ctx.sql, ctx.boundVariables);
|
||||
});
|
||||
return results.map(table.map).toList();
|
||||
}
|
||||
|
||||
List<D> _mapResponse(List<Map<String, Object?>> rows) {
|
||||
return rows.map(table.map).toList();
|
||||
}
|
||||
|
||||
/// Creates a select statement that operates on more than one table by
|
||||
|
@ -118,18 +133,6 @@ class SimpleSelectStatement<T extends Table, D extends DataClass>
|
|||
void orderBy(List<OrderClauseGenerator<T>> clauses) {
|
||||
orderByExpr = OrderBy(clauses.map((t) => t(table.asDslTable)).toList());
|
||||
}
|
||||
|
||||
@override
|
||||
Stream<List<D>> watch() {
|
||||
final query = constructQuery();
|
||||
final fetcher = QueryStreamFetcher<List<D>>(
|
||||
readsFrom: TableUpdateQuery.onAllTables(query.watchedTables),
|
||||
fetchData: () => _getWithQuery(query),
|
||||
key: StreamKey(query.sql, query.boundVariables, D),
|
||||
);
|
||||
|
||||
return database.createStream(fetcher);
|
||||
}
|
||||
}
|
||||
|
||||
String _beginOfSelect(bool distinct) {
|
||||
|
|
|
@ -181,23 +181,26 @@ class JoinedSelectStatement<FirstT extends Table, FirstD extends DataClass>
|
|||
@override
|
||||
Stream<List<TypedResult>> watch() {
|
||||
final ctx = constructQuery();
|
||||
final fetcher = QueryStreamFetcher<List<TypedResult>>(
|
||||
final fetcher = QueryStreamFetcher(
|
||||
readsFrom: TableUpdateQuery.onAllTables(ctx.watchedTables),
|
||||
fetchData: () => _getWithQuery(ctx),
|
||||
key: StreamKey(ctx.sql, ctx.boundVariables, TypedResult),
|
||||
fetchData: () => _getRaw(ctx),
|
||||
key: StreamKey(ctx.sql, ctx.boundVariables),
|
||||
);
|
||||
|
||||
return database.createStream(fetcher);
|
||||
return database
|
||||
.createStream(fetcher)
|
||||
.map((rows) => _mapResponse(ctx, rows));
|
||||
}
|
||||
|
||||
@override
|
||||
Future<List<TypedResult>> get() async {
|
||||
final ctx = constructQuery();
|
||||
return _getWithQuery(ctx);
|
||||
final raw = await _getRaw(ctx);
|
||||
return _mapResponse(ctx, raw);
|
||||
}
|
||||
|
||||
Future<List<TypedResult>> _getWithQuery(GenerationContext ctx) async {
|
||||
final results = await ctx.executor!.doWhenOpened((e) async {
|
||||
Future<List<Map<String, Object?>>> _getRaw(GenerationContext ctx) {
|
||||
return ctx.executor!.doWhenOpened((e) async {
|
||||
try {
|
||||
return await e.runSelect(ctx.sql, ctx.boundVariables);
|
||||
} catch (e, s) {
|
||||
|
@ -211,8 +214,11 @@ class JoinedSelectStatement<FirstT extends Table, FirstD extends DataClass>
|
|||
rethrow;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return results.map((row) {
|
||||
List<TypedResult> _mapResponse(
|
||||
GenerationContext ctx, List<Map<String, Object?>> rows) {
|
||||
return rows.map((row) {
|
||||
final readTables = <TableInfo, dynamic>{};
|
||||
final readColumns = <Expression, dynamic>{};
|
||||
|
||||
|
|
|
@ -56,33 +56,42 @@ class MockExecutor extends Mock implements QueryExecutor {
|
|||
@override
|
||||
SqlDialect get dialect =>
|
||||
_nsm(Invocation.getter(#dialect), SqlDialect.sqlite);
|
||||
|
||||
@override
|
||||
Future<bool> ensureOpen(QueryExecutorUser? user) =>
|
||||
_nsm(Invocation.method(#ensureOpen, [user]), Future.value(true));
|
||||
|
||||
@override
|
||||
Future<List<Map<String, Object?>>> runSelect(
|
||||
String? statement, List<Object?>? args) =>
|
||||
_nsm(Invocation.method(#runSelect, [statement, args]),
|
||||
Future.value(<Map<String, Object?>>[]));
|
||||
|
||||
@override
|
||||
Future<int> runInsert(String? statement, List<Object?>? args) =>
|
||||
_nsm(Invocation.method(#runInsert, [statement, args]), Future.value(0));
|
||||
|
||||
@override
|
||||
Future<int> runUpdate(String? statement, List<Object?>? args) =>
|
||||
_nsm(Invocation.method(#runUpdate, [statement, args]), Future.value(0));
|
||||
|
||||
@override
|
||||
Future<int> runDelete(String? statement, List<Object?>? args) =>
|
||||
_nsm(Invocation.method(#runDelete, [statement, args]), Future.value(0));
|
||||
|
||||
@override
|
||||
Future<void> runCustom(String? statement, [List<Object?>? args]) => _nsm(
|
||||
Invocation.method(#runCustom, [statement, args]), Future.value(null));
|
||||
|
||||
@override
|
||||
Future<void> runBatched(BatchedStatements? statements) =>
|
||||
_nsm(Invocation.method(#runBatched, [statements]), Future.value(null));
|
||||
|
||||
@override
|
||||
TransactionExecutor beginTransaction() =>
|
||||
_nsm(Invocation.method(#beginTransaction, []), transactions) ??
|
||||
transactions;
|
||||
|
||||
@override
|
||||
Future<void> close() =>
|
||||
_nsm(Invocation.method(#close, []), Future.value(null));
|
||||
|
@ -107,22 +116,28 @@ class MockTransactionExecutor extends MockExecutor
|
|||
|
||||
class MockStreamQueries extends Mock implements StreamQueryStore {
|
||||
@override
|
||||
Stream<T> registerStream<T>(QueryStreamFetcher<T>? fetcher) =>
|
||||
_nsm(Invocation.method(#registerStream, [fetcher]), Stream<T>.empty());
|
||||
Stream<List<Map<String, Object?>>> registerStream(
|
||||
QueryStreamFetcher? fetcher) =>
|
||||
_nsm(Invocation.method(#registerStream, [fetcher]),
|
||||
const Stream<Never>.empty());
|
||||
|
||||
@override
|
||||
Stream<Null?> updatesForSync(TableUpdateQuery? query) => _nsm(
|
||||
Invocation.method(#updatesForSync, [query]), const Stream<Never>.empty());
|
||||
|
||||
@override
|
||||
void handleTableUpdates(Set<TableUpdate>? updates) =>
|
||||
super.noSuchMethod(Invocation.method(#handleTableUpdates, [updates]));
|
||||
|
||||
@override
|
||||
void markAsClosed(
|
||||
QueryStream<dynamic>? stream, dynamic Function()? whenRemoved) =>
|
||||
void markAsClosed(QueryStream? stream, dynamic Function()? whenRemoved) =>
|
||||
super.noSuchMethod(
|
||||
Invocation.method(#markAsClosed, [stream, whenRemoved]));
|
||||
|
||||
@override
|
||||
void markAsOpened(QueryStream<dynamic>? stream) =>
|
||||
void markAsOpened(QueryStream? stream) =>
|
||||
super.noSuchMethod(Invocation.method(#markAsOpened, [stream]));
|
||||
|
||||
@override
|
||||
Future<void> close() =>
|
||||
_nsm(Invocation.method(#close, []), Future.value(null));
|
||||
|
|
|
@ -182,11 +182,10 @@ void main() {
|
|||
});
|
||||
|
||||
group('stream keys', () {
|
||||
final keyA = StreamKey('SELECT * FROM users;', [], User);
|
||||
final keyB = StreamKey('SELECT * FROM users;', [], User);
|
||||
final keyCustom = StreamKey('SELECT * FROM users;', [], QueryRow);
|
||||
final keyCustomTodos = StreamKey('SELECT * FROM todos;', [], QueryRow);
|
||||
final keyArgs = StreamKey('SELECT * FROM users;', ['name'], User);
|
||||
final keyA = StreamKey('SELECT * FROM users;', []);
|
||||
final keyB = StreamKey('SELECT * FROM users;', []);
|
||||
final keyTodos = StreamKey('SELECT * FROM todos;', []);
|
||||
final keyArgs = StreamKey('SELECT * FROM users;', ['name']);
|
||||
|
||||
test('are equal for same parameters', () {
|
||||
expect(keyA, equals(keyB));
|
||||
|
@ -194,19 +193,14 @@ void main() {
|
|||
});
|
||||
|
||||
test('are not equal for different queries', () {
|
||||
expect(keyCustomTodos, isNot(keyCustom));
|
||||
expect(keyCustomTodos.hashCode, isNot(keyCustom.hashCode));
|
||||
expect(keyA, isNot(keyTodos));
|
||||
expect(keyA.hashCode, isNot(keyTodos.hashCode));
|
||||
});
|
||||
|
||||
test('are not equal for different variables', () {
|
||||
expect(keyArgs, isNot(keyA));
|
||||
expect(keyArgs.hashCode, isNot(keyA.hashCode));
|
||||
});
|
||||
|
||||
test('are not equal for different types', () {
|
||||
expect(keyCustom, isNot(keyA));
|
||||
expect(keyCustom.hashCode, isNot(keyA.hashCode));
|
||||
});
|
||||
});
|
||||
|
||||
group("streams don't fetch", () {
|
||||
|
|
|
@ -6,7 +6,7 @@ homepage: https://moor.simonbinder.eu/
|
|||
issue_tracker: https://github.com/simolus3/moor/issues
|
||||
|
||||
environment:
|
||||
sdk: '>=2.12.0-0 <3.0.0'
|
||||
sdk: '>=2.12.0 <3.0.0'
|
||||
|
||||
dependencies:
|
||||
moor: ^4.0.0
|
||||
|
|
Loading…
Reference in New Issue