Run statements in correct context

This commit is contained in:
Simon Binder 2024-02-03 14:31:58 +01:00
parent 0ec19f4705
commit 7f7d2ab1b0
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
9 changed files with 96 additions and 20 deletions

View File

@ -1,8 +1,14 @@
## 2.16.0-dev
- When a migration throws, the database will now block subsequent operations
- When a migration throws, the database will now block subsequent operations
instead of potentially allowing them to operate on a database in an
inconsistent state.
- Statements built through the Dart query builder will now run in the context
active while they are running, instead of the context active at the time they
were created. For instance, creating an `UpdateStatement` with
`database.update` outside of a transaction and then calling
`UpdateStatement.write` inside of a transaction will now perform the update
inside of the transaction, instead of causing a deadlock.
## 2.15.0

View File

@ -57,7 +57,7 @@ abstract class DatabaseConnectionUser {
/// Creates and auto-updating stream from the given select statement. This
/// method should not be used directly.
Stream<List<Map<String, Object?>>> createStream(QueryStreamFetcher stmt) =>
streamQueries.registerStream(stmt);
resolvedEngine.streamQueries.registerStream(stmt);
/// Creates a copy of the table with an alias so that it can be used in the
/// same query more than once.
@ -165,7 +165,7 @@ abstract class DatabaseConnectionUser {
/// Starts an [InsertStatement] for a given table. You can use that statement
/// to write data into the [table] by using [InsertStatement.insert].
InsertStatement<T, D> into<T extends Table, D>(TableInfo<T, D> table) {
return InsertStatement<T, D>(resolvedEngine, table);
return InsertStatement<T, D>(this, table);
}
/// Starts an [UpdateStatement] for the given table. You can use that
@ -173,7 +173,7 @@ abstract class DatabaseConnectionUser {
/// clause on that table and then use [UpdateStatement.write].
UpdateStatement<Tbl, R> update<Tbl extends Table, R>(
TableInfo<Tbl, R> table) =>
UpdateStatement(resolvedEngine, table);
UpdateStatement(this, table);
/// Starts a query on the given table.
///
@ -202,8 +202,7 @@ abstract class DatabaseConnectionUser {
SimpleSelectStatement<T, R> select<T extends HasResultSet, R>(
ResultSetImplementation<T, R> table,
{bool distinct = false}) {
return SimpleSelectStatement<T, R>(resolvedEngine, table,
distinct: distinct);
return SimpleSelectStatement<T, R>(this, table, distinct: distinct);
}
/// Starts a complex statement on [table] that doesn't necessarily use all of
@ -239,8 +238,7 @@ abstract class DatabaseConnectionUser {
JoinedSelectStatement<T, R> selectOnly<T extends HasResultSet, R>(
ResultSetImplementation<T, R> table,
{bool distinct = false}) {
return JoinedSelectStatement<T, R>(
resolvedEngine, table, [], distinct, false, false);
return JoinedSelectStatement<T, R>(this, table, [], distinct, false, false);
}
/// Starts a [DeleteStatement] that can be used to delete rows from a table.
@ -248,7 +246,7 @@ abstract class DatabaseConnectionUser {
/// See the [documentation](https://drift.simonbinder.eu/docs/getting-started/writing_queries/#updates-and-deletes)
/// for more details and example on how delete statements work.
DeleteStatement<T, D> delete<T extends Table, D>(TableInfo<T, D> table) {
return DeleteStatement<T, D>(resolvedEngine, table);
return DeleteStatement<T, D>(this, table);
}
/// Executes a custom delete or update statement and returns the amount of
@ -360,7 +358,7 @@ abstract class DatabaseConnectionUser {
Selectable<QueryRow> customSelect(String query,
{List<Variable> variables = const [],
Set<ResultSetImplementation> readsFrom = const {}}) {
return CustomSelectStatement(query, variables, readsFrom, resolvedEngine);
return CustomSelectStatement(query, variables, readsFrom, this);
}
/// Creates a custom select statement from the given sql [query]. To run the
@ -617,4 +615,9 @@ extension RunWithEngine on DatabaseConnectionUser {
DatabaseConnectionUser user, Future<T> Function() calculation) {
return _runConnectionZoned(user, calculation);
}
Future<T> withCurrentExecutor<T>(Future<T> Function(QueryExecutor e) run) {
final engine = resolvedEngine;
return engine.doWhenOpened(run);
}
}

View File

@ -49,7 +49,7 @@ class DeleteStatement<T extends Table, D> extends Query<T, D>
Future<int> go() async {
final ctx = constructQuery();
return ctx.executor!.doWhenOpened((e) async {
return database.withCurrentExecutor((e) async {
final rows = await e.runDelete(ctx.sql, ctx.boundVariables);
if (rows > 0) {
@ -69,7 +69,7 @@ class DeleteStatement<T extends Table, D> extends Query<T, D>
Future<List<D>> _goReturning() async {
final ctx = constructQuery();
return ctx.executor!.doWhenOpened((e) async {
return database.withCurrentExecutor((e) async {
final rows = await e.runSelect(ctx.sql, ctx.boundVariables);
if (rows.isNotEmpty) {

View File

@ -70,7 +70,7 @@ class InsertStatement<T extends Table, D> {
final ctx = createContext(entity, mode ?? InsertMode.insert,
onConflict: onConflict);
return await database.doWhenOpened((e) async {
return await database.withCurrentExecutor((e) async {
final id = await e.runInsert(ctx.sql, ctx.boundVariables);
database
.notifyUpdates({TableUpdate.onTable(table, kind: UpdateKind.insert)});
@ -131,7 +131,7 @@ class InsertStatement<T extends Table, D> {
..write(' FROM $sourceCte');
_writeOnConflict(ctx, mode, null, onConflict);
return await database.doWhenOpened((e) async {
return await database.withCurrentExecutor((e) async {
await e.runInsert(ctx.sql, ctx.boundVariables);
database
.notifyUpdates({TableUpdate.onTable(table, kind: UpdateKind.insert)});
@ -170,7 +170,7 @@ class InsertStatement<T extends Table, D> {
final ctx = createContext(entity, mode ?? InsertMode.insert,
onConflict: onConflict, returning: true);
return database.doWhenOpened((e) async {
return database.withCurrentExecutor((e) async {
final result = await e.runSelect(ctx.sql, ctx.boundVariables);
if (result.isNotEmpty) {
database.notifyUpdates(

View File

@ -48,7 +48,7 @@ class CustomSelectStatement with Selectable<QueryRow> {
}
Future<List<Map<String, Object?>>> _executeRaw(List<Object?> mappedArgs) {
return _db.doWhenOpened((e) => e.runSelect(query, mappedArgs));
return _db.withCurrentExecutor((e) => e.runSelect(query, mappedArgs));
}
List<QueryRow> _mapDbResponse(List<Map<String, Object?>> rows) {

View File

@ -77,7 +77,7 @@ class SimpleSelectStatement<T extends HasResultSet, D> extends Query<T, D>
}
Future<List<Map<String, Object?>>> _getRaw(GenerationContext ctx) {
return database.doWhenOpened((e) {
return database.withCurrentExecutor((e) {
return e.runSelect(ctx.sql, ctx.boundVariables);
});
}

View File

@ -261,7 +261,7 @@ class JoinedSelectStatement<FirstT extends HasResultSet, FirstD>
}
Future<List<Map<String, Object?>>> _getRaw(GenerationContext ctx) {
return ctx.executor!.doWhenOpened((e) async {
return database.withCurrentExecutor((e) async {
try {
return await e.runSelect(ctx.sql, ctx.boundVariables);
} catch (e, s) {

View File

@ -32,7 +32,7 @@ class UpdateStatement<T extends Table, D> extends Query<T, D>
Future<int> _performQuery() async {
final ctx = constructQuery();
final rows = await ctx.executor!.doWhenOpened((e) async {
final rows = await database.withCurrentExecutor((e) async {
return await e.runUpdate(ctx.sql, ctx.boundVariables);
});
@ -84,7 +84,7 @@ class UpdateStatement<T extends Table, D> extends Query<T, D>
await write(entity, dontExecute: true);
final ctx = constructQuery();
final rows = await ctx.executor!.doWhenOpened((e) {
final rows = await database.withCurrentExecutor((e) {
return e.runSelect(ctx.sql, ctx.boundVariables);
});

View File

@ -255,4 +255,71 @@ void main() {
.having((e) => e.exception, 'exception', rollbackException)),
);
});
group('statements run in correct zone', () {
// Statements used to run in the executor of the zone that created them, but
// they should actually run in the zone that calls the async method starting
// the database operation (https://github.com/simolus3/drift/issues/2873).
test('select', () async {
when(executor.runSelect(any, any))
.thenAnswer((_) => Future.error('should run select in transaction'));
final simpleQuery = db.users.select();
final joinedQuery = db.selectOnly(db.users)..addColumns([db.users.id]);
final customQuery = db.customSelect('SELECT 1');
await db.transaction(() async {
expect(await simpleQuery.get(), isEmpty);
expect(await joinedQuery.get(), isEmpty);
expect(await customQuery.get(), isEmpty);
});
});
test('update', () async {
when(executor.runUpdate(any, any))
.thenAnswer((_) => Future.error('should run update in transaction'));
final stmt = db.update(db.users);
await db.transaction(() async {
await stmt.write(UsersCompanion(isAwesome: Value(true)));
});
verify(executor.transactions
.runUpdate('UPDATE "users" SET "is_awesome" = ?;', [1]));
});
test('delete', () async {
when(executor.runDelete(any, any))
.thenAnswer((_) => Future.error('should run delete in transaction'));
final stmt = db.delete(db.users);
await db.transaction(() async {
await stmt.go();
});
verify(executor.transactions.runDelete('DELETE FROM "users";', []));
});
test('insert', () async {
when(executor.runSelect(any, any))
.thenAnswer((_) => Future.error('should run select in transaction'));
when(executor.runInsert(any, any))
.thenAnswer((_) => Future.error('should run delete in transaction'));
final stmt = db.into(db.categories);
await db.transaction(() async {
await stmt.insert(CategoriesCompanion.insert(description: 'test'));
await stmt.insertReturningOrNull(
CategoriesCompanion.insert(description: 'test2'));
});
verify(executor.transactions
.runInsert('INSERT INTO "categories" ("desc") VALUES (?)', ['test']));
verify(executor.transactions.runSelect(
'INSERT INTO "categories" ("desc") VALUES (?) RETURNING *',
['test2']));
});
});
}