Remove coupling between QueryExecutor and generated db

Closes #372
This commit is contained in:
Simon Binder 2020-03-15 14:55:02 +01:00
parent 4ff3d438f8
commit 60d3bf05e1
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
38 changed files with 349 additions and 494 deletions

View File

@ -47,7 +47,7 @@ class _SqfliteDelegate extends DatabaseDelegate with _SqfliteExecutor {
bool get isOpen => db != null;
@override
Future<void> open([GeneratedDatabase db]) async {
Future<void> open(QueryExecutorUser user) async {
String resolvedPath;
if (inDbFolder) {
resolvedPath = join(await s.getDatabasesPath(), path);
@ -61,11 +61,11 @@ class _SqfliteDelegate extends DatabaseDelegate with _SqfliteExecutor {
}
// default value when no migration happened
_loadedSchemaVersion = db.schemaVersion;
_loadedSchemaVersion = user.schemaVersion;
this.db = await s.openDatabase(
db = await s.openDatabase(
resolvedPath,
version: db.schemaVersion,
version: user.schemaVersion,
password: password,
onCreate: (db, version) {
_loadedSchemaVersion = 0;

View File

@ -44,7 +44,6 @@ Future<void> main() async {
await file.delete();
}
var didCallCreator = false;
final executor = FlutterQueryExecutor.inDatabaseFolder(
path: dbNameInDevice,
@ -56,7 +55,7 @@ Future<void> main() async {
},
);
final database = Database(executor);
await database.executor.ensureOpen();
await database.executor.ensureOpen(database);
expect(didCallCreator, isTrue);
});

View File

@ -33,11 +33,11 @@ void migrationTests(TestExecutor executor) {
test('runs the migrator when downgrading', () async {
var database = Database(executor.createExecutor(), schemaVersion: 2);
await database.executor.ensureOpen(); // Create the database
await database.executor.ensureOpen(database); // Create the database
await database.close();
database = Database(executor.createExecutor(), schemaVersion: 1);
await database.executor.ensureOpen(); // Let the migrator run
await database.executor.ensureOpen(database); // Let the migrator run
expect(database.schemaVersionChangedFrom, 2);
expect(database.schemaVersionChangedTo, 1);

View File

@ -194,23 +194,23 @@ void main() {
Future<void> _testWith(MoorWebStorage storage) async {
var didCallInitializer = false;
final db = WebDatabase.withStorage(storage, initializer: () async {
final executor = WebDatabase.withStorage(storage, initializer: () async {
didCallInitializer = true;
return base64.decode(_rawDataBase64.replaceAll('\n', ''));
});
moorRuntimeOptions.dontWarnAboutMultipleDatabases = true;
db.databaseInfo = _FakeDatabase(db);
final attachedDb = _FakeDatabase(executor);
await db.ensureOpen();
await executor.ensureOpen(attachedDb);
expect(didCallInitializer, isTrue);
final result = await db.runSelect('SELECT * FROM foo', const []);
final result = await executor.runSelect('SELECT * FROM foo', const []);
expect(result, [
{'name': 'hello world'}
]);
await db.close();
await executor.close();
}
class _FakeDatabase extends GeneratedDatabase {

View File

@ -75,7 +75,7 @@ class _MySqlDelegate extends DatabaseDelegate with _MySqlExecutor {
SqlDialect get dialect => SqlDialect.mysql;
@override
Future<void> open([GeneratedDatabase db]) async {
Future<void> open(_) async {
_connection = await MySqlConnection.connect(_settings);
}

View File

@ -122,14 +122,17 @@ class Batch {
}
Future<void> _commit() async {
await _engine.executor.ensureOpen();
await _engine.executor.ensureOpen(_engine.attachedDatabase);
if (_startTransaction) {
TransactionExecutor transaction;
try {
transaction = _engine.executor.beginTransaction();
await transaction.doWhenOpened(_runWith);
await transaction.ensureOpen(null);
await _runWith(transaction);
await transaction.send();
} catch (e) {
await transaction.rollback();

View File

@ -11,7 +11,8 @@ Map<Type, int> _openedDbCount = {};
/// A base class for all generated databases.
abstract class GeneratedDatabase extends DatabaseConnectionUser
with QueryEngine {
with QueryEngine
implements QueryExecutorUser {
@override
bool get topLevel => true;
@ -20,6 +21,7 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
/// Specify the schema version of your database. Whenever you change or add
/// tables, you should bump this field and provide a [migration] strategy.
@override
int get schemaVersion;
/// Defines the migration strategy that will determine how to deal with an
@ -58,14 +60,12 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
GeneratedDatabase(SqlTypeSystem types, QueryExecutor executor,
{StreamQueryStore streamStore})
: super(types, executor, streamQueries: streamStore) {
executor?.databaseInfo = this;
assert(_handleInstantiated());
}
/// Used by generated code to connect to a database that is already open.
GeneratedDatabase.connect(DatabaseConnection connection)
: super.fromConnection(connection) {
connection?.executor?.databaseInfo = this;
assert(_handleInstantiated());
}
@ -98,46 +98,31 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
/// Creates a [Migrator] with the provided query executor. Migrators generate
/// sql statements to create or drop tables.
///
/// This api is mainly used internally in moor, for instance in
/// [handleDatabaseCreation] and [handleDatabaseVersionChange]. However, it
/// can also be used if you need to create tables manually and outside of a
/// [MigrationStrategy]. For almost all use cases, overriding [migration]
/// should suffice.
/// This api is mainly used internally in moor, especially to implement the
/// [beforeOpen] callback from the database site.
/// However, it can also be used if yuo need to create tables manually and
/// outside of a [MigrationStrategy]. For almost all use cases, overriding
/// [migration] should suffice.
@protected
Migrator createMigrator([SqlExecutor executor]) {
final actualExecutor = executor ?? customStatement;
return Migrator(this, actualExecutor);
@visibleForTesting
Migrator createMigrator() {
return Migrator(this, _resolvedEngine);
}
/// Handles database creation by delegating the work to the [migration]
/// strategy. This method should not be called by users.
Future<void> handleDatabaseCreation({@required SqlExecutor executor}) {
final migrator = createMigrator(executor);
return _resolvedMigration.onCreate(migrator);
@override
Future<void> beforeOpen(QueryExecutor executor, OpeningDetails details) {
return _runEngineZoned(BeforeOpenRunner(this, executor), () async {
if (details.wasCreated) {
final migrator = createMigrator();
await _resolvedMigration.onCreate(migrator);
} else if (details.hadUpgrade) {
final migrator = createMigrator();
await _resolvedMigration.onUpgrade(
migrator, details.versionBefore, details.versionNow);
}
/// Handles database updates by delegating the work to the [migration]
/// strategy. This method should not be called by users.
Future<void> handleDatabaseVersionChange(
{@required SqlExecutor executor, int from, int to}) {
final migrator = createMigrator(executor);
return _resolvedMigration.onUpgrade(migrator, from, to);
}
/// Handles the before opening callback as set in the [migration]. This method
/// is used internally by database implementations and should not be called by
/// users.
Future<void> beforeOpenCallback(
QueryExecutor executor, OpeningDetails details) {
final migration = _resolvedMigration;
if (migration.beforeOpen != null) {
return _runEngineZoned(
BeforeOpenRunner(this, executor),
() => migration.beforeOpen(details),
);
}
return Future.value();
await _resolvedMigration.beforeOpen?.call(details);
});
}
/// Closes this database and releases associated resources.

View File

@ -89,6 +89,15 @@ mixin QueryEngine on DatabaseConnectionUser {
});
}
/// Performs the async [fn] after this executor is ready, or directly if it's
/// already ready.
///
/// Calling this method directly might circumvent the current transaction. For
/// that reason, it should only be called inside moor.
Future<T> doWhenOpened<T>(FutureOr<T> Function(QueryExecutor e) fn) {
return executor.ensureOpen(attachedDatabase).then((_) => fn(executor));
}
/// Starts an [InsertStatement] for a given table. You can use that statement
/// to write data into the [table] by using [InsertStatement.insert].
@protected
@ -247,13 +256,12 @@ mixin QueryEngine on DatabaseConnectionUser {
_CustomWriter<T> writer,
) async {
final engine = _resolvedEngine;
final executor = engine.executor;
final ctx = GenerationContext.fromDb(engine);
final mappedArgs = variables.map((v) => v.mapToSimpleValue(ctx)).toList();
final result =
await executor.doWhenOpened((e) => writer(e, query, mappedArgs));
await engine.doWhenOpened((e) => writer(e, query, mappedArgs));
if (updates != null) {
engine.notifyUpdates({
@ -305,7 +313,7 @@ mixin QueryEngine on DatabaseConnectionUser {
Future<void> customStatement(String statement, [List<dynamic> args]) {
final engine = _resolvedEngine;
return engine.executor.doWhenOpened((executor) {
return engine.doWhenOpened((executor) {
return executor.runCustom(statement, args);
});
}
@ -340,8 +348,7 @@ mixin QueryEngine on DatabaseConnectionUser {
return action();
}
final executor = resolved.executor;
return await executor.doWhenOpened((executor) {
return await resolved.doWhenOpened((executor) {
final transactionExecutor = executor.beginTransaction();
final transaction = Transaction(this, transactionExecutor);

View File

@ -24,19 +24,11 @@ class _MultiExecutorImpl extends MultiExecutor {
_MultiExecutorImpl(this._reads, this._writes) : super._();
@override
set databaseInfo(GeneratedDatabase database) {
super.databaseInfo = database;
_writes.databaseInfo = database;
_reads.databaseInfo = _NoMigrationsWrapper(database);
}
@override
Future<bool> ensureOpen() async {
Future<bool> ensureOpen(QueryExecutorUser user) async {
// note: It's crucial that we open the writes first. The reading connection
// doesn't run migrations, but has to set the user version.
await _writes.ensureOpen();
await _reads.ensureOpen();
await _writes.ensureOpen(user);
await _reads.ensureOpen(_NoMigrationsWrapper(user));
return true;
}
@ -84,30 +76,17 @@ class _MultiExecutorImpl extends MultiExecutor {
}
}
// query executors are responsible for starting the migration process on
// a database after they open. We don't want to run migrations twice, so
// we give the reading executor a database handle that doesn't do any
// migrations.
class _NoMigrationsWrapper extends GeneratedDatabase {
final GeneratedDatabase _inner;
class _NoMigrationsWrapper extends QueryExecutorUser {
final QueryExecutorUser inner;
_NoMigrationsWrapper(this._inner)
: super(const SqlTypeSystem.withDefaults(), null);
_NoMigrationsWrapper(this.inner);
@override
Iterable<TableInfo<Table, DataClass>> get allTables => const [];
int get schemaVersion => inner.schemaVersion;
@override
int get schemaVersion => _inner.schemaVersion;
@override
Future<void> handleDatabaseCreation({@required SqlExecutor executor}) async {}
@override
Future<void> handleDatabaseVersionChange(
{@required SqlExecutor executor, int from, int to}) async {}
@override
Future<void> beforeOpenCallback(
QueryExecutor executor, OpeningDetails details) async {}
Future<void> beforeOpen(
QueryExecutor executor, OpeningDetails details) async {
// don't run any migrations
}
}

View File

@ -2,7 +2,7 @@ import 'dart:async';
import 'package:collection/collection.dart';
import 'package:moor/backends.dart';
import 'package:moor/moor.dart' show GeneratedDatabase;
import 'package:moor/moor.dart' show OpeningDetails;
import 'package:moor/src/utils/hash.dart';
/// A query executor is responsible for executing statements on a database and
@ -15,22 +15,11 @@ import 'package:moor/src/utils/hash.dart';
/// engine to use with moor and run into issues, please consider creating an
/// issue.
abstract class QueryExecutor {
/// The higher-level database class attached to this executor. This
/// information can be used to read the [GeneratedDatabase.schemaVersion] when
/// opening the database.
GeneratedDatabase databaseInfo;
/// The [SqlDialect] to use for this database engine.
SqlDialect get dialect => SqlDialect.sqlite;
/// Performs the async [fn] after this executor is ready, or directly if it's
/// already ready.
Future<T> doWhenOpened<T>(FutureOr<T> Function(QueryExecutor e) fn) {
return ensureOpen().then((_) => fn(this));
}
/// Opens the executor, if it has not yet been opened.
Future<bool> ensureOpen();
Future<bool> ensureOpen(QueryExecutorUser user);
/// Runs a select statement with the given variables and returns the raw
/// results.
@ -67,6 +56,23 @@ abstract class QueryExecutor {
}
}
/// Callbacks passed to [QueryExecutor.ensureOpen] to run schema migrations when
/// the database is first opened.
abstract class QueryExecutorUser {
/// The schema version to set on the database when it's opened.
int get schemaVersion;
/// A callbacks that runs after the database connection has been established,
/// but before any other query is sent.
///
/// The query executor will wait for this future to complete before running
/// any other query. Queries running on the [executor] are an exception to
/// this, they can be used to run migrations.
/// No matter how often [QueryExecutor.ensureOpen] is called, this method will
/// not be called more than once.
Future<void> beforeOpen(QueryExecutor executor, OpeningDetails details);
}
/// A statement that should be executed in a batch. Used internally by moor.
class BatchedStatement {
static const _nestedListEquality = ListEquality(ListEquality());

View File

@ -46,11 +46,11 @@ abstract class DatabaseDelegate implements QueryDelegate {
/// times, so you don't have to worry about a connection being created
/// multiple times.
///
/// The [GeneratedDatabase] is the user-defined database annotated with
/// The [QueryExecutorUser] is the user-defined database annotated with
/// [UseMoor]. It might be useful to read the
/// [GeneratedDatabase.schemaVersion] if that information is required while
/// [QueryExecutorUser.schemaVersion] if that information is required while
/// opening the database.
Future<void> open([GeneratedDatabase db]);
Future<void> open(QueryExecutorUser db);
/// Closes this database. When the future completes, all resources used
/// by this database should have been disposed.

View File

@ -122,7 +122,7 @@ class _TransactionExecutor extends TransactionExecutor
}
@override
Future<bool> ensureOpen() async {
Future<bool> ensureOpen(_) async {
_ensureOpenCalled = true;
if (_openingCompleter != null) {
return await _openingCompleter.future;
@ -233,7 +233,7 @@ class DelegatedDatabase extends QueryExecutor with _ExecutorWithQueryDelegate {
}
@override
Future<bool> ensureOpen() {
Future<bool> ensureOpen(QueryExecutorUser user) {
_ensureOpenCalled = true;
return _openingLock.synchronized(() async {
final alreadyOpen = await delegate.isOpen;
@ -241,23 +241,21 @@ class DelegatedDatabase extends QueryExecutor with _ExecutorWithQueryDelegate {
return true;
}
assert(databaseInfo != null,
'A databaseInfo needs to be set to use a QueryExeuctor');
await delegate.open(databaseInfo);
await _runMigrations();
await delegate.open(user);
await _runMigrations(user);
return true;
});
}
Future<void> _runMigrations() async {
Future<void> _runMigrations(QueryExecutorUser user) async {
final versionDelegate = delegate.versionDelegate;
int oldVersion;
final currentVersion = databaseInfo.schemaVersion;
final currentVersion = user.schemaVersion;
if (versionDelegate is NoVersionDelegate) {
// this one is easy. There is no version mechanism, so we don't run any
// migrations. Assume database is on latest version.
oldVersion = databaseInfo.schemaVersion;
oldVersion = user.schemaVersion;
} else if (versionDelegate is OnOpenVersionDelegate) {
// version has already been set during open
oldVersion = await versionDelegate.loadSchemaVersion();
@ -276,17 +274,9 @@ class DelegatedDatabase extends QueryExecutor with _ExecutorWithQueryDelegate {
oldVersion = null;
}
final dbCreated = oldVersion == null;
if (dbCreated) {
await databaseInfo.handleDatabaseCreation(executor: runCustom);
} else if (oldVersion != currentVersion) {
await databaseInfo.handleDatabaseVersionChange(
executor: runCustom, from: oldVersion, to: currentVersion);
}
final openingDetails = OpeningDetails(oldVersion, currentVersion);
await _runBeforeOpen(openingDetails);
await user.beforeOpen(_BeforeOpeningExecutor(this), openingDetails);
delegate.notifyDatabaseOpened(openingDetails);
}
@ -295,10 +285,6 @@ class DelegatedDatabase extends QueryExecutor with _ExecutorWithQueryDelegate {
return _TransactionExecutor(this);
}
Future<void> _runBeforeOpen(OpeningDetails d) {
return databaseInfo.beforeOpenCallback(_BeforeOpeningExecutor(this), d);
}
@override
Future<void> close() {
return delegate.close();
@ -320,7 +306,7 @@ class _BeforeOpeningExecutor extends QueryExecutor
TransactionExecutor beginTransaction() => _base.beginTransaction();
@override
Future<bool> ensureOpen() {
Future<bool> ensureOpen(_) {
_ensureOpenCalled = true;
return Future.value(true);
}

View File

@ -7,9 +7,7 @@ class _MoorClient {
DatabaseConnection _connection;
GeneratedDatabase get connectedDb => _connection.executor.databaseInfo;
SqlExecutor get executor => _connection.executor.runCustom;
QueryExecutorUser _connectedDb;
_MoorClient(this._channel, this.typeSystem) {
_streamStore = _IsolateStreamQueryStore(this);
@ -35,22 +33,9 @@ class _MoorClient {
dynamic _handleRequest(Request request) {
final payload = request.payload;
if (payload is _NoArgsRequest) {
switch (payload) {
case _NoArgsRequest.runOnCreate:
return connectedDb.handleDatabaseCreation(executor: executor);
default:
throw UnsupportedError('This operation must be run on the server');
}
} else if (payload is _RunOnUpgrade) {
return connectedDb.handleDatabaseVersionChange(
executor: executor,
from: payload.versionBefore,
to: payload.versionNow,
);
} else if (payload is _RunBeforeOpen) {
return connectedDb.beforeOpenCallback(
_connection.executor, payload.details);
if (payload is _RunBeforeOpen) {
final executor = _IsolateQueryExecutor(this, payload.createdExecutor);
return _connectedDb.beforeOpen(executor, payload.details);
} else if (payload is _NotifyTablesUpdated) {
_streamStore.handleTableUpdates(payload.updates.toSet(), true);
}
@ -59,19 +44,19 @@ class _MoorClient {
abstract class _BaseExecutor extends QueryExecutor {
final _MoorClient client;
int _transactionId;
int _executorId;
_BaseExecutor(this.client);
_BaseExecutor(this.client, [this._executorId]);
@override
Future<void> runBatched(List<BatchedStatement> statements) {
return client._channel
.request(_ExecuteBatchedStatement(statements, _transactionId));
.request(_ExecuteBatchedStatement(statements, _executorId));
}
Future<T> _runRequest<T>(_StatementMethod method, String sql, List args) {
return client._channel
.request<T>(_ExecuteQuery(method, sql, args, _transactionId));
.request<T>(_ExecuteQuery(method, sql, args, _executorId));
}
@override
@ -105,31 +90,25 @@ abstract class _BaseExecutor extends QueryExecutor {
}
class _IsolateQueryExecutor extends _BaseExecutor {
_IsolateQueryExecutor(_MoorClient client) : super(client);
_IsolateQueryExecutor(_MoorClient client, [int executorId])
: super(client, executorId);
Completer<void> _setSchemaVersion;
@override
set databaseInfo(GeneratedDatabase db) {
super.databaseInfo = db;
_setSchemaVersion = Completer();
_setSchemaVersion
.complete(client._channel.request(_SetSchemaVersion(db.schemaVersion)));
}
@override
TransactionExecutor beginTransaction() {
return _TransactionIsolateExecutor(client);
}
@override
Future<bool> ensureOpen() async {
Future<bool> ensureOpen(QueryExecutorUser user) async {
client._connectedDb = user;
if (_setSchemaVersion != null) {
await _setSchemaVersion.future;
_setSchemaVersion = null;
}
return client._channel.request<bool>(_NoArgsRequest.ensureOpen);
return client._channel
.request<bool>(_EnsureOpen(user.schemaVersion, _executorId));
}
@override
@ -153,20 +132,19 @@ class _TransactionIsolateExecutor extends _BaseExecutor
TransactionExecutor beginTransaction() => null;
@override
Future<bool> ensureOpen() {
Future<bool> ensureOpen(_) {
_pendingOpen ??= Completer()..complete(_openAtServer());
return _pendingOpen.future;
}
Future<bool> _openAtServer() async {
_transactionId =
_executorId =
await client._channel.request(_NoArgsRequest.startTransaction) as int;
return true;
}
Future<void> _sendAction(_TransactionControl action) {
return client._channel
.request(_RunTransactionAction(action, _transactionId));
return client._channel.request(_RunTransactionAction(action, _executorId));
}
@override

View File

@ -6,17 +6,9 @@ enum _NoArgsRequest {
/// [SqlTypeSystem] of the [_MoorServer.connection] it's managing.
getTypeSystem,
/// Sent from the client to the server. The server will reply with
/// [QueryExecutor.ensureOpen], based on the [_MoorServer.connection].
ensureOpen,
/// Sent from the server to a client. The client should run the on create
/// method of the attached database
runOnCreate,
/// Sent from the client to start a transaction. The server must reply with an
/// integer, which serves as an identifier for the transaction in
/// [_ExecuteQuery.transactionId].
/// [_ExecuteQuery.executorId].
startTransaction,
/// Close the background isolate, disconnect all clients, release all
@ -42,14 +34,14 @@ class _ExecuteQuery {
final _StatementMethod method;
final String sql;
final List<dynamic> args;
final int transactionId;
final int executorId;
_ExecuteQuery(this.method, this.sql, this.args, [this.transactionId]);
_ExecuteQuery(this.method, this.sql, this.args, [this.executorId]);
@override
String toString() {
if (transactionId != null) {
return '$method: $sql with $args (@$transactionId)';
if (executorId != null) {
return '$method: $sql with $args (@$executorId)';
}
return '$method: $sql with $args';
}
@ -58,9 +50,9 @@ class _ExecuteQuery {
/// Sent from the client to run a list of [BatchedStatement]s.
class _ExecuteBatchedStatement {
final List<BatchedStatement> stmts;
final int transactionId;
final int executorId;
_ExecuteBatchedStatement(this.stmts, [this.transactionId]);
_ExecuteBatchedStatement(this.stmts, [this.executorId]);
}
/// Sent from the client to commit or rollback a transaction
@ -71,29 +63,22 @@ class _RunTransactionAction {
_RunTransactionAction(this.control, this.transactionId);
}
/// Sent from the client to notify the server of the
/// [GeneratedDatabase.schemaVersion] used by the attached database.
class _SetSchemaVersion {
/// Sent from the client to the server. The server should open the underlying
/// database connection, using the [schemaVersion].
class _EnsureOpen {
final int schemaVersion;
final int executorId;
_SetSchemaVersion(this.schemaVersion);
}
/// Sent from the server to the client. The client should run a database upgrade
/// migration.
class _RunOnUpgrade {
final int versionBefore;
final int versionNow;
_RunOnUpgrade(this.versionBefore, this.versionNow);
_EnsureOpen(this.schemaVersion, this.executorId);
}
/// Sent from the server to the client when it should run the before open
/// callback.
class _RunBeforeOpen {
final OpeningDetails details;
final int createdExecutor;
_RunBeforeOpen(this.details);
_RunBeforeOpen(this.details, this.createdExecutor);
}
/// Sent to notify that a previous query has updated some tables. When a server

View File

@ -5,8 +5,8 @@ class _MoorServer {
DatabaseConnection connection;
final Map<int, TransactionExecutor> _transactions = {};
int _currentTransaction = 0;
final Map<int, QueryExecutor> _managedExecutors = {};
int _currentExecutorId = 0;
/// when a transaction is active, all queries that don't operate on another
/// query executor have to wait!
@ -15,11 +15,11 @@ class _MoorServer {
/// first transaction id in the backlog is active at the moment. Whenever a
/// transaction completes, we emit an item on [_backlogUpdated]. This can be
/// used to implement a lock.
final List<int> _transactionBacklog = [];
final List<int> _executorBacklog = [];
final StreamController<void> _backlogUpdated =
StreamController.broadcast(sync: true);
_FakeDatabase _fakeDb;
_IsolateDelegatedUser _dbUser;
ServerKey get key => server.key;
@ -28,9 +28,7 @@ class _MoorServer {
connection.setRequestHandler(_handleRequest);
});
connection = opener();
_fakeDb = _FakeDatabase(connection, this);
connection.executor.databaseInfo = _fakeDb;
_dbUser = _IsolateDelegatedUser(this);
}
/// Returns the first connected client, or null if no client is connected.
@ -46,8 +44,6 @@ class _MoorServer {
switch (payload) {
case _NoArgsRequest.getTypeSystem:
return connection.typeSystem;
case _NoArgsRequest.ensureOpen:
return connection.executor.ensureOpen();
case _NoArgsRequest.startTransaction:
return _spawnTransaction();
case _NoArgsRequest.terminateAll:
@ -56,19 +52,14 @@ class _MoorServer {
server.close();
Isolate.current.kill();
break;
// the following are requests which are handled on the client side
case _NoArgsRequest.runOnCreate:
throw UnsupportedError(
'This operation needs to be run on the client');
}
} else if (payload is _SetSchemaVersion) {
_fakeDb.schemaVersion = payload.schemaVersion;
return null;
} else if (payload is _EnsureOpen) {
return _handleEnsureOpen(payload);
} else if (payload is _ExecuteQuery) {
return _runQuery(
payload.method, payload.sql, payload.args, payload.transactionId);
payload.method, payload.sql, payload.args, payload.executorId);
} else if (payload is _ExecuteBatchedStatement) {
return _runBatched(payload.stmts, payload.transactionId);
return _runBatched(payload.stmts, payload.executorId);
} else if (payload is _NotifyTablesUpdated) {
for (final connected in server.currentChannels) {
connected.request(payload);
@ -78,6 +69,13 @@ class _MoorServer {
}
}
Future<bool> _handleEnsureOpen(_EnsureOpen open) async {
_dbUser.schemaVersion = open.schemaVersion;
final executor = await _loadExecutor(open.executorId);
return await executor.ensureOpen(_dbUser);
}
Future<dynamic> _runQuery(
_StatementMethod method, String sql, List args, int transactionId) async {
final executor = await _loadExecutor(transactionId);
@ -105,23 +103,34 @@ class _MoorServer {
Future<QueryExecutor> _loadExecutor(int transactionId) async {
await _waitForTurn(transactionId);
return transactionId != null
? _transactions[transactionId]
? _managedExecutors[transactionId]
: connection.executor;
}
Future<int> _spawnTransaction() async {
final id = _currentTransaction++;
final transaction = connection.executor.beginTransaction();
final id = _putExecutor(transaction);
_transactions[id] = transaction;
_transactionBacklog.add(id);
await transaction.ensureOpen();
await transaction.ensureOpen(_dbUser);
return id;
}
int _putExecutor(QueryExecutor executor) {
final id = _currentExecutorId++;
_managedExecutors[id] = executor;
_executorBacklog.add(id);
return id;
}
Future<void> _transactionControl(
_TransactionControl action, int transactionId) async {
final transaction = _transactions[transactionId];
final executor = _managedExecutors[transactionId];
if (executor is! TransactionExecutor) {
throw ArgumentError.value(
transactionId, 'transactionId', 'Does not reference a transaction');
}
final transaction = executor as TransactionExecutor;
try {
switch (action) {
@ -133,19 +142,23 @@ class _MoorServer {
break;
}
} finally {
_transactions.remove(transactionId);
_transactionBacklog.remove(transactionId);
_notifyTransactionsUpdated();
_releaseExecutor(transactionId);
}
}
void _releaseExecutor(int id) {
_managedExecutors.remove(id);
_executorBacklog.remove(id);
_notifyActiveExecutorUpdated();
}
Future<void> _waitForTurn(int transactionId) {
bool idIsActive() {
if (transactionId == null) {
return _transactionBacklog.isEmpty;
return _executorBacklog.isEmpty;
} else {
return _transactionBacklog.isNotEmpty &&
_transactionBacklog.first == transactionId;
return _executorBacklog.isNotEmpty &&
_executorBacklog.first == transactionId;
}
}
@ -155,43 +168,29 @@ class _MoorServer {
return _backlogUpdated.stream.firstWhere((_) => idIsActive());
}
void _notifyTransactionsUpdated() {
void _notifyActiveExecutorUpdated() {
if (!_backlogUpdated.isClosed) {
_backlogUpdated.add(null);
}
}
}
/// A mock database so that the [QueryExecutor] which is running on a background
/// isolate can have the [QueryExecutor.databaseInfo] set. The query executor
/// uses that to set the schema version and to run migration callbacks. For a
/// server, all of that is delegated via clients.
class _FakeDatabase extends GeneratedDatabase {
class _IsolateDelegatedUser implements QueryExecutorUser {
final _MoorServer server;
_FakeDatabase(DatabaseConnection connection, this.server)
: super.connect(connection);
@override
int schemaVersion = 0;
_IsolateDelegatedUser(this.server); // will be overridden by client requests
@override
final List<TableInfo<Table, DataClass>> allTables = const [];
@override
int schemaVersion = 0; // will be overridden by client requests
@override
Future<void> handleDatabaseCreation({SqlExecutor executor}) {
return server.firstClient.request(_NoArgsRequest.runOnCreate);
}
@override
Future<void> handleDatabaseVersionChange(
{SqlExecutor executor, int from, int to}) {
return server.firstClient.request(_RunOnUpgrade(from, to));
}
@override
Future<void> beforeOpenCallback(
QueryExecutor executor, OpeningDetails details) {
return server.firstClient.request(_RunBeforeOpen(details));
Future<void> beforeOpen(
QueryExecutor executor, OpeningDetails details) async {
final id = server._putExecutor(executor);
try {
await server.firstClient.request(_RunBeforeOpen(details, id));
} finally {
server._releaseExecutor(id);
}
}
}

View File

@ -15,8 +15,8 @@ class GenerationContext {
/// The [SqlDialect] that should be respected when generating the query.
final SqlDialect dialect;
/// The actual [QueryExecutor] that's going to execute the generated query.
final QueryExecutor executor;
/// The actual [QueryEngine] that's going to execute the generated query.
final QueryEngine executor;
final List<dynamic> _boundVariables = [];
@ -39,10 +39,9 @@ class GenerationContext {
/// Constructs a [GenerationContext] by copying the relevant fields from the
/// database.
GenerationContext.fromDb(QueryEngine database)
: typeSystem = database.typeSystem,
executor = database.executor,
dialect = database.executor?.dialect ?? SqlDialect.sqlite;
GenerationContext.fromDb(this.executor)
: typeSystem = executor.typeSystem,
dialect = executor.executor?.dialect ?? SqlDialect.sqlite;
/// Constructs a custom [GenerationContext] by setting the fields manually.
/// See [GenerationContext.fromDb] for a more convenient factory.

View File

@ -35,7 +35,7 @@ class MigrationStrategy {
/// and all migrations ran), but before any other queries will be sent. This
/// makes it a suitable place to populate data after the database has been
/// created or set sqlite `PRAGMAS` that you need.
final OnBeforeOpen beforeOpen;
final OnBeforeOpen /*?*/ beforeOpen;
/// Construct a migration strategy from the provided [onCreate] and
/// [onUpgrade] methods.
@ -46,16 +46,13 @@ class MigrationStrategy {
});
}
/// A function that executes queries and ignores what they return.
typedef SqlExecutor = Future<void> Function(String sql, [List<dynamic> args]);
/// Runs migrations declared by a [MigrationStrategy].
class Migrator {
final GeneratedDatabase _db;
final SqlExecutor _executor;
final QueryEngine _resolvedEngineForMigrations;
/// Used internally by moor when opening the database.
Migrator(this._db, this._executor);
Migrator(this._db, this._resolvedEngineForMigrations);
/// Creates all tables specified for the database, if they don't exist
@Deprecated('Use createAll() instead')
@ -84,10 +81,7 @@ class Migrator {
}
GenerationContext _createContext() {
return GenerationContext(
_db.typeSystem,
_SimpleSqlAsQueryExecutor(_executor),
);
return GenerationContext.fromDb(_db);
}
/// Creates the given table if it doesn't exist
@ -194,7 +188,9 @@ class Migrator {
/// Executes the custom query.
Future<void> issueCustomQuery(String sql, [List<dynamic> args]) async {
return _executor(sql, args);
await _resolvedEngineForMigrations.doWhenOpened(
(executor) => executor.runCustom(sql, args),
);
}
}
@ -217,49 +213,3 @@ class OpeningDetails {
/// Used internally by moor when opening a database.
const OpeningDetails(this.versionBefore, this.versionNow);
}
class _SimpleSqlAsQueryExecutor extends QueryExecutor {
final SqlExecutor executor;
_SimpleSqlAsQueryExecutor(this.executor);
@override
TransactionExecutor beginTransaction() {
throw UnsupportedError('Not supported for migrations');
}
@override
Future<bool> ensureOpen() {
return Future.value(true);
}
@override
Future<void> runBatched(List<BatchedStatement> statements) {
throw UnsupportedError('Not supported for migrations');
}
@override
Future<void> runCustom(String statement, [List<dynamic> args]) {
return executor(statement, args);
}
@override
Future<int> runDelete(String statement, List args) {
throw UnsupportedError('Not supported for migrations');
}
@override
Future<int> runInsert(String statement, List args) {
throw UnsupportedError('Not supported for migrations');
}
@override
Future<List<Map<String, dynamic>>> runSelect(String statement, List args) {
throw UnsupportedError('Not supported for migrations');
}
@override
Future<int> runUpdate(String statement, List args) {
throw UnsupportedError('Not supported for migrations');
}
}

View File

@ -29,7 +29,7 @@ class DeleteStatement<T extends Table, D extends DataClass> extends Query<T, D>
final ctx = constructQuery();
return ctx.executor.doWhenOpened((e) async {
final rows = await ctx.executor.runDelete(ctx.sql, ctx.boundVariables);
final rows = await e.runDelete(ctx.sql, ctx.boundVariables);
if (rows > 0) {
database.notifyUpdates(

View File

@ -34,8 +34,8 @@ class InsertStatement<D extends DataClass> {
}) async {
final ctx = createContext(entity, mode ?? InsertMode.insert);
return await database.executor.doWhenOpened((e) async {
final id = await database.executor.runInsert(ctx.sql, ctx.boundVariables);
return await database.doWhenOpened((e) async {
final id = await e.runInsert(ctx.sql, ctx.boundVariables);
database
.notifyUpdates({TableUpdate.onTable(table, kind: UpdateKind.insert)});
return id;

View File

@ -50,7 +50,7 @@ class CustomSelectStatement with Selectable<QueryRow> {
Future<List<QueryRow>> _executeWithMappedArgs(
List<dynamic> mappedArgs) async {
final result =
await _db.executor.doWhenOpened((e) => e.runSelect(query, mappedArgs));
await _db.doWhenOpened((e) => e.runSelect(query, mappedArgs));
return result.map((row) => QueryRow(row, _db)).toList();
}

View File

@ -20,12 +20,6 @@ class LazyDatabase extends QueryExecutor {
/// first requested to be opened.
LazyDatabase(this.opener);
@override
set databaseInfo(GeneratedDatabase db) {
super.databaseInfo = db;
_delegate?.databaseInfo = db;
}
Future<void> _awaitOpened() {
if (_delegate != null) {
return Future.value();
@ -35,7 +29,6 @@ class LazyDatabase extends QueryExecutor {
_openDelegate = Completer();
Future.value(opener()).then((database) {
_delegate = database;
_delegate.databaseInfo = databaseInfo;
_openDelegate.complete();
});
return _openDelegate.future;
@ -46,8 +39,8 @@ class LazyDatabase extends QueryExecutor {
TransactionExecutor beginTransaction() => _delegate.beginTransaction();
@override
Future<bool> ensureOpen() {
return _awaitOpened().then((_) => _delegate.ensureOpen());
Future<bool> ensureOpen(QueryExecutorUser user) {
return _awaitOpened().then((_) => _delegate.ensureOpen(user));
}
@override

View File

@ -62,7 +62,7 @@ class _WebDelegate extends DatabaseDelegate {
bool get isOpen => _db != null;
@override
Future<void> open([GeneratedDatabase db]) async {
Future<void> open([QueryExecutorUser db]) async {
final dbVersion = db.schemaVersion;
assert(dbVersion >= 1, 'Database schema version needs to be at least 1');

View File

@ -1,6 +1,6 @@
name: moor
description: Moor is a safe and reactive persistence library for Dart applications
version: 2.4.1
version: 3.0.0-dev
repository: https://github.com/simolus3/moor
homepage: https://moor.simonbinder.eu/
issue_tracker: https://github.com/simolus3/moor/issues

View File

@ -6,8 +6,6 @@ import 'package:moor/src/runtime/executor/stream_queries.dart';
export 'package:mockito/mockito.dart';
typedef _EnsureOpenAction<T> = Future<T> Function(QueryExecutor e);
class MockExecutor extends Mock implements QueryExecutor {
final MockTransactionExecutor transactions = MockTransactionExecutor();
var _opened = false;
@ -38,18 +36,11 @@ class MockExecutor extends Mock implements QueryExecutor {
return transactions;
});
when(ensureOpen()).thenAnswer((i) {
when(ensureOpen(any)).thenAnswer((i) {
_opened = true;
return Future.value(true);
});
when(doWhenOpened(any)).thenAnswer((i) {
_opened = true;
final action = i.positionalArguments.single as _EnsureOpenAction;
return action(this);
});
when(close()).thenAnswer((_) async {
_opened = false;
});
@ -62,11 +53,7 @@ class MockTransactionExecutor extends Mock implements TransactionExecutor {
when(runUpdate(any, any)).thenAnswer((_) => Future.value(0));
when(runDelete(any, any)).thenAnswer((_) => Future.value(0));
when(runInsert(any, any)).thenAnswer((_) => Future.value(0));
when(doWhenOpened(any)).thenAnswer((i) {
final action = i.positionalArguments.single as _EnsureOpenAction;
return action(this);
});
when(ensureOpen(any)).thenAnswer((_) => Future.value());
when(send()).thenAnswer((_) => Future.value(null));
when(rollback()).thenAnswer((_) => Future.value(null));
@ -75,13 +62,6 @@ class MockTransactionExecutor extends Mock implements TransactionExecutor {
class MockStreamQueries extends Mock implements StreamQueryStore {}
// used so that we can mock the SqlExecutor typedef
abstract class SqlExecutorAsClass {
Future<void> call(String sql, [List<dynamic> args]);
}
class MockQueryExecutor extends Mock implements SqlExecutorAsClass {}
DatabaseConnection createConnection(QueryExecutor executor,
[StreamQueryStore streams]) {
return DatabaseConnection(

View File

@ -45,27 +45,24 @@ void main() {
group('callbacks', () {
_FakeDb db;
MockExecutor executor;
MockQueryExecutor queryExecutor;
setUp(() {
executor = MockExecutor();
queryExecutor = MockQueryExecutor();
db = _FakeDb(SqlTypeSystem.defaultInstance, executor);
});
test('onCreate', () async {
await db.handleDatabaseCreation(executor: queryExecutor);
verify(queryExecutor.call('created'));
await db.beforeOpen(executor, const OpeningDetails(null, 1));
verify(executor.runCustom('created', any));
});
test('onUpgrade', () async {
await db.handleDatabaseVersionChange(
executor: queryExecutor, from: 2, to: 3);
verify(queryExecutor.call('updated from 2 to 3'));
await db.beforeOpen(executor, const OpeningDetails(2, 3));
verify(executor.runCustom('updated from 2 to 3', any));
});
test('beforeOpen', () async {
await db.beforeOpenCallback(executor, const OpeningDetails(3, 4));
await db.beforeOpen(executor, const OpeningDetails(3, 4));
verify(executor.runSelect('opened: 3 to 4', []));
});
});

View File

@ -18,17 +18,14 @@ void main() {
});
test('opens delegated executors when opening', () async {
await multi.ensureOpen();
await multi.ensureOpen(db);
verify(write.databaseInfo = db);
verify(read.databaseInfo = any);
verify(read.ensureOpen());
verify(write.ensureOpen());
verify(read.ensureOpen(argThat(isNot(db))));
verify(write.ensureOpen(db));
});
test('runs selects on the reading executor', () async {
await multi.ensureOpen();
await multi.ensureOpen(db);
when(read.runSelect(any, any)).thenAnswer((_) async {
return [
@ -47,7 +44,7 @@ void main() {
});
test('runs updates on the writing executor', () async {
await multi.ensureOpen();
await multi.ensureOpen(db);
await multi.runUpdate('update', []);
await multi.runInsert('insert', []);
@ -61,15 +58,14 @@ void main() {
});
test('runs transactions on the writing executor', () async {
await multi.ensureOpen();
await multi.ensureOpen(db);
final transation = multi.beginTransaction();
await transation.doWhenOpened((e) async {
await e.runSelect('select', []);
});
final transaction = multi.beginTransaction();
await transaction.ensureOpen(db);
await transaction.runSelect('select', []);
verify(write.beginTransaction());
verify(write.transactions.doWhenOpened(any));
verify(write.transactions.ensureOpen(any));
verify(write.transactions.runSelect('select', []));
});
}

View File

@ -13,6 +13,15 @@ class _MockDynamicVersionDelegate extends Mock
class _MockTransactionDelegate extends Mock
implements SupportedTransactionDelegate {}
class _FakeExecutorUser extends QueryExecutorUser {
@override
Future<void> beforeOpen(
QueryExecutor executor, OpeningDetails details) async {}
@override
int get schemaVersion => 1;
}
void main() {
_MockDelegate delegate;
setUp(() {
@ -31,14 +40,13 @@ void main() {
void _runTests(bool sequential) {
test('when sequential = $sequential', () async {
final db = DelegatedDatabase(delegate, isSequential: sequential);
await db.ensureOpen(_FakeExecutorUser());
await db.doWhenOpened((_) async {
expect(await db.runSelect(null, null), isEmpty);
expect(await db.runUpdate(null, null), 3);
expect(await db.runInsert(null, null), 4);
await db.runCustom(null);
await db.runBatched(null);
});
verifyInOrder([
delegate.isOpen,
@ -63,29 +71,26 @@ void main() {
when(userDb.schemaVersion).thenReturn(3);
when(delegate.isOpen).thenAnswer((_) => Future.value(false));
db = DelegatedDatabase(delegate)..databaseInfo = userDb;
db = DelegatedDatabase(delegate);
when(userDb.handleDatabaseCreation(executor: anyNamed('executor')))
.thenAnswer((i) async {
final executor = i.namedArguments.values.single as SqlExecutor;
await executor('created', []);
});
when(userDb.beforeOpen(any, any)).thenAnswer((i) async {
final executor = i.positionalArguments[0] as QueryExecutor;
final details = i.positionalArguments[1] as OpeningDetails;
when(userDb.handleDatabaseVersionChange(
executor: anyNamed('executor'),
from: anyNamed('from'),
to: anyNamed('to'),
)).thenAnswer((i) async {
final executor = i.namedArguments[#executor] as SqlExecutor;
final from = i.namedArguments[#from] as int;
final to = i.namedArguments[#to] as int;
await executor('upgraded', [from, to]);
await executor.ensureOpen(userDb);
if (details.wasCreated) {
await executor.runCustom('created', []);
} else if (details.hadUpgrade) {
await executor.runCustom(
'updated', [details.versionBefore, details.versionNow]);
}
});
});
test('when the database does not support versions', () async {
when(delegate.versionDelegate).thenReturn(const NoVersionDelegate());
await db.doWhenOpened((_) async {});
await db.ensureOpen(userDb);
verify(delegate.open(userDb));
verifyNever(delegate.runCustom(any, any));
@ -94,7 +99,7 @@ void main() {
test('when the database supports versions at opening', () async {
when(delegate.versionDelegate)
.thenReturn(OnOpenVersionDelegate(() => Future.value(3)));
await db.doWhenOpened((_) async {});
await db.ensureOpen(userDb);
verify(delegate.open(userDb));
verifyNever(delegate.runCustom(any, any));
@ -105,7 +110,7 @@ void main() {
when(version.schemaVersion).thenAnswer((_) => Future.value(3));
when(delegate.versionDelegate).thenReturn(version);
await db.doWhenOpened((_) async {});
await db.ensureOpen(userDb);
verify(delegate.open(userDb));
verifyNever(delegate.runCustom(any, any));
@ -116,7 +121,7 @@ void main() {
test('handles database creations', () async {
when(delegate.versionDelegate)
.thenReturn(OnOpenVersionDelegate(() => Future.value(0)));
await db.doWhenOpened((_) async {});
await db.ensureOpen(userDb);
verify(delegate.runCustom('created', []));
});
@ -124,9 +129,9 @@ void main() {
test('handles database upgrades', () async {
when(delegate.versionDelegate)
.thenReturn(OnOpenVersionDelegate(() => Future.value(1)));
await db.doWhenOpened((_) async {});
await db.ensureOpen(userDb);
verify(delegate.runCustom('upgraded', [1, 3]));
verify(delegate.runCustom('updated', argThat(equals([1, 3]))));
});
});
@ -140,14 +145,12 @@ void main() {
test('when the delegate does not support transactions', () async {
when(delegate.transactionDelegate)
.thenReturn(const NoTransactionDelegate());
await db.doWhenOpened((_) async {
final transaction = db.beginTransaction();
await transaction.doWhenOpened((e) async {
await e.runSelect(null, null);
await db.ensureOpen(_FakeExecutorUser());
final transaction = db.beginTransaction();
await transaction.ensureOpen(_FakeExecutorUser());
await transaction.runSelect(null, null);
await transaction.send();
});
});
verifyInOrder([
delegate.runCustom('BEGIN TRANSACTION', []),
@ -157,23 +160,19 @@ void main() {
});
test('when the database supports transactions', () async {
final transaction = _MockTransactionDelegate();
when(transaction.startTransaction(any)).thenAnswer((i) {
final transactionDelegate = _MockTransactionDelegate();
when(transactionDelegate.startTransaction(any)).thenAnswer((i) {
(i.positionalArguments.single as Function(QueryDelegate))(delegate);
});
when(delegate.transactionDelegate).thenReturn(transaction);
when(delegate.transactionDelegate).thenReturn(transactionDelegate);
await db.doWhenOpened((_) async {
await db.ensureOpen(_FakeExecutorUser());
final transaction = db.beginTransaction();
await transaction.doWhenOpened((e) async {
await e.runSelect(null, null);
await transaction.ensureOpen(_FakeExecutorUser());
await transaction.send();
});
});
verify(transaction.startTransaction(any));
verify(transactionDelegate.startTransaction(any));
});
});
}

View File

@ -45,35 +45,34 @@ void main() {
// see ../data/tables/tables.moor
test('creates everything as specified in .moor files', () async {
final mockExecutor = MockExecutor();
final mockQueryExecutor = MockQueryExecutor();
final db = CustomTablesDb(mockExecutor);
await Migrator(db, mockQueryExecutor).createAll();
await db.createMigrator().createAll();
verify(mockQueryExecutor.call(_createNoIds, []));
verify(mockQueryExecutor.call(_createWithDefaults, []));
verify(mockQueryExecutor.call(_createWithConstraints, []));
verify(mockQueryExecutor.call(_createConfig, []));
verify(mockQueryExecutor.call(_createMyTable, []));
verify(mockQueryExecutor.call(_createEmail, []));
verify(mockQueryExecutor.call(_createMyTrigger, []));
verify(mockQueryExecutor.call(_createValueIndex, []));
verify(mockQueryExecutor.call(_defaultInsert, []));
verify(mockExecutor.runCustom(_createNoIds, []));
verify(mockExecutor.runCustom(_createWithDefaults, []));
verify(mockExecutor.runCustom(_createWithConstraints, []));
verify(mockExecutor.runCustom(_createConfig, []));
verify(mockExecutor.runCustom(_createMyTable, []));
verify(mockExecutor.runCustom(_createEmail, []));
verify(mockExecutor.runCustom(_createMyTrigger, []));
verify(mockExecutor.runCustom(_createValueIndex, []));
verify(mockExecutor.runCustom(_defaultInsert, []));
});
test('can create trigger manually', () async {
final mockQueryExecutor = MockQueryExecutor();
final db = CustomTablesDb(MockExecutor());
final mockExecutor = MockExecutor();
final db = CustomTablesDb(mockExecutor);
await Migrator(db, mockQueryExecutor).createTrigger(db.myTrigger);
verify(mockQueryExecutor.call(_createMyTrigger, []));
await db.createMigrator().createTrigger(db.myTrigger);
verify(mockExecutor.runCustom(_createMyTrigger, []));
});
test('can create index manually', () async {
final mockQueryExecutor = MockQueryExecutor();
final db = CustomTablesDb(MockExecutor());
final mockExecutor = MockExecutor();
final db = CustomTablesDb(mockExecutor);
await Migrator(db, mockQueryExecutor).createIndex(db.valueIdx);
verify(mockQueryExecutor.call(_createValueIndex, []));
await db.createMigrator().createIndex(db.valueIdx);
verify(mockExecutor.runCustom(_createValueIndex, []));
});
test('infers primary keys correctly', () async {

View File

@ -6,34 +6,32 @@ import 'data/utils/mocks.dart';
void main() {
TodoDb db;
MockQueryExecutor mockQueryExecutor;
QueryExecutor mockExecutor;
setUp(() {
mockQueryExecutor = MockQueryExecutor();
mockExecutor = MockExecutor();
db = TodoDb(mockExecutor);
});
group('Migrations', () {
test('creates all tables', () async {
await db.handleDatabaseCreation(executor: mockQueryExecutor);
await db.beforeOpen(mockExecutor, const OpeningDetails(null, 1));
// should create todos, categories, users and shared_todos table
verify(mockQueryExecutor.call(
verify(mockExecutor.runCustom(
'CREATE TABLE IF NOT EXISTS todos '
'(id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, title VARCHAR NULL, '
'content VARCHAR NOT NULL, target_date INTEGER NULL, '
'category INTEGER NULL);',
[]));
verify(mockQueryExecutor.call(
verify(mockExecutor.runCustom(
'CREATE TABLE IF NOT EXISTS categories '
'(id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, '
'`desc` VARCHAR NOT NULL UNIQUE);',
[]));
verify(mockQueryExecutor.call(
verify(mockExecutor.runCustom(
'CREATE TABLE IF NOT EXISTS users '
'(id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, '
'name VARCHAR NOT NULL, '
@ -43,7 +41,7 @@ void main() {
"DEFAULT (strftime('%s', CURRENT_TIMESTAMP)));",
[]));
verify(mockQueryExecutor.call(
verify(mockExecutor.runCustom(
'CREATE TABLE IF NOT EXISTS shared_todos ('
'todo INTEGER NOT NULL, '
'user INTEGER NOT NULL, '
@ -53,7 +51,7 @@ void main() {
');',
[]));
verify(mockQueryExecutor.call(
verify(mockExecutor.runCustom(
'CREATE TABLE IF NOT EXISTS '
'table_without_p_k ('
'not_really_an_id INTEGER NOT NULL, '
@ -64,9 +62,9 @@ void main() {
});
test('creates individual tables', () async {
await Migrator(db, mockQueryExecutor).createTable(db.users);
await db.createMigrator().createTable(db.users);
verify(mockQueryExecutor.call(
verify(mockExecutor.runCustom(
'CREATE TABLE IF NOT EXISTS users '
'(id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, '
'name VARCHAR NOT NULL, '
@ -78,16 +76,15 @@ void main() {
});
test('drops tables', () async {
await Migrator(db, mockQueryExecutor).deleteTable('users');
await db.createMigrator().deleteTable('users');
verify(mockQueryExecutor.call('DROP TABLE IF EXISTS users;'));
verify(mockExecutor.runCustom('DROP TABLE IF EXISTS users;'));
});
test('adds columns', () async {
await Migrator(db, mockQueryExecutor)
.addColumn(db.users, db.users.isAwesome);
await db.createMigrator().addColumn(db.users, db.users.isAwesome);
verify(mockQueryExecutor.call('ALTER TABLE users ADD COLUMN '
verify(mockExecutor.runCustom('ALTER TABLE users ADD COLUMN '
'is_awesome INTEGER NOT NULL DEFAULT 1 '
'CHECK (is_awesome in (0, 1));'));
});
@ -101,9 +98,9 @@ void main() {
test('upgrading a database without schema migration throws', () async {
final db = _DefaultDb(MockExecutor());
expect(
() => db.handleDatabaseVersionChange(
executor: MockQueryExecutor(), from: 1, to: 2),
throwsA(const TypeMatcher<Exception>()));
() => db.beforeOpen(db.executor, const OpeningDetails(2, 3)),
throwsA(const TypeMatcher<Exception>()),
);
});
}

View File

@ -30,14 +30,14 @@ void main() {
});
group('SELECT statements are generated', () {
test('for simple statements', () {
db.select(db.users, distinct: true).get();
test('for simple statements', () async {
await db.select(db.users, distinct: true).get();
verify(executor.runSelect(
'SELECT DISTINCT * FROM users;', argThat(isEmpty)));
});
test('with limit statements', () {
(db.select(db.users)..limit(10, offset: 0)).get();
test('with limit statements', () async {
await (db.select(db.users)..limit(10, offset: 0)).get();
verify(executor.runSelect(
'SELECT * FROM users LIMIT 10 OFFSET 0;', argThat(isEmpty)));
});
@ -49,8 +49,8 @@ void main() {
'SELECT * FROM users LIMIT 10;', argThat(isEmpty)));
});
test('with like expressions', () {
(db.select(db.users)..where((u) => u.name.like('Dash%'))).get();
test('with like expressions', () async {
await (db.select(db.users)..where((u) => u.name.like('Dash%'))).get();
verify(executor
.runSelect('SELECT * FROM users WHERE name LIKE ?;', ['Dash%']));
});
@ -69,8 +69,8 @@ void main() {
argThat(isEmpty)));
});
test('with complex predicates', () {
(db.select(db.users)
test('with complex predicates', () async {
await (db.select(db.users)
..where((u) =>
u.name.equals('Dash').not() & u.id.isBiggerThanValue(12)))
.get();
@ -80,8 +80,8 @@ void main() {
['Dash', 12]));
});
test('with expressions from boolean columns', () {
(db.select(db.users)..where((u) => u.isAwesome)).get();
test('with expressions from boolean columns', () async {
await (db.select(db.users)..where((u) => u.isAwesome)).get();
verify(executor.runSelect(
'SELECT * FROM users WHERE is_awesome;', argThat(isEmpty)));

View File

@ -16,12 +16,13 @@ void main() {
db = TodoDb(executor);
});
test('streams fetch when the first listener attaches', () {
test('streams fetch when the first listener attaches', () async {
final stream = db.select(db.users).watch();
verifyNever(executor.runSelect(any, any));
stream.listen((_) {});
await pumpEventQueue(times: 1);
verify(executor.runSelect(any, any)).called(1);
});
@ -216,9 +217,9 @@ void main() {
test('when the data updates after the listener has detached', () async {
final subscription = db.select(db.users).watch().listen((_) {});
clearInteractions(executor);
await subscription.cancel();
clearInteractions(executor);
// The stream is kept open for the rest of this event iteration
final completer = Completer.sync();

View File

@ -164,7 +164,7 @@ void main() {
test('the database is opened before starting a transaction', () async {
await db.transaction(() async {
verify(executor.doWhenOpened(any));
verify(executor.ensureOpen(db));
});
});

View File

@ -5,12 +5,23 @@ import 'package:test/test.dart';
import '../data/tables/todos.dart';
import '../data/utils/mocks.dart';
class _LazyQueryUserForTest extends QueryExecutorUser {
@override
int get schemaVersion => 1;
@override
Future<void> beforeOpen(QueryExecutor executor, OpeningDetails details) {
// do nothing
return Future.value();
}
}
void main() {
test('lazy database delegates work', () async {
final inner = MockExecutor();
final lazy = LazyDatabase(() => inner);
await lazy.ensureOpen();
await lazy.ensureOpen(_LazyQueryUserForTest());
clearInteractions(inner);
lazy.beginTransaction();
@ -39,31 +50,33 @@ void main() {
return inner;
});
final user = _LazyQueryUserForTest();
for (var i = 0; i < 10; i++) {
unawaited(lazy.ensureOpen());
unawaited(lazy.ensureOpen(user));
}
await pumpEventQueue();
expect(openCount, 1);
});
test('sets generated database property', () async {
test('opens the inner database with the outer user', () async {
final inner = MockExecutor();
final db = TodoDb(LazyDatabase(() => inner));
// run a statement to make sure the database has been opened
await db.customSelect('custom_select').get();
verify(inner.databaseInfo = db);
verify(inner.ensureOpen(db));
});
test('returns the existing delegate if it was open', () async {
final inner = MockExecutor();
final lazy = LazyDatabase(() => inner);
final user = _LazyQueryUserForTest();
await lazy.ensureOpen();
await lazy.ensureOpen();
await lazy.ensureOpen(user);
await lazy.ensureOpen(user);
verify(inner.ensureOpen());
verify(inner.ensureOpen(user));
});
}

View File

@ -34,7 +34,7 @@ class _VmDelegate extends DatabaseDelegate {
Future<bool> get isOpen => Future.value(_db != null);
@override
Future<void> open([GeneratedDatabase db]) async {
Future<void> open(QueryExecutorUser user) async {
if (file != null) {
_db = Database.openFile(file);
} else {

View File

@ -1,6 +1,6 @@
name: moor_ffi
description: "Provides sqlite bindings using dart:ffi, including a moor executor"
version: 0.4.0
version: 0.5.0-dev
homepage: https://github.com/simolus3/moor/tree/develop/moor_ffi
issue_tracker: https://github.com/simolus3/moor/issues
@ -8,7 +8,7 @@ environment:
sdk: ">=2.6.0 <3.0.0"
dependencies:
moor: ">=1.7.0 <3.0.0"
moor: ^3.0.0
ffi: ^0.1.3
collection: ^1.0.0
meta: ^1.0.2
@ -17,6 +17,10 @@ dev_dependencies:
test: ^1.6.0
path: ^1.6.0
dependency_overrides:
moor:
path: ../moor
flutter:
plugin:
# the flutter.plugin key needs to exists so that this project gets recognized as a plugin when imported. We need to

View File

@ -48,7 +48,7 @@ class _SqfliteDelegate extends DatabaseDelegate with _SqfliteExecutor {
bool get isOpen => db != null;
@override
Future<void> open([GeneratedDatabase db]) async {
Future<void> open(QueryExecutorUser user) async {
String resolvedPath;
if (inDbFolder) {
resolvedPath = join(await s.getDatabasesPath(), path);
@ -62,11 +62,11 @@ class _SqfliteDelegate extends DatabaseDelegate with _SqfliteExecutor {
}
// default value when no migration happened
_loadedSchemaVersion = db.schemaVersion;
_loadedSchemaVersion = user.schemaVersion;
this.db = await s.openDatabase(
db = await s.openDatabase(
resolvedPath,
version: db.schemaVersion,
version: user.schemaVersion,
onCreate: (db, version) {
_loadedSchemaVersion = 0;
},

View File

@ -94,7 +94,7 @@ packages:
path: "../moor"
relative: true
source: path
version: "2.4.0"
version: "2.4.1"
path:
dependency: "direct main"
description:

View File

@ -1,6 +1,6 @@
name: moor_flutter
description: Flutter implementation of moor, a safe and reactive persistence library for Dart applications
version: 2.1.1
version: 3.0.0
repository: https://github.com/simolus3/moor
homepage: https://moor.simonbinder.eu/
issue_tracker: https://github.com/simolus3/moor/issues
@ -9,7 +9,7 @@ environment:
sdk: ">=2.0.0-dev.68.0 <3.0.0"
dependencies:
moor: ^2.0.0
moor: ^3.0.0
sqflite: ^1.1.6+5
meta: ^1.0.0
path: ^1.0.0