Support stream queries in transactions (#365)

This commit is contained in:
Simon Binder 2020-01-30 22:08:18 +01:00
parent d1f837c481
commit 68e2b716fe
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
7 changed files with 157 additions and 39 deletions

View File

@ -36,15 +36,24 @@ There are a couple of things that should be kept in mind when working with trans
1. __Await all calls__: All queries inside the transaction must be `await`-ed. The transaction
will complete when the inner method completes. Without `await`, some queries might be operating
on the transaction after it has been closed! This can cause data loss or runtime crashes.
2. __No select streams in transactions__: Inside a `transaction` callback, select statements can't
be `.watch()`ed. The reasons behind this is that it's unclear how a stream should behave when a
transaction completes. Should the stream complete as well? Update to data changes made outside of the
transaction? Both seem inconsistent, so moor forbids this.
2. __Different behavior of stream queries__: Inside a `transaction` callback, stream queries behave
differently. If you're creating streams inside a transaction, check the next section to learn how
they behave.
## Transactions and query streams
Query streams that have been created outside a transaction work nicely together with
updates made in a transaction: All changes to tables will only be reported after the
transaction completes. Updates inside a transaction don't have an immediate effect on
streams, so your data will always be consistent.
streams, so your data will always be consistent and there aren't any uneccessary updates.
However, as mentioned above, note that streams can't be created inside a `transaction` block.
With streams created _inside_ a `transaction` block (or a nested call in there), it's
a different story. Notably, they
- reflect on changes made in the transaction immediatly
- complete when the transaction completes
This behavior is useful if you're collapsing streams inside a transaction, for instance by
calling `first` or `fold`.
However, we recommend that streams created _inside_ a transaction are not listened to
_outside_ of a transaction. While it's possible, it defeats the isolation principle
of transactions as its state is exposed through the stream.

View File

@ -3,6 +3,7 @@
- Support aggregate expressions and `group by` in the Dart api
- Support type converters in moor files! The [documentation](https://moor.simonbinder.eu/docs/advanced-features/type_converters/)
has been updated to explain how to use them.
- Support stream queries in transactions ([#356](https://github.com/simolus3/moor/issues/365))
- Support table-valued functions (like `json_each` and `json_tree`) in moor files
[#260](https://github.com/simolus3/moor/issues/260).
- Fix a crash when opening a transaction without using it ([#361](https://github.com/simolus3/moor/issues/361))

View File

@ -68,14 +68,6 @@ abstract class DatabaseConnectionUser {
/// [DatabaseConnection].
DatabaseConnectionUser.fromConnection(this.connection);
/// Marks the tables as updated. This method will be called internally
/// whenever a update, delete or insert statement is issued on the database.
/// We can then inform all active select-streams on those tables that their
/// snapshot might be out-of-date and needs to be fetched again.
void markTablesUpdated(Set<TableInfo> tables) {
streamQueries.handleTableUpdates(tables);
}
/// Creates and auto-updating stream from the given select statement. This
/// method should not be used directly.
Stream<T> createStream<T>(QueryStreamFetcher<T> stmt) =>

View File

@ -47,6 +47,14 @@ mixin QueryEngine on DatabaseConnectionUser {
}
}
/// Marks the tables as updated. This method will be called internally
/// whenever a update, delete or insert statement is issued on the database.
/// We can then inform all active select-streams on those tables that their
/// snapshot might be out-of-date and needs to be fetched again.
void markTablesUpdated(Set<TableInfo> tables) {
_resolvedEngine.streamQueries.handleTableUpdates(tables);
}
/// Starts an [InsertStatement] for a given table. You can use that statement
/// to write data into the [table] by using [InsertStatement.insert].
@protected
@ -253,14 +261,27 @@ mixin QueryEngine on DatabaseConnectionUser {
/// Executes [action] in a transaction, which means that all its queries and
/// updates will be called atomically.
///
/// Please be aware of the following limitations of transactions:
/// 1. Inside a transaction, auto-updating streams cannot be created. This
/// operation will throw at runtime. The reason behind this is that a
/// stream might have a longer lifespan than a transaction, but it still
/// needs to know about the transaction because the data in a transaction
/// might be different than that of the "global" database instance.
/// 2. Nested transactions are not supported. Creating another transaction
/// inside a transaction returns the parent transaction.
/// Returns the value of [action].
/// When [action] throws an exception, the transaction will be reset and no
/// changes will be applied to the databases. The exception will be rethrown
/// by [transaction].
///
/// The behavior of stream queries in transactions depends on where the stream
/// was created:
///
/// - streams created outside of a [transaction] block: The stream will update
/// with the tables modified in the transaction after it completes
/// successfully. If the transaction fails, the stream will not update.
/// - streams created inside a [transaction] block: The stream will update for
/// each write in the transaction. When the transaction completes,
/// successful or not, streams created in it will close. Writes happening
/// outside of this transaction will not affect the stream.
///
/// Please note that nested transactions are not supported. Creating another
/// transaction inside a transaction returns the parent transaction.
///
/// See also:
/// - the docs on [transactions](https://moor.simonbinder.eu/docs/transactions/)
Future<T> transaction<T>(Future<T> Function() action) async {
final resolved = _resolvedEngine;
if (resolved is Transaction) {
@ -288,6 +309,7 @@ mixin QueryEngine on DatabaseConnectionUser {
// complete() will also take care of committing the transaction
await transaction.complete();
}
await transaction.disposeChildStreams();
}
});
});

View File

@ -189,6 +189,8 @@ class QueryStream<T> {
return _controller.stream.transform(StartWithValueTransformer(_cachedData));
}
bool get hasKey => _fetcher.key != null;
QueryStream(this._fetcher, this._store);
/// Called when we have a new listener, makes the stream query behave similar
@ -246,4 +248,8 @@ class QueryStream<T> {
}
}
}
Future<void> close() {
return _controller.close();
}
}

View File

@ -17,10 +17,14 @@ class Transaction extends DatabaseConnectionUser with QueryEngine {
/// Instructs the underlying executor to execute this instructions. Batched
/// table updates will also be send to the stream query store.
Future complete() async {
final streams = streamQueries as _TransactionStreamStore;
await (executor as TransactionExecutor).send();
}
await streams.dispatchUpdates();
/// Closes all streams created in this transactions and applies table updates
/// to the main stream store.
Future<void> disposeChildStreams() async {
final streams = streamQueries as _TransactionStreamStore;
await streams._dispatchAndClose();
}
}
@ -28,24 +32,47 @@ class Transaction extends DatabaseConnectionUser with QueryEngine {
/// updates to the outer stream query store when the transaction is completed.
class _TransactionStreamStore extends StreamQueryStore {
final StreamQueryStore parent;
final Set<TableInfo> affectedTables = <TableInfo>{};
final Set<String> affectedTables = <String>{};
final Set<QueryStream> _queriesWithoutKey = {};
_TransactionStreamStore(this.parent);
@override
Stream<T> registerStream<T>(QueryStreamFetcher<T> statement) {
throw StateError('Streams cannot be created inside a transaction. See the '
'documentation of GeneratedDatabase.transaction for details.');
void handleTableUpdatesByName(Set<String> tables) {
affectedTables.addAll(tables);
super.handleTableUpdatesByName(tables);
}
// Override lifecycle hooks for each stream. The regular StreamQueryStore
// keeps track of created streams if they have a key. It also takes care of
// closing the underlying stream controllers when calling close(), which we
// do.
// However, it doesn't keep track of keyless queries, as those can't be
// cached and keeping a reference would leak. A transaction is usually
// completed quickly, so we can keep a list and close that too.
@override
void markAsOpened(QueryStream stream) {
super.markAsOpened(stream);
if (!stream.hasKey) {
_queriesWithoutKey.add(stream);
}
}
@override
Future handleTableUpdates(Set<TableInfo> tables) {
affectedTables.addAll(tables);
return Future.value(null);
void markAsClosed(QueryStream stream, Function() whenRemoved) {
super.markAsClosed(stream, whenRemoved);
_queriesWithoutKey.add(stream);
}
Future dispatchUpdates() {
return parent.handleTableUpdates(affectedTables);
Future _dispatchAndClose() async {
parent.handleTableUpdatesByName(affectedTables);
await super.close();
await Future.wait(_queriesWithoutKey.map((e) => e.close()));
}
}

View File

@ -1,3 +1,5 @@
import 'dart:async';
@TestOn('!browser') // todo: Figure out why this doesn't run in js
// ignore_for_file: lines_longer_than_80_chars
@ -35,12 +37,71 @@ void main() {
db = TodoDb.connect(connection);
});
test("transactions don't allow creating streams", () {
expect(() async {
test('streams in transactions are isolated and scoped', () async {
// create a database without mocked stream queries
db = TodoDb(MockExecutor());
Stream<int> stream;
final didSetUpStream = Completer<void>();
final makeUpdate = Completer<void>();
final complete = Completer<void>();
final transaction = db.transaction(() async {
stream = db
.customSelectQuery(
'SELECT _mocked_',
readsFrom: {db.users},
)
.map((r) => r.readInt('_mocked_'))
.watchSingle();
didSetUpStream.complete();
await makeUpdate.future;
db.markTablesUpdated({db.users});
await complete.future;
});
final emittedValues = <dynamic>[];
var didComplete = false;
// wait for the transaction to setup the stream
await didSetUpStream.future;
stream.listen(emittedValues.add, onDone: () => didComplete = true);
// Stream should emit initial select
await pumpEventQueue();
expect(emittedValues, hasLength(1));
// update tables inside the transaction -> stream should emit another value
makeUpdate.complete();
await pumpEventQueue();
expect(emittedValues, hasLength(2));
// update tables outside of the transaction -> stream should NOT update
db.markTablesUpdated({db.users});
await pumpEventQueue();
expect(emittedValues, hasLength(2));
complete.complete();
await transaction;
expect(didComplete, isTrue, reason: 'Stream must complete');
});
test('stream queries terminate on exceptional transaction', () async {
Stream stream;
try {
await db.transaction(() async {
db.select(db.users).watch();
stream = db.select(db.users).watch();
throw Exception();
});
}, throwsStateError);
} on Exception {
// ignore
}
expect(stream, emitsDone);
});
test('nested transactions use the outer transaction', () async {
@ -90,11 +151,11 @@ void main() {
// Even though we just wrote to users, this only happened inside the
// transaction, so the top level stream queries should not be updated.
verifyNever(streamQueries.handleTableUpdates(any));
verifyZeroInteractions(streamQueries);
});
// After the transaction completes, the queries should be updated
verify(streamQueries.handleTableUpdates({db.users})).called(1);
verify(streamQueries.handleTableUpdatesByName({'users'})).called(1);
verify(executor.transactions.send());
});