mirror of https://github.com/AMT-Cheif/drift.git
Implement single() version for get() and watch()
This commit is contained in:
parent
110d775a90
commit
067a33adec
|
@ -142,8 +142,7 @@ mixin QueryEngine on DatabaseConnectionUser {
|
|||
/// value.
|
||||
Future<List<QueryRow>> customSelect(String query,
|
||||
{List<Variable> variables = const []}) async {
|
||||
return CustomSelectStatement(query, variables, <TableInfo>{}, this)
|
||||
.execute();
|
||||
return CustomSelectStatement(query, variables, <TableInfo>{}, this).get();
|
||||
}
|
||||
|
||||
/// Creates a stream from a custom select statement.To use the variables, mark
|
||||
|
@ -155,7 +154,7 @@ mixin QueryEngine on DatabaseConnectionUser {
|
|||
{List<Variable> variables = const [], Set<TableInfo> readsFrom}) {
|
||||
final tables = readsFrom ?? <TableInfo>{};
|
||||
final statement = CustomSelectStatement(query, variables, tables, this);
|
||||
return createStream(statement.constructFetcher());
|
||||
return statement.watch();
|
||||
}
|
||||
|
||||
/// Executes [action] in a transaction, which means that all its queries and
|
||||
|
|
|
@ -10,6 +10,7 @@ import 'package:moor/src/runtime/expressions/custom.dart';
|
|||
import 'package:moor/src/runtime/expressions/expression.dart';
|
||||
import 'package:moor/src/types/sql_types.dart';
|
||||
import 'package:moor/src/runtime/structure/table_info.dart';
|
||||
import 'package:moor/src/utils/single_transformer.dart';
|
||||
|
||||
/// Statement that operates with data that already exists (select, delete,
|
||||
/// update).
|
||||
|
@ -68,6 +69,47 @@ abstract class Query<T extends Table, DataClass> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Abstract class for queries which can return one-time values or a stream
|
||||
/// of values.
|
||||
abstract class Selectable<T> {
|
||||
/// Executes this statement and returns the result.
|
||||
Future<List<T>> get();
|
||||
|
||||
/// Creates an auto-updating stream of the result that emits new items
|
||||
/// whenever any table used in this statement changes.
|
||||
Stream<List<T>> watch();
|
||||
|
||||
/// Executes this statement, like [get], but only returns one value. If the
|
||||
/// result has no or too many values, this method will throw.
|
||||
///
|
||||
/// Be aware that this operation won't put a limit clause on this statement,
|
||||
/// if that's needed you would have to do that yourself.
|
||||
Future<T> getSingle() async {
|
||||
final list = await get();
|
||||
final iterator = list.iterator;
|
||||
|
||||
if (!iterator.moveNext()) {
|
||||
throw StateError('Expected exactly one result, but actually there were '
|
||||
'none!');
|
||||
}
|
||||
final element = iterator.current;
|
||||
if (iterator.moveNext()) {
|
||||
throw StateError('Expected exactly one result, but found more than one!');
|
||||
}
|
||||
|
||||
return element;
|
||||
}
|
||||
|
||||
/// Creates an auto-updating stream of this statement, similar to [watch].
|
||||
/// However, it is assumed that the query will only emit one result, so
|
||||
/// instead of returning a [Stream<List<T>>], this returns a [Stream<T>]. If
|
||||
/// the query emits more than one row at some point, an error will be emitted
|
||||
/// to the stream instead.
|
||||
Stream<T> watchSingle() {
|
||||
return watch().transform(singleElements());
|
||||
}
|
||||
}
|
||||
|
||||
mixin SingleTableQueryMixin<T extends Table, DataClass> on Query<T, DataClass> {
|
||||
void where(Expression<bool, BoolType> filter(T tbl)) {
|
||||
final predicate = filter(table.asDslTable);
|
||||
|
|
|
@ -14,7 +14,8 @@ import 'package:moor/src/runtime/structure/table_info.dart';
|
|||
typedef OrderingTerm OrderClauseGenerator<T>(T tbl);
|
||||
|
||||
class JoinedSelectStatement<FirstT extends Table, FirstD>
|
||||
extends Query<FirstT, FirstD> with LimitContainerMixin {
|
||||
extends Query<FirstT, FirstD>
|
||||
with LimitContainerMixin, Selectable<TypedResult> {
|
||||
JoinedSelectStatement(
|
||||
QueryEngine database, TableInfo<FirstT, FirstD> table, this._joins)
|
||||
: super(database, table);
|
||||
|
@ -97,8 +98,7 @@ class JoinedSelectStatement<FirstT extends Table, FirstD>
|
|||
orderByExpr = OrderBy(terms);
|
||||
}
|
||||
|
||||
/// Creates an auto-updating stream of the result that emits new items
|
||||
/// whenever any table of this statement changes.
|
||||
@override
|
||||
Stream<List<TypedResult>> watch() {
|
||||
final ctx = constructQuery();
|
||||
final fetcher = QueryStreamFetcher<List<TypedResult>>(
|
||||
|
@ -110,7 +110,7 @@ class JoinedSelectStatement<FirstT extends Table, FirstD>
|
|||
return database.createStream(fetcher);
|
||||
}
|
||||
|
||||
/// Executes this statement and returns the result.
|
||||
@override
|
||||
Future<List<TypedResult>> get() async {
|
||||
final ctx = constructQuery();
|
||||
return _getWithQuery(ctx);
|
||||
|
@ -143,7 +143,7 @@ class JoinedSelectStatement<FirstT extends Table, FirstD>
|
|||
|
||||
/// A select statement that doesn't use joins
|
||||
class SimpleSelectStatement<T extends Table, D> extends Query<T, D>
|
||||
with SingleTableQueryMixin<T, D>, LimitContainerMixin<T, D> {
|
||||
with SingleTableQueryMixin<T, D>, LimitContainerMixin<T, D>, Selectable<D> {
|
||||
SimpleSelectStatement(QueryEngine database, TableInfo<T, D> table)
|
||||
: super(database, table);
|
||||
|
||||
|
@ -155,7 +155,7 @@ class SimpleSelectStatement<T extends Table, D> extends Query<T, D>
|
|||
ctx.buffer.write('SELECT * FROM ${table.tableWithAlias}');
|
||||
}
|
||||
|
||||
/// Loads and returns all results from this select query.
|
||||
@override
|
||||
Future<List<D>> get() async {
|
||||
final ctx = constructQuery();
|
||||
return _getWithQuery(ctx);
|
||||
|
@ -212,8 +212,7 @@ class SimpleSelectStatement<T extends Table, D> extends Query<T, D>
|
|||
orderByExpr = OrderBy(clauses.map((t) => t(table.asDslTable)).toList());
|
||||
}
|
||||
|
||||
/// Creates an auto-updating stream that emits new items whenever this table
|
||||
/// changes.
|
||||
@override
|
||||
Stream<List<D>> watch() {
|
||||
final query = constructQuery();
|
||||
final fetcher = QueryStreamFetcher<List<D>>(
|
||||
|
@ -228,7 +227,7 @@ class SimpleSelectStatement<T extends Table, D> extends Query<T, D>
|
|||
|
||||
/// A select statement that is constructed with a raw sql prepared statement
|
||||
/// instead of the high-level moor api.
|
||||
class CustomSelectStatement {
|
||||
class CustomSelectStatement with Selectable<QueryRow> {
|
||||
/// Tables this select statement reads from. When turning this select query
|
||||
/// into an auto-updating stream, that stream will emit new items whenever
|
||||
/// any of these tables changes.
|
||||
|
@ -242,12 +241,21 @@ class CustomSelectStatement {
|
|||
final List<Variable> variables;
|
||||
final QueryEngine _db;
|
||||
|
||||
/// Constructs a new
|
||||
/// Constructs a new custom select statement for the query, the variables,
|
||||
/// the affected tables and the database.
|
||||
CustomSelectStatement(this.query, this.variables, this.tables, this._db);
|
||||
|
||||
/// Constructs a fetcher for this query. The fetcher is responsible for
|
||||
/// updating a stream at the right moment.
|
||||
@Deprecated(
|
||||
'There is no need to use this method. Please use watch() directly')
|
||||
QueryStreamFetcher<List<QueryRow>> constructFetcher() {
|
||||
return _constructFetcher();
|
||||
}
|
||||
|
||||
/// Constructs a fetcher for this query. The fetcher is responsible for
|
||||
/// updating a stream at the right moment.
|
||||
QueryStreamFetcher<List<QueryRow>> _constructFetcher() {
|
||||
final args = _mapArgs();
|
||||
|
||||
return QueryStreamFetcher<List<QueryRow>>(
|
||||
|
@ -257,11 +265,22 @@ class CustomSelectStatement {
|
|||
);
|
||||
}
|
||||
|
||||
/// Executes this query and returns the result.
|
||||
Future<List<QueryRow>> execute() async {
|
||||
@override
|
||||
Future<List<QueryRow>> get() async {
|
||||
return _executeWithMappedArgs(_mapArgs());
|
||||
}
|
||||
|
||||
@override
|
||||
Stream<List<QueryRow>> watch() {
|
||||
return _db.createStream(_constructFetcher());
|
||||
}
|
||||
|
||||
/// Executes this query and returns the result.
|
||||
@Deprecated('Use get() instead')
|
||||
Future<List<QueryRow>> execute() async {
|
||||
return get();
|
||||
}
|
||||
|
||||
List<dynamic> _mapArgs() {
|
||||
final ctx = GenerationContext.fromDb(_db);
|
||||
return variables.map((v) => v.mapToSimpleValue(ctx)).toList();
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
import 'dart:async';
|
||||
|
||||
/// Transforms a stream of lists into a stream of single elements, assuming
|
||||
/// that each list is a singleton.
|
||||
StreamTransformer<List<T>, T> singleElements<T>() {
|
||||
return StreamTransformer.fromHandlers(handleData: (data, sink) {
|
||||
try {
|
||||
sink.add(data.single);
|
||||
} catch (e) {
|
||||
sink.addError(
|
||||
StateError('Expected exactly one element, but got ${data.length}'));
|
||||
}
|
||||
});
|
||||
}
|
|
@ -6,6 +6,20 @@ import 'package:test_api/test_api.dart';
|
|||
import 'data/tables/todos.dart';
|
||||
import 'data/utils/mocks.dart';
|
||||
|
||||
final _dataOfTodoEntry = {
|
||||
'id': 10,
|
||||
'title': 'A todo title',
|
||||
'content': 'Content',
|
||||
'category': 3
|
||||
};
|
||||
|
||||
final _todoEntry = TodoEntry(
|
||||
id: 10,
|
||||
title: 'A todo title',
|
||||
content: 'Content',
|
||||
category: 3,
|
||||
);
|
||||
|
||||
void main() {
|
||||
TodoDb db;
|
||||
MockExecutor executor;
|
||||
|
@ -78,20 +92,10 @@ void main() {
|
|||
|
||||
group('SELECT results are parsed', () {
|
||||
test('when all fields are non-null', () {
|
||||
final data = [
|
||||
{'id': 10, 'title': 'A todo title', 'content': 'Content', 'category': 3}
|
||||
];
|
||||
final resolved = TodoEntry(
|
||||
id: 10,
|
||||
title: 'A todo title',
|
||||
content: 'Content',
|
||||
category: 3,
|
||||
);
|
||||
|
||||
when(executor.runSelect('SELECT * FROM todos;', any))
|
||||
.thenAnswer((_) => Future.value(data));
|
||||
.thenAnswer((_) => Future.value([_dataOfTodoEntry]));
|
||||
|
||||
expect(db.select(db.todosTable).get(), completion([resolved]));
|
||||
expect(db.select(db.todosTable).get(), completion([_todoEntry]));
|
||||
});
|
||||
|
||||
test('when some fields are null', () {
|
||||
|
@ -116,4 +120,35 @@ void main() {
|
|||
expect(db.select(db.todosTable).get(), completion([resolved]));
|
||||
});
|
||||
});
|
||||
|
||||
group('queries for a single row', () {
|
||||
test('get once', () {
|
||||
when(executor.runSelect('SELECT * FROM todos;', any))
|
||||
.thenAnswer((_) => Future.value([_dataOfTodoEntry]));
|
||||
|
||||
expect(db.select(db.todosTable).getSingle(), completion(_todoEntry));
|
||||
});
|
||||
|
||||
test('get multiple times', () {
|
||||
final resultRows = <List<Map<String, dynamic>>>[
|
||||
[_dataOfTodoEntry],
|
||||
[],
|
||||
[_dataOfTodoEntry, _dataOfTodoEntry],
|
||||
];
|
||||
var _currentRow = 0;
|
||||
|
||||
when(executor.runSelect('SELECT * FROM todos;', any)).thenAnswer((_) {
|
||||
return Future.value(resultRows[_currentRow++]);
|
||||
});
|
||||
|
||||
expectLater(
|
||||
db.select(db.todosTable).watchSingle(),
|
||||
emitsInOrder(
|
||||
[_todoEntry, emitsError(anything), emitsError(anything)]));
|
||||
|
||||
db
|
||||
..markTablesUpdated({db.todosTable})
|
||||
..markTablesUpdated({db.todosTable});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
import 'dart:async';
|
||||
|
||||
import 'package:moor/src/utils/single_transformer.dart';
|
||||
import 'package:test_api/test_api.dart';
|
||||
|
||||
void main() {
|
||||
test('transforms simple values', () {
|
||||
final controller = StreamController<List<int>>();
|
||||
final stream = controller.stream.transform(singleElements());
|
||||
|
||||
expectLater(stream, emitsInOrder([1, 2, 3, 4]));
|
||||
|
||||
controller..add([1])..add([2])..add([3])..add([4]);
|
||||
});
|
||||
|
||||
test('emits errors for invalid lists', () {
|
||||
final controller = StreamController<List<int>>();
|
||||
final stream = controller.stream.transform(singleElements());
|
||||
|
||||
expectLater(stream,
|
||||
emitsInOrder([1, emitsError(anything), 2, emitsError(anything)]));
|
||||
|
||||
controller..add([1])..add([2, 3])..add([2])..add([]);
|
||||
});
|
||||
}
|
Loading…
Reference in New Issue