diff --git a/sally/README.md b/sally/README.md index 1b1e166b..08324622 100644 --- a/sally/README.md +++ b/sally/README.md @@ -103,7 +103,6 @@ create an issue. - Custom primary keys - Stabilize all end-user APIs and document them extensively - Support default values and expressions, auto-increment -- Auto-updating streams for select statements ##### Definitely planned for the future - Allow using DAOs instead of having to put everything in the main database class. diff --git a/sally/lib/src/runtime/executor/executor.dart b/sally/lib/src/runtime/executor/executor.dart index c707b85d..5a56fb4d 100644 --- a/sally/lib/src/runtime/executor/executor.dart +++ b/sally/lib/src/runtime/executor/executor.dart @@ -1,5 +1,6 @@ import 'package:meta/meta.dart'; import 'package:sally/sally.dart'; +import 'package:sally/src/runtime/executor/stream_queries.dart'; import 'package:sally/src/runtime/executor/type_system.dart'; import 'package:sally/src/runtime/migration.dart'; import 'package:sally/src/runtime/statements/delete.dart'; @@ -10,9 +11,9 @@ import 'package:sally/src/runtime/statements/update.dart'; abstract class GeneratedDatabase { final SqlTypeSystem typeSystem; final QueryExecutor executor; + final StreamQueryStore streamQueries = StreamQueryStore(); int get schemaVersion; - MigrationStrategy get migration; List get allTables; @@ -24,6 +25,10 @@ abstract class GeneratedDatabase { /// before that executor is ready. Migrator _createMigrator(SqlExecutor executor) => Migrator(this, executor); + void markTableUpdated(String tableName) { + streamQueries.handleTableUpdates(tableName); + } + Future handleDatabaseCreation({@required SqlExecutor executor}) { final migrator = _createMigrator(executor); return migration.onCreate(migrator); diff --git a/sally/lib/src/runtime/executor/stream_queries.dart b/sally/lib/src/runtime/executor/stream_queries.dart new file mode 100644 index 00000000..53dbafb9 --- /dev/null +++ b/sally/lib/src/runtime/executor/stream_queries.dart @@ -0,0 +1,71 @@ +import 'dart:async'; + +import 'package:sally/sally.dart'; + +class StreamQueryStore { + final List<_QueryStream> _activeStreams = []; + + Stream> registerStream(SelectStatement statement) { + final stream = _QueryStream(statement, this); + _activeStreams.add(stream); + return stream.stream; + } + + Future handleTableUpdates(String table) async { + final affectedStreams = _activeStreams.where((stream) => stream.isAffectedByTableChange(table)); + + for (var stream in affectedStreams) { + await stream.fetchAndEmitData(); + } + } + + void _markAsClosed(_QueryStream stream) { + _activeStreams.remove(stream); + } +} + +class _QueryStream { + final SelectStatement query; + final StreamQueryStore _store; + + StreamController> _controller; + + Stream> get stream { + _controller ??= StreamController.broadcast( + onListen: _onListen, + onCancel: _onCancel, + ); + + return _controller.stream; + } + + _QueryStream(this.query, this._store); + + void _onListen() { + // first listener added, fetch query + fetchAndEmitData(); + } + + void _onCancel() { + // last listener gone, dispose + _controller.close(); + // todo this removes the stream from the list so that it can be garbage + // collected. When a stream is never listened to, we have a memory leak as + // this will never be called. Maybe an Expando would help here? + _store._markAsClosed(this); + } + + Future fetchAndEmitData() async { + if (!_controller.hasListener) return; + + final data = await query.get(); + + if (!_controller.isClosed) { + _controller.add(data); + } + } + + bool isAffectedByTableChange(String table) { + return table == query.table.$tableName; + } +} diff --git a/sally/lib/src/runtime/statements/delete.dart b/sally/lib/src/runtime/statements/delete.dart index 66bf5b86..a554f74e 100644 --- a/sally/lib/src/runtime/statements/delete.dart +++ b/sally/lib/src/runtime/statements/delete.dart @@ -16,6 +16,12 @@ class DeleteStatement extends Query { Future go() async { final ctx = constructQuery(); - return await ctx.database.executor.runDelete(ctx.sql, ctx.boundVariables); + final rows = await ctx.database.executor.runDelete(ctx.sql, ctx.boundVariables); + + if (rows > 0) { + database.markTableUpdated(table.$tableName); + } + + return rows; } } diff --git a/sally/lib/src/runtime/statements/insert.dart b/sally/lib/src/runtime/statements/insert.dart index b548f0ce..a2894e91 100644 --- a/sally/lib/src/runtime/statements/insert.dart +++ b/sally/lib/src/runtime/statements/insert.dart @@ -37,7 +37,8 @@ class InsertStatement { ctx.buffer.write(')'); - return database.executor.runInsert(ctx.sql, ctx.boundVariables); + await database.executor.runInsert(ctx.sql, ctx.boundVariables); + database.markTableUpdated(table.$tableName); } // TODO insert multiple values diff --git a/sally/lib/src/runtime/statements/query.dart b/sally/lib/src/runtime/statements/query.dart index cb694a27..48dfaadc 100644 --- a/sally/lib/src/runtime/statements/query.dart +++ b/sally/lib/src/runtime/statements/query.dart @@ -13,7 +13,6 @@ import 'package:sally/src/runtime/structure/table_info.dart'; abstract class Query { @protected GeneratedDatabase database; - @protected TableInfo table; Query(this.database, this.table); diff --git a/sally/lib/src/runtime/statements/select.dart b/sally/lib/src/runtime/statements/select.dart index 4674a863..5322537b 100644 --- a/sally/lib/src/runtime/statements/select.dart +++ b/sally/lib/src/runtime/statements/select.dart @@ -20,4 +20,10 @@ class SelectStatement extends Query { await ctx.database.executor.runSelect(ctx.sql, ctx.boundVariables); return results.map(table.map).toList(); } + + /// Creates an auto-updating stream that emits new items whenever this table + /// changes. + Stream> watch() { + return database.streamQueries.registerStream(this); + } } diff --git a/sally/lib/src/runtime/statements/update.dart b/sally/lib/src/runtime/statements/update.dart index b83d84cc..ba5edfa2 100644 --- a/sally/lib/src/runtime/statements/update.dart +++ b/sally/lib/src/runtime/statements/update.dart @@ -36,11 +36,17 @@ class UpdateStatement extends Query { /// that match the set [where] and [limit] constraints. Warning: That also /// means that, when you're not setting a where or limit expression /// explicitly, this method will update all rows in the specific table. - Future write(D entity) { + Future write(D entity) async { _updateReference = entity; table.validateIntegrity(_updateReference, false); final ctx = constructQuery(); - return ctx.database.executor.runUpdate(ctx.sql, ctx.boundVariables); + final rows = await ctx.database.executor.runUpdate(ctx.sql, ctx.boundVariables); + + if (rows > 0) { + database.markTableUpdated(table.$tableName); + } + + return rows; } } diff --git a/sally/test/queries_test.dart b/sally/test/queries_test.dart index 89efa98e..382975dc 100644 --- a/sally/test/queries_test.dart +++ b/sally/test/queries_test.dart @@ -15,6 +15,9 @@ void main() { db = TestDatabase(executor); when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([])); + when(executor.runUpdate(any, any)).thenAnswer((_) => Future.value(0)); + when(executor.runDelete(any, any)).thenAnswer((_) => Future.value(0)); + when(executor.runInsert(any, any)).thenAnswer((_) => Future.value(0)); }); group('Generates SELECT statements', () { @@ -54,6 +57,17 @@ void main() { }); }); + group('Streams for queries', () { + test('update correctly', () { + final stream = db.select(db.users).watch(); + stream.listen((_) => null); + + db.markTableUpdated('users'); + + verify(executor.runSelect('SELECT * FROM users;', argThat(isEmpty))).called(2); + }); + }); + group('Generates DELETE statements', () { test('without any constraints', () { db.delete(db.users).go();