mirror of https://github.com/AMT-Cheif/drift.git
Open transaction callback in right zone (#1881)
This commit is contained in:
parent
429d36ff9c
commit
dc2511c808
|
@ -125,6 +125,25 @@ without awaiting every statement in it.''');
|
||||||
|
|
||||||
class _TransactionExecutor extends _BaseExecutor
|
class _TransactionExecutor extends _BaseExecutor
|
||||||
implements TransactionExecutor {
|
implements TransactionExecutor {
|
||||||
|
// We're doing some async hacks for database implementations which manage
|
||||||
|
// transactions for us (e.g. sqflite where we do `transaction((t) => ...)`)
|
||||||
|
// and can only use the transaction in that callback.
|
||||||
|
// Since drift's executor API works somewhat differently, our callback starts
|
||||||
|
// a completer which we await in that callback. Outside of that callback, we
|
||||||
|
// use the transaction and finally complete the completer with a bogus value
|
||||||
|
// or with an exception if we want to commit or rollback the transaction.
|
||||||
|
//
|
||||||
|
// This works fine, but there's a rare problem since `ensureOpen` is called by
|
||||||
|
// the first operation _inside_ drift's `transaction` block, NOT by the
|
||||||
|
// transaction block itself. In particular, if that first operation is a
|
||||||
|
// select, the zone calling `ensureOpen` is a cancellable error zone. This
|
||||||
|
// means that, in the case of a rollback (sent from an outer zone), an error
|
||||||
|
// event would cross error zone boundaries. This is blocked by Dart's async
|
||||||
|
// implementation, which replaces it with an uncaught error handler.
|
||||||
|
// We _do_ want to handle those errors though, so we make sure that this
|
||||||
|
// wrapping hack in `ensureOpen` runs in the zone that created this
|
||||||
|
// transaction runner and not in the zone that does the first operation.
|
||||||
|
final Zone _createdIn = Zone.current;
|
||||||
final DelegatedDatabase _db;
|
final DelegatedDatabase _db;
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
@ -194,16 +213,18 @@ class _TransactionExecutor extends _BaseExecutor
|
||||||
await _sendCalled.future;
|
await _sendCalled.future;
|
||||||
}));
|
}));
|
||||||
} else if (transactionManager is SupportedTransactionDelegate) {
|
} else if (transactionManager is SupportedTransactionDelegate) {
|
||||||
transactionManager.startTransaction((transaction) async {
|
_createdIn.run(() {
|
||||||
impl = transaction;
|
transactionManager.startTransaction((transaction) async {
|
||||||
// specs say that the db implementation will perform a rollback when
|
impl = transaction;
|
||||||
// this future completes with an error.
|
// specs say that the db implementation will perform a rollback when
|
||||||
_sendFakeErrorOnRollback = true;
|
// this future completes with an error.
|
||||||
transactionStarted.complete();
|
_sendFakeErrorOnRollback = true;
|
||||||
|
transactionStarted.complete();
|
||||||
|
|
||||||
// this callback must be running as long as the transaction, so we do
|
// this callback must be running as long as the transaction, so we do
|
||||||
// that until send() was called.
|
// that until send() was called.
|
||||||
await _sendCalled.future;
|
await _sendCalled.future;
|
||||||
|
});
|
||||||
});
|
});
|
||||||
} else if (transactionManager is WrappedTransactionDelegate) {
|
} else if (transactionManager is WrappedTransactionDelegate) {
|
||||||
unawaited(_db._synchronized(() async {
|
unawaited(_db._synchronized(() async {
|
||||||
|
|
|
@ -55,7 +55,42 @@ Future<void> main() async {
|
||||||
);
|
);
|
||||||
final database = Database.executor(executor);
|
final database = Database.executor(executor);
|
||||||
await database.executor.ensureOpen(database);
|
await database.executor.ensureOpen(database);
|
||||||
|
addTearDown(database.close);
|
||||||
|
|
||||||
expect(didCallCreator, isTrue);
|
expect(didCallCreator, isTrue);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('can rollback transactions', () async {
|
||||||
|
final executor = SqfliteQueryExecutor(path: ':memory:');
|
||||||
|
final database = EmptyDb(executor);
|
||||||
|
addTearDown(database.close);
|
||||||
|
|
||||||
|
final expectedException = Exception('oops');
|
||||||
|
|
||||||
|
try {
|
||||||
|
await database
|
||||||
|
.customSelect('select 1')
|
||||||
|
.getSingle(); // ensure database is open/created
|
||||||
|
|
||||||
|
await database.transaction(() async {
|
||||||
|
await database.customSelect('select 1').watchSingle().first;
|
||||||
|
throw expectedException;
|
||||||
|
});
|
||||||
|
} catch (e) {
|
||||||
|
expect(e, expectedException);
|
||||||
|
} finally {
|
||||||
|
await database.customSelect('select 1').getSingle().timeout(
|
||||||
|
const Duration(milliseconds: 500),
|
||||||
|
onTimeout: () => fail('deadlock?'),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}, timeout: const Timeout.factor(100));
|
||||||
|
}
|
||||||
|
|
||||||
|
class EmptyDb extends GeneratedDatabase {
|
||||||
|
EmptyDb(QueryExecutor q) : super(SqlTypeSystem.defaultInstance, q);
|
||||||
|
@override
|
||||||
|
final List<TableInfo> allTables = const [];
|
||||||
|
@override
|
||||||
|
final schemaVersion = 1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue