Expand batch api to update statements (#221)

This commit is contained in:
Simon Binder 2019-11-02 21:48:37 +01:00
parent b189a2bcb2
commit 9f1aafbcef
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
7 changed files with 261 additions and 9 deletions

View File

@ -2,6 +2,7 @@
- Fix crash when `customStatement` is the first operation used on a database ([#199](https://github.com/simolus3/moor/issues/199))
- Allow transactions inside a `beforeOpen` callback
- New `batch` method on generated databases to execute multiple queries in a single batch
## 2.0.1

View File

@ -0,0 +1,107 @@
part of 'database.dart';
/// Contains operations to run queries in a batched mode. This can be much more
/// efficient when running a lot of similar queries at the same time, making
/// this api suitable for bulk updates.
class Batch {
final Map<String, List<List<dynamic>>> _createdStatements = {};
final QueryEngine _engine;
final Set<TableInfo> _affectedTables = {};
Batch._(this._engine);
/// Inserts a row constructed from the fields in [row].
///
/// All fields in the entity that don't have a default value or auto-increment
/// must be set and non-null. Otherwise, an [InvalidDataException] will be
/// thrown.
///
/// By default, an exception will be thrown if another row with the same
/// primary key already exists. This behavior can be overridden with [mode],
/// for instance by using [InsertMode.replace] or [InsertMode.insertOrIgnore].
///
/// See also:
/// - [InsertStatement.insert], which would be used outside a [Batch].
void insert<D extends DataClass>(TableInfo<Table, D> table, Insertable<D> row,
{InsertMode mode}) {
_affectedTables.add(table);
final actualMode = mode ?? InsertMode.insert;
final context =
InsertStatement<D>(_engine, table).createContext(row, actualMode);
_addContext(context);
}
/// Inserts all [rows] into the [table].
///
/// All fields in a row that don't have a default value or auto-increment
/// must be set and non-null. Otherwise, an [InvalidDataException] will be
/// thrown.
/// By default, an exception will be thrown if another row with the same
/// primary key already exists. This behavior can be overridden with [mode],
/// for instance by using [InsertMode.replace] or [InsertMode.insertOrIgnore].
void insertAll<D extends DataClass>(
TableInfo<Table, D> table, List<Insertable<D>> rows,
{InsertMode mode}) {
for (var row in rows) {
insert<D>(table, row, mode: mode);
}
}
/// Writes all present columns from the [row] into all rows in the [table]
/// that match the [where] clause.
///
/// For more details on how updates work in moor, check out
/// [UpdateStatement.write] or the [documentation with examples](https://moor.simonbinder.eu/docs/getting-started/writing_queries/#updates-and-deletes)
void update<T extends Table, D extends DataClass>(
TableInfo<T, D> table, Insertable<D> row,
{Expression<bool, BoolType> Function(T table) where}) {
_affectedTables.add(table);
final stmt = UpdateStatement(_engine, table);
if (where != null) stmt.where(where);
stmt.write(row, dontExecute: true);
final context = stmt.constructQuery();
_addContext(context);
}
/// Replaces the [row] from the [table] with the updated values. The row in
/// the table with the same primary key will be replaced.
///
/// See also:
/// - [UpdateStatement.replace], which is what would be used outside of a
/// [Batch].
void replace<T extends Table, D extends DataClass>(
TableInfo<T, D> table,
Insertable<D> row,
) {
_affectedTables.add(table);
final stmt = UpdateStatement(_engine, table)
..replace(row, dontExecute: true);
_addContext(stmt.constructQuery());
}
/// Helper that calls [replace] for all [rows].
void replaceAll<T extends Table, D extends DataClass>(
TableInfo<T, D> table, List<Insertable<D>> rows) {
for (var row in rows) {
replace(table, row);
}
}
void _addContext(GenerationContext ctx) {
final sql = ctx.sql;
final variableSet = _createdStatements.putIfAbsent(sql, () => []);
variableSet.add(ctx.boundVariables);
}
Future<void> _commit() {
return _engine.executor.doWhenOpened((executor) async {
final statements = _createdStatements.entries.map((entry) {
return BatchedStatement(entry.key, entry.value);
}).toList();
await executor.runBatched(statements);
_engine.markTablesUpdated(_affectedTables);
});
}
}

View File

@ -4,6 +4,8 @@ import 'package:meta/meta.dart';
import 'package:moor/moor.dart';
import 'package:moor/src/runtime/executor/stream_queries.dart';
part 'batch.dart';
const _zoneRootUserKey = #DatabaseConnectionUser;
typedef _CustomWriter<T> = Future<T> Function(
@ -379,6 +381,43 @@ mixin QueryEngine on DatabaseConnectionUser {
});
}
/// Runs statements inside a batch.
///
/// A batch can only run a subset of statements, and those statements must be
/// called on the [Batch] instance. The statements aren't executed with a call
/// to [Batch]. Instead, all generated queries are queued up and are then run
/// and executed atomically.
/// Typically, running bulk updates (so a lot of similar statements) over a
/// [Batch] is much faster than running them via the [GeneratedDatabase]
/// directly.
///
/// An example that inserts users in a batch:
/// ```dart
/// await batch((b) {
/// b.insertAll(
/// todos,
/// [
/// TodosCompanion.insert(content: 'Use batches'),
/// TodosCompanion.insert(content: 'Have fun'),
/// ],
/// );
/// });
/// ```
@protected
@visibleForTesting
Future<void> batch(Function(Batch) runInBatch) {
final resolved = _resolvedEngine;
if (resolved is Transaction) {
// we use runBatched in the implementation, which is always run as top
// level with sqflite.
throw UnsupportedError('Batches cannot be used inside a transaction');
}
final batch = Batch._(resolved);
runInBatch(batch);
return batch._commit();
}
/// Runs [calculation] in a forked [Zone] that has its [_resolvedEngine] set
/// to the [engine].
///

View File

@ -38,8 +38,7 @@ class InsertStatement<D extends DataClass> {
'If the mode parameter is set on insertAll, orReplace must be null or '
'false',
);
_validateIntegrity(entity);
final ctx = _createContext(entity, _resolveMode(mode, orReplace));
final ctx = createContext(entity, _resolveMode(mode, orReplace));
return await database.executor.doWhenOpened((e) async {
final id = await database.executor.runInsert(ctx.sql, ctx.boundVariables);
@ -56,6 +55,7 @@ class InsertStatement<D extends DataClass> {
/// By default, an exception will be thrown if another row with the same
/// primary key already exists. This behavior can be overridden with [mode],
/// for instance by using [InsertMode.replace] or [InsertMode.insertOrIgnore].
@Deprecated('Call batch() on a generated database, then use Batch.insertAll')
Future<void> insertAll(
List<Insertable<D>> rows, {
@Deprecated('Use mode: InsertMode.replace instead') bool orReplace = false,
@ -72,9 +72,7 @@ class InsertStatement<D extends DataClass> {
// not included. So, we have a map for sql -> list of variables which we can
// then turn into prepared statements
for (var row in rows) {
_validateIntegrity(row);
final ctx = _createContext(row, _resolveMode(mode, orReplace));
final ctx = createContext(row, _resolveMode(mode, orReplace));
statements.putIfAbsent(ctx.sql, () => []).add(ctx);
}
@ -89,7 +87,12 @@ class InsertStatement<D extends DataClass> {
database.markTablesUpdated({table});
}
GenerationContext _createContext(Insertable<D> entry, InsertMode mode) {
/// Creates a [GenerationContext] which contains the sql necessary to run an
/// insert statement fro the [entry] with the [mode].
///
/// This method is used internally by moor. Consider using [insert] instead.
GenerationContext createContext(Insertable<D> entry, InsertMode mode) {
_validateIntegrity(entry);
final map = table.entityToSql(entry.createCompanion(true))
..removeWhere((_, value) => value == null);

View File

@ -32,7 +32,10 @@ abstract class Query<T extends Table, D extends DataClass> {
void writeStartPart(GenerationContext ctx);
/// Constructs the query that can then be sent to the database executor.
@protected
///
/// This is used internally by moor to run the query. Users should use the
/// other methods explained in the [documentation][moor-docs].
/// [moor-docs]: https://moor.simonbinder.eu/docs/getting-started/writing_queries/
GenerationContext constructQuery() {
final ctx = GenerationContext.fromDb(database);
var needsWhitespace = false;

View File

@ -50,11 +50,15 @@ class UpdateStatement<T extends Table, D extends DataClass> extends Query<T, D>
/// The fields that are null on the [entity] object will not be changed by
/// this operation, they will be ignored.
///
/// When [dontExecute] is true (defaults to false), the query will __NOT__ be
/// run, but all the validations are still in place. This is mainly used
/// internally by moor.
///
/// Returns the amount of rows that have been affected by this operation.
///
/// See also: [replace], which does not require [where] statements and
/// supports setting fields back to null.
Future<int> write(Insertable<D> entity) async {
Future<int> write(Insertable<D> entity, {bool dontExecute = false}) async {
final companion = entity.createCompanion(true);
table.validateIntegrity(companion).throwIfInvalid(entity);
@ -66,6 +70,7 @@ class UpdateStatement<T extends Table, D extends DataClass> extends Query<T, D>
return Future.value(0);
}
if (dontExecute) return -1;
return await _performQuery();
}
@ -80,6 +85,10 @@ class UpdateStatement<T extends Table, D extends DataClass> extends Query<T, D>
/// will be reset to null. This behavior is different to [write], which simply
/// ignores such fields without changing them in the database.
///
/// When [dontExecute] is true (defaults to false), the query will __NOT__ be
/// run, but all the validations are still in place. This is mainly used
/// internally by moor.
///
/// Returns true if a row was affected by this operation.
///
/// See also:
@ -87,7 +96,7 @@ class UpdateStatement<T extends Table, D extends DataClass> extends Query<T, D>
/// null values in the entity.
/// - [InsertStatement.insert] with the `orReplace` parameter, which behaves
/// similar to this method but creates a new row if none exists.
Future<bool> replace(Insertable<D> entity) async {
Future<bool> replace(Insertable<D> entity, {bool dontExecute = false}) async {
// We don't turn nulls to absent values here (as opposed to a regular
// update, where only non-null fields will be written).
final companion = entity.createCompanion(false);
@ -122,6 +131,7 @@ class UpdateStatement<T extends Table, D extends DataClass> extends Query<T, D>
// Don't update the primary key
_updatedFields.removeWhere((key, _) => primaryKeys.contains(key));
if (dontExecute) return false;
final updatedRows = await _performQuery();
return updatedRows != 0;
}

89
moor/test/batch_test.dart Normal file
View File

@ -0,0 +1,89 @@
import 'package:moor/moor.dart';
import 'package:test/test.dart';
import 'data/tables/todos.dart';
import 'data/utils/mocks.dart';
void main() {
TodoDb db;
MockExecutor executor;
MockStreamQueries streamQueries;
setUp(() {
executor = MockExecutor();
streamQueries = MockStreamQueries();
db = TodoDb.connect(createConnection(executor, streamQueries));
});
test('runs generated statements', () async {
await db.batch((b) {
b.insertAll(
db.todosTable,
[
TodosTableCompanion.insert(content: 'first'),
TodosTableCompanion.insert(content: 'second'),
],
);
b.update(db.users, const UsersCompanion(name: Value('new name')));
b.update(
db.users,
const UsersCompanion(name: Value('Another')),
where: (Users row) => row.name.equals('old'),
);
b.replaceAll(db.categories, const [
CategoriesCompanion(id: Value(1), description: Value('new1')),
CategoriesCompanion(id: Value(2), description: Value('new2')),
]);
});
verify(executor.runBatched([
BatchedStatement(
'INSERT INTO todos (content) VALUES (?)',
[
['first'],
['second'],
],
),
BatchedStatement(
'UPDATE users SET name = ?;',
[
['new name']
],
),
BatchedStatement(
'UPDATE users SET name = ? WHERE name = ?;',
[
['Another', 'old']
],
),
BatchedStatement(
'UPDATE categories SET desc = ? WHERE id = ?;',
[
['new1', 1],
['new2', 2],
],
),
]));
});
test("doesn't work inside a transaction", () {
expectLater(() {
return db.transaction(() async {
await db.batch((b) {});
});
}, throwsA(const TypeMatcher<UnsupportedError>()));
verifyNever(executor.runBatched(any));
});
test('updates stream queries', () async {
await db.batch((b) {
b.update(db.users, const UsersCompanion(name: Value('new user name')));
});
verify(streamQueries.handleTableUpdates({db.users}));
});
}