mirror of https://github.com/AMT-Cheif/drift.git
Migrate VM database to use the new delegate api
This commit is contained in:
parent
2500e5ff20
commit
56f8e447bd
|
@ -7,7 +7,7 @@ import 'dart:async';
|
||||||
import 'dart:io';
|
import 'dart:io';
|
||||||
|
|
||||||
import 'package:meta/meta.dart';
|
import 'package:meta/meta.dart';
|
||||||
import 'package:synchronized/synchronized.dart';
|
import 'backends.dart';
|
||||||
import 'moor.dart';
|
import 'moor.dart';
|
||||||
|
|
||||||
import 'src/vm/api/database.dart';
|
import 'src/vm/api/database.dart';
|
||||||
|
|
|
@ -321,4 +321,10 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Closes this database instance and released the resources associated with
|
||||||
|
/// it.
|
||||||
|
void close() {
|
||||||
|
executor.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,28 +89,8 @@ class BatchedStatement {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A [QueryExecutor] that runs multiple queries atomically.
|
/// A [QueryExecutor] that runs multiple queries atomically.
|
||||||
mixin TransactionExecutor on QueryExecutor {
|
abstract class TransactionExecutor extends QueryExecutor {
|
||||||
/// Completes the transaction. No further queries may be sent to to this
|
/// Completes the transaction. No further queries may be sent to to this
|
||||||
/// [QueryExecutor] after this method was called.
|
/// [QueryExecutor] after this method was called.
|
||||||
Future<void> send();
|
Future<void> send();
|
||||||
|
|
||||||
@override
|
|
||||||
TransactionExecutor beginTransaction() {
|
|
||||||
throw UnsupportedError(
|
|
||||||
'Transactions cannot be created inside a transaction');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Used internally by moor. Responsible for executing the `beforeOpen`
|
|
||||||
/// callback.
|
|
||||||
mixin BeforeOpenMixin on QueryExecutor {
|
|
||||||
@override
|
|
||||||
Future<bool> ensureOpen() {
|
|
||||||
return Future.value(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
|
||||||
TransactionExecutor beginTransaction() {
|
|
||||||
throw UnsupportedError('Transactions cannot be created inside beforeOpen!');
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -217,6 +217,12 @@ class DelegatedDatabase extends QueryExecutor with _ExecutorWithQueryDelegate {
|
||||||
final alreadyOpen = await delegate.isOpen;
|
final alreadyOpen = await delegate.isOpen;
|
||||||
if (alreadyOpen) return true;
|
if (alreadyOpen) return true;
|
||||||
|
|
||||||
|
// could have changed already, see https://github.com/dart-lang/linter/issues/1384
|
||||||
|
// ignore: invariant_booleans
|
||||||
|
if (_openingCompleter != null) {
|
||||||
|
return _openingCompleter.future;
|
||||||
|
}
|
||||||
|
|
||||||
// not already open or opening. Open the database now!
|
// not already open or opening. Open the database now!
|
||||||
_openingCompleter = Completer();
|
_openingCompleter = Completer();
|
||||||
await delegate.open(databaseInfo);
|
await delegate.open(databaseInfo);
|
||||||
|
|
|
@ -6,9 +6,9 @@ class Result extends Iterable<Row> {
|
||||||
// a result set can have multiple columns with the same name, but that's rare
|
// a result set can have multiple columns with the same name, but that's rare
|
||||||
// and users usually use a name as index. So we cache that for O(1) lookups
|
// and users usually use a name as index. So we cache that for O(1) lookups
|
||||||
Map<String, int> _calculatedIndexes;
|
Map<String, int> _calculatedIndexes;
|
||||||
final List<List<dynamic>> _rows;
|
final List<List<dynamic>> rows;
|
||||||
|
|
||||||
Result(this.columnNames, this._rows) {
|
Result(this.columnNames, this.rows) {
|
||||||
_calculatedIndexes = {
|
_calculatedIndexes = {
|
||||||
for (var column in columnNames) column: columnNames.lastIndexOf(column),
|
for (var column in columnNames) column: columnNames.lastIndexOf(column),
|
||||||
};
|
};
|
||||||
|
@ -28,7 +28,7 @@ class Row extends MapMixin<String, dynamic>
|
||||||
|
|
||||||
/// Returns the value stored in the [i]-th column in this row (zero-indexed).
|
/// Returns the value stored in the [i]-th column in this row (zero-indexed).
|
||||||
dynamic columnAt(int i) {
|
dynamic columnAt(int i) {
|
||||||
return _result._rows[_rowIndex][i];
|
return _result.rows[_rowIndex][i];
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
@ -57,6 +57,6 @@ class _ResultIterator extends Iterator<Row> {
|
||||||
@override
|
@override
|
||||||
bool moveNext() {
|
bool moveNext() {
|
||||||
index++;
|
index++;
|
||||||
return index < result._rows.length;
|
return index < result.rows.length;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,96 +1,47 @@
|
||||||
part of 'package:moor/moor_vm.dart';
|
part of 'package:moor/moor_vm.dart';
|
||||||
|
|
||||||
class _DbState {
|
/// A moor database that runs on the Dart VM.
|
||||||
final bool logStatements;
|
class VMDatabase extends DelegatedDatabase {
|
||||||
final File file;
|
VMDatabase._(DatabaseDelegate delegate, bool logStatements)
|
||||||
final Lock lock = Lock();
|
: super(delegate, isSequential: true, logStatements: logStatements);
|
||||||
|
|
||||||
Database db;
|
/// Creates a database that will store its result in the [file], creating it
|
||||||
|
/// if it doesn't exist.
|
||||||
|
factory VMDatabase(File file, {bool logStatements = false}) {
|
||||||
|
return VMDatabase._(_VmDelegate(file), logStatements);
|
||||||
|
}
|
||||||
|
|
||||||
_DbState(this.logStatements, this.file);
|
/// Creates a database won't persist its changes on disk.
|
||||||
|
factory VMDatabase.memory({bool logStatements = false}) {
|
||||||
|
return VMDatabase._(_VmDelegate(null), logStatements);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class _DatabaseUser extends QueryExecutor {
|
class _VmDelegate extends DatabaseDelegate {
|
||||||
final _DbState _state;
|
Database _db;
|
||||||
|
|
||||||
bool get _bypassLock => false;
|
final File file;
|
||||||
Database get _db => _state.db;
|
|
||||||
|
|
||||||
_DatabaseUser(this._state);
|
_VmDelegate(this.file);
|
||||||
|
|
||||||
void _logStmt(String statement, List<dynamic> args) {
|
|
||||||
if (_state.logStatements) {
|
|
||||||
print('Executing $statement with variables $args');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<T> _synchronized<T>(FutureOr<T> computation()) async {
|
|
||||||
final lock = _state.lock;
|
|
||||||
if (_bypassLock) {
|
|
||||||
return await computation();
|
|
||||||
}
|
|
||||||
|
|
||||||
return await lock.synchronized(computation);
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> runCustom(String statement) {
|
final TransactionDelegate transactionDelegate = const NoTransactionDelegate();
|
||||||
return _synchronized(() {
|
|
||||||
_logStmt(statement, const []);
|
|
||||||
_db.execute(statement);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void _runWithArgs(String statement, List<dynamic> args) {
|
@override
|
||||||
_logStmt(statement, args);
|
DbVersionDelegate versionDelegate;
|
||||||
|
|
||||||
if (args.isEmpty) {
|
@override
|
||||||
_db.execute(statement);
|
Future<bool> get isOpen => Future.value(_db != null);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> open([GeneratedDatabase db]) {
|
||||||
|
if (file != null) {
|
||||||
|
_db = Database.openFile(file);
|
||||||
} else {
|
} else {
|
||||||
_db.prepare(statement)
|
_db = Database.memory();
|
||||||
..execute(args)
|
|
||||||
..close();
|
|
||||||
}
|
}
|
||||||
}
|
versionDelegate = _VmVersionDelegate(_db);
|
||||||
|
return Future.value();
|
||||||
Future<int> _runAndReturnAffected(String statement, List<dynamic> args) {
|
|
||||||
_runWithArgs(statement, args);
|
|
||||||
return Future.value(_db.updatedRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
|
||||||
Future<int> runDelete(String statement, List<dynamic> args) {
|
|
||||||
return _synchronized(() {
|
|
||||||
return _runAndReturnAffected(statement, args);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
|
||||||
Future<int> runUpdate(String statement, List<dynamic> args) {
|
|
||||||
return _synchronized(() {
|
|
||||||
return _runAndReturnAffected(statement, args);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
|
||||||
Future<int> runInsert(String statement, List<dynamic> args) {
|
|
||||||
return _synchronized(() {
|
|
||||||
_runWithArgs(statement, args);
|
|
||||||
return Future.value(_db.lastInsertId);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
|
||||||
Future<List<Map<String, dynamic>>> runSelect(
|
|
||||||
String statement, List<dynamic> args) {
|
|
||||||
return _synchronized(() {
|
|
||||||
_logStmt(statement, args);
|
|
||||||
final stmt = _db.prepare(statement);
|
|
||||||
final result = stmt.select(args);
|
|
||||||
stmt.close();
|
|
||||||
|
|
||||||
return Future.value(result.toList());
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
@ -107,103 +58,56 @@ abstract class _DatabaseUser extends QueryExecutor {
|
||||||
|
|
||||||
return Future.value();
|
return Future.value();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
class VMDatabase extends _DatabaseUser {
|
void _runWithArgs(String statement, List<dynamic> args) {
|
||||||
VMDatabase(File file, {bool logStatements = false})
|
if (args.isEmpty) {
|
||||||
: super(_DbState(logStatements, file));
|
_db.execute(statement);
|
||||||
|
|
||||||
VMDatabase.memory({bool logStatements = false})
|
|
||||||
: this(null, logStatements: logStatements);
|
|
||||||
|
|
||||||
@override
|
|
||||||
Future<bool> ensureOpen() async {
|
|
||||||
if (_db == null) {
|
|
||||||
_state.db = _openInternal();
|
|
||||||
await _runMigrations();
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
Database _openInternal() {
|
|
||||||
if (_state.file == null) {
|
|
||||||
return Database.memory();
|
|
||||||
} else {
|
} else {
|
||||||
return Database.openFile(_state.file);
|
_db.prepare(statement)
|
||||||
|
..execute(args)
|
||||||
|
..close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Future _runMigrations() async {
|
|
||||||
final current = _db.userVersion;
|
|
||||||
final target = databaseInfo.schemaVersion;
|
|
||||||
|
|
||||||
if (current == 0) {
|
|
||||||
await databaseInfo.handleDatabaseCreation(executor: runCustom);
|
|
||||||
} else if (current < target) {
|
|
||||||
await databaseInfo.handleDatabaseVersionChange(
|
|
||||||
executor: null, from: current, to: target);
|
|
||||||
}
|
|
||||||
|
|
||||||
_db.userVersion = target;
|
|
||||||
|
|
||||||
await _synchronized(() {
|
|
||||||
databaseInfo.beforeOpenCallback(
|
|
||||||
_BeforeOpenExecutor(_state), OpeningDetails(current, target));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> close() {
|
Future<void> runCustom(String statement, List args) {
|
||||||
_db?.close();
|
_runWithArgs(statement, args);
|
||||||
return Future.value();
|
return Future.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
TransactionExecutor beginTransaction() {
|
Future<int> runInsert(String statement, List args) {
|
||||||
final transactionReady = Completer<bool>();
|
_runWithArgs(statement, args);
|
||||||
final executor = _TransactionExecutor(_state, transactionReady.future);
|
return Future.value(_db.lastInsertId);
|
||||||
|
|
||||||
_synchronized(() async {
|
|
||||||
// we have the lock, so start the transaction
|
|
||||||
transactionReady.complete(true);
|
|
||||||
await executor.completed;
|
|
||||||
});
|
|
||||||
|
|
||||||
return executor;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class _BeforeOpenExecutor extends _DatabaseUser with BeforeOpenMixin {
|
|
||||||
@override
|
|
||||||
final bool _bypassLock = true;
|
|
||||||
_BeforeOpenExecutor(_DbState state) : super(state);
|
|
||||||
}
|
|
||||||
|
|
||||||
class _TransactionExecutor extends _DatabaseUser with TransactionExecutor {
|
|
||||||
@override
|
|
||||||
final bool _bypassLock = true;
|
|
||||||
final Future<bool> _openingFuture;
|
|
||||||
bool _sentBeginTransaction = false;
|
|
||||||
|
|
||||||
final Completer<void> _completer = Completer();
|
|
||||||
Future<void> get completed => _completer.future;
|
|
||||||
|
|
||||||
_TransactionExecutor(_DbState state, this._openingFuture) : super(state);
|
|
||||||
|
|
||||||
@override
|
|
||||||
Future<bool> ensureOpen() async {
|
|
||||||
await _openingFuture;
|
|
||||||
if (!_sentBeginTransaction) {
|
|
||||||
_db.execute('BEGIN TRANSACTION');
|
|
||||||
_sentBeginTransaction = true;
|
|
||||||
}
|
|
||||||
return Future.value(true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> send() {
|
Future<int> runUpdate(String statement, List args) {
|
||||||
_db.execute('COMMIT TRANSACTION;');
|
_runWithArgs(statement, args);
|
||||||
_completer.complete();
|
return Future.value(_db.updatedRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<QueryResult> runSelect(String statement, List args) {
|
||||||
|
final stmt = _db.prepare(statement);
|
||||||
|
final result = stmt.select(args);
|
||||||
|
stmt.close();
|
||||||
|
|
||||||
|
return Future.value(QueryResult(result.columnNames, result.rows));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class _VmVersionDelegate extends DynamicVersionDelegate {
|
||||||
|
final Database database;
|
||||||
|
|
||||||
|
_VmVersionDelegate(this.database);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<int> get schemaVersion => Future.value(database.userVersion);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> setSchemaVersion(int version) {
|
||||||
|
database.userVersion = version;
|
||||||
return Future.value();
|
return Future.value();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -228,14 +228,23 @@ class WebDatabase extends _DatabaseUser {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class _BeforeOpenExecutor extends _DatabaseUser with BeforeOpenMixin {
|
class _BeforeOpenExecutor extends _DatabaseUser {
|
||||||
_BeforeOpenExecutor(_DbState state) : super(state);
|
_BeforeOpenExecutor(_DbState state) : super(state);
|
||||||
|
|
||||||
@override
|
@override
|
||||||
final bool _bypassLock = true;
|
final bool _bypassLock = true;
|
||||||
|
|
||||||
|
@override
|
||||||
|
TransactionExecutor beginTransaction() {
|
||||||
|
throw Error();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<bool> ensureOpen() => Future.value(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
class _TransactionExecutor extends _DatabaseUser with TransactionExecutor {
|
class _TransactionExecutor extends _DatabaseUser
|
||||||
|
implements TransactionExecutor {
|
||||||
_TransactionExecutor(_DbState state, this._openingFuture) : super(state);
|
_TransactionExecutor(_DbState state, this._openingFuture) : super(state);
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
@ -270,4 +279,9 @@ class _TransactionExecutor extends _DatabaseUser with TransactionExecutor {
|
||||||
_completer.complete();
|
_completer.complete();
|
||||||
return Future.value();
|
return Future.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
TransactionExecutor beginTransaction() {
|
||||||
|
throw Error();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
|
import 'dart:async';
|
||||||
|
|
||||||
import 'package:moor/moor.dart';
|
import 'package:moor/moor.dart';
|
||||||
|
import 'package:pedantic/pedantic.dart';
|
||||||
import 'package:test_api/test_api.dart';
|
import 'package:test_api/test_api.dart';
|
||||||
import 'package:moor/moor_vm.dart';
|
import 'package:moor/moor_vm.dart';
|
||||||
|
|
||||||
|
@ -32,6 +35,29 @@ void main() {
|
||||||
final readUpdated = await db.select(db.todosTable).getSingle();
|
final readUpdated = await db.select(db.todosTable).getSingle();
|
||||||
expect(readUpdated.content, 'Updated content');
|
expect(readUpdated.content, 'Updated content');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('Transactions test', () async {
|
||||||
|
db = TodoDb(VMDatabase.memory(logStatements: false));
|
||||||
|
|
||||||
|
final completedOperations = StreamController<String>();
|
||||||
|
|
||||||
|
unawaited(db.transaction((_) async {
|
||||||
|
await insertCategory();
|
||||||
|
completedOperations.add('transaction');
|
||||||
|
await pumpEventQueue();
|
||||||
|
}));
|
||||||
|
|
||||||
|
unawaited(insertUser().then((_) {
|
||||||
|
completedOperations.add('regular');
|
||||||
|
}));
|
||||||
|
|
||||||
|
await expectLater(
|
||||||
|
completedOperations.stream, emitsInOrder(['transaction', 'regular']));
|
||||||
|
|
||||||
|
// call .getSingle to verify both rows have been written
|
||||||
|
await db.select(db.users).getSingle();
|
||||||
|
await db.select(db.categories).getSingle();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Future insertCategory() async {
|
Future insertCategory() async {
|
||||||
|
|
Loading…
Reference in New Issue