From 4b9cd084b5c8466ac17d438b05b69f567b7ebd3f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sat, 20 Jul 2019 22:55:27 +0200 Subject: [PATCH] VM: transactions and before open callback --- moor/lib/moor_vm.dart | 2 + moor/lib/src/runtime/executor/executor.dart | 22 +- moor/lib/src/vm/vm_database.dart | 210 ++++++++++++++------ moor/lib/src/web/web_db.dart | 19 +- moor_flutter/lib/moor_flutter.dart | 18 +- 5 files changed, 171 insertions(+), 100 deletions(-) diff --git a/moor/lib/moor_vm.dart b/moor/lib/moor_vm.dart index 1a0648e3..8a5abfcc 100644 --- a/moor/lib/moor_vm.dart +++ b/moor/lib/moor_vm.dart @@ -3,9 +3,11 @@ @experimental library moor_vm; +import 'dart:async'; import 'dart:io'; import 'package:meta/meta.dart'; +import 'package:synchronized/synchronized.dart'; import 'moor.dart'; import 'src/vm/api/database.dart'; diff --git a/moor/lib/src/runtime/executor/executor.dart b/moor/lib/src/runtime/executor/executor.dart index 56097e01..584ba5b0 100644 --- a/moor/lib/src/runtime/executor/executor.dart +++ b/moor/lib/src/runtime/executor/executor.dart @@ -85,8 +85,28 @@ class BatchedStatement { } /// A [QueryExecutor] that runs multiple queries atomically. -abstract class TransactionExecutor extends QueryExecutor { +mixin TransactionExecutor on QueryExecutor { /// Completes the transaction. No further queries may be sent to to this /// [QueryExecutor] after this method was called. Future send(); + + @override + TransactionExecutor beginTransaction() { + throw UnsupportedError( + 'Transactions cannot be created inside a transaction'); + } +} + +/// Used internally by moor. Responsible for executing the `beforeOpen` +/// callback. +mixin BeforeOpenMixin on QueryExecutor { + @override + Future ensureOpen() { + return Future.value(true); + } + + @override + TransactionExecutor beginTransaction() { + throw UnsupportedError('Transactions cannot be created inside beforeOpen!'); + } } diff --git a/moor/lib/src/vm/vm_database.dart b/moor/lib/src/vm/vm_database.dart index 1cad00f0..fde16c31 100644 --- a/moor/lib/src/vm/vm_database.dart +++ b/moor/lib/src/vm/vm_database.dart @@ -1,24 +1,44 @@ part of 'package:moor/moor_vm.dart'; -abstract class _DatabaseUser extends QueryExecutor { +class _DbState { final bool logStatements; - final File dbFile; + final File file; + final Lock lock = Lock(); - Database _db; + Database db; - _DatabaseUser(this.logStatements, this.dbFile); + _DbState(this.logStatements, this.file); +} + +abstract class _DatabaseUser extends QueryExecutor { + final _DbState _state; + + bool get _bypassLock => false; + Database get _db => _state.db; + + _DatabaseUser(this._state); void _logStmt(String statement, List args) { - if (logStatements) { + if (_state.logStatements) { print('Executing $statement with variables $args'); } } + Future _synchronized(FutureOr computation()) async { + final lock = _state.lock; + if (_bypassLock) { + return await computation(); + } + + return await lock.synchronized(computation); + } + @override Future runCustom(String statement) { - _logStmt(statement, const []); - _db.execute(statement); - return Future.value(); + return _synchronized(() { + _logStmt(statement, const []); + _db.execute(statement); + }); } void _runWithArgs(String statement, List args) { @@ -40,78 +60,37 @@ abstract class _DatabaseUser extends QueryExecutor { @override Future runDelete(String statement, List args) { - return _runAndReturnAffected(statement, args); + return _synchronized(() { + return _runAndReturnAffected(statement, args); + }); } @override Future runUpdate(String statement, List args) { - return _runAndReturnAffected(statement, args); + return _synchronized(() { + return _runAndReturnAffected(statement, args); + }); } @override Future runInsert(String statement, List args) { - _runWithArgs(statement, args); - return Future.value(_db.lastInsertId); + return _synchronized(() { + _runWithArgs(statement, args); + return Future.value(_db.lastInsertId); + }); } @override Future>> runSelect( String statement, List args) { - _logStmt(statement, args); - final stmt = _db.prepare(statement); - final result = stmt.select(args); - stmt.close(); + return _synchronized(() { + _logStmt(statement, args); + final stmt = _db.prepare(statement); + final result = stmt.select(args); + stmt.close(); - return Future.value(result.toList()); - } - - @override - Future close() { - _db?.close(); - return Future.value(); - } -} - -class VMDatabase extends _DatabaseUser { - VMDatabase(File file, {bool logStatements = false}) - : super(logStatements, file); - - VMDatabase.memory({bool logStatements = false}) : super(logStatements, null); - - @override - Future ensureOpen() async { - if (_db == null) { - _db = _openInternal(); - await _runMigrations(); - } - return true; - } - - Database _openInternal() { - if (dbFile == null) { - return Database.memory(); - } else { - return Database.openFile(dbFile); - } - } - - Future _runMigrations() async { - final current = _db.userVersion; - final target = databaseInfo.schemaVersion; - - if (current == 0) { - await databaseInfo.handleDatabaseCreation(executor: runCustom); - } else if (current < target) { - await databaseInfo.handleDatabaseVersionChange( - executor: null, from: current, to: target); - } - - _db.userVersion = target; - } - - @override - TransactionExecutor beginTransaction() { - throw UnsupportedError('Transactions are not yet supported on the Dart VM'); + return Future.value(result.toList()); + }); } @override @@ -129,3 +108,102 @@ class VMDatabase extends _DatabaseUser { return Future.value(); } } + +class VMDatabase extends _DatabaseUser { + VMDatabase(File file, {bool logStatements = false}) + : super(_DbState(logStatements, file)); + + VMDatabase.memory({bool logStatements = false}) + : this(null, logStatements: logStatements); + + @override + Future ensureOpen() async { + if (_db == null) { + _state.db = _openInternal(); + await _runMigrations(); + } + return true; + } + + Database _openInternal() { + if (_state.file == null) { + return Database.memory(); + } else { + return Database.openFile(_state.file); + } + } + + Future _runMigrations() async { + final current = _db.userVersion; + final target = databaseInfo.schemaVersion; + + if (current == 0) { + await databaseInfo.handleDatabaseCreation(executor: runCustom); + } else if (current < target) { + await databaseInfo.handleDatabaseVersionChange( + executor: null, from: current, to: target); + } + + _db.userVersion = target; + + await _synchronized(() { + databaseInfo.beforeOpenCallback( + _BeforeOpenExecutor(_state), OpeningDetails(current, target)); + }); + } + + @override + Future close() { + _db?.close(); + return Future.value(); + } + + @override + TransactionExecutor beginTransaction() { + final transactionReady = Completer(); + final executor = _TransactionExecutor(_state, transactionReady.future); + + _synchronized(() async { + // we have the lock, so start the transaction + transactionReady.complete(true); + await executor.completed; + }); + + return executor; + } +} + +class _BeforeOpenExecutor extends _DatabaseUser with BeforeOpenMixin { + @override + final bool _bypassLock = true; + _BeforeOpenExecutor(_DbState state) : super(state); +} + +class _TransactionExecutor extends _DatabaseUser with TransactionExecutor { + @override + final bool _bypassLock = true; + final Future _openingFuture; + bool _sentBeginTransaction = false; + + final Completer _completer = Completer(); + Future get completed => _completer.future; + + _TransactionExecutor(_DbState state, this._openingFuture) : super(state); + + @override + Future ensureOpen() async { + await _openingFuture; + if (!_sentBeginTransaction) { + _db.execute('BEGIN TRANSACTION'); + _sentBeginTransaction = true; + } + return Future.value(true); + } + + @override + Future send() { + _db.execute('COMMIT TRANSACTION;'); + _completer.complete(); + return Future.value(); + } +} diff --git a/moor/lib/src/web/web_db.dart b/moor/lib/src/web/web_db.dart index fd72ba53..6e3dc966 100644 --- a/moor/lib/src/web/web_db.dart +++ b/moor/lib/src/web/web_db.dart @@ -228,24 +228,14 @@ class WebDatabase extends _DatabaseUser { } } -class _BeforeOpenExecutor extends _DatabaseUser { +class _BeforeOpenExecutor extends _DatabaseUser with BeforeOpenMixin { _BeforeOpenExecutor(_DbState state) : super(state); @override final bool _bypassLock = true; - - @override - TransactionExecutor beginTransaction() { - throw UnsupportedError( - "Transactions aren't supported in the before open callback"); - } - - @override - Future ensureOpen() => Future.value(true); } -class _TransactionExecutor extends _DatabaseUser - implements TransactionExecutor { +class _TransactionExecutor extends _DatabaseUser with TransactionExecutor { _TransactionExecutor(_DbState state, this._openingFuture) : super(state); @override @@ -264,11 +254,6 @@ class _TransactionExecutor extends _DatabaseUser _needsSave = true; } - @override - TransactionExecutor beginTransaction() { - throw UnsupportedError('Cannot have nested transactions'); - } - @override Future ensureOpen() async { await _openingFuture; diff --git a/moor_flutter/lib/moor_flutter.dart b/moor_flutter/lib/moor_flutter.dart index 8e9d528d..26ff54e6 100644 --- a/moor_flutter/lib/moor_flutter.dart +++ b/moor_flutter/lib/moor_flutter.dart @@ -160,7 +160,7 @@ class FlutterQueryExecutor extends _DatabaseOwner { } class _SqfliteTransactionExecutor extends _DatabaseOwner - implements TransactionExecutor { + with TransactionExecutor { @override s.Transaction db; @@ -194,11 +194,6 @@ class _SqfliteTransactionExecutor extends _DatabaseOwner openingCompleter.future, actionCompleter, sendFuture, db.logStatements); } - @override - TransactionExecutor beginTransaction() { - throw StateError('Transactions cannot create another transaction!'); - } - @override Future ensureOpen() => _open.then((_) => true); @@ -209,18 +204,9 @@ class _SqfliteTransactionExecutor extends _DatabaseOwner } } -class _BeforeOpenExecutor extends _DatabaseOwner { +class _BeforeOpenExecutor extends _DatabaseOwner with BeforeOpenMixin { @override final s.DatabaseExecutor db; _BeforeOpenExecutor(this.db, bool logStatements) : super(logStatements); - - @override - TransactionExecutor beginTransaction() { - throw UnsupportedError( - "Transactions can't be started in the befoeOpen callback"); - } - - @override - Future ensureOpen() => Future.value(true); }