VM: transactions and before open callback

This commit is contained in:
Simon Binder 2019-07-20 22:55:27 +02:00
parent eb0edb8ac0
commit 4b9cd084b5
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
5 changed files with 171 additions and 100 deletions

View File

@ -3,9 +3,11 @@
@experimental @experimental
library moor_vm; library moor_vm;
import 'dart:async';
import 'dart:io'; import 'dart:io';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:synchronized/synchronized.dart';
import 'moor.dart'; import 'moor.dart';
import 'src/vm/api/database.dart'; import 'src/vm/api/database.dart';

View File

@ -85,8 +85,28 @@ class BatchedStatement {
} }
/// A [QueryExecutor] that runs multiple queries atomically. /// A [QueryExecutor] that runs multiple queries atomically.
abstract class TransactionExecutor extends QueryExecutor { mixin TransactionExecutor on QueryExecutor {
/// Completes the transaction. No further queries may be sent to to this /// Completes the transaction. No further queries may be sent to to this
/// [QueryExecutor] after this method was called. /// [QueryExecutor] after this method was called.
Future<void> send(); Future<void> send();
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError(
'Transactions cannot be created inside a transaction');
}
}
/// Used internally by moor. Responsible for executing the `beforeOpen`
/// callback.
mixin BeforeOpenMixin on QueryExecutor {
@override
Future<bool> ensureOpen() {
return Future.value(true);
}
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError('Transactions cannot be created inside beforeOpen!');
}
} }

View File

