Always return the same stream for equal queries

This commit is contained in:
Simon Binder 2019-03-20 12:11:57 +01:00
parent 9ea3c2d711
commit aa6582ae79
No known key found for this signature in database
GPG Key ID: B807FDF954BA00CF
14 changed files with 182 additions and 83 deletions

View File

@ -46,26 +46,17 @@ abstract class DatabaseConnectionUser {
executor = executor ?? other.executor,
streamQueries = streamQueries ?? other.streamQueries;
/// Marks the table as updated. This method will be called internally whenever
/// a update, delete or insert statement is issued on the database. We can
/// then inform all active select-streams on that table that their snapshot
/// might be out-of-date and needs to be fetched again.
@Deprecated('Use markTablesUpdated instead')
void markTableUpdated(String tableName) {
markTablesUpdated({tableName});
}
/// Marks the tables as updated. This method will be called internally
/// whenever a update, delete or insert statement is issued on the database.
/// We can then inform all active select-streams on those tables that their
/// snapshot might be out-of-date and needs to be fetched again.
void markTablesUpdated(Set<String> tables) {
void markTablesUpdated(Set<TableInfo> tables) {
streamQueries.handleTableUpdates(tables);
}
/// Creates and auto-updating stream from the given select statement. This
/// method should not be used directly.
Stream<List<T>> createStream<T>(TableChangeListener<List<T>> stmt) =>
Stream<T> createStream<T>(QueryStreamFetcher<T> stmt) =>
streamQueries.registerStream(stmt);
}
@ -120,8 +111,7 @@ mixin QueryEngine on DatabaseConnectionUser {
executor.doWhenOpened((_) => executor.runUpdate(query, mappedArgs));
if (updates != null) {
await streamQueries
.handleTableUpdates(updates.map((t) => t.$tableName).toSet());
await streamQueries.handleTableUpdates(updates);
}
return affectedRows;
@ -132,7 +122,8 @@ mixin QueryEngine on DatabaseConnectionUser {
/// value.
Future<List<QueryRow>> customSelect(String query,
{List<Variable> variables = const []}) async {
return CustomSelectStatement(query, variables, <TableInfo>{}, this).read();
return CustomSelectStatement(query, variables, <TableInfo>{}, this)
.execute();
}
/// Creates a stream from a custom select statement.To use the variables, mark
@ -143,7 +134,8 @@ mixin QueryEngine on DatabaseConnectionUser {
Stream<List<QueryRow>> customSelectStream(String query,
{List<Variable> variables = const [], Set<TableInfo> readsFrom}) {
final tables = readsFrom ?? <TableInfo>{};
return createStream(CustomSelectStatement(query, variables, tables, this));
final statement = CustomSelectStatement(query, variables, tables, this);
return createStream(statement.constructFetcher());
}
}

View File

@ -1,38 +1,95 @@
import 'dart:async';
import 'package:collection/collection.dart';
import 'package:meta/meta.dart';
import 'package:moor/moor.dart';
/// Internal interface to mark classes that respond to table changes
abstract class TableChangeListener<T> {
/// Called to check if this listener should update after any table who's name
/// is in the set has changed.
bool isAffectedBy(Set<String> tables);
const _listEquality = ListEquality<dynamic>();
/// Called to reload data from the table after it has changed.
Future<T> handleDataChanged();
/// Representation of a select statement that knows from which tables the
/// statement is reading its data and how to execute the query.
class QueryStreamFetcher<T> {
/// The set of tables this query reads from. If any of these tables changes,
/// the stream must fetch its again.
final Set<TableInfo> readsFrom;
/// Key that can be used to check whether two fetchers will yield the same
/// result when operating on the same data.
final StreamKey key;
/// Function that asynchronously fetches the latest set of data.
final Future<T> Function() fetchData;
QueryStreamFetcher(
{@required this.readsFrom, this.key, @required this.fetchData});
}
/// Key that uniquely identifies a select statement. If two keys created from
/// two select statements are equal, the statements are equal as well.
///
/// As two equal statements always yield the same result when operating on the
/// same data, this can make streams more efficient as we can return the same
/// stream for two equivalent queries.
class StreamKey {
final String sql;
final List<dynamic> variables;
/// Used to differentiate between custom streams, which return a [QueryRow],
/// and regular streams, which return an instance of a generated data class.
final Type returnType;
StreamKey(this.sql, this.variables, this.returnType);
@override
int get hashCode {
return (sql.hashCode * 31 + _listEquality.hash(variables)) * 31 +
returnType.hashCode;
}
@override
bool operator ==(other) {
return identical(this, other) ||
(other is StreamKey &&
other.sql == sql &&
_listEquality.equals(other.variables, variables) &&
other.returnType == returnType);
}
}
/// Keeps track of active streams created from [SelectStatement]s and updates
/// them when needed.
class StreamQueryStore {
final List<QueryStream> _activeStreams = [];
// todo cache streams (return same instance for same sql + variables)
final List<QueryStream> _activeStreamsWithoutKey = [];
final Map<StreamKey, QueryStream> _activeKeyStreams = {};
StreamQueryStore();
Iterable<QueryStream> get _activeStreams {
return _activeKeyStreams.values.followedBy(_activeStreamsWithoutKey);
}
/// Creates a new stream from the select statement.
Stream<List<T>> registerStream<T>(TableChangeListener<List<T>> statement) {
final stream = QueryStream(statement, this);
_activeStreams.add(stream);
Stream<T> registerStream<T>(QueryStreamFetcher<T> fetcher) {
final key = fetcher.key;
if (key == null) {
final stream = QueryStream(fetcher, this);
_activeStreamsWithoutKey.add(stream);
return stream.stream;
} else {
final stream = _activeKeyStreams.putIfAbsent(key, () {
return QueryStream<T>(fetcher, this);
});
return (stream as QueryStream<T>).stream;
}
}
/// Handles updates on a given table by re-executing all queries that read
/// from that table.
Future<void> handleTableUpdates(Set<String> tables) async {
Future<void> handleTableUpdates(Set<TableInfo> tables) async {
final affectedStreams = _activeStreams
.where((stream) => stream.isAffectedByTableChange(tables));
.where((stream) => stream._fetcher.readsFrom.any(tables.contains));
for (var stream in affectedStreams) {
await stream.fetchAndEmitData();
@ -40,15 +97,22 @@ class StreamQueryStore {
}
void markAsClosed(QueryStream stream) {
_activeStreams.remove(stream);
final key = stream._fetcher.key;
if (key == null) {
_activeStreamsWithoutKey.remove(stream);
} else {
_activeKeyStreams.remove(key);
}
}
}
class QueryStream<T> {
final TableChangeListener<T> listener;
final QueryStreamFetcher<T> _fetcher;
final StreamQueryStore _store;
StreamController<T> _controller;
// caching the stream so that the stream getter always returns the same stream
Stream<T> _stream;
Stream<T> get stream {
_controller ??= StreamController.broadcast(
@ -56,10 +120,10 @@ class QueryStream<T> {
onCancel: _onCancel,
);
return _controller.stream;
return _stream ??= _controller.stream;
}
QueryStream(this.listener, this._store);
QueryStream(this._fetcher, this._store);
void _onListen() {
// first listener added, fetch query
@ -80,13 +144,10 @@ class QueryStream<T> {
// Fetch data if it's needed, publish that data if it's possible.
if (!_controller.hasListener) return;
final data = await listener.handleDataChanged();
final data = await _fetcher.fetchData();
if (!_controller.isClosed) {
_controller.add(data);
}
}
bool isAffectedByTableChange(Set<String> tables) =>
listener.isAffectedBy(tables);
}

View File

@ -21,18 +21,18 @@ class Transaction extends DatabaseConnectionUser with QueryEngine {
/// updates to the outer stream query store when the transaction is completed.
class _TransactionStreamStore extends StreamQueryStore {
final StreamQueryStore parent;
final Set<String> affectedTables = <String>{};
final Set<TableInfo> affectedTables = <TableInfo>{};
_TransactionStreamStore(this.parent);
@override
Stream<List<T>> registerStream<T>(TableChangeListener<List<T>> statement) {
Stream<T> registerStream<T>(QueryStreamFetcher<T> statement) {
throw StateError('Streams cannot be created inside a transaction. See the '
'documentation of GeneratedDatabase.transaction for details.');
}
@override
Future handleTableUpdates(Set<String> tables) {
Future handleTableUpdates(Set<TableInfo> tables) {
affectedTables.addAll(tables);
return Future.value(null);
}

View File

@ -36,7 +36,7 @@ class DeleteStatement<T, D> extends Query<T, D> {
await ctx.database.executor.runDelete(ctx.sql, ctx.boundVariables);
if (rows > 0) {
database.markTablesUpdated({table.$tableName});
database.markTablesUpdated({table});
}
return rows;
});

View File

@ -56,7 +56,7 @@ class InsertStatement<DataClass> {
await database.executor.doWhenOpened((e) async {
await database.executor.runInsert(ctx.sql, ctx.boundVariables);
database.markTablesUpdated({table.$tableName});
database.markTablesUpdated({table});
});
}

View File

@ -10,8 +10,7 @@ import 'package:moor/src/runtime/structure/table_info.dart';
typedef OrderingTerm OrderClauseGenerator<T>(T tbl);
class SelectStatement<T, D> extends Query<T, D>
implements TableChangeListener<List<D>> {
class SelectStatement<T, D> extends Query<T, D> {
SelectStatement(QueryEngine database, TableInfo<T, D> table)
: super(database, table);
@ -23,7 +22,10 @@ class SelectStatement<T, D> extends Query<T, D>
/// Loads and returns all results from this select query.
Future<List<D>> get() async {
final ctx = constructQuery();
return _getWithQuery(ctx);
}
Future<List<D>> _getWithQuery(GenerationContext ctx) async {
final results = await ctx.database.executor.doWhenOpened((e) async {
return await ctx.database.executor.runSelect(ctx.sql, ctx.boundVariables);
});
@ -47,21 +49,18 @@ class SelectStatement<T, D> extends Query<T, D>
/// Creates an auto-updating stream that emits new items whenever this table
/// changes.
Stream<List<D>> watch() {
return database.createStream(this);
}
final query = constructQuery();
final fetcher = QueryStreamFetcher<List<D>>(
readsFrom: {table},
fetchData: () => _getWithQuery(query),
key: StreamKey(query.sql, query.boundVariables, D),
);
@override
Future<List<D>> handleDataChanged() {
return get();
}
@override
bool isAffectedBy(Set<String> tables) {
return tables.contains(super.table.$tableName);
return database.createStream(fetcher);
}
}
class CustomSelectStatement implements TableChangeListener<List<QueryRow>> {
class CustomSelectStatement {
/// Tables this select statement reads from
final Set<TableInfo> tables;
final String query;
@ -70,26 +69,35 @@ class CustomSelectStatement implements TableChangeListener<List<QueryRow>> {
CustomSelectStatement(this.query, this.variables, this.tables, this.db);
Future<List<QueryRow>> read() => handleDataChanged();
QueryStreamFetcher<List<QueryRow>> constructFetcher() {
final args = _mapArgs();
@override
Future<List<QueryRow>> handleDataChanged() async {
return QueryStreamFetcher<List<QueryRow>>(
readsFrom: tables,
fetchData: () => _executeWithMappedArgs(args),
key: StreamKey(query, args, QueryRow),
);
}
Future<List<QueryRow>> execute() async {
return _executeWithMappedArgs(_mapArgs());
}
List<dynamic> _mapArgs() {
final ctx = GenerationContext(db);
final mappedArgs = variables.map((v) => v.mapToSimpleValue(ctx)).toList();
return variables.map((v) => v.mapToSimpleValue(ctx)).toList();
}
Future<List<QueryRow>> _executeWithMappedArgs(
List<dynamic> mappedArgs) async {
final result =
await db.executor.doWhenOpened((e) => e.runSelect(query, mappedArgs));
return result.map((row) => QueryRow(row, db)).toList();
}
@override
bool isAffectedBy(Set<String> tables) {
return this.tables.intersection(tables).isNotEmpty;
}
}
/// For custom select statement, represents a row in the result set.
/// For custom select statements, represents a row in the result set.
class QueryRow {
final Map<String, dynamic> data;
final QueryEngine _db;

View File

@ -36,7 +36,7 @@ class UpdateStatement<T, D> extends Query<T, D> {
});
if (rows > 0) {
database.markTablesUpdated({table.$tableName});
database.markTablesUpdated({table});
}
return rows;

View File

@ -11,6 +11,7 @@ environment:
dependencies:
meta: '>= 1.0.0 <2.0.0'
collection: '>= 1.0.0 <2.0.0'
dev_dependencies:
moor_generator: ^1.1.1
@ -21,11 +22,11 @@ dev_dependencies:
mockito: ^4.0.0
grinder: ^0.8.0
coverage: ^0.12.0
coveralls: ^5.1.0
dependency_overrides:
moor_generator:
path: ../moor_generator
coveralls: ^5.1.0
# Temporarily use my fork because it can collect coverage when running tests with the test runner
coverage:
git: https://github.com/simolus3/coverage.git

View File

@ -56,7 +56,7 @@ void main() {
await db.delete(db.users).go();
verify(streamQueries.handleTableUpdates({'users'}));
verify(streamQueries.handleTableUpdates({db.users}));
});
test('are not issued when no data was changed', () async {

View File

@ -42,7 +42,7 @@ void main() {
profilePicture: Uint8List(0),
));
verify(streamQueries.handleTableUpdates({'users'}));
verify(streamQueries.handleTableUpdates({db.users}));
});
test('enforces data integrety', () {

View File

@ -1,3 +1,5 @@
import 'package:moor/moor.dart';
import 'package:moor/src/runtime/executor/stream_queries.dart';
import 'package:test_api/test_api.dart';
import 'data/tables/todos.dart';
@ -24,17 +26,52 @@ void main() {
test('streams fetch when the underlying data changes', () {
db.select(db.users).watch().listen((_) {});
db.markTablesUpdated({'users'});
db.markTablesUpdated({db.users});
// twice: Once because the listener attached, once because the data changed
verify(executor.runSelect(any, any)).called(2);
});
test('equal statements yield identical streams', () {
final firstStream = (db.select(db.users).watch())..listen((_) {});
final secondStream = (db.select(db.users).watch())..listen((_) {});
expect(identical(firstStream, secondStream), true);
});
group('stream keys', () {
final keyA = StreamKey('SELECT * FROM users;', [], User);
final keyB = StreamKey('SELECT * FROM users;', [], User);
final keyCustom = StreamKey('SELECT * FROM users;', [], QueryRow);
final keyCustomTodos = StreamKey('SELECT * FROM todos;', [], QueryRow);
final keyArgs = StreamKey('SELECT * FROM users;', ['name'], User);
test('are equal for same parameters', () {
expect(keyA, equals(keyB));
expect(keyA.hashCode, keyB.hashCode);
});
test('are not equal for different queries', () {
expect(keyCustomTodos, isNot(keyCustom));
expect(keyCustomTodos.hashCode, isNot(keyCustom.hashCode));
});
test('are not equal for different variables', () {
expect(keyArgs, isNot(keyA));
expect(keyArgs.hashCode, isNot(keyA.hashCode));
});
test('are not equal for different types', () {
expect(keyCustom, isNot(keyA));
expect(keyCustom.hashCode, isNot(keyA.hashCode));
});
});
group("streams don't fetch", () {
test('when no listeners were attached', () {
db.select(db.users).watch();
db.markTablesUpdated({'users'});
db.markTablesUpdated({db.users});
verifyNever(executor.runSelect(any, any));
});
@ -44,7 +81,7 @@ void main() {
clearInteractions(executor);
subscription.cancel();
db.markTablesUpdated({'users'});
db.markTablesUpdated({db.users});
verifyNever(executor.runSelect(any, any));
});

View File

@ -37,7 +37,7 @@ void main() {
});
// After the transaction completes, the queries should be updated
verify(streamQueries.handleTableUpdates({'users'})).called(1);
verify(streamQueries.handleTableUpdates({db.users})).called(1);
verify(executor.transactions.send());
});
}

View File

@ -67,7 +67,7 @@ void main() {
content: 'Updated content',
));
verify(streamQueries.handleTableUpdates({'todos'}));
verify(streamQueries.handleTableUpdates({db.todosTable}));
});
test('are not issued when no data was changed', () async {
@ -109,7 +109,7 @@ void main() {
test('informs about updated tables', () async {
await db.customUpdate('', updates: {db.users, db.todosTable});
verify(streamQueries.handleTableUpdates({'users', 'todos'}));
verify(streamQueries.handleTableUpdates({db.users, db.todosTable}));
});
});
}

View File

@ -92,8 +92,8 @@ class TableParser extends ParserBase {
if (expression is SetOrMapLiteral) {
for (var entry in expression.elements2) {
if (entry is Identifier) {
final column = columns.singleWhere(
(column) => column.dartGetterName == entry.name);
final column = columns
.singleWhere((column) => column.dartGetterName == entry.name);
parsedPrimaryKey.add(column);
} else {
// Don't add an error, these features aren't on a stable dart release