diff --git a/drift/lib/src/runtime/executor/helpers/engines.dart b/drift/lib/src/runtime/executor/helpers/engines.dart index a3259c64..b7112899 100644 --- a/drift/lib/src/runtime/executor/helpers/engines.dart +++ b/drift/lib/src/runtime/executor/helpers/engines.dart @@ -125,6 +125,25 @@ without awaiting every statement in it.'''); class _TransactionExecutor extends _BaseExecutor implements TransactionExecutor { + // We're doing some async hacks for database implementations which manage + // transactions for us (e.g. sqflite where we do `transaction((t) => ...)`) + // and can only use the transaction in that callback. + // Since drift's executor API works somewhat differently, our callback starts + // a completer which we await in that callback. Outside of that callback, we + // use the transaction and finally complete the completer with a bogus value + // or with an exception if we want to commit or rollback the transaction. + // + // This works fine, but there's a rare problem since `ensureOpen` is called by + // the first operation _inside_ drift's `transaction` block, NOT by the + // transaction block itself. In particular, if that first operation is a + // select, the zone calling `ensureOpen` is a cancellable error zone. This + // means that, in the case of a rollback (sent from an outer zone), an error + // event would cross error zone boundaries. This is blocked by Dart's async + // implementation, which replaces it with an uncaught error handler. + // We _do_ want to handle those errors though, so we make sure that this + // wrapping hack in `ensureOpen` runs in the zone that created this + // transaction runner and not in the zone that does the first operation. + final Zone _createdIn = Zone.current; final DelegatedDatabase _db; @override @@ -194,16 +213,18 @@ class _TransactionExecutor extends _BaseExecutor await _sendCalled.future; })); } else if (transactionManager is SupportedTransactionDelegate) { - transactionManager.startTransaction((transaction) async { - impl = transaction; - // specs say that the db implementation will perform a rollback when - // this future completes with an error. - _sendFakeErrorOnRollback = true; - transactionStarted.complete(); + _createdIn.run(() { + transactionManager.startTransaction((transaction) async { + impl = transaction; + // specs say that the db implementation will perform a rollback when + // this future completes with an error. + _sendFakeErrorOnRollback = true; + transactionStarted.complete(); - // this callback must be running as long as the transaction, so we do - // that until send() was called. - await _sendCalled.future; + // this callback must be running as long as the transaction, so we do + // that until send() was called. + await _sendCalled.future; + }); }); } else if (transactionManager is WrappedTransactionDelegate) { unawaited(_db._synchronized(() async { diff --git a/extras/integration_tests/sqflite/lib/drift_sqflite.dart b/extras/integration_tests/sqflite/lib/drift_sqflite.dart index 7c74f74e..7499653b 100644 --- a/extras/integration_tests/sqflite/lib/drift_sqflite.dart +++ b/extras/integration_tests/sqflite/lib/drift_sqflite.dart @@ -55,7 +55,42 @@ Future main() async { ); final database = Database.executor(executor); await database.executor.ensureOpen(database); + addTearDown(database.close); expect(didCallCreator, isTrue); }); + + test('can rollback transactions', () async { + final executor = SqfliteQueryExecutor(path: ':memory:'); + final database = EmptyDb(executor); + addTearDown(database.close); + + final expectedException = Exception('oops'); + + try { + await database + .customSelect('select 1') + .getSingle(); // ensure database is open/created + + await database.transaction(() async { + await database.customSelect('select 1').watchSingle().first; + throw expectedException; + }); + } catch (e) { + expect(e, expectedException); + } finally { + await database.customSelect('select 1').getSingle().timeout( + const Duration(milliseconds: 500), + onTimeout: () => fail('deadlock?'), + ); + } + }, timeout: const Timeout.factor(100)); +} + +class EmptyDb extends GeneratedDatabase { + EmptyDb(QueryExecutor q) : super(SqlTypeSystem.defaultInstance, q); + @override + final List allTables = const []; + @override + final schemaVersion = 1; }