Allow more specific updates in stream queries

This commit is contained in:
Simon Binder 2020-03-04 20:28:31 +01:00
parent 0a09f3411c
commit 0b0d5792fd
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
12 changed files with 248 additions and 57 deletions

View File

@ -56,10 +56,20 @@ mixin QueryEngine on DatabaseConnectionUser {
/// We can then inform all active select-streams on those tables that their
/// snapshot might be out-of-date and needs to be fetched again.
void markTablesUpdated(Set<TableInfo> tables) {
final withRulesApplied = attachedDatabase.streamUpdateRules
.apply(tables.map((t) => t.actualTableName));
notifyUpdates(
{for (final table in tables) TableUpdate(table.actualTableName)},
);
}
_resolvedEngine.streamQueries.handleTableUpdatesByName(withRulesApplied);
/// Dispatches the set of [updates] to the stream query manager.
///
/// Internally, moor will call this method whenever a update, delete or insert
/// statement is issued on the database. We can then inform all active select-
/// streams affected that their snapshot might be out-of-date and needs to be
/// fetched again.
void notifyUpdates(Set<TableUpdate> updates) {
final withRulesApplied = attachedDatabase.streamUpdateRules.apply(updates);
_resolvedEngine.streamQueries.handleTableUpdates(withRulesApplied);
}
/// Starts an [InsertStatement] for a given table. You can use that statement
@ -171,9 +181,15 @@ mixin QueryEngine on DatabaseConnectionUser {
@visibleForTesting
Future<int> customUpdate(String query,
{List<Variable> variables = const [], Set<TableInfo> updates}) async {
return _customWrite(query, variables, updates, (executor, sql, vars) {
return executor.runUpdate(sql, vars);
});
return _customWrite(
query,
variables,
updates,
null, // could be delete or update, so don't specify kind
(executor, sql, vars) {
return executor.runUpdate(sql, vars);
},
);
}
/// Executes a custom insert statement and returns the last inserted rowid.
@ -185,16 +201,27 @@ mixin QueryEngine on DatabaseConnectionUser {
@visibleForTesting
Future<int> customInsert(String query,
{List<Variable> variables = const [], Set<TableInfo> updates}) {
return _customWrite(query, variables, updates, (executor, sql, vars) {
return executor.runInsert(sql, vars);
});
return _customWrite(
query,
variables,
updates,
UpdateKind.insert,
(executor, sql, vars) {
return executor.runInsert(sql, vars);
},
);
}
/// Common logic for [customUpdate] and [customInsert] which takes care of
/// mapping the variables, running the query and optionally informing the
/// stream-queries.
Future<T> _customWrite<T>(String query, List<Variable> variables,
Set<TableInfo> updates, _CustomWriter<T> writer) async {
Future<T> _customWrite<T>(
String query,
List<Variable> variables,
Set<TableInfo> updates,
UpdateKind updateKind,
_CustomWriter<T> writer,
) async {
final engine = _resolvedEngine;
final executor = engine.executor;
@ -205,7 +232,10 @@ mixin QueryEngine on DatabaseConnectionUser {
await executor.doWhenOpened((e) => writer(e, query, mappedArgs));
if (updates != null) {
await engine.streamQueries.handleTableUpdates(updates);
engine.notifyUpdates({
for (final table in updates)
TableUpdate(table.actualTableName, kind: updateKind),
});
}
return result;

View File

@ -4,10 +4,6 @@ part of 'runtime_api.dart';
/// direct updates to a table affects other updates.
///
/// This is used to implement query streams in databases that have triggers.
///
/// Note that all the members in this class are visible for generated code and
/// internal moor code. They don't adhere to Semantic Versioning and should not
/// be used manually.
class StreamQueryUpdateRules {
/// All rules active in a database.
final List<UpdateRule> rules;
@ -18,25 +14,22 @@ class StreamQueryUpdateRules {
/// The default implementation, which doesn't have any rules.
const StreamQueryUpdateRules.none() : this(const []);
/// Obtain a set of all tables that might be affected by direct updates to
/// [updatedTables].
///
/// This method should be used in internal moor code only, it does not respect
/// Semantic Versioning and might change at any time.
Set<String> apply(Iterable<String> updatedTables) {
/// Obtain a set of all tables that might be affected by direct updates in
/// [input].
Set<TableUpdate> apply(Iterable<TableUpdate> input) {
// Most users don't have any update rules, and this check is much faster
// than crawling through all updates.
if (rules.isEmpty) return updatedTables.toSet();
if (rules.isEmpty) return input.toSet();
final pending = List.of(updatedTables);
final seen = <String>{};
final pending = List.of(input);
final seen = <TableUpdate>{};
while (pending.isNotEmpty) {
final updatedTable = pending.removeLast();
seen.add(updatedTable);
final update = pending.removeLast();
seen.add(update);
for (final rule in rules) {
if (rule is WritePropagation && rule.onTable == updatedTable) {
pending.addAll(rule.updates.where((u) => !seen.contains(u)));
if (rule is WritePropagation && rule.on.matches(update)) {
pending.addAll(rule.result.where((u) => !seen.contains(u)));
}
}
}
@ -45,25 +38,89 @@ class StreamQueryUpdateRules {
}
}
/// A common rule that describes how a [TableUpdate] has other [TableUpdate]s.
///
/// Users should not extend or implement this class.
abstract class UpdateRule {
/// Common const constructor so that subclasses can be const.
const UpdateRule();
const UpdateRule._();
}
/// An [UpdateRule] for triggers that exist in a database.
///
/// An update on [onTable] implicitly triggers updates on [updates].
/// An update on [on] implicitly triggers updates on [result].
///
/// This class is for use by generated or moor-internal code only. It does not
/// adhere to Semantic Versioning and should not be used manually.
class WritePropagation extends UpdateRule {
/// The table name that the trigger is active on.
final String onTable;
/// The updates that cause further writes in [result].
final TableUpdateQuery on;
/// All tables potentially updated by the trigger.
final Set<String> updates;
/// All updates that will be performed by the trigger listening on [on].
final Set<TableUpdate> result;
/// Default constructor. See [WritePropagation] for details.
const WritePropagation(this.onTable, this.updates);
const WritePropagation(this.on, this.result) : super._();
}
/// Classifies a [TableUpdate] by what kind of write happened - an insert, an
/// update or a delete operation.
enum UpdateKind {
/// An insert statement ran on the affected table.
insert,
/// An update statement ran on the affected table.
update,
/// A delete statement ran on the affected table.
delete
}
/// Contains information on how a table was updated, which can be used to find
/// queries that are affected by this.
class TableUpdate {
/// What kind of update was applied to the [table].
///
/// Can be null, which indicates that the update is not known.
final UpdateKind /*?*/ kind;
/// Name of the table that was updated.
final String table;
/// Default constant constructor.
const TableUpdate(this.table, {this.kind});
@override
int get hashCode => $mrjf($mrjc(kind.hashCode, table.hashCode));
@override
bool operator ==(dynamic other) {
return other is TableUpdate && other.kind == kind && other.table == table;
}
}
/// A table update query describes information to listen for [TableUpdate]s.
///
/// Users should not extend implement this class.
abstract class TableUpdateQuery {
/// Default const constructor so that subclasses can have constant
/// constructors.
const TableUpdateQuery();
/// A query that listens for all table updates in a database.
const factory TableUpdateQuery.any() = AnyUpdateQuery;
/// A query that listens for all updates that match any query in [queries].
const factory TableUpdateQuery.allOf(Set<TableUpdateQuery> queries) =
MultipleUpdateQuery;
/// A query that listens for all updates on a specific [table].
///
/// The optional [limitUpdateKind] parameter can be used to limit the updates
/// to a certain kind.
const factory TableUpdateQuery.onTable(String table,
{UpdateKind limitUpdateKind}) = SpecificUpdateQuery;
/// Determines whether the [update] would be picked up by this query.
bool matches(TableUpdate update);
}

View File

@ -108,8 +108,8 @@ class StreamQueryStore {
/// Handles updates on a given table by re-executing all queries that read
/// from that table.
Future<void> handleTableUpdates(Set<TableInfo> tables) async {
handleTableUpdatesByName(tables.map((t) => t.actualTableName).toSet());
Future<void> handleTableUpdates(Set<TableUpdate> updates) async {
handleTableUpdatesByName(updates.map((t) => t.table).toSet());
}
/// Handles updates on tables by their name. All queries reading from any of
@ -263,3 +263,38 @@ class QueryStream<T> {
_controller.close();
}
}
// Note: These classes are here because we want them to be public, but not
// exposed without an src import.
class AnyUpdateQuery extends TableUpdateQuery {
const AnyUpdateQuery();
@override
bool matches(TableUpdate update) => true;
}
class MultipleUpdateQuery extends TableUpdateQuery {
final Set<TableUpdateQuery> queries;
const MultipleUpdateQuery(this.queries);
@override
bool matches(TableUpdate update) => queries.any((q) => q.matches(update));
}
class SpecificUpdateQuery extends TableUpdateQuery {
final UpdateKind limitUpdateKind;
final String table;
const SpecificUpdateQuery(this.table, {this.limitUpdateKind});
@override
bool matches(TableUpdate update) {
if (update.table != table) return false;
return update.kind == null ||
limitUpdateKind == null ||
update.kind == limitUpdateKind;
}
}

View File

@ -99,6 +99,6 @@ void main() {
b.update(db.users, const UsersCompanion(name: Value('new user name')));
});
verify(streamQueries.handleTableUpdates({db.users}));
verify(streamQueries.handleTableUpdates({const TableUpdate('users')}));
});
}

View File

@ -64,7 +64,7 @@ void main() {
variables: [Variable.withString('hi')], updates: {db.users});
verify(executor.runUpdate('UPDATE tbl SET a = ?', ['hi']));
verify(streamQueries.handleTableUpdates({db.users}));
verify(streamQueries.handleTableUpdates({const TableUpdate('users')}));
});
test('custom insert', () async {

View File

@ -1274,7 +1274,9 @@ abstract class _$CustomTablesDb extends GeneratedDatabase {
];
@override
StreamQueryUpdateRules get streamUpdateRules => const StreamQueryUpdateRules([
WritePropagation('config', {'with_defaults'})
WritePropagation(
TableUpdateQuery.onTable('config', limitUpdateKind: null),
{TableUpdate('with_defaults', kind: null)})
]);
}

View File

@ -57,7 +57,8 @@ void main() {
await db.delete(db.users).go();
verify(streamQueries.handleTableUpdates({db.users}));
verify(streamQueries.handleTableUpdates(
{const TableUpdate('users', kind: UpdateKind.delete)}));
});
test('are not issued when no data was changed', () async {

View File

@ -64,7 +64,8 @@ void main() {
profilePicture: Value(Uint8List(0)),
));
verify(streamQueries.handleTableUpdates({db.users}));
verify(streamQueries.handleTableUpdates(
{const TableUpdate('users', kind: UpdateKind.insert)}));
});
test('enforces data integrity', () async {

View File

@ -105,7 +105,7 @@ void main() {
content: Value('Updated content'),
));
verify(streamQueries.handleTableUpdates({db.todosTable}));
verify(streamQueries.handleTableUpdates({const TableUpdate('todos')}));
});
test('are not issued when no data was changed', () async {
@ -147,7 +147,8 @@ void main() {
test('informs about updated tables', () async {
await db.customUpdate('', updates: {db.users, db.todosTable});
verify(streamQueries.handleTableUpdates({db.users, db.todosTable}));
verify(streamQueries.handleTableUpdates(
{const TableUpdate('users'), const TableUpdate('todos')}));
});
});
}

View File

@ -12,8 +12,11 @@ class FindStreamUpdateRules {
for (final trigger in db.entities.whereType<MoorTrigger>()) {
rules.add(
WritePropagation(
trigger.on.sqlName,
trigger.bodyUpdates.map((t) => t.sqlName).toSet(),
TableUpdateQuery.onTable(trigger.on.sqlName),
{
for (final update in trigger.bodyUpdates)
TableUpdate(update.sqlName)
},
),
);
}

View File

@ -1,4 +1,6 @@
import 'package:moor/moor.dart';
// ignore: implementation_imports
import 'package:moor/src/runtime/executor/stream_queries.dart';
import 'package:moor_generator/moor_generator.dart';
import 'package:moor_generator/src/services/find_stream_update_rules.dart';
import 'package:moor_generator/src/utils/string_escaper.dart';
@ -120,18 +122,77 @@ class DatabaseWriter {
schemaScope.write(', ');
}
isFirst = false;
if (rule is WritePropagation) {
final updateNames = rule.updates.map(asDartLiteral).join(', ');
schemaScope.write('WritePropagation('
'${asDartLiteral(rule.onTable)}, {$updateNames})');
}
rule.writeConstructor(schemaScope);
}
schemaScope.write(']);');
schemaScope.write(']);\n');
}
// close the class
schemaScope.write('}');
schemaScope.write('}\n');
}
}
const _kindToDartExpr = {
UpdateKind.delete: 'UpdateKind.delete',
UpdateKind.insert: 'UpdateKind.insert',
UpdateKind.update: 'UpdateKind.update',
null: 'null',
};
extension on UpdateRule {
void writeConstructor(StringBuffer buffer) {
if (this is WritePropagation) {
final write = this as WritePropagation;
buffer.write('WritePropagation(');
write.on.writeConstructor(buffer);
buffer.write(', {');
var isFirst = true;
for (final update in write.result) {
if (!isFirst) {
buffer.write(', ');
}
isFirst = false;
update.writeConstructor(buffer);
}
buffer.write('})');
}
}
}
extension on TableUpdate {
void writeConstructor(StringBuffer buffer) {
buffer.write(
'TableUpdate(${asDartLiteral(table)}, kind: ${_kindToDartExpr[kind]})');
}
}
extension on TableUpdateQuery {
void writeConstructor(StringBuffer buffer) {
if (this is AnyUpdateQuery) {
buffer.write('TableUpdateQuery.any()');
} else if (this is SpecificUpdateQuery) {
final query = this as SpecificUpdateQuery;
buffer.write('TableUpdateQuery.onTable(${asDartLiteral(query.table)}, '
'limitUpdateKind: ${_kindToDartExpr[query.limitUpdateKind]})');
} else if (this is MultipleUpdateQuery) {
final queries = (this as MultipleUpdateQuery).queries;
var isFirst = true;
buffer.write('TableUpdateQuery.allOf({');
for (final query in queries) {
if (!isFirst) {
buffer.write(', ');
}
isFirst = false;
query.writeConstructor(buffer);
}
buffer.write('})');
}
}
}

View File

@ -39,8 +39,8 @@ class MyDatabase {}
expect(
rules.rules.single,
isA<WritePropagation>()
.having((e) => e.onTable, 'onTable', 'users')
.having((e) => e.updates, 'updates', {'users'}),
.having((e) => e.on, 'on', const TableUpdateQuery.onTable('users'))
.having((e) => e.result, 'result', {const TableUpdate('users')}),
);
});
}