mirror of https://github.com/AMT-Cheif/drift.git
Implement custom queries
This commit is contained in:
parent
44297c8deb
commit
a85481b21a
|
@ -57,7 +57,7 @@ abstract class GeneratedDatabase {
|
|||
|
||||
/// Creates and auto-updating stream from the given select statement. This
|
||||
/// method should not be used directly.
|
||||
Stream<List<T>> createStream<T>(SelectStatement<dynamic, T> stmt) =>
|
||||
Stream<List<T>> createStream<T>(TableChangeListener<List<T>> stmt) =>
|
||||
streamQueries.registerStream(stmt);
|
||||
|
||||
/// Handles database creation by delegating the work to the [migration]
|
||||
|
@ -112,12 +112,13 @@ abstract class GeneratedDatabase {
|
|||
/// You can use the [updates] parameter so that sally knows which tables are
|
||||
/// affected by your query. All select streams that depend on a table
|
||||
/// specified there will then issue another query.
|
||||
Future<int> updateCustom(String query,
|
||||
Future<int> customUpdate(String query,
|
||||
{List<Variable> variables = const [], Set<TableInfo> updates}) async {
|
||||
final ctx = GenerationContext(this);
|
||||
final mappedArgs = variables.map((v) => v.mapToSimpleValue(ctx)).toList();
|
||||
|
||||
final affectedRows = await executor.runUpdate(query, mappedArgs);
|
||||
final affectedRows =
|
||||
executor.doWhenOpened((_) => executor.runUpdate(query, mappedArgs));
|
||||
|
||||
if (updates != null) {
|
||||
for (var table in updates) {
|
||||
|
@ -127,6 +128,25 @@ abstract class GeneratedDatabase {
|
|||
|
||||
return affectedRows;
|
||||
}
|
||||
|
||||
/// Executes a custom select statement once. To use the variables, mark them
|
||||
/// with a "?" in your [query]. They will then be changed to the appropriate
|
||||
/// value.
|
||||
Future<List<QueryRow>> customSelect(String query,
|
||||
{List<Variable> variables = const []}) async {
|
||||
return CustomSelectStatement(query, variables, Set(), this).read();
|
||||
}
|
||||
|
||||
/// Creates a stream from a custom select statement.To use the variables, mark
|
||||
/// them with a "?" in your [query]. They will then be changed to the
|
||||
/// appropriate value. The stream will re-emit items when any table in
|
||||
/// [readsFrom] changes, so be sure to set it to the set of tables your query
|
||||
/// reads data from.
|
||||
Stream<List<QueryRow>> customSelectStream(String query,
|
||||
{List<Variable> variables = const [], Set<TableInfo> readsFrom}) {
|
||||
final tables = readsFrom ?? Set();
|
||||
return createStream(CustomSelectStatement(query, variables, tables, this));
|
||||
}
|
||||
}
|
||||
|
||||
/// A query executor is responsible for executing statements on a database and
|
||||
|
|
|
@ -2,6 +2,16 @@ import 'dart:async';
|
|||
|
||||
import 'package:sally/sally.dart';
|
||||
|
||||
/// Internal interface to mark classes that respond to table changes
|
||||
abstract class TableChangeListener<T> {
|
||||
/// Called to check if this listener should update after the table with the
|
||||
/// given name has changed.
|
||||
bool isAffectedBy(String table);
|
||||
|
||||
/// Called to reload data from the table after it has changed.
|
||||
Future<T> handleDataChanged();
|
||||
}
|
||||
|
||||
/// Keeps track of active streams created from [SelectStatement]s and updates
|
||||
/// them when needed.
|
||||
class StreamQueryStore {
|
||||
|
@ -12,7 +22,7 @@ class StreamQueryStore {
|
|||
StreamQueryStore();
|
||||
|
||||
/// Creates a new stream from the select statement.
|
||||
Stream<List<T>> registerStream<T>(SelectStatement<dynamic, T> statement) {
|
||||
Stream<List<T>> registerStream<T>(TableChangeListener<List<T>> statement) {
|
||||
final stream = _QueryStream(statement, this);
|
||||
_activeStreams.add(stream);
|
||||
return stream.stream;
|
||||
|
@ -34,13 +44,13 @@ class StreamQueryStore {
|
|||
}
|
||||
}
|
||||
|
||||
class _QueryStream<T, D> {
|
||||
final SelectStatement<T, D> query;
|
||||
class _QueryStream<T> {
|
||||
final TableChangeListener<T> listener;
|
||||
final StreamQueryStore _store;
|
||||
|
||||
StreamController<List<D>> _controller;
|
||||
StreamController<T> _controller;
|
||||
|
||||
Stream<List<D>> get stream {
|
||||
Stream<T> get stream {
|
||||
_controller ??= StreamController.broadcast(
|
||||
onListen: _onListen,
|
||||
onCancel: _onCancel,
|
||||
|
@ -49,7 +59,7 @@ class _QueryStream<T, D> {
|
|||
return _controller.stream;
|
||||
}
|
||||
|
||||
_QueryStream(this.query, this._store);
|
||||
_QueryStream(this.listener, this._store);
|
||||
|
||||
void _onListen() {
|
||||
// first listener added, fetch query
|
||||
|
@ -70,14 +80,12 @@ class _QueryStream<T, D> {
|
|||
// Fetch data if it's needed, publish that data if it's possible.
|
||||
if (!_controller.hasListener) return;
|
||||
|
||||
final data = await query.get();
|
||||
final data = await listener.handleDataChanged();
|
||||
|
||||
if (!_controller.isClosed) {
|
||||
_controller.add(data);
|
||||
}
|
||||
}
|
||||
|
||||
bool isAffectedByTableChange(String table) {
|
||||
return table == query.table.$tableName;
|
||||
}
|
||||
bool isAffectedByTableChange(String table) => listener.isAffectedBy(table);
|
||||
}
|
||||
|
|
|
@ -4,12 +4,14 @@ import 'package:sally/sally.dart';
|
|||
import 'package:sally/src/runtime/components/component.dart';
|
||||
import 'package:sally/src/runtime/components/limit.dart';
|
||||
import 'package:sally/src/runtime/executor/executor.dart';
|
||||
import 'package:sally/src/runtime/executor/stream_queries.dart';
|
||||
import 'package:sally/src/runtime/statements/query.dart';
|
||||
import 'package:sally/src/runtime/structure/table_info.dart';
|
||||
|
||||
typedef OrderingTerm OrderClauseGenerator<T>(T tbl);
|
||||
|
||||
class SelectStatement<T, D> extends Query<T, D> {
|
||||
class SelectStatement<T, D> extends Query<T, D>
|
||||
implements TableChangeListener<List<D>> {
|
||||
SelectStatement(GeneratedDatabase database, TableInfo<T, D> table)
|
||||
: super(database, table);
|
||||
|
||||
|
@ -47,4 +49,71 @@ class SelectStatement<T, D> extends Query<T, D> {
|
|||
Stream<List<D>> watch() {
|
||||
return database.createStream(this);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<List<D>> handleDataChanged() {
|
||||
return get();
|
||||
}
|
||||
|
||||
@override
|
||||
bool isAffectedBy(String table) {
|
||||
return table == super.table.$tableName;
|
||||
}
|
||||
}
|
||||
|
||||
class CustomSelectStatement implements TableChangeListener<List<QueryRow>> {
|
||||
/// Tables this select statement reads from
|
||||
final Set<TableInfo> tables;
|
||||
final String query;
|
||||
final List<Variable> variables;
|
||||
final GeneratedDatabase db;
|
||||
|
||||
CustomSelectStatement(this.query, this.variables, this.tables, this.db);
|
||||
|
||||
Future<List<QueryRow>> read() => handleDataChanged();
|
||||
|
||||
@override
|
||||
Future<List<QueryRow>> handleDataChanged() async {
|
||||
final ctx = GenerationContext(db);
|
||||
final mappedArgs = variables.map((v) => v.mapToSimpleValue(ctx)).toList();
|
||||
|
||||
final result =
|
||||
await db.executor.doWhenOpened((e) => e.runSelect(query, mappedArgs));
|
||||
|
||||
return result.map((row) => QueryRow(row, db)).toList();
|
||||
}
|
||||
|
||||
@override
|
||||
bool isAffectedBy(String table) {
|
||||
return tables.any((t) => t.$tableName == table);
|
||||
}
|
||||
}
|
||||
|
||||
/// For custom select statement, represents a row in the result set.
|
||||
class QueryRow {
|
||||
final Map<String, dynamic> _data;
|
||||
final GeneratedDatabase _db;
|
||||
|
||||
QueryRow(this._data, this._db);
|
||||
|
||||
/// Reads an arbitrary value from the row and maps it to a fitting dart type.
|
||||
/// The dart type [T] must be supported by the type system of the database
|
||||
/// used (mostly contains booleans, strings, integers and dates).
|
||||
T read<T>(String key) {
|
||||
final type = _db.typeSystem.forDartType<T>();
|
||||
|
||||
return type.mapFromDatabaseResponse(_data[key]);
|
||||
}
|
||||
|
||||
/// Reads a bool from the column named [key].
|
||||
bool readBool(String key) => read<bool>(key);
|
||||
|
||||
/// Reads a string from the column named [key].
|
||||
String readString(String key) => read<String>(key);
|
||||
|
||||
/// Reads a int from the column named [key].
|
||||
int readInt(String key) => read<int>(key);
|
||||
|
||||
/// Reads a [DateTime] from the column named [key].
|
||||
DateTime readDateTime(String key) => read<DateTime>(key);
|
||||
}
|
||||
|
|
|
@ -2,8 +2,11 @@ import 'package:sally/sally.dart';
|
|||
import 'package:sally/src/runtime/components/component.dart';
|
||||
import 'package:test_api/test_api.dart';
|
||||
|
||||
import '../data/tables/todos.dart';
|
||||
|
||||
void main() {
|
||||
final expression = GeneratedIntColumn('col', false);
|
||||
final db = TodoDb(null);
|
||||
|
||||
final comparisons = {
|
||||
expression.isSmallerThan: '<',
|
||||
|
@ -24,7 +27,7 @@ void main() {
|
|||
|
||||
comparisons.forEach((fn, value) {
|
||||
test('for operator $value', () {
|
||||
final ctx = GenerationContext(null);
|
||||
final ctx = GenerationContext(db);
|
||||
|
||||
fn(compare).writeInto(ctx);
|
||||
|
||||
|
@ -36,7 +39,7 @@ void main() {
|
|||
group('can compare with values', () {
|
||||
comparisonsVal.forEach((fn, value) {
|
||||
test('for operator $value', () {
|
||||
final ctx = GenerationContext(null);
|
||||
final ctx = GenerationContext(db);
|
||||
|
||||
fn(12).writeInto(ctx);
|
||||
|
||||
|
|
|
@ -65,13 +65,13 @@ void main() {
|
|||
|
||||
group('custom updates', () {
|
||||
test('execute the correct sql', () async {
|
||||
await db.updateCustom('DELETE FROM users');
|
||||
await db.customUpdate('DELETE FROM users');
|
||||
|
||||
verify(executor.runUpdate('DELETE FROM users', []));
|
||||
});
|
||||
|
||||
test('map the variables correctly', () async {
|
||||
await db.updateCustom(
|
||||
await db.customUpdate(
|
||||
'DELETE FROM users WHERE name = ? AND birthdate < ?',
|
||||
variables: [
|
||||
Variable.withString('Name'),
|
||||
|
@ -87,11 +87,11 @@ void main() {
|
|||
test('returns information from executor', () async {
|
||||
when(executor.runUpdate(any, any)).thenAnswer((_) => Future.value(10));
|
||||
|
||||
expect(await db.updateCustom(''), 10);
|
||||
expect(await db.customUpdate(''), 10);
|
||||
});
|
||||
|
||||
test('informs about updated tables', () async {
|
||||
await db.updateCustom('', updates: Set.of([db.users, db.todosTable]));
|
||||
await db.customUpdate('', updates: Set.of([db.users, db.todosTable]));
|
||||
|
||||
verify(streamQueries.handleTableUpdates('users'));
|
||||
verify(streamQueries.handleTableUpdates('todos'));
|
||||
|
|
|
@ -189,6 +189,8 @@ If a column is nullable or has a default value (this includes auto-increments),
|
|||
can be omitted. All other fields must be set and non-null. The `insert` method will throw
|
||||
otherwise.
|
||||
|
||||
'
|
||||
|
||||
## Migrations
|
||||
Sally provides a migration API that can be used to gradually apply schema changes after bumping
|
||||
the `schemaVersion` getter inside the `Database` class. To use it, override the `migration`
|
||||
|
@ -226,6 +228,8 @@ You can also add individual tables or drop them.
|
|||
|
||||
## TODO-List and current limitations
|
||||
### Limitations (at the moment)
|
||||
Please note that a workaround for most on this list exists with custom statements.
|
||||
|
||||
- No joins
|
||||
- No `group by` or window functions
|
||||
- Custom primary key support is very limited
|
||||
|
|
|
@ -23,6 +23,13 @@ class Categories extends Table {
|
|||
TextColumn get description => text().named('desc')();
|
||||
}
|
||||
|
||||
class CategoryWithCount {
|
||||
final Category category;
|
||||
final int count; // amount of entries in this category
|
||||
|
||||
CategoryWithCount(this.category, this.count);
|
||||
}
|
||||
|
||||
@UseSally(tables: [Todos, Categories])
|
||||
class Database extends _$Database {
|
||||
Database()
|
||||
|
|
Loading…
Reference in New Issue