Auto-updating streams for queries

This commit is contained in:
Simon Binder 2019-02-14 15:50:54 +01:00
parent 5909b0d3a2
commit 624d0980e0
9 changed files with 114 additions and 7 deletions

View File

@ -103,7 +103,6 @@ create an issue.
- Custom primary keys
- Stabilize all end-user APIs and document them extensively
- Support default values and expressions, auto-increment
- Auto-updating streams for select statements
##### Definitely planned for the future
- Allow using DAOs instead of having to put everything in the main database
class.

View File

@ -1,5 +1,6 @@
import 'package:meta/meta.dart';
import 'package:sally/sally.dart';
import 'package:sally/src/runtime/executor/stream_queries.dart';
import 'package:sally/src/runtime/executor/type_system.dart';
import 'package:sally/src/runtime/migration.dart';
import 'package:sally/src/runtime/statements/delete.dart';
@ -10,9 +11,9 @@ import 'package:sally/src/runtime/statements/update.dart';
abstract class GeneratedDatabase {
final SqlTypeSystem typeSystem;
final QueryExecutor executor;
final StreamQueryStore streamQueries = StreamQueryStore();
int get schemaVersion;
MigrationStrategy get migration;
List<TableInfo> get allTables;
@ -24,6 +25,10 @@ abstract class GeneratedDatabase {
/// before that executor is ready.
Migrator _createMigrator(SqlExecutor executor) => Migrator(this, executor);
void markTableUpdated(String tableName) {
streamQueries.handleTableUpdates(tableName);
}
Future<void> handleDatabaseCreation({@required SqlExecutor executor}) {
final migrator = _createMigrator(executor);
return migration.onCreate(migrator);

View File

@ -0,0 +1,71 @@
import 'dart:async';
import 'package:sally/sally.dart';
class StreamQueryStore {
final List<_QueryStream> _activeStreams = [];
Stream<List<T>> registerStream<T>(SelectStatement<dynamic, T> statement) {
final stream = _QueryStream(statement, this);
_activeStreams.add(stream);
return stream.stream;
}
Future<void> handleTableUpdates(String table) async {
final affectedStreams = _activeStreams.where((stream) => stream.isAffectedByTableChange(table));
for (var stream in affectedStreams) {
await stream.fetchAndEmitData();
}
}
void _markAsClosed(_QueryStream stream) {
_activeStreams.remove(stream);
}
}
class _QueryStream<T, D> {
final SelectStatement<T, D> query;
final StreamQueryStore _store;
StreamController<List<D>> _controller;
Stream<List<D>> get stream {
_controller ??= StreamController.broadcast(
onListen: _onListen,
onCancel: _onCancel,
);
return _controller.stream;
}
_QueryStream(this.query, this._store);
void _onListen() {
// first listener added, fetch query
fetchAndEmitData();
}
void _onCancel() {
// last listener gone, dispose
_controller.close();
// todo this removes the stream from the list so that it can be garbage
// collected. When a stream is never listened to, we have a memory leak as
// this will never be called. Maybe an Expando would help here?
_store._markAsClosed(this);
}
Future<void> fetchAndEmitData() async {
if (!_controller.hasListener) return;
final data = await query.get();
if (!_controller.isClosed) {
_controller.add(data);
}
}
bool isAffectedByTableChange(String table) {
return table == query.table.$tableName;
}
}

View File

@ -16,6 +16,12 @@ class DeleteStatement<UserTable> extends Query<UserTable, dynamic> {
Future<int> go() async {
final ctx = constructQuery();
return await ctx.database.executor.runDelete(ctx.sql, ctx.boundVariables);
final rows = await ctx.database.executor.runDelete(ctx.sql, ctx.boundVariables);
if (rows > 0) {
database.markTableUpdated(table.$tableName);
}
return rows;
}
}

View File

@ -37,7 +37,8 @@ class InsertStatement<DataClass> {
ctx.buffer.write(')');
return database.executor.runInsert(ctx.sql, ctx.boundVariables);
await database.executor.runInsert(ctx.sql, ctx.boundVariables);
database.markTableUpdated(table.$tableName);
}
// TODO insert multiple values

View File

@ -13,7 +13,6 @@ import 'package:sally/src/runtime/structure/table_info.dart';
abstract class Query<Table, DataClass> {
@protected
GeneratedDatabase database;
@protected
TableInfo<Table, DataClass> table;
Query(this.database, this.table);

View File

@ -20,4 +20,10 @@ class SelectStatement<T, D> extends Query<T, D> {
await ctx.database.executor.runSelect(ctx.sql, ctx.boundVariables);
return results.map(table.map).toList();
}
/// Creates an auto-updating stream that emits new items whenever this table
/// changes.
Stream<List<D>> watch() {
return database.streamQueries.registerStream(this);
}
}

View File

@ -36,11 +36,17 @@ class UpdateStatement<T, D> extends Query<T, D> {
/// that match the set [where] and [limit] constraints. Warning: That also
/// means that, when you're not setting a where or limit expression
/// explicitly, this method will update all rows in the specific table.
Future<int> write(D entity) {
Future<int> write(D entity) async {
_updateReference = entity;
table.validateIntegrity(_updateReference, false);
final ctx = constructQuery();
return ctx.database.executor.runUpdate(ctx.sql, ctx.boundVariables);
final rows = await ctx.database.executor.runUpdate(ctx.sql, ctx.boundVariables);
if (rows > 0) {
database.markTableUpdated(table.$tableName);
}
return rows;
}
}

View File

@ -15,6 +15,9 @@ void main() {
db = TestDatabase(executor);
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
when(executor.runUpdate(any, any)).thenAnswer((_) => Future.value(0));
when(executor.runDelete(any, any)).thenAnswer((_) => Future.value(0));
when(executor.runInsert(any, any)).thenAnswer((_) => Future.value(0));
});
group('Generates SELECT statements', () {
@ -54,6 +57,17 @@ void main() {
});
});
group('Streams for queries', () {
test('update correctly', () {
final stream = db.select(db.users).watch();
stream.listen((_) => null);
db.markTableUpdated('users');
verify(executor.runSelect('SELECT * FROM users;', argThat(isEmpty))).called(2);
});
});
group('Generates DELETE statements', () {
test('without any constraints', () {
db.delete(db.users).go();