@ -1,24 +1,44 @@
part of 'package:moor/moor_vm.dart'; part of 'package:moor/moor_vm.dart';
abstract class _DatabaseUser extends QueryExecutor { class _DbState {
final bool logStatements; final bool logStatements;
final File dbFile; final File file;
final Lock lock = Lock();
Database _db; Database db;
_DatabaseUser(this.logStatements, this.dbFile); _DbState(this.logStatements, this.file);
}
abstract class _DatabaseUser extends QueryExecutor {
final _DbState _state;
bool get _bypassLock => false;
Database get _db => _state.db;
_DatabaseUser(this._state);
void _logStmt(String statement, List<dynamic> args) { void _logStmt(String statement, List<dynamic> args) {
if (logStatements) { if (_state.logStatements) {
print('Executing $statement with variables $args'); print('Executing $statement with variables $args');
} }
} }
Future<T> _synchronized<T>(FutureOr<T> computation()) async {
final lock = _state.lock;
if (_bypassLock) {
return await computation();
}
return await lock.synchronized(computation);
}
@override @override
Future<void> runCustom(String statement) { Future<void> runCustom(String statement) {
return _synchronized(() {
_logStmt(statement, const []); _logStmt(statement, const []);
_db.execute(statement); _db.execute(statement);
return Future.value(); });
} }
void _runWithArgs(String statement, List<dynamic> args) { void _runWithArgs(String statement, List<dynamic> args) {
@ -40,78 +60,37 @@ abstract class _DatabaseUser extends QueryExecutor {
@override @override
Future<int> runDelete(String statement, List<dynamic> args) { Future<int> runDelete(String statement, List<dynamic> args) {
return _synchronized(() {
return _runAndReturnAffected(statement, args); return _runAndReturnAffected(statement, args);
});
} }
@override @override
Future<int> runUpdate(String statement, List<dynamic> args) { Future<int> runUpdate(String statement, List<dynamic> args) {
return _synchronized(() {
return _runAndReturnAffected(statement, args); return _runAndReturnAffected(statement, args);
});
} }
@override @override
Future<int> runInsert(String statement, List<dynamic> args) { Future<int> runInsert(String statement, List<dynamic> args) {
return _synchronized(() {
_runWithArgs(statement, args); _runWithArgs(statement, args);
return Future.value(_db.lastInsertId); return Future.value(_db.lastInsertId);
});
} }
@override @override
Future<List<Map<String, dynamic>>> runSelect( Future<List<Map<String, dynamic>>> runSelect(
String statement, List<dynamic> args) { String statement, List<dynamic> args) {
return _synchronized(() {
_logStmt(statement, args); _logStmt(statement, args);
final stmt = _db.prepare(statement); final stmt = _db.prepare(statement);
final result = stmt.select(args); final result = stmt.select(args);
stmt.close(); stmt.close();
return Future.value(result.toList()); return Future.value(result.toList());
} });
@override
Future<void> close() {
_db?.close();
return Future.value();
}
}
class VMDatabase extends _DatabaseUser {
VMDatabase(File file, {bool logStatements = false})
: super(logStatements, file);
VMDatabase.memory({bool logStatements = false}) : super(logStatements, null);
@override
Future<bool> ensureOpen() async {
if (_db == null) {
_db = _openInternal();
await _runMigrations();
}
return true;
}
Database _openInternal() {
if (dbFile == null) {
return Database.memory();
} else {
return Database.openFile(dbFile);
}
}
Future _runMigrations() async {
final current = _db.userVersion;
final target = databaseInfo.schemaVersion;
if (current == 0) {
await databaseInfo.handleDatabaseCreation(executor: runCustom);
} else if (current < target) {
await databaseInfo.handleDatabaseVersionChange(
executor: null, from: current, to: target);
}
_db.userVersion = target;
}
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError('Transactions are not yet supported on the Dart VM');
} }
@override @override
@ -129,3 +108,102 @@ class VMDatabase extends _DatabaseUser {
return Future.value(); return Future.value();
} }
} }
class VMDatabase extends _DatabaseUser {
VMDatabase(File file, {bool logStatements = false})
: super(_DbState(logStatements, file));
VMDatabase.memory({bool logStatements = false})
: this(null, logStatements: logStatements);
@override
Future<bool> ensureOpen() async {
if (_db == null) {
_state.db = _openInternal();
await _runMigrations();
}
return true;
}
Database _openInternal() {
if (_state.file == null) {
return Database.memory();
} else {
return Database.openFile(_state.file);
}
}
Future _runMigrations() async {
final current = _db.userVersion;
final target = databaseInfo.schemaVersion;
if (current == 0) {
await databaseInfo.handleDatabaseCreation(executor: runCustom);
} else if (current < target) {
await databaseInfo.handleDatabaseVersionChange(
executor: null, from: current, to: target);
}
_db.userVersion = target;
await _synchronized(() {
databaseInfo.beforeOpenCallback(
_BeforeOpenExecutor(_state), OpeningDetails(current, target));
});
}
@override
Future<void> close() {
_db?.close();
return Future.value();
}
@override
TransactionExecutor beginTransaction() {
final transactionReady = Completer<bool>();
final executor = _TransactionExecutor(_state, transactionReady.future);
_synchronized(() async {
// we have the lock, so start the transaction
transactionReady.complete(true);
await executor.completed;
});
return executor;
}
}
class _BeforeOpenExecutor extends _DatabaseUser with BeforeOpenMixin {
@override
final bool _bypassLock = true;
_BeforeOpenExecutor(_DbState state) : super(state);
}
class _TransactionExecutor extends _DatabaseUser with TransactionExecutor {
@override
final bool _bypassLock = true;
final Future<bool> _openingFuture;
bool _sentBeginTransaction = false;
final Completer<void> _completer = Completer();
Future<void> get completed => _completer.future;
_TransactionExecutor(_DbState state, this._openingFuture) : super(state);
@override
Future<bool> ensureOpen() async {
await _openingFuture;
if (!_sentBeginTransaction) {
_db.execute('BEGIN TRANSACTION');
_sentBeginTransaction = true;
}
return Future.value(true);
}
@override
Future<void> send() {
_db.execute('COMMIT TRANSACTION;');
_completer.complete();
return Future.value();
}
}

View File

@ -228,24 +228,14 @@ class WebDatabase extends _DatabaseUser {
} }
} }
class _BeforeOpenExecutor extends _DatabaseUser { class _BeforeOpenExecutor extends _DatabaseUser with BeforeOpenMixin {
_BeforeOpenExecutor(_DbState state) : super(state); _BeforeOpenExecutor(_DbState state) : super(state);
@override @override
final bool _bypassLock = true; final bool _bypassLock = true;
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError(
"Transactions aren't supported in the before open callback");
} }
@override class _TransactionExecutor extends _DatabaseUser with TransactionExecutor {
Future<bool> ensureOpen() => Future.value(true);
}
class _TransactionExecutor extends _DatabaseUser
implements TransactionExecutor {
_TransactionExecutor(_DbState state, this._openingFuture) : super(state); _TransactionExecutor(_DbState state, this._openingFuture) : super(state);
@override @override
@ -264,11 +254,6 @@ class _TransactionExecutor extends _DatabaseUser
_needsSave = true; _needsSave = true;
} }
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError('Cannot have nested transactions');
}
@override @override
Future<bool> ensureOpen() async { Future<bool> ensureOpen() async {
await _openingFuture; await _openingFuture;

View File

@ -160,7 +160,7 @@ class FlutterQueryExecutor extends _DatabaseOwner {
} }
class _SqfliteTransactionExecutor extends _DatabaseOwner class _SqfliteTransactionExecutor extends _DatabaseOwner
implements TransactionExecutor { with TransactionExecutor {
@override @override
s.Transaction db; s.Transaction db;
@ -194,11 +194,6 @@ class _SqfliteTransactionExecutor extends _DatabaseOwner
openingCompleter.future, actionCompleter, sendFuture, db.logStatements); openingCompleter.future, actionCompleter, sendFuture, db.logStatements);
} }
@override
TransactionExecutor beginTransaction() {
throw StateError('Transactions cannot create another transaction!');
}
@override @override
Future<bool> ensureOpen() => _open.then((_) => true); Future<bool> ensureOpen() => _open.then((_) => true);
@ -209,18 +204,9 @@ class _SqfliteTransactionExecutor extends _DatabaseOwner
} }
} }
class _BeforeOpenExecutor extends _DatabaseOwner { class _BeforeOpenExecutor extends _DatabaseOwner with BeforeOpenMixin {
@override @override
final s.DatabaseExecutor db; final s.DatabaseExecutor db;
_BeforeOpenExecutor(this.db, bool logStatements) : super(logStatements); _BeforeOpenExecutor(this.db, bool logStatements) : super(logStatements);
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError(
"Transactions can't be started in the befoeOpen callback");
}
@override
Future<bool> ensureOpen() => Future.value(true);
} }