mirror of https://github.com/AMT-Cheif/drift.git
Initial support for nested transactions
This commit is contained in:
parent
5317d0a33b
commit
8def7055a0
|
@ -7,6 +7,7 @@
|
||||||
- __Breaking__: Mapping methods on type converters are now called `toSql` and `fromSql`.
|
- __Breaking__: Mapping methods on type converters are now called `toSql` and `fromSql`.
|
||||||
- Consistently handle transaction errors like a failing `BEGIN` or `COMMIT`
|
- Consistently handle transaction errors like a failing `BEGIN` or `COMMIT`
|
||||||
across database implementations.
|
across database implementations.
|
||||||
|
- Support nested transactions.
|
||||||
- Fix nullability of `min`, `max` and `avg` in the Dart query builder.
|
- Fix nullability of `min`, `max` and `avg` in the Dart query builder.
|
||||||
|
|
||||||
## 1.7.0
|
## 1.7.0
|
||||||
|
|
|
@ -162,6 +162,9 @@ class _RemoteTransactionExecutor extends _BaseExecutor
|
||||||
@override
|
@override
|
||||||
SqlDialect get dialect => SqlDialect.sqlite;
|
SqlDialect get dialect => SqlDialect.sqlite;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get supportsNestedTransactions => false;
|
||||||
|
|
||||||
@override
|
@override
|
||||||
TransactionExecutor beginTransaction() {
|
TransactionExecutor beginTransaction() {
|
||||||
throw UnsupportedError('Nested transactions');
|
throw UnsupportedError('Nested transactions');
|
||||||
|
|
|
@ -427,16 +427,49 @@ abstract class DatabaseConnectionUser {
|
||||||
/// successful or not, streams created in it will close. Writes happening
|
/// successful or not, streams created in it will close. Writes happening
|
||||||
/// outside of this transaction will not affect the stream.
|
/// outside of this transaction will not affect the stream.
|
||||||
///
|
///
|
||||||
/// Please note that nested transactions are not supported. Creating another
|
/// Starting from drift version 2.0, nested transactions are supported on most
|
||||||
/// transaction inside a transaction returns the parent transaction.
|
/// database implementations (including `NativeDatabase`, TODO list). When
|
||||||
|
/// calling [transaction] inside a [transaction] block on supported database
|
||||||
|
/// implementations, a new transaction will be started.
|
||||||
|
/// For backwards-compatibility, the current transaction will be re-used if
|
||||||
|
/// a nested transaction is started with a database implementation not
|
||||||
|
/// supporting nested transactions. The [requireNew] parameter can be set to
|
||||||
|
/// instead turn this case into a runtime error.
|
||||||
|
///
|
||||||
|
/// Nested transactions are conceptionally similar to regular, top-level
|
||||||
|
/// transactions in the sense that their writes are not seen by users outside
|
||||||
|
/// of the transaction until it is commited. However, their behavior around
|
||||||
|
/// completions is different:
|
||||||
|
///
|
||||||
|
/// - When a nested transaction completes, nothing is being persisted right
|
||||||
|
/// away. The parent transaction can now see changes from the child
|
||||||
|
/// transaction and continues to run. When the outermost transaction
|
||||||
|
/// completes, its changes (including changes from child transactions) are
|
||||||
|
/// written to the database.
|
||||||
|
/// - When a nested transaction is aborted (which happens due to exceptions),
|
||||||
|
/// only changes in that inner transaction are reverted. The outer
|
||||||
|
/// transaction can continue to run if it catched the exception thrown by
|
||||||
|
/// the inner transaction when it aborted.
|
||||||
///
|
///
|
||||||
/// See also:
|
/// See also:
|
||||||
/// - the docs on [transactions](https://drift.simonbinder.eu/docs/transactions/)
|
/// - the docs on [transactions](https://drift.simonbinder.eu/docs/transactions/)
|
||||||
Future<T> transaction<T>(Future<T> Function() action) async {
|
Future<T> transaction<T>(Future<T> Function() action,
|
||||||
|
{bool requireNew = false}) async {
|
||||||
final resolved = resolvedEngine;
|
final resolved = resolvedEngine;
|
||||||
|
|
||||||
|
// Are we about to start a nested transaction?
|
||||||
if (resolved is Transaction) {
|
if (resolved is Transaction) {
|
||||||
|
final executor = resolved.executor as TransactionExecutor;
|
||||||
|
if (!executor.supportsNestedTransactions) {
|
||||||
|
if (requireNew) {
|
||||||
|
throw UnsupportedError('The current database implementation does '
|
||||||
|
'not support nested transactions.');
|
||||||
|
} else {
|
||||||
|
// Just run the block in the current transaction zone.
|
||||||
return action();
|
return action();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return await resolved.doWhenOpened((executor) {
|
return await resolved.doWhenOpened((executor) {
|
||||||
final transactionExecutor = executor.beginTransaction();
|
final transactionExecutor = executor.beginTransaction();
|
||||||
|
|
|
@ -143,6 +143,10 @@ class ArgumentsForBatchedStatement {
|
||||||
|
|
||||||
/// A [QueryExecutor] that runs multiple queries atomically.
|
/// A [QueryExecutor] that runs multiple queries atomically.
|
||||||
abstract class TransactionExecutor extends QueryExecutor {
|
abstract class TransactionExecutor extends QueryExecutor {
|
||||||
|
/// Whether this transaction executor supports nesting transactions by calling
|
||||||
|
/// [beginTransaction] on it.
|
||||||
|
bool get supportsNestedTransactions;
|
||||||
|
|
||||||
/// Completes the transaction. No further queries may be sent to to this
|
/// Completes the transaction. No further queries may be sent to to this
|
||||||
/// [QueryExecutor] after this method was called.
|
/// [QueryExecutor] after this method was called.
|
||||||
///
|
///
|
||||||
|
|
|
@ -152,6 +152,9 @@ abstract class _TransactionExecutor extends _BaseExecutor
|
||||||
|
|
||||||
@override
|
@override
|
||||||
bool get isSequential => _db.isSequential;
|
bool get isSequential => _db.isSequential;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get supportsNestedTransactions => false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A transaction implementation that sends `BEGIN` and `COMMIT` statements
|
/// A transaction implementation that sends `BEGIN` and `COMMIT` statements
|
||||||
|
@ -162,7 +165,25 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor {
|
||||||
Completer<bool>? _opened;
|
Completer<bool>? _opened;
|
||||||
final Completer<void> _done = Completer();
|
final Completer<void> _done = Completer();
|
||||||
|
|
||||||
_StatementBasedTransactionExecutor(super._db, this._delegate);
|
final _StatementBasedTransactionExecutor? _parent;
|
||||||
|
|
||||||
|
final String _startCommand;
|
||||||
|
final String _commitCommand;
|
||||||
|
final String _rollbackCommand;
|
||||||
|
|
||||||
|
_StatementBasedTransactionExecutor(super._db, this._delegate)
|
||||||
|
: _startCommand = _delegate.start,
|
||||||
|
_commitCommand = _delegate.commit,
|
||||||
|
_rollbackCommand = _delegate.rollback,
|
||||||
|
_parent = null;
|
||||||
|
|
||||||
|
_StatementBasedTransactionExecutor.nested(
|
||||||
|
_StatementBasedTransactionExecutor this._parent, int depth)
|
||||||
|
: _delegate = _parent._delegate,
|
||||||
|
_startCommand = 'SAVEPOINT s$depth',
|
||||||
|
_commitCommand = 'RELEASE s$depth',
|
||||||
|
_rollbackCommand = 'ROLLBACK TO s$depth',
|
||||||
|
super(_parent._db);
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<bool> ensureOpen(QueryExecutorUser user) {
|
Future<bool> ensureOpen(QueryExecutorUser user) {
|
||||||
|
@ -171,10 +192,11 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor {
|
||||||
|
|
||||||
if (opened == null) {
|
if (opened == null) {
|
||||||
opened = _opened = Completer();
|
opened = _opened = Completer();
|
||||||
// Block the main database interface while this transaction is active.
|
// Block the main database or the parent transaction while this
|
||||||
unawaited(_db._synchronized(() async {
|
// transaction is active.
|
||||||
|
unawaited((_parent ?? _db)._synchronized(() async {
|
||||||
try {
|
try {
|
||||||
await runCustom(_delegate.start);
|
await runCustom(_startCommand);
|
||||||
_db.delegate.isInTransaction = true;
|
_db.delegate.isInTransaction = true;
|
||||||
_opened!.complete(true);
|
_opened!.complete(true);
|
||||||
} catch (e, s) {
|
} catch (e, s) {
|
||||||
|
@ -192,15 +214,28 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor {
|
||||||
@override
|
@override
|
||||||
QueryDelegate get impl => _db.delegate;
|
QueryDelegate get impl => _db.delegate;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get supportsNestedTransactions => true;
|
||||||
|
|
||||||
|
@override
|
||||||
|
TransactionExecutor beginTransaction() {
|
||||||
|
var ownDepth = 0;
|
||||||
|
var ancestor = _parent;
|
||||||
|
while (ancestor != null) {
|
||||||
|
ownDepth++;
|
||||||
|
ancestor = ancestor._parent;
|
||||||
|
}
|
||||||
|
|
||||||
|
return _StatementBasedTransactionExecutor.nested(this, ownDepth);
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> send() async {
|
Future<void> send() async {
|
||||||
// don't do anything if the transaction completes before it was opened
|
// don't do anything if the transaction completes before it was opened
|
||||||
if (!_ensureOpenCalled) return;
|
if (!_ensureOpenCalled) return;
|
||||||
|
|
||||||
await runCustom(_delegate.commit, const []);
|
await runCustom(_commitCommand, const []);
|
||||||
_db.delegate.isInTransaction = false;
|
_afterCommitOrRollback();
|
||||||
_done.complete();
|
|
||||||
_closed = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
@ -208,7 +243,7 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor {
|
||||||
if (!_ensureOpenCalled) return;
|
if (!_ensureOpenCalled) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await runCustom(_delegate.rollback, const []);
|
await runCustom(_rollbackCommand, const []);
|
||||||
} finally {
|
} finally {
|
||||||
// Note: When send() is called and throws an exception, we don't mark this
|
// Note: When send() is called and throws an exception, we don't mark this
|
||||||
// transaction is closed (as the commit should either be retried or the
|
// transaction is closed (as the commit should either be retried or the
|
||||||
|
@ -216,13 +251,19 @@ class _StatementBasedTransactionExecutor extends _TransactionExecutor {
|
||||||
// When aborting fails too, something is seriously wrong already. Let's
|
// When aborting fails too, something is seriously wrong already. Let's
|
||||||
// at least make sure that we don't block the rest of the db by pretending
|
// at least make sure that we don't block the rest of the db by pretending
|
||||||
// the transaction is still open.
|
// the transaction is still open.
|
||||||
|
_afterCommitOrRollback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void _afterCommitOrRollback() {
|
||||||
|
if (_parent == null) {
|
||||||
_db.delegate.isInTransaction = false;
|
_db.delegate.isInTransaction = false;
|
||||||
|
}
|
||||||
|
|
||||||
_done.complete();
|
_done.complete();
|
||||||
_closed = true;
|
_closed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
class _WrappingTransactionExecutor extends _TransactionExecutor {
|
class _WrappingTransactionExecutor extends _TransactionExecutor {
|
||||||
static final _artificialRollback =
|
static final _artificialRollback =
|
||||||
|
|
|
@ -88,7 +88,11 @@ void main() {
|
||||||
expect(stream, emitsDone);
|
expect(stream, emitsDone);
|
||||||
});
|
});
|
||||||
|
|
||||||
test('nested transactions use the outer transaction', () async {
|
group('nested transactions', () {
|
||||||
|
test('are no-ops if not supported', () async {
|
||||||
|
final transactions = executor.transactions;
|
||||||
|
when(transactions.supportsNestedTransactions).thenReturn(false);
|
||||||
|
|
||||||
await db.transaction(() async {
|
await db.transaction(() async {
|
||||||
await db.transaction(() async {
|
await db.transaction(() async {
|
||||||
// todo how can we test that these are really equal?
|
// todo how can we test that these are really equal?
|
||||||
|
@ -98,7 +102,68 @@ void main() {
|
||||||
verifyNever(executor.transactions.send());
|
verifyNever(executor.transactions.send());
|
||||||
});
|
});
|
||||||
|
|
||||||
verify(executor.transactions.send());
|
verify(transactions.send());
|
||||||
|
verify(executor.beginTransaction());
|
||||||
|
verifyNever(transactions.beginTransaction());
|
||||||
|
});
|
||||||
|
|
||||||
|
test('can throw if not supported', () async {
|
||||||
|
final transactions = executor.transactions;
|
||||||
|
when(transactions.supportsNestedTransactions).thenReturn(false);
|
||||||
|
|
||||||
|
await db.transaction(() async {
|
||||||
|
await expectLater(
|
||||||
|
db.transaction(() async {
|
||||||
|
fail('Should not be called');
|
||||||
|
}, requireNew: true),
|
||||||
|
throwsUnsupportedError,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
verify(transactions.send());
|
||||||
|
verifyNever(transactions.beginTransaction());
|
||||||
|
});
|
||||||
|
|
||||||
|
test('are committed separately', () async {
|
||||||
|
final outerTransactions = executor.transactions;
|
||||||
|
final innerTransactions = outerTransactions.transactions;
|
||||||
|
|
||||||
|
await db.transaction(() async {
|
||||||
|
verify(executor.beginTransaction());
|
||||||
|
|
||||||
|
await db.transaction(() async {
|
||||||
|
await db.select(db.todosTable).get();
|
||||||
|
});
|
||||||
|
|
||||||
|
verify(outerTransactions.beginTransaction());
|
||||||
|
verify(innerTransactions.ensureOpen(any));
|
||||||
|
verify(innerTransactions.send());
|
||||||
|
});
|
||||||
|
|
||||||
|
verify(outerTransactions.send());
|
||||||
|
});
|
||||||
|
|
||||||
|
test('are rolled back after exceptions', () async {
|
||||||
|
final outerTransactions = executor.transactions;
|
||||||
|
final innerTransactions = outerTransactions.transactions;
|
||||||
|
|
||||||
|
await db.transaction(() async {
|
||||||
|
verify(executor.beginTransaction());
|
||||||
|
final cause = Exception('revert inner');
|
||||||
|
|
||||||
|
await expectLater(db.transaction(() async {
|
||||||
|
// Some bogus query so that the transaction is actually opened.
|
||||||
|
await db.select(db.todosTable).get();
|
||||||
|
throw cause;
|
||||||
|
}), throwsA(cause));
|
||||||
|
|
||||||
|
verify(outerTransactions.beginTransaction());
|
||||||
|
verify(innerTransactions.ensureOpen(any));
|
||||||
|
verify(innerTransactions.rollback());
|
||||||
|
});
|
||||||
|
|
||||||
|
verify(outerTransactions.send());
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
test('code in callback uses transaction', () async {
|
test('code in callback uses transaction', () async {
|
||||||
|
|
|
@ -42,4 +42,54 @@ void main() {
|
||||||
await expectLater(
|
await expectLater(
|
||||||
driftDb.select(driftDb.categories).get(), completion(hasLength(1)));
|
driftDb.select(driftDb.categories).get(), completion(hasLength(1)));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
group('nested transactions', () {
|
||||||
|
test(
|
||||||
|
'outer transaction does not see inner writes after rollback',
|
||||||
|
() async {
|
||||||
|
final db = TodoDb(NativeDatabase.memory());
|
||||||
|
|
||||||
|
await db.transaction(() async {
|
||||||
|
await db
|
||||||
|
.into(db.categories)
|
||||||
|
.insert(CategoriesCompanion.insert(description: 'outer'));
|
||||||
|
|
||||||
|
try {
|
||||||
|
await db.transaction(() async {
|
||||||
|
await db
|
||||||
|
.into(db.categories)
|
||||||
|
.insert(CategoriesCompanion.insert(description: 'inner'));
|
||||||
|
|
||||||
|
expect(await db.select(db.categories).get(), hasLength(2));
|
||||||
|
throw Exception('rollback inner');
|
||||||
|
});
|
||||||
|
} on Exception {
|
||||||
|
// Expected rollback, let's continue
|
||||||
|
}
|
||||||
|
|
||||||
|
final categories = await db.select(db.categories).get();
|
||||||
|
expect(categories, hasLength(1));
|
||||||
|
expect(categories.single.description, 'outer');
|
||||||
|
});
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
test('inner writes are visible after completion', () async {
|
||||||
|
final db = TodoDb(NativeDatabase.memory());
|
||||||
|
|
||||||
|
await db.transaction(() async {
|
||||||
|
await db
|
||||||
|
.into(db.categories)
|
||||||
|
.insert(CategoriesCompanion.insert(description: 'outer'));
|
||||||
|
|
||||||
|
await db.transaction(() async {
|
||||||
|
await db
|
||||||
|
.into(db.categories)
|
||||||
|
.insert(CategoriesCompanion.insert(description: 'inner'));
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(await db.select(db.categories).get(), hasLength(2));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,10 +98,16 @@ class MockExecutor extends Mock implements QueryExecutor {
|
||||||
class MockTransactionExecutor extends MockExecutor
|
class MockTransactionExecutor extends MockExecutor
|
||||||
implements TransactionExecutor {
|
implements TransactionExecutor {
|
||||||
MockTransactionExecutor() {
|
MockTransactionExecutor() {
|
||||||
|
when(supportsNestedTransactions).thenReturn(true);
|
||||||
when(send()).thenAnswer((_) => Future.value(null));
|
when(send()).thenAnswer((_) => Future.value(null));
|
||||||
when(rollback()).thenAnswer((_) => Future.value(null));
|
when(rollback()).thenAnswer((_) => Future.value(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get supportsNestedTransactions {
|
||||||
|
return _nsm(Invocation.getter(#supportsNestedTransactions), true);
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> send() {
|
Future<void> send() {
|
||||||
return _nsm(Invocation.method(#send, []), Future.value(null));
|
return _nsm(Invocation.method(#send, []), Future.value(null));
|
||||||
|
|
Loading…
Reference in New Issue