New API to create custom backends easier

Also ported the Flutter backend to that API
This commit is contained in:
Simon Binder 2019-07-22 11:30:39 +02:00
parent 6acc81c88b
commit 48f87330d6
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
9 changed files with 580 additions and 182 deletions

View File

@ -0,0 +1,8 @@
---
layout: guide
title: Custom databases
nav_order: 8
permalink: /custom_backend
---
TBD

8
moor/lib/backends.dart Normal file
View File

@ -0,0 +1,8 @@
/// Utility classes to implement custom database backends that work together
/// with moor.
library backends;
export 'src/runtime/executor/executor.dart';
export 'src/runtime/executor/helpers/delegates.dart';
export 'src/runtime/executor/helpers/engines.dart';
export 'src/runtime/executor/helpers/results.dart';

View File

@ -177,8 +177,8 @@ mixin QueryEngine on DatabaseConnectionUser {
final ctx = GenerationContext.fromDb(engine);
final mappedArgs = variables.map((v) => v.mapToSimpleValue(ctx)).toList();
final affectedRows =
executor.doWhenOpened((_) => executor.runUpdate(query, mappedArgs));
final affectedRows = await executor
.doWhenOpened((_) => executor.runUpdate(query, mappedArgs));
if (updates != null) {
await engine.streamQueries.handleTableUpdates(updates);

View File

@ -6,6 +6,10 @@ import 'package:moor/src/utils/hash.dart';
/// A query executor is responsible for executing statements on a database and
/// return their results in a raw form.
///
/// This is an internal api of moor, which can break often. If you want to
/// implement custom database backends, consider using a delegate as described
/// [here](https://moor.simonbinder.eu/custom_backend)
abstract class QueryExecutor {
GeneratedDatabase databaseInfo;

View File

@ -0,0 +1,154 @@
import 'dart:typed_data' show Uint8List;
import 'package:moor/moor.dart';
import 'package:moor/src/runtime/executor/helpers/results.dart';
/// An interface that supports sending database queries. Used as a backend for
/// moor.
///
/// Database implementations should support the following types both for
/// variables and result sets:
/// - [int]
/// - [double]
/// - [String]
/// - [Uint8List]
abstract class DatabaseDelegate implements QueryDelegate {
/// Returns an appropriate class to resolve the current schema version in
/// this database.
///
/// Common implementations will be:
/// - [NoVersionDelegate] for databases without a schema version (such as an
/// MySql server we connect to)
/// - [OnOpenVersionDelegate] for databases whose schema version can only be
/// set while opening it (such as sqflite)
/// - [DynamicVersionDelegate] for databases where moor can set the schema
/// version at any time (used for the web and VM implementation)
DbVersionDelegate get versionDelegate;
/// The way this database engine starts transactions.
TransactionDelegate get transactionDelegate;
/// A future that completes with `true` when this database is open and with
/// `false` when its not. The future may never complete with an error or with
/// null. It should return relatively quickly, as moor queries it before each
/// statement it sends to the database.
Future<bool> get isOpen;
/// Opens the database. Moor will only call this when [isOpen] has returned
/// false before. Further, moor will not attempt to open a database multiple
/// times, so you don't have to worry about a connection being created
/// multiple times.
///
/// The [GeneratedDatabase] is the user-defined database annotated with
/// [UseMoor]. It might be useful to read the [GeneratedDatabase.schemaVersion]
/// if that information is required while opening the database.
Future<void> open([GeneratedDatabase db]);
}
/// An interface which can execute sql statements.
abstract class QueryDelegate {
/// Prepares and executes the [statement], binding the variables to [args].
/// Its safe to assume that the [statement] is a select statement, the
/// [QueryResult] that it returns should be returned from here.
///
/// If the statement can't be executed, an exception should be thrown. See
/// the class documentation of [DatabaseDelegate] on what types are supported.
Future<QueryResult> runSelect(String statement, List<dynamic> args);
/// Prepares and executes the [statement] with the variables bound to [args].
/// The statement will either be an `UPDATE` or `DELETE` statement.
///
/// If the statement completes successfully, the amount of changed rows should
/// be returned, or `0` if no rows where updated. Should throw if the
/// statement can't be executed.
Future<int> runUpdate(String statement, List<dynamic> args);
/// Prepares and executes the [statement] with the variables bound to [args].
/// The statement will be an `INSERT` statement.
///
/// If the statement completes successfully, the insert id of the row can be
/// returned. If that information is not available, `null` can be returned.
/// The method should throw if the statement can't be executed.
Future<int> runInsert(String statement, List<dynamic> args);
/// Runs a custom [statement] with the given [args]. Ignores all results, but
/// throws when the statement can't be executed.
Future<void> runCustom(String statement, List<dynamic> args);
/// Runs each of the [statements] with all set of variables in each
/// [BatchedStatement.variables]. For database APIs that support preparing
/// statements, this allows us to only prepare a statement once for each
/// [BatchedStatement], which can be executed multiple times.
Future<void> runBatched(List<BatchedStatement> statements) async {
// default, inefficient implementation
for (var stmt in statements) {
for (var boundVars in stmt.variables) {
await runCustom(stmt.sql, boundVars);
}
}
}
}
/// An interface to start and manage transactions.
///
/// Clients may not extend, implement or mix-in this class directly.
abstract class TransactionDelegate {
const TransactionDelegate();
}
/// A [TransactionDelegate] for database APIs which don't already support
/// creating transactions. Moor will send a `BEGIN TRANSACTION` statement at the
/// beginning, then block the database, and finally send a `COMMIT` statement
/// at the end.
class NoTransactionDelegate extends TransactionDelegate {
/// The statement that starts a transaction on this database engine.
final String start;
/// The statement that commits a transaction on this database engine.
final String commit;
const NoTransactionDelegate(
{this.start = 'BEGIN TRANSACTION', this.commit = 'COMMIT TRANSACTION'});
}
/// A [TransactionDelegate] for database APIs which do support creating and
/// managing transactions themselves.
abstract class SupportedTransactionDelegate extends TransactionDelegate {
const SupportedTransactionDelegate();
/// Start a transaction, which we assume implements [QueryEngine], and call
/// [run] with the transaction.
///
/// If [run] completes with an error, rollback. Otherwise, commit.
void startTransaction(Future Function(QueryDelegate) run);
}
/// An interface that supports setting the database version.
///
/// Clients may not extend, implement or mix-in this class directly.
abstract class DbVersionDelegate {
const DbVersionDelegate();
}
/// A database that doesn't support setting schema versions.
class NoVersionDelegate extends DbVersionDelegate {
const NoVersionDelegate();
}
/// A database that only support setting the schema version while being opened.
class OnOpenVersionDelegate extends DbVersionDelegate {
/// Function that returns with the current schema version.
final Future<int> Function() loadSchemaVersion;
const OnOpenVersionDelegate(this.loadSchemaVersion);
}
/// A database that supports setting the schema version at any time.
abstract class DynamicVersionDelegate extends DbVersionDelegate {
const DynamicVersionDelegate();
/// Load the current schema version stored in this database.
Future<int> get schemaVersion;
/// Writes the schema [version] to the database.
Future<void> setSchemaVersion(int version);
}

View File

@ -0,0 +1,277 @@
import 'dart:async';
import 'package:moor/moor.dart';
import 'package:pedantic/pedantic.dart';
import 'package:synchronized/synchronized.dart';
import 'delegates.dart';
mixin _ExecutorWithQueryDelegate on QueryExecutor {
QueryDelegate get impl;
bool get isSequential => false;
bool get logStatements => false;
final Lock _lock = Lock();
Future<T> _synchronized<T>(FutureOr<T> Function() action) async {
if (isSequential) {
return await _lock.synchronized(action);
} else {
// support multiple operations in parallel, so just run right away
return await action();
}
}
void _log(String sql, List<dynamic> args) {
if (logStatements) {
print('Moor: Sent $sql with args $args');
}
}
@override
Future<List<Map<String, dynamic>>> runSelect(
String statement, List args) async {
final result = await _synchronized(() {
_log(statement, args);
return impl.runSelect(statement, args);
});
return result.asMap.toList();
}
@override
Future<int> runUpdate(String statement, List args) {
return _synchronized(() {
_log(statement, args);
return impl.runUpdate(statement, args);
});
}
@override
Future<int> runDelete(String statement, List args) {
return _synchronized(() {
_log(statement, args);
return impl.runUpdate(statement, args);
});
}
@override
Future<int> runInsert(String statement, List args) {
return _synchronized(() {
_log(statement, args);
return impl.runInsert(statement, args);
});
}
@override
Future<void> runCustom(String statement) {
return _synchronized(() {
_log(statement, const []);
return impl.runCustom(statement, const []);
});
}
@override
Future<void> runBatched(List<BatchedStatement> statements) {
return _synchronized(() {
if (logStatements) {
print('Moor: Executing $statements in a batch');
}
return impl.runBatched(statements);
});
}
}
class _TransactionExecutor extends TransactionExecutor
with _ExecutorWithQueryDelegate {
final DelegatedDatabase _db;
@override
QueryDelegate impl;
@override
bool get isSequential => _db.isSequential;
@override
bool get logStatements => _db.logStatements;
final Completer<void> _sendCalled = Completer();
Completer<bool> _openingCompleter;
String _sendOnCommit;
Future get completed => _sendCalled.future;
_TransactionExecutor(this._db);
@override
TransactionExecutor beginTransaction() {
throw Exception("Nested transactions aren't supported");
}
@override
Future<bool> ensureOpen() async {
if (_openingCompleter != null) {
return await _openingCompleter.future;
}
_openingCompleter = Completer();
final transactionManager = _db.delegate.transactionDelegate;
final transactionStarted = Completer();
if (transactionManager is NoTransactionDelegate) {
assert(
_db.isSequential,
'When using the default NoTransactionDelegate, the database must be'
'sequential.');
// run all the commands on the main database, which we block while the
// transaction is running.
unawaited(_db._synchronized(() async {
impl = _db.delegate;
await impl.runCustom(transactionManager.start, const []);
_sendOnCommit = transactionManager.commit;
transactionStarted.complete();
// release the database lock after the transaction completes
await _sendCalled.future;
}));
} else if (transactionManager is SupportedTransactionDelegate) {
transactionManager.startTransaction((transaction) async {
impl = transaction;
transactionStarted.complete();
// this callback must be running as long as the transaction, so we do
// that until send() was called.
await _sendCalled.future;
});
} else {
throw Exception('Invalid delegate: Has unknown transaction delegate');
}
await transactionStarted.future;
_openingCompleter.complete(true);
return true;
}
@override
Future<void> send() async {
if (_sendOnCommit != null) {
await impl.runCustom(_sendOnCommit, const []);
}
_sendCalled.complete();
}
}
class _BeforeOpeningExecutor extends QueryExecutor
with _ExecutorWithQueryDelegate {
final DelegatedDatabase db;
@override
QueryDelegate get impl => db.delegate;
@override
bool get isSequential => db.isSequential;
@override
bool get logStatements => db.logStatements;
_BeforeOpeningExecutor(this.db);
@override
TransactionExecutor beginTransaction() {
throw Exception(
"Transactions can't be started in the before open callback");
}
@override
Future<bool> ensureOpen() {
return Future.value(true);
}
}
class DelegatedDatabase extends QueryExecutor with _ExecutorWithQueryDelegate {
final DatabaseDelegate delegate;
Completer<bool> _openingCompleter;
@override
final bool logStatements;
@override
final bool isSequential;
@override
QueryDelegate get impl => delegate;
DelegatedDatabase(this.delegate,
{this.logStatements = false, this.isSequential = false});
@override
Future<bool> ensureOpen() async {
// if we're already opening the database or if its already open, return that
// status
if (_openingCompleter != null) {
return _openingCompleter.future;
}
final alreadyOpen = await delegate.isOpen;
if (alreadyOpen) return true;
// not already open or opening. Open the database now!
_openingCompleter = Completer();
await delegate.open(databaseInfo);
await _runMigrations();
_openingCompleter.complete(true);
_openingCompleter = null;
return true;
}
Future<void> _runMigrations() async {
final versionDelegate = delegate.versionDelegate;
int oldVersion;
final currentVersion = databaseInfo.schemaVersion;
if (versionDelegate is NoVersionDelegate) {
// this one is easy. There is no version mechanism, so we don't run any
// migrations. Assume database is on latest version.
oldVersion = databaseInfo.schemaVersion;
} else if (versionDelegate is OnOpenVersionDelegate) {
// version has already been set during open
oldVersion = await versionDelegate.loadSchemaVersion();
} else if (versionDelegate is DynamicVersionDelegate) {
// set version now
oldVersion = await versionDelegate.schemaVersion;
await versionDelegate.setSchemaVersion(currentVersion);
} else {
throw Exception('Invalid delegate: $delegate. The versionDelegate getter '
'must not subclass DBVersionDelegate directly');
}
if (oldVersion == 0) {
// some database implementations use version 0 to indicate that the
// database was just created. We normalize that to null.
oldVersion = null;
}
final dbCreated = oldVersion == null;
if (dbCreated) {
await databaseInfo.handleDatabaseCreation(executor: runCustom);
} else if (oldVersion != currentVersion) {
await databaseInfo.handleDatabaseVersionChange(
executor: runCustom, from: oldVersion, to: currentVersion);
}
await _runBeforeOpen(OpeningDetails(oldVersion, currentVersion));
}
@override
TransactionExecutor beginTransaction() {
return _TransactionExecutor(this);
}
Future<void> _runBeforeOpen(OpeningDetails d) {
return databaseInfo.beforeOpenCallback(_BeforeOpeningExecutor(this), d);
}
}

View File

@ -0,0 +1,41 @@
/// A result from an select statement.
class QueryResult {
/// Names of the columns returned by the select statement.
final List<String> columnNames;
/// The data returned by the select statement. Each list represents a row,
/// which has the data in the same order as [columnNames].
final List<List<dynamic>> rows;
Map<String, int> _columnIndexes;
QueryResult(this.columnNames, this.rows) {
_columnIndexes = {
for (var column in columnNames) column: columnNames.lastIndexOf(column)
};
}
factory QueryResult.fromRows(List<Map<String, dynamic>> rows) {
if (rows.isEmpty) {
return QueryResult(const [], const []);
}
final keys = rows.first.keys.toList();
final mappedRows = [
for (var row in rows) [for (var key in keys) row[key]]
];
return QueryResult(keys, mappedRows);
}
/// Returns a "list of maps" representation of this result set. Each map has
/// the same keys - the [columnNames]. The values are the actual values in
/// the row.
Iterable<Map<String, dynamic>> get asMap {
return rows.map((row) {
return {
for (var column in columnNames) column: row[_columnIndexes[column]],
};
});
}
}

View File

@ -15,6 +15,7 @@ dependencies:
meta: '>= 1.0.0 <2.0.0'
collection: '>= 1.0.0 <2.0.0'
synchronized: ^2.1.0
pedantic: any
dev_dependencies:
moor_generator: ^1.6.0

View File

@ -8,55 +8,82 @@ import 'dart:async';
import 'package:meta/meta.dart';
import 'package:path/path.dart';
import 'package:moor/moor.dart';
import 'package:moor/backends.dart';
import 'package:sqflite/sqflite.dart' as s;
export 'package:moor_flutter/src/animated_list.dart';
export 'package:moor/moor.dart';
abstract class _DatabaseOwner extends QueryExecutor {
_DatabaseOwner(this.logStatements);
class _SqfliteDelegate extends DatabaseDelegate with _SqfliteExecutor {
int _loadedSchemaVersion;
@override
s.Database db;
@visibleForOverriding
s.DatabaseExecutor get db;
final bool inDbFolder;
final String path;
final bool logStatements;
_SqfliteDelegate(this.inDbFolder, this.path);
void _log(String sql, [List args]) {
if (logStatements == true) {
final formattedArgs = (args?.isEmpty ?? true) ? ' no variables' : args;
print('moor: $sql with $formattedArgs');
@override
DbVersionDelegate get versionDelegate {
return OnOpenVersionDelegate(() => Future.value(_loadedSchemaVersion));
}
@override
TransactionDelegate get transactionDelegate =>
_SqfliteTransactionDelegate(this);
@override
Future<bool> get isOpen => Future.value(db != null);
@override
Future<void> open([GeneratedDatabase db]) async {
String resolvedPath;
if (inDbFolder) {
resolvedPath = join(await s.getDatabasesPath(), path);
} else {
resolvedPath = path;
}
// default value when no migration happened
_loadedSchemaVersion = db.schemaVersion;
this.db = await s.openDatabase(
resolvedPath,
version: db.schemaVersion,
onCreate: (db, version) {
_loadedSchemaVersion = 0;
},
onUpgrade: (db, from, to) {
_loadedSchemaVersion = from;
},
);
}
}
class _SqfliteTransactionDelegate extends SupportedTransactionDelegate {
final _SqfliteDelegate delegate;
_SqfliteTransactionDelegate(this.delegate);
@override
Future<int> runDelete(String statement, List args) {
_log(statement, args);
return db.rawDelete(statement, args);
void startTransaction(Future<void> Function(QueryDelegate) run) {
delegate.db.transaction((transaction) async {
final executor = _SqfliteTransactionExecutor(transaction);
await run(executor);
});
}
}
class _SqfliteTransactionExecutor extends QueryDelegate with _SqfliteExecutor {
@override
Future<int> runInsert(String statement, List args) {
_log(statement, args);
return db.rawInsert(statement, args);
}
final s.DatabaseExecutor db;
@override
Future<List<Map<String, dynamic>>> runSelect(String statement, List args) {
_log(statement, args);
return db.rawQuery(statement, args);
}
_SqfliteTransactionExecutor(this.db);
}
@override
Future<int> runUpdate(String statement, List args) {
_log(statement, args);
return db.rawUpdate(statement, args);
}
@override
Future<void> runCustom(String statement) {
_log(statement, null);
return db.execute(statement);
}
mixin _SqfliteExecutor on QueryDelegate {
s.DatabaseExecutor get db;
@override
Future<void> runBatched(List<BatchedStatement> statements) async {
@ -64,163 +91,41 @@ abstract class _DatabaseOwner extends QueryExecutor {
for (var statement in statements) {
for (var boundVariables in statement.variables) {
_log(statement.sql, boundVariables);
batch.execute(statement.sql, boundVariables);
}
}
await batch.commit(noResult: true);
}
@override
Future<void> runCustom(String statement, List args) {
return db.execute(statement);
}
@override
Future<int> runInsert(String statement, List args) {
return db.rawInsert(statement, args);
}
@override
Future<QueryResult> runSelect(String statement, List args) async {
final result = await db.rawQuery(statement, args);
return QueryResult.fromRows(result);
}
@override
Future<int> runUpdate(String statement, List args) {
return db.rawUpdate(statement, args);
}
}
/// A query executor that uses sqflite internally.
class FlutterQueryExecutor extends _DatabaseOwner {
final bool _inDbPath;
final String path;
@override
s.Database db;
Completer<void> _openingCompleter;
bool _hadMigration = false;
int _versionBefore;
FlutterQueryExecutor({@required this.path, bool logStatements})
: _inDbPath = false,
super(logStatements);
class FlutterQueryExecutor extends DelegatedDatabase {
FlutterQueryExecutor({@required String path, bool logStatements})
: super(_SqfliteDelegate(false, path), logStatements: logStatements);
FlutterQueryExecutor.inDatabaseFolder(
{@required this.path, bool logStatements})
: _inDbPath = true,
super(logStatements);
@override
Future<bool> ensureOpen() async {
// mechanism to ensure that _openDatabase is only called once, even if we
// have many queries calling ensureOpen() repeatedly. _openingCompleter is
// set if we're currently in the process of opening the database.
if (_openingCompleter != null) {
// already opening, wait for that to finish and don't open the database
// again
await _openingCompleter.future;
return true;
}
if (db != null && db.isOpen) {
// database is opened and ready
return true;
}
// alright, opening the database
_openingCompleter = Completer();
await _openDatabase();
_openingCompleter.complete();
return true;
}
Future _openDatabase() async {
String resolvedPath;
if (_inDbPath) {
resolvedPath = join(await s.getDatabasesPath(), path);
} else {
resolvedPath = path;
}
db = await s.openDatabase(resolvedPath, version: databaseInfo.schemaVersion,
onCreate: (db, version) {
_hadMigration = true;
return databaseInfo.handleDatabaseCreation(
executor: _migrationExecutor(db),
);
}, onUpgrade: (db, from, to) {
_hadMigration = true;
_versionBefore = from;
return databaseInfo.handleDatabaseVersionChange(
executor: _migrationExecutor(db), from: from, to: to);
}, onOpen: (db) async {
final versionNow = await db.getVersion();
final resolvedPrevious = _hadMigration ? _versionBefore : versionNow;
final details = OpeningDetails(resolvedPrevious, versionNow);
await databaseInfo.beforeOpenCallback(
_BeforeOpenExecutor(db, logStatements), details);
});
}
SqlExecutor _migrationExecutor(s.Database db) {
return (sql) {
_log(sql);
return db.execute(sql);
};
}
@override
TransactionExecutor beginTransaction() {
return _SqfliteTransactionExecutor.startFromDb(this);
}
}
class _SqfliteTransactionExecutor extends _DatabaseOwner
implements TransactionExecutor {
@override
s.Transaction db;
/// This future should complete with the transaction once the transaction has
/// been created.
final Future<s.Transaction> _open;
// This completer will complete when send() is called. We use it because
// sqflite expects a future in the db.transaction() method. The transaction
// will be executed when that future completes.
final Completer _actionCompleter;
/// This future should complete when the call to db.transaction completes.
final Future _sendFuture;
_SqfliteTransactionExecutor(
this._open, this._actionCompleter, this._sendFuture, bool logStatements)
: super(logStatements) {
_open.then((transaction) => db = transaction);
}
factory _SqfliteTransactionExecutor.startFromDb(FlutterQueryExecutor db) {
final actionCompleter = Completer();
final openingCompleter = Completer<s.Transaction>();
final sendFuture = db.db.transaction((t) {
openingCompleter.complete(t);
return actionCompleter.future;
});
return _SqfliteTransactionExecutor(
openingCompleter.future, actionCompleter, sendFuture, db.logStatements);
}
@override
TransactionExecutor beginTransaction() {
throw StateError('Transactions cannot create another transaction!');
}
@override
Future<bool> ensureOpen() => _open.then((_) => true);
@override
Future<void> send() {
_actionCompleter.complete(null);
return _sendFuture;
}
}
class _BeforeOpenExecutor extends _DatabaseOwner {
@override
final s.DatabaseExecutor db;
_BeforeOpenExecutor(this.db, bool logStatements) : super(logStatements);
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError(
"Transactions can't be started in the befoeOpen callback");
}
@override
Future<bool> ensureOpen() => Future.value(true);
{@required String path, bool logStatements})
: super(_SqfliteDelegate(true, path), logStatements: logStatements);
}