From 8bed506e0d42f92896d71c7411955616d48a1caa Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 19 Apr 2019 22:45:38 +0200 Subject: [PATCH] Batched insert statements Closes #15 --- moor/lib/src/runtime/executor/executor.dart | 38 ++++++++++++ moor/lib/src/runtime/statements/insert.dart | 66 +++++++++++++++++---- moor/test/insert_test.dart | 25 +++++++- moor_flutter/lib/moor_flutter.dart | 19 ++++-- 4 files changed, 131 insertions(+), 17 deletions(-) diff --git a/moor/lib/src/runtime/executor/executor.dart b/moor/lib/src/runtime/executor/executor.dart index 5eaab1ba..c6d83078 100644 --- a/moor/lib/src/runtime/executor/executor.dart +++ b/moor/lib/src/runtime/executor/executor.dart @@ -1,6 +1,8 @@ import 'dart:async'; +import 'package:collection/collection.dart'; import 'package:moor/src/runtime/database.dart'; +import 'package:moor/src/utils/hash.dart'; /// A query executor is responsible for executing statements on a database and /// return their results in a raw form. @@ -36,10 +38,46 @@ abstract class QueryExecutor { /// statement will be ignored. Future runCustom(String statement); + /// Prepares the [statements] and then executes each one with all of the + /// [BatchedStatement.variables]. + Future runBatched(List statements); + /// Starts a [TransactionExecutor]. TransactionExecutor beginTransaction(); } +/// A statement that should be executed in a batch. Used internally by moor. +class BatchedStatement { + static const _nestedListEquality = ListEquality(ListEquality()); + + /// The raw sql that needs to be executed + final String sql; + + /// The variables to be used for the statement. Each entry holds a list of + /// variables that should be bound to the [sql] statement. + final List> variables; + + BatchedStatement(this.sql, this.variables); + + @override + int get hashCode { + return $mrjf($mrjc(sql.hashCode, const ListEquality().hash(variables))); + } + + @override + bool operator ==(other) { + return identical(this, other) || + (other is BatchedStatement && + other.sql == sql && + _nestedListEquality.equals(variables, other.variables)); + } + + @override + String toString() { + return 'BatchedStatement($sql, $variables)'; + } +} + /// A [QueryExecutor] that runs multiple queries atomically. abstract class TransactionExecutor extends QueryExecutor { /// Completes the transaction. No further queries may be sent to to this diff --git a/moor/lib/src/runtime/statements/insert.dart b/moor/lib/src/runtime/statements/insert.dart index 98cc7502..261ee2de 100644 --- a/moor/lib/src/runtime/statements/insert.dart +++ b/moor/lib/src/runtime/statements/insert.dart @@ -26,12 +26,18 @@ class InsertStatement { /// If the table contains an auto-increment column, the generated value will /// be returned. Future insert(DataClass entity) async { - if (!table.validateIntegrity(entity, true)) { - throw InvalidDataException( - 'Invalid data: $entity cannot be written into ${table.$tableName}'); - } + _validateIntegrity(entity); + final ctx = _createContext(entity, _orReplace); - final map = table.entityToSql(entity) + return await database.executor.doWhenOpened((e) async { + final id = await database.executor.runInsert(ctx.sql, ctx.boundVariables); + database.markTablesUpdated({table}); + return id; + }); + } + + GenerationContext _createContext(DataClass entry, bool replace) { + final map = table.entityToSql(entry) ..removeWhere((_, value) => value == null); final ctx = GenerationContext(database); @@ -56,15 +62,51 @@ class InsertStatement { } ctx.buffer.write(')'); - - return await database.executor.doWhenOpened((e) async { - final id = await database.executor.runInsert(ctx.sql, ctx.boundVariables); - database.markTablesUpdated({table}); - return id; - }); + return ctx; } - // TODO insert multiple values + void _validateIntegrity(DataClass d) { + if (d == null) { + throw InvalidDataException( + 'Cannot writee null row into ${table.$tableName}'); + } + if (!table.validateIntegrity(d, true)) { + throw InvalidDataException( + 'Invalid data: $d cannot be written into ${table.$tableName}'); + } + } + + /// Inserts all [rows] into the table. + /// + /// All fields in a row that don't have a default value or auto-increment + /// must be set and non-null. Otherwise, an [InvalidDataException] will be + /// thrown. + /// When a row with the same primary or unique key already exists in the + /// database, the insert will fail. Use [orReplace] to replace rows that + /// already exist. + Future insertAll(List rows, {bool orReplace}) async { + final statements = >{}; + + // Not every insert has the same sql, as fields which are set to null are + // not included. So, we have a map for sql -> list of variables which we can + // then turn into prepared statements + for (var row in rows) { + _validateIntegrity(row); + + final ctx = _createContext(row, orReplace); + statements.putIfAbsent(ctx.sql, () => []).add(ctx); + } + + final batchedStatements = statements.entries.map((e) { + final vars = e.value.map((context) => context.boundVariables).toList(); + return BatchedStatement(e.key, vars); + }).toList(growable: false); + + await database.executor.doWhenOpened((e) async { + await e.runBatched(batchedStatements); + }); + database.markTablesUpdated({table}); + } /// Updates the row with the same primary key in the database or creates one /// if it doesn't exist. diff --git a/moor/test/insert_test.dart b/moor/test/insert_test.dart index c4dffbf7..f3f92e5a 100644 --- a/moor/test/insert_test.dart +++ b/moor/test/insert_test.dart @@ -35,6 +35,29 @@ void main() { [113, 'Done'])); }); + test('runs bulk inserts', () async { + await db.into(db.todosTable).insertAll([ + TodoEntry(content: 'a'), + TodoEntry(title: 'title', content: 'b'), + TodoEntry(title: 'title', content: 'c'), + ]); + + final insertSimple = 'INSERT INTO todos (content) VALUES (?)'; + final insertTitle = 'INSERT INTO todos (title, content) VALUES (?, ?)'; + + verify(executor.runBatched([ + BatchedStatement(insertSimple, [ + ['a'] + ]), + BatchedStatement(insertTitle, [ + ['title', 'b'], + ['title', 'c'] + ]), + ])); + + verify(streamQueries.handleTableUpdates({db.todosTable})); + }); + test('notifies stream queries on inserts', () async { await db.into(db.users).insert(User( name: 'User McUserface', @@ -45,7 +68,7 @@ void main() { verify(streamQueries.handleTableUpdates({db.users})); }); - test('enforces data integrety', () { + test('enforces data integrity', () { expect( db.into(db.todosTable).insert( TodoEntry( diff --git a/moor_flutter/lib/moor_flutter.dart b/moor_flutter/lib/moor_flutter.dart index 5bdce677..fb872a73 100644 --- a/moor_flutter/lib/moor_flutter.dart +++ b/moor_flutter/lib/moor_flutter.dart @@ -57,6 +57,20 @@ abstract class _DatabaseOwner extends QueryExecutor { _log(statement, null); return db.execute(statement); } + + @override + Future runBatched(List statements) async { + final batch = db.batch(); + + for (var statement in statements) { + for (var boundVariables in statement.variables) { + _log(statement.sql, boundVariables); + batch.execute(statement.sql, boundVariables); + } + } + + await batch.commit(noResult: true); + } } /// A query executor that uses sqflite internally. @@ -115,10 +129,7 @@ class FlutterQueryExecutor extends _DatabaseOwner { SqlExecutor _migrationExecutor(s.Database db) { return (sql) { - if (logStatements) { - _log(sql); - } - + _log(sql); db.execute(sql); }; }