Batched insert statements

Closes #15
This commit is contained in:
Simon Binder 2019-04-19 22:45:38 +02:00
parent 333e71f0a5
commit 8bed506e0d
No known key found for this signature in database
GPG Key ID: B807FDF954BA00CF
4 changed files with 131 additions and 17 deletions

View File

@ -1,6 +1,8 @@
import 'dart:async';
import 'package:collection/collection.dart';
import 'package:moor/src/runtime/database.dart';
import 'package:moor/src/utils/hash.dart';
/// A query executor is responsible for executing statements on a database and
/// return their results in a raw form.
@ -36,10 +38,46 @@ abstract class QueryExecutor {
/// statement will be ignored.
Future<void> runCustom(String statement);
/// Prepares the [statements] and then executes each one with all of the
/// [BatchedStatement.variables].
Future<void> runBatched(List<BatchedStatement> statements);
/// Starts a [TransactionExecutor].
TransactionExecutor beginTransaction();
}
/// A statement that should be executed in a batch. Used internally by moor.
class BatchedStatement {
static const _nestedListEquality = ListEquality(ListEquality());
/// The raw sql that needs to be executed
final String sql;
/// The variables to be used for the statement. Each entry holds a list of
/// variables that should be bound to the [sql] statement.
final List<List<dynamic>> variables;
BatchedStatement(this.sql, this.variables);
@override
int get hashCode {
return $mrjf($mrjc(sql.hashCode, const ListEquality().hash(variables)));
}
@override
bool operator ==(other) {
return identical(this, other) ||
(other is BatchedStatement &&
other.sql == sql &&
_nestedListEquality.equals(variables, other.variables));
}
@override
String toString() {
return 'BatchedStatement($sql, $variables)';
}
}
/// A [QueryExecutor] that runs multiple queries atomically.
abstract class TransactionExecutor extends QueryExecutor {
/// Completes the transaction. No further queries may be sent to to this

View File

@ -26,12 +26,18 @@ class InsertStatement<DataClass> {
/// If the table contains an auto-increment column, the generated value will
/// be returned.
Future<int> insert(DataClass entity) async {
if (!table.validateIntegrity(entity, true)) {
throw InvalidDataException(
'Invalid data: $entity cannot be written into ${table.$tableName}');
}
_validateIntegrity(entity);
final ctx = _createContext(entity, _orReplace);
final map = table.entityToSql(entity)
return await database.executor.doWhenOpened((e) async {
final id = await database.executor.runInsert(ctx.sql, ctx.boundVariables);
database.markTablesUpdated({table});
return id;
});
}
GenerationContext _createContext(DataClass entry, bool replace) {
final map = table.entityToSql(entry)
..removeWhere((_, value) => value == null);
final ctx = GenerationContext(database);
@ -56,15 +62,51 @@ class InsertStatement<DataClass> {
}
ctx.buffer.write(')');
return await database.executor.doWhenOpened((e) async {
final id = await database.executor.runInsert(ctx.sql, ctx.boundVariables);
database.markTablesUpdated({table});
return id;
});
return ctx;
}
// TODO insert multiple values
void _validateIntegrity(DataClass d) {
if (d == null) {
throw InvalidDataException(
'Cannot writee null row into ${table.$tableName}');
}
if (!table.validateIntegrity(d, true)) {
throw InvalidDataException(
'Invalid data: $d cannot be written into ${table.$tableName}');
}
}
/// Inserts all [rows] into the table.
///
/// All fields in a row that don't have a default value or auto-increment
/// must be set and non-null. Otherwise, an [InvalidDataException] will be
/// thrown.
/// When a row with the same primary or unique key already exists in the
/// database, the insert will fail. Use [orReplace] to replace rows that
/// already exist.
Future<void> insertAll(List<DataClass> rows, {bool orReplace}) async {
final statements = <String, List<GenerationContext>>{};
// Not every insert has the same sql, as fields which are set to null are
// not included. So, we have a map for sql -> list of variables which we can
// then turn into prepared statements
for (var row in rows) {
_validateIntegrity(row);
final ctx = _createContext(row, orReplace);
statements.putIfAbsent(ctx.sql, () => []).add(ctx);
}
final batchedStatements = statements.entries.map((e) {
final vars = e.value.map((context) => context.boundVariables).toList();
return BatchedStatement(e.key, vars);
}).toList(growable: false);
await database.executor.doWhenOpened((e) async {
await e.runBatched(batchedStatements);
});
database.markTablesUpdated({table});
}
/// Updates the row with the same primary key in the database or creates one
/// if it doesn't exist.

View File

@ -35,6 +35,29 @@ void main() {
[113, 'Done']));
});
test('runs bulk inserts', () async {
await db.into(db.todosTable).insertAll([
TodoEntry(content: 'a'),
TodoEntry(title: 'title', content: 'b'),
TodoEntry(title: 'title', content: 'c'),
]);
final insertSimple = 'INSERT INTO todos (content) VALUES (?)';
final insertTitle = 'INSERT INTO todos (title, content) VALUES (?, ?)';
verify(executor.runBatched([
BatchedStatement(insertSimple, [
['a']
]),
BatchedStatement(insertTitle, [
['title', 'b'],
['title', 'c']
]),
]));
verify(streamQueries.handleTableUpdates({db.todosTable}));
});
test('notifies stream queries on inserts', () async {
await db.into(db.users).insert(User(
name: 'User McUserface',
@ -45,7 +68,7 @@ void main() {
verify(streamQueries.handleTableUpdates({db.users}));
});
test('enforces data integrety', () {
test('enforces data integrity', () {
expect(
db.into(db.todosTable).insert(
TodoEntry(

View File

@ -57,6 +57,20 @@ abstract class _DatabaseOwner extends QueryExecutor {
_log(statement, null);
return db.execute(statement);
}
@override
Future<void> runBatched(List<BatchedStatement> statements) async {
final batch = db.batch();
for (var statement in statements) {
for (var boundVariables in statement.variables) {
_log(statement.sql, boundVariables);
batch.execute(statement.sql, boundVariables);
}
}
await batch.commit(noResult: true);
}
}
/// A query executor that uses sqflite internally.
@ -115,10 +129,7 @@ class FlutterQueryExecutor extends _DatabaseOwner {
SqlExecutor _migrationExecutor(s.Database db) {
return (sql) {
if (logStatements) {
_log(sql);
}
_log(sql);
db.execute(sql);
};
}