Support table-update propagation at runtime

This commit is contained in:
Simon Binder 2020-03-04 16:09:01 +01:00
parent c8f4f739e9
commit 3fe6fbfc99
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
7 changed files with 114 additions and 16 deletions

View File

@ -13,9 +13,9 @@ abstract class DatabaseAccessor<T extends GeneratedDatabase>
final bool topLevel = true;
/// The main database instance for this dao
@protected
final T db;
@override
final T attachedDatabase;
/// Used internally by moor
DatabaseAccessor(this.db) : super.delegate(db);
DatabaseAccessor(this.attachedDatabase) : super.delegate(attachedDatabase);
}

View File

@ -13,7 +13,10 @@ Map<Type, int> _openedDbCount = {};
abstract class GeneratedDatabase extends DatabaseConnectionUser
with QueryEngine {
@override
final bool topLevel = true;
bool get topLevel => true;
@override
GeneratedDatabase get attachedDatabase => this;
/// Specify the schema version of your database. Whenever you change or add
/// tables, you should bump this field and provide a [migration] strategy.
@ -28,6 +31,14 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
MigrationStrategy _cachedMigration;
MigrationStrategy get _resolvedMigration => _cachedMigration ??= migration;
/// The collection of update rules contains information on how updates on
/// tables result in other updates, for instance due to a trigger.
///
/// There should be no need to overwrite this field, moor will generate an
/// appropriate implementation automatically.
StreamQueryUpdateRules get streamUpdateRules =>
const StreamQueryUpdateRules.none();
/// A list of tables specified in this database.
Iterable<TableInfo> get allTables;

View File

@ -30,6 +30,10 @@ mixin QueryEngine on DatabaseConnectionUser {
@protected
bool get topLevel => false;
/// The database that this query engine is attached to.
@visibleForOverriding
GeneratedDatabase get attachedDatabase;
/// We can detect when a user called methods on the wrong [QueryEngine]
/// (e.g. calling [QueryEngine.into] in a transaction, where
/// [QueryEngine.into] should have been called instead). See the documentation
@ -52,7 +56,10 @@ mixin QueryEngine on DatabaseConnectionUser {
/// We can then inform all active select-streams on those tables that their
/// snapshot might be out-of-date and needs to be fetched again.
void markTablesUpdated(Set<TableInfo> tables) {
_resolvedEngine.streamQueries.handleTableUpdates(tables);
final withRulesApplied = attachedDatabase.streamUpdateRules
.apply(tables.map((t) => t.actualTableName));
_resolvedEngine.streamQueries.handleTableUpdatesByName(withRulesApplied);
}
/// Starts an [InsertStatement] for a given table. You can use that statement

View File

@ -9,6 +9,7 @@ part 'connection.dart';
part 'db_base.dart';
part 'dao_base.dart';
part 'query_engine.dart';
part 'stream_updates.dart';
/// Defines additional runtime behavior for moor. Changing the fields of this
/// class is rarely necessary.

View File

@ -0,0 +1,69 @@
part of 'runtime_api.dart';
/// Collects a set of [UpdateRule]s which can be used to express how a set of
/// direct updates to a table affects other updates.
///
/// This is used to implement query streams in databases that have triggers.
///
/// Note that all the members in this class are visible for generated code and
/// internal moor code. They don't adhere to Semantic Versioning and should not
/// be used manually.
class StreamQueryUpdateRules {
/// All rules active in a database.
final List<UpdateRule> rules;
/// Creates a [StreamQueryUpdateRules] from the underlying [rules].
const StreamQueryUpdateRules(this.rules);
/// The default implementation, which doesn't have any rules.
const StreamQueryUpdateRules.none() : this(const []);
/// Obtain a set of all tables that might be affected by direct updates to
/// [updatedTables].
///
/// This method should be used in internal moor code only, it does not respect
/// Semantic Versioning and might change at any time.
Set<String> apply(Iterable<String> updatedTables) {
// Most users don't have any update rules, and this check is much faster
// than crawling through all updates.
if (rules.isEmpty) return updatedTables.toSet();
final pending = List.of(updatedTables);
final seen = <String>{};
while (pending.isNotEmpty) {
final updatedTable = pending.removeLast();
seen.add(updatedTable);
for (final rule in rules) {
if (rule is WritePropagation && rule.onTable == updatedTable) {
pending.addAll(rule.updates.where((u) => !seen.contains(u)));
}
}
}
return seen;
}
}
/// Users should not extend or implement this class.
abstract class UpdateRule {
/// Common const constructor so that subclasses can be const.
const UpdateRule();
}
/// An [UpdateRule] for triggers that exist in a database.
///
/// An update on [onTable] implicitly triggers updates on [updates].
///
/// This class is for use by generated or moor-internal code only. It does not
/// adhere to Semantic Versioning and should not be used manually.
class WritePropagation extends UpdateRule {
/// The table name that the trigger is active on.
final String onTable;
/// All tables potentially updated by the trigger.
final Set<String> updates;
/// Default constructor. See [WritePropagation] for details.
const WritePropagation(this.onTable, this.updates);
}

View File

@ -5,13 +5,18 @@ import 'package:moor/src/runtime/executor/stream_queries.dart';
///
/// Moor users should use [QueryEngine.transaction] to use this api.
class Transaction extends DatabaseConnectionUser with QueryEngine {
/// Constructs a transaction executor from the [other] user and the underlying
/// [executor].
Transaction(DatabaseConnectionUser other, TransactionExecutor executor)
final QueryEngine _parent;
@override
GeneratedDatabase get attachedDatabase => _parent.attachedDatabase;
/// Constructs a transaction executor from the [_parent] engine and the
/// underlying [executor].
Transaction(this._parent, TransactionExecutor executor)
: super.delegate(
other,
_parent,
executor: executor,
streamQueries: _TransactionStreamStore(other.streamQueries),
streamQueries: _TransactionStreamStore(_parent.streamQueries),
);
/// Instructs the underlying executor to execute this instructions. Batched
@ -83,8 +88,13 @@ class _TransactionStreamStore extends StreamQueryStore {
/// To use this api, moor users should use the [MigrationStrategy.beforeOpen]
/// parameter inside the [GeneratedDatabase.migration] getter.
class BeforeOpenRunner extends DatabaseConnectionUser with QueryEngine {
/// Creates a [BeforeOpenRunner] from the [database] and the special
final QueryEngine _parent;
@override
GeneratedDatabase get attachedDatabase => _parent.attachedDatabase;
/// Creates a [BeforeOpenRunner] from a [QueryEngine] and the special
/// [executor] running the queries.
BeforeOpenRunner(DatabaseConnectionUser database, QueryExecutor executor)
: super.delegate(database, executor: executor);
BeforeOpenRunner(this._parent, QueryExecutor executor)
: super.delegate(_parent, executor: executor);
}

View File

@ -1447,9 +1447,9 @@ class AllTodosWithCategoryResult {
// **************************************************************************
mixin _$SomeDaoMixin on DatabaseAccessor<TodoDb> {
$UsersTable get users => db.users;
$SharedTodosTable get sharedTodos => db.sharedTodos;
$TodosTableTable get todosTable => db.todosTable;
$UsersTable get users => attachedDatabase.users;
$SharedTodosTable get sharedTodos => attachedDatabase.sharedTodos;
$TodosTableTable get todosTable => attachedDatabase.todosTable;
TodoEntry _rowToTodoEntry(QueryRow row) {
return TodoEntry(
id: row.readInt('id'),