From c2ec06c1de063a3a1c1a05476e6a31c773ed0938 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 7 Jan 2020 14:57:42 +0100 Subject: [PATCH] Fix transaction behavior on isolates (#324) --- moor/lib/src/runtime/isolate/client.dart | 25 +++--- moor/lib/src/runtime/isolate/protocol.dart | 6 +- moor/lib/src/runtime/isolate/server.dart | 88 ++++++++++++++++++---- moor/test/isolate_test.dart | 29 +++++++ 4 files changed, 121 insertions(+), 27 deletions(-) diff --git a/moor/lib/src/runtime/isolate/client.dart b/moor/lib/src/runtime/isolate/client.dart index 41ad4a78..fee995d5 100644 --- a/moor/lib/src/runtime/isolate/client.dart +++ b/moor/lib/src/runtime/isolate/client.dart @@ -65,16 +65,22 @@ abstract class _BaseExecutor extends QueryExecutor { @override Future runBatched(List statements) { - return client._channel.request(_ExecuteBatchedStatement(statements)); + return client._channel + .request(_ExecuteBatchedStatement(statements, _transactionId)); } Future _runRequest(_StatementMethod method, String sql, List args) { - return client._channel.request(_ExecuteQuery(method, sql, args)); + return client._channel + .request(_ExecuteQuery(method, sql, args, _transactionId)); } @override Future runCustom(String statement, [List args]) { - return _runRequest(_StatementMethod.custom, statement, args); + return _runRequest( + _StatementMethod.custom, + statement, + args, + ); } @override @@ -128,7 +134,7 @@ class _TransactionIsolateExecutor extends _BaseExecutor implements TransactionExecutor { _TransactionIsolateExecutor(_MoorClient client) : super(client); - bool _pendingOpen = false; + Completer _pendingOpen; // nested transactions aren't supported @override @@ -136,17 +142,14 @@ class _TransactionIsolateExecutor extends _BaseExecutor @override Future ensureOpen() { - if (_transactionId == null && !_pendingOpen) { - _pendingOpen = true; - return _openAtServer().then((_) => true); - } - return Future.value(true); + _pendingOpen ??= Completer()..complete(_openAtServer()); + return _pendingOpen.future; } - Future _openAtServer() async { + Future _openAtServer() async { _transactionId = await client._channel.request(_NoArgsRequest.startTransaction) as int; - _pendingOpen = false; + return true; } Future _sendAction(_TransactionControl action) { diff --git a/moor/lib/src/runtime/isolate/protocol.dart b/moor/lib/src/runtime/isolate/protocol.dart index 6d5f7858..f8a0902f 100644 --- a/moor/lib/src/runtime/isolate/protocol.dart +++ b/moor/lib/src/runtime/isolate/protocol.dart @@ -48,6 +48,9 @@ class _ExecuteQuery { @override String toString() { + if (transactionId != null) { + return '$method: $sql with $args (@$transactionId)'; + } return '$method: $sql with $args'; } } @@ -55,8 +58,9 @@ class _ExecuteQuery { /// Sent from the client to run a list of [BatchedStatement]s. class _ExecuteBatchedStatement { final List stmts; + final int transactionId; - _ExecuteBatchedStatement(this.stmts); + _ExecuteBatchedStatement(this.stmts, [this.transactionId]); } /// Sent from the client to commit or rollback a transaction diff --git a/moor/lib/src/runtime/isolate/server.dart b/moor/lib/src/runtime/isolate/server.dart index 1447d357..fec0f17f 100644 --- a/moor/lib/src/runtime/isolate/server.dart +++ b/moor/lib/src/runtime/isolate/server.dart @@ -4,8 +4,21 @@ class _MoorServer { final Server server; DatabaseConnection connection; + final Map _transactions = {}; int _currentTransaction = 0; + + /// when a transaction is active, all queries that don't operate on another + /// query executor have to wait! + /// + /// When this list is empty, the top-level executor is active. When not, the + /// first transaction id in the backlog is active at the moment. Whenever a + /// transaction completes, we emit an item on [_backlogUpdated]. This can be + /// used to implement a lock. + final List _transactionBacklog = []; + final StreamController _backlogUpdated = + StreamController.broadcast(sync: true); + _FakeDatabase _fakeDb; ServerKey get key => server.key; @@ -38,6 +51,7 @@ class _MoorServer { case _NoArgsRequest.startTransaction: return _spawnTransaction(); case _NoArgsRequest.terminateAll: + _backlogUpdated.close(); connection.executor.close(); server.close(); Isolate.current.kill(); @@ -54,7 +68,7 @@ class _MoorServer { return _runQuery( payload.method, payload.sql, payload.args, payload.transactionId); } else if (payload is _ExecuteBatchedStatement) { - return connection.executor.runBatched(payload.stmts); + return _runBatched(payload.stmts, payload.transactionId); } else if (payload is _NotifyTablesUpdated) { for (final connected in server.currentChannels) { connected.request(payload); @@ -65,10 +79,8 @@ class _MoorServer { } Future _runQuery( - _StatementMethod method, String sql, List args, int transactionId) { - final executor = transactionId != null - ? _transactions[transactionId] - : connection.executor; + _StatementMethod method, String sql, List args, int transactionId) async { + final executor = await _loadExecutor(transactionId); switch (method) { case _StatementMethod.custom: @@ -84,23 +96,69 @@ class _MoorServer { throw AssertionError("Unknown _StatementMethod, this can't happen."); } - int _spawnTransaction() { + Future _runBatched( + List stmts, int transactionId) async { + final executor = await _loadExecutor(transactionId); + await executor.runBatched(stmts); + } + + Future _loadExecutor(int transactionId) async { + await _waitForTurn(transactionId); + return transactionId != null + ? _transactions[transactionId] + : connection.executor; + } + + Future _spawnTransaction() async { final id = _currentTransaction++; - _transactions[id] = connection.executor.beginTransaction(); + final transaction = connection.executor.beginTransaction(); + + _transactions[id] = transaction; + _transactionBacklog.add(id); + await transaction.ensureOpen(); return id; } Future _transactionControl( - _TransactionControl action, int transactionId) { + _TransactionControl action, int transactionId) async { final transaction = _transactions[transactionId]; - _transactions.remove(transactionId); - switch (action) { - case _TransactionControl.commit: - return transaction.send(); - case _TransactionControl.rollback: - return transaction.rollback(); + + try { + switch (action) { + case _TransactionControl.commit: + await transaction.send(); + break; + case _TransactionControl.rollback: + await transaction.rollback(); + break; + } + } finally { + _transactions.remove(transactionId); + _transactionBacklog.remove(transactionId); + _notifyTransactionsUpdated(); + } + } + + Future _waitForTurn(int transactionId) { + bool idIsActive() { + if (transactionId == null) { + return _transactionBacklog.isEmpty; + } else { + return _transactionBacklog.isNotEmpty && + _transactionBacklog.first == transactionId; + } + } + + // Don't wait for a backlog update if the current transaction id is active + if (idIsActive()) return Future.value(null); + + return _backlogUpdated.stream.firstWhere((_) => idIsActive()); + } + + void _notifyTransactionsUpdated() { + if (!_backlogUpdated.isClosed) { + _backlogUpdated.add(null); } - throw AssertionError("Can't happen"); } } diff --git a/moor/test/isolate_test.dart b/moor/test/isolate_test.dart index ee762c87..c51c6381 100644 --- a/moor/test/isolate_test.dart +++ b/moor/test/isolate_test.dart @@ -113,6 +113,35 @@ void _runTests( final result = await database.select(database.todosTable).get(); expect(result, isNotEmpty); }); + + test('transactions have an isolate view on data', () async { + // regression test for https://github.com/simolus3/moor/issues/324 + final db = TodoDb.connect(isolateConnection); + + await db + .customStatement('create table tbl (id integer primary key not null)'); + + Future expectRowCount(TodoDb db, int count) async { + final rows = await db.customSelectQuery('select * from tbl').get(); + expect(rows, hasLength(count)); + } + + final rowInserted = Completer(); + final runTransaction = db.transaction(() async { + await db.customInsert('insert into tbl default values'); + await expectRowCount(db, 1); + rowInserted.complete(); + // Hold transaction open for expectRowCount() outside the transaction to + // finish + await Future.delayed(const Duration(seconds: 1)); + await db.customStatement('delete from tbl'); + await expectRowCount(db, 0); + }); + + await rowInserted.future; + await expectRowCount(db, 0); + await runTransaction; // wait for the transaction to complete + }); } DatabaseConnection _backgroundConnection() {