New tableUpdates stream method on QueryEngine (#394)

This commit is contained in:
Simon Binder 2020-03-04 22:43:41 +01:00
parent 2811d91fa1
commit 3002d87bcb
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
14 changed files with 118 additions and 21 deletions

View File

@ -19,6 +19,8 @@
or transaction.
- Updated stream queries: They now take triggers into account and more accurately detect when an update
is necessary.
- New `tableUpdates` method that can be used to listen for a subset of table updates outside of
a query.
## 2.4.1

View File

@ -72,6 +72,23 @@ mixin QueryEngine on DatabaseConnectionUser {
_resolvedEngine.streamQueries.handleTableUpdates(withRulesApplied);
}
/// Creates a stream that emits `null` each time a table that would affect
/// [query] is changed.
///
/// When called inside a transaction, the stream will close when the
/// transaction completes or is rolled back. Otherwise, the stream will
/// complete as the database is closed.
Stream<Null> tableUpdates(
[TableUpdateQuery query = const TableUpdateQuery.any()]) {
return _resolvedEngine.streamQueries
.updatesForSync(query)
.asyncMap((event) async {
// streamQueries.updatesForSync is a synchronous stream - make it
// asynchronous by awaiting null for each event.
return await null;
});
}
/// Starts an [InsertStatement] for a given table. You can use that statement
/// to write data into the [table] by using [InsertStatement.insert].
@protected

View File

@ -93,7 +93,7 @@ class TableUpdate {
/// Creates a [TableUpdate] instance based on a [TableInfo] instead of the raw
/// name.
factory TableUpdate.fromTable(TableInfo table, {UpdateKind kind}) {
factory TableUpdate.onTable(TableInfo table, {UpdateKind kind}) {
return TableUpdate(table.actualTableName, kind: kind);
}
@ -126,21 +126,32 @@ abstract class TableUpdateQuery {
const factory TableUpdateQuery.allOf(List<TableUpdateQuery> queries) =
MultipleUpdateQuery;
/// A query that listens for all updates on a specific [table] by its name.
///
/// The optional [limitUpdateKind] parameter can be used to limit the updates
/// to a certain kind.
const factory TableUpdateQuery.onTableName(String table,
{UpdateKind limitUpdateKind}) = SpecificUpdateQuery;
/// A query that listens for all updates on a specific [table].
///
/// The optional [limitUpdateKind] parameter can be used to limit the updates
/// to a certain kind.
const factory TableUpdateQuery.onTable(String table,
{UpdateKind limitUpdateKind}) = SpecificUpdateQuery;
factory TableUpdateQuery.onTable(TableInfo table,
{UpdateKind limitUpdateKind}) {
return TableUpdateQuery.onTableName(
table.actualTableName,
limitUpdateKind: limitUpdateKind,
);
}
/// A query that listens for any change on any table in [tables].
factory TableUpdateQuery.onAllTables(Iterable<TableInfo> tables) {
// analyzer bug, remove when Dart 2.8 is stable
// ignore: prefer_const_constructors
return TableUpdateQuery.allOf([
for (final table in tables)
TableUpdateQuery.onTable(table.actualTableName)
]);
return TableUpdateQuery.allOf(
[for (final table in tables) TableUpdateQuery.onTable(table)],
);
}
/// Determines whether the [update] would be picked up by this query.

View File

@ -107,7 +107,7 @@ class StreamQueryStore {
return stream.stream;
}
Stream<Null> _updatesFor(TableUpdateQuery query) {
Stream<Null> updatesForSync(TableUpdateQuery query) {
return _tableUpdates.stream
.where((e) => e.any(query.matches))
.map((_) => null);
@ -219,7 +219,7 @@ class QueryStream<T> {
fetchAndEmitData();
_tablesChangedSubscription =
_store._updatesFor(_fetcher.readsFrom).listen((_) {
_store.updatesForSync(_fetcher.readsFrom).listen((_) {
// table has changed, invalidate cache
_lastData = null;
fetchAndEmitData();

View File

@ -33,7 +33,7 @@ class DeleteStatement<T extends Table, D extends DataClass> extends Query<T, D>
if (rows > 0) {
database.notifyUpdates(
{TableUpdate.fromTable(table, kind: UpdateKind.delete)});
{TableUpdate.onTable(table, kind: UpdateKind.delete)});
}
return rows;
});

View File

@ -36,8 +36,8 @@ class InsertStatement<D extends DataClass> {
return await database.executor.doWhenOpened((e) async {
final id = await database.executor.runInsert(ctx.sql, ctx.boundVariables);
database.notifyUpdates(
{TableUpdate.fromTable(table, kind: UpdateKind.insert)});
database
.notifyUpdates({TableUpdate.onTable(table, kind: UpdateKind.insert)});
return id;
});
}

View File

@ -36,8 +36,8 @@ class UpdateStatement<T extends Table, D extends DataClass> extends Query<T, D>
});
if (rows > 0) {
database.notifyUpdates(
{TableUpdate.fromTable(table, kind: UpdateKind.update)});
database
.notifyUpdates({TableUpdate.onTable(table, kind: UpdateKind.update)});
}
return rows;

View File

@ -1276,7 +1276,7 @@ abstract class _$CustomTablesDb extends GeneratedDatabase {
StreamQueryUpdateRules get streamUpdateRules => const StreamQueryUpdateRules(
[
WritePropagation(
on: TableUpdateQuery.onTable('config',
on: TableUpdateQuery.onTableName('config',
limitUpdateKind: UpdateKind.insert),
result: [
TableUpdate('with_defaults', kind: UpdateKind.insert),

View File

@ -1,5 +1,6 @@
import 'dart:async';
import 'package:moor/moor.dart';
import 'package:moor/src/runtime/api/runtime_api.dart';
import 'package:moor/src/runtime/executor/stream_queries.dart';
import 'package:test/test.dart';
@ -250,4 +251,69 @@ void main() {
verify(executor.runSelect(any, any)).called(1);
});
group('listen for table updates', () {
test('any', () async {
var counter = 0;
db.tableUpdates().listen((event) => counter++);
db.markTablesUpdated({db.todosTable});
await pumpEventQueue(times: 1);
expect(counter, 1);
db.markTablesUpdated({db.users});
await pumpEventQueue(times: 1);
expect(counter, 2);
});
test('stream is async', () {
var counter = 0;
db.tableUpdates().listen((event) => counter++);
db.markTablesUpdated({});
// no wait here, the counter should not be updated yet.
expect(counter, 0);
});
test('specific table', () async {
var counter = 0;
db
.tableUpdates(TableUpdateQuery.onTable(db.users))
.listen((event) => counter++);
db.markTablesUpdated({db.todosTable});
await pumpEventQueue(times: 1);
expect(counter, 0);
db.markTablesUpdated({db.users});
await pumpEventQueue(times: 1);
expect(counter, 1);
db.markTablesUpdated({db.categories});
await pumpEventQueue(times: 1);
expect(counter, 1);
});
test('specific table and update kind', () async {
var counter = 0;
db
.tableUpdates(TableUpdateQuery.onTable(db.users,
limitUpdateKind: UpdateKind.update))
.listen((event) => counter++);
db.markTablesUpdated({db.todosTable});
await pumpEventQueue(times: 1);
expect(counter, 0);
db.notifyUpdates(
{TableUpdate.onTable(db.users, kind: UpdateKind.update)});
await pumpEventQueue(times: 1);
expect(counter, 1);
db.notifyUpdates(
{TableUpdate.onTable(db.users, kind: UpdateKind.delete)});
await pumpEventQueue(times: 1);
expect(counter, 1);
});
});
}

View File

@ -157,7 +157,7 @@ void main() {
// After the transaction completes, the queries should be updated
verify(
streamQueries.handleTableUpdates(
{TableUpdate.fromTable(db.users, kind: UpdateKind.update)}),
{TableUpdate.onTable(db.users, kind: UpdateKind.update)}),
).called(1);
verify(executor.transactions.send());
});

View File

@ -106,7 +106,7 @@ void main() {
));
verify(streamQueries.handleTableUpdates(
{TableUpdate.fromTable(db.todosTable, kind: UpdateKind.update)}));
{TableUpdate.onTable(db.todosTable, kind: UpdateKind.update)}));
});
test('are not issued when no data was changed', () async {

View File

@ -23,7 +23,7 @@ class FindStreamUpdateRules {
rules.add(
WritePropagation(
on: TableUpdateQuery.onTable(
on: TableUpdateQuery.onTableName(
trigger.on.sqlName,
limitUpdateKind: targetKind,
),

View File

@ -168,7 +168,8 @@ extension on TableUpdateQuery {
buffer.write('TableUpdateQuery.any()');
} else if (this is SpecificUpdateQuery) {
final query = this as SpecificUpdateQuery;
buffer.write('TableUpdateQuery.onTable(${asDartLiteral(query.table)}, '
buffer
.write('TableUpdateQuery.onTableName(${asDartLiteral(query.table)}, '
'limitUpdateKind: ${_kindToDartExpr[query.limitUpdateKind]})');
} else if (this is MultipleUpdateQuery) {
final queries = (this as MultipleUpdateQuery).queries;

View File

@ -42,7 +42,7 @@ class MyDatabase {}
.having(
(e) => e.on,
'on',
const TableUpdateQuery.onTable('users',
const TableUpdateQuery.onTableName('users',
limitUpdateKind: UpdateKind.insert))
.having((e) => e.result, 'result', {const TableUpdate('users')}),
);