Support batches in transactions (#271)

This commit is contained in:
Simon Binder 2019-12-16 15:50:12 +01:00
parent ffe4bb8c82
commit 76ac7c7ab3
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
5 changed files with 41 additions and 27 deletions

View File

@ -105,9 +105,9 @@ class Database extends _$Database {
if (details.wasCreated) { if (details.wasCreated) {
// make sure that transactions can be used in the beforeOpen callback. // make sure that transactions can be used in the beforeOpen callback.
await transaction(() async { await transaction(() async {
for (final user in [people.dash, people.duke, people.gopher]) { batch((batch) {
await into(users).insert(user); batch.insertAll(users, [people.dash, people.duke, people.gopher]);
} });
}); });
} }
}, },

View File

@ -11,7 +11,7 @@
``` ```
- Provide Dart apis for the json1 extension in the `package:moor/extensions/json1.dart` library. Note that - Provide Dart apis for the json1 extension in the `package:moor/extensions/json1.dart` library. Note that
json1 is not supported on most platforms. json1 is not supported on most platforms.
- Batches are now always sent in a transaction, this used to be implementation specific before - Standardized behavior of batches in transactions across backends
## 2.1.0 ## 2.1.0

View File

@ -7,9 +7,12 @@ class Batch {
final Map<String, List<List<dynamic>>> _createdStatements = {}; final Map<String, List<List<dynamic>>> _createdStatements = {};
final QueryEngine _engine; final QueryEngine _engine;
/// Whether we should start a transaction when completing.
final bool _startTransaction;
final Set<TableInfo> _affectedTables = {}; final Set<TableInfo> _affectedTables = {};
Batch._(this._engine); Batch._(this._engine, this._startTransaction);
/// Inserts a row constructed from the fields in [row]. /// Inserts a row constructed from the fields in [row].
/// ///
@ -100,14 +103,29 @@ class Batch {
Future<void> _commit() async { Future<void> _commit() async {
await _engine.executor.ensureOpen(); await _engine.executor.ensureOpen();
final transaction = _engine.executor.beginTransaction(); if (_startTransaction) {
await transaction.doWhenOpened((executor) async { TransactionExecutor transaction;
try {
transaction = _engine.executor.beginTransaction();
await transaction.doWhenOpened(_runWith);
await transaction.send();
} catch (e) {
await transaction.rollback();
rethrow;
}
} else {
return _runWith(_engine.executor);
}
_engine.markTablesUpdated(_affectedTables);
}
Future<void> _runWith(QueryExecutor executor) async {
final statements = _createdStatements.entries.map((entry) { final statements = _createdStatements.entries.map((entry) {
return BatchedStatement(entry.key, entry.value); return BatchedStatement(entry.key, entry.value);
}).toList(); }).toList();
await executor.runBatched(statements); await executor.runBatched(statements);
});
await transaction.send();
_engine.markTablesUpdated(_affectedTables);
} }
} }

View File

@ -259,6 +259,9 @@ mixin QueryEngine on DatabaseConnectionUser {
/// called on the [Batch] instance. The statements aren't executed with a call /// called on the [Batch] instance. The statements aren't executed with a call
/// to [Batch]. Instead, all generated queries are queued up and are then run /// to [Batch]. Instead, all generated queries are queued up and are then run
/// and executed atomically in a transaction. /// and executed atomically in a transaction.
/// If [batch] is called outside of a [transaction] call, it will implicitly
/// start a transaction. Otherwise, the batch will re-use the transaction,
/// and will have an effect when the transaction completes.
/// Typically, running bulk updates (so a lot of similar statements) over a /// Typically, running bulk updates (so a lot of similar statements) over a
/// [Batch] is much faster than running them via the [GeneratedDatabase] /// [Batch] is much faster than running them via the [GeneratedDatabase]
/// directly. /// directly.
@ -278,14 +281,9 @@ mixin QueryEngine on DatabaseConnectionUser {
@protected @protected
@visibleForTesting @visibleForTesting
Future<void> batch(Function(Batch) runInBatch) { Future<void> batch(Function(Batch) runInBatch) {
final resolved = _resolvedEngine; final engine = _resolvedEngine;
if (resolved is Transaction) {
// we use runBatched in the implementation, which is always run as top
// level with sqflite.
throw UnsupportedError('Batches cannot be used inside a transaction');
}
final batch = Batch._(resolved); final batch = Batch._(engine, engine is! Transaction);
runInBatch(batch); runInBatch(batch);
return batch._commit(); return batch._commit();
} }

View File

@ -70,15 +70,13 @@ void main() {
])); ]));
}); });
test("doesn't work inside a transaction", () { test('can re-use an outer transaction', () async {
expectLater(() { await db.transaction(() async {
return db.transaction(() async {
await db.batch((b) {}); await db.batch((b) {});
}); });
}, throwsA(const TypeMatcher<UnsupportedError>()));
verifyNever(executor.runBatched(any)); verifyNever(executor.runBatched(any));
verifyNever(executor.transactions.runBatched(any)); verify(executor.transactions.runBatched(any));
}); });
test('updates stream queries', () async { test('updates stream queries', () async {