Fix transaction behavior on isolates (#324)

This commit is contained in:
Simon Binder 2020-01-07 14:57:42 +01:00
parent a3708b7230
commit c2ec06c1de
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
4 changed files with 121 additions and 27 deletions

View File

@ -65,16 +65,22 @@ abstract class _BaseExecutor extends QueryExecutor {
@override
Future<void> runBatched(List<BatchedStatement> statements) {
return client._channel.request(_ExecuteBatchedStatement(statements));
return client._channel
.request(_ExecuteBatchedStatement(statements, _transactionId));
}
Future<T> _runRequest<T>(_StatementMethod method, String sql, List args) {
return client._channel.request<T>(_ExecuteQuery(method, sql, args));
return client._channel
.request<T>(_ExecuteQuery(method, sql, args, _transactionId));
}
@override
Future<void> runCustom(String statement, [List args]) {
return _runRequest(_StatementMethod.custom, statement, args);
return _runRequest(
_StatementMethod.custom,
statement,
args,
);
}
@override
@ -128,7 +134,7 @@ class _TransactionIsolateExecutor extends _BaseExecutor
implements TransactionExecutor {
_TransactionIsolateExecutor(_MoorClient client) : super(client);
bool _pendingOpen = false;
Completer<bool> _pendingOpen;
// nested transactions aren't supported
@override
@ -136,17 +142,14 @@ class _TransactionIsolateExecutor extends _BaseExecutor
@override
Future<bool> ensureOpen() {
if (_transactionId == null && !_pendingOpen) {
_pendingOpen = true;
return _openAtServer().then((_) => true);
}
return Future.value(true);
_pendingOpen ??= Completer()..complete(_openAtServer());
return _pendingOpen.future;
}
Future _openAtServer() async {
Future<bool> _openAtServer() async {
_transactionId =
await client._channel.request(_NoArgsRequest.startTransaction) as int;
_pendingOpen = false;
return true;
}
Future<void> _sendAction(_TransactionControl action) {

View File

@ -48,6 +48,9 @@ class _ExecuteQuery {
@override
String toString() {
if (transactionId != null) {
return '$method: $sql with $args (@$transactionId)';
}
return '$method: $sql with $args';
}
}
@ -55,8 +58,9 @@ class _ExecuteQuery {
/// Sent from the client to run a list of [BatchedStatement]s.
class _ExecuteBatchedStatement {
final List<BatchedStatement> stmts;
final int transactionId;
_ExecuteBatchedStatement(this.stmts);
_ExecuteBatchedStatement(this.stmts, [this.transactionId]);
}
/// Sent from the client to commit or rollback a transaction

View File

@ -4,8 +4,21 @@ class _MoorServer {
final Server server;
DatabaseConnection connection;
final Map<int, TransactionExecutor> _transactions = {};
int _currentTransaction = 0;
/// when a transaction is active, all queries that don't operate on another
/// query executor have to wait!
///
/// When this list is empty, the top-level executor is active. When not, the
/// first transaction id in the backlog is active at the moment. Whenever a
/// transaction completes, we emit an item on [_backlogUpdated]. This can be
/// used to implement a lock.
final List<int> _transactionBacklog = [];
final StreamController<void> _backlogUpdated =
StreamController.broadcast(sync: true);
_FakeDatabase _fakeDb;
ServerKey get key => server.key;
@ -38,6 +51,7 @@ class _MoorServer {
case _NoArgsRequest.startTransaction:
return _spawnTransaction();
case _NoArgsRequest.terminateAll:
_backlogUpdated.close();
connection.executor.close();
server.close();
Isolate.current.kill();
@ -54,7 +68,7 @@ class _MoorServer {
return _runQuery(
payload.method, payload.sql, payload.args, payload.transactionId);
} else if (payload is _ExecuteBatchedStatement) {
return connection.executor.runBatched(payload.stmts);
return _runBatched(payload.stmts, payload.transactionId);
} else if (payload is _NotifyTablesUpdated) {
for (final connected in server.currentChannels) {
connected.request(payload);
@ -65,10 +79,8 @@ class _MoorServer {
}
Future<dynamic> _runQuery(
_StatementMethod method, String sql, List args, int transactionId) {
final executor = transactionId != null
? _transactions[transactionId]
: connection.executor;
_StatementMethod method, String sql, List args, int transactionId) async {
final executor = await _loadExecutor(transactionId);
switch (method) {
case _StatementMethod.custom:
@ -84,23 +96,69 @@ class _MoorServer {
throw AssertionError("Unknown _StatementMethod, this can't happen.");
}
int _spawnTransaction() {
Future<void> _runBatched(
List<BatchedStatement> stmts, int transactionId) async {
final executor = await _loadExecutor(transactionId);
await executor.runBatched(stmts);
}
Future<QueryExecutor> _loadExecutor(int transactionId) async {
await _waitForTurn(transactionId);
return transactionId != null
? _transactions[transactionId]
: connection.executor;
}
Future<int> _spawnTransaction() async {
final id = _currentTransaction++;
_transactions[id] = connection.executor.beginTransaction();
final transaction = connection.executor.beginTransaction();
_transactions[id] = transaction;
_transactionBacklog.add(id);
await transaction.ensureOpen();
return id;
}
Future<void> _transactionControl(
_TransactionControl action, int transactionId) {
_TransactionControl action, int transactionId) async {
final transaction = _transactions[transactionId];
_transactions.remove(transactionId);
switch (action) {
case _TransactionControl.commit:
return transaction.send();
case _TransactionControl.rollback:
return transaction.rollback();
try {
switch (action) {
case _TransactionControl.commit:
await transaction.send();
break;
case _TransactionControl.rollback:
await transaction.rollback();
break;
}
} finally {
_transactions.remove(transactionId);
_transactionBacklog.remove(transactionId);
_notifyTransactionsUpdated();
}
}
Future<void> _waitForTurn(int transactionId) {
bool idIsActive() {
if (transactionId == null) {
return _transactionBacklog.isEmpty;
} else {
return _transactionBacklog.isNotEmpty &&
_transactionBacklog.first == transactionId;
}
}
// Don't wait for a backlog update if the current transaction id is active
if (idIsActive()) return Future.value(null);
return _backlogUpdated.stream.firstWhere((_) => idIsActive());
}
void _notifyTransactionsUpdated() {
if (!_backlogUpdated.isClosed) {
_backlogUpdated.add(null);
}
throw AssertionError("Can't happen");
}
}

View File

@ -113,6 +113,35 @@ void _runTests(
final result = await database.select(database.todosTable).get();
expect(result, isNotEmpty);
});
test('transactions have an isolate view on data', () async {
// regression test for https://github.com/simolus3/moor/issues/324
final db = TodoDb.connect(isolateConnection);
await db
.customStatement('create table tbl (id integer primary key not null)');
Future<void> expectRowCount(TodoDb db, int count) async {
final rows = await db.customSelectQuery('select * from tbl').get();
expect(rows, hasLength(count));
}
final rowInserted = Completer<void>();
final runTransaction = db.transaction(() async {
await db.customInsert('insert into tbl default values');
await expectRowCount(db, 1);
rowInserted.complete();
// Hold transaction open for expectRowCount() outside the transaction to
// finish
await Future.delayed(const Duration(seconds: 1));
await db.customStatement('delete from tbl');
await expectRowCount(db, 0);
});
await rowInserted.future;
await expectRowCount(db, 0);
await runTransaction; // wait for the transaction to complete
});
}
DatabaseConnection _backgroundConnection() {