Fix unnecessary re-query when subscriptions switch (#329)

This commit is contained in:
Simon Binder 2020-01-09 16:14:20 +01:00
parent 5c82b2f2a7
commit 1b60879a09
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
6 changed files with 112 additions and 56 deletions

View File

@ -14,6 +14,7 @@
- Support for moor-file queries that run on initialization ([#280](https://github.com/simolus3/moor/issues/280)) - Support for moor-file queries that run on initialization ([#280](https://github.com/simolus3/moor/issues/280))
Declare them like this `@create: INSERT INTO users VALUES ('default', 'user')` Declare them like this `@create: INSERT INTO users VALUES ('default', 'user')`
- Support deletes in batches ([#325](https://github.com/simolus3/moor/issues/325)) - Support deletes in batches ([#325](https://github.com/simolus3/moor/issues/325))
- Reduce unnecessary queries when a stream is unsubscribed and then re-subscribed ([#329](https://github.com/simolus3/moor/issues/329))
## 2.2.0 ## 2.2.0

View File

@ -132,6 +132,7 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
/// Closes this database and releases associated resources. /// Closes this database and releases associated resources.
Future<void> close() async { Future<void> close() async {
await executor.close(); await executor.close();
await streamQueries.close();
if (_openedDbCount[runtimeType] != null) { if (_openedDbCount[runtimeType] != null) {
_openedDbCount[runtimeType]--; _openedDbCount[runtimeType]--;

View File

@ -67,6 +67,11 @@ class StreamQueryStore {
final Map<StreamKey, QueryStream> _activeKeyStreams = {}; final Map<StreamKey, QueryStream> _activeKeyStreams = {};
final HashSet<StreamKey> _keysPendingRemoval = HashSet<StreamKey>(); final HashSet<StreamKey> _keysPendingRemoval = HashSet<StreamKey>();
bool _isShuttingDown = false;
// we track pending timers since Flutter throws an exception when timers
// remain after a test run.
final Set<Completer> _pendingTimers = {};
// Why is this stream synchronous? We want to dispatch table updates before // Why is this stream synchronous? We want to dispatch table updates before
// the future from the query completes. This allows streams to invalidate // the future from the query completes. This allows streams to invalidate
// their cached data before the user can send another query. // their cached data before the user can send another query.
@ -112,16 +117,31 @@ class StreamQueryStore {
_updatedTableNames.add(updatedTableNames); _updatedTableNames.add(updatedTableNames);
} }
void markAsClosed(QueryStream stream) { void markAsClosed(QueryStream stream, Function() whenRemoved) {
if (_isShuttingDown) return;
final key = stream._fetcher.key; final key = stream._fetcher.key;
_keysPendingRemoval.add(key); _keysPendingRemoval.add(key);
scheduleMicrotask(() { final completer = Completer<void>();
_pendingTimers.add(completer);
// Hey there! If you're sent here because your Flutter tests fail, please
// call and await Database.close() in your Flutter widget tests!
// Moor uses timers internally so that after you stopped listening to a
// stream, it can keep its cache just a bit longer. When you listen to
// streams a lot, this helps reduce duplicate statements, especially with
// Flutter's StreamBuilder.
Timer.run(() {
completer.complete();
_pendingTimers.remove(completer);
// if no other subscriber was found during this event iteration, remove // if no other subscriber was found during this event iteration, remove
// the stream from the cache. // the stream from the cache.
if (_keysPendingRemoval.contains(key)) { if (_keysPendingRemoval.contains(key)) {
_keysPendingRemoval.remove(key); _keysPendingRemoval.remove(key);
_activeKeyStreams.remove(key); _activeKeyStreams.remove(key);
whenRemoved();
} }
}); });
} }
@ -134,16 +154,27 @@ class StreamQueryStore {
_activeKeyStreams[key] = stream; _activeKeyStreams[key] = stream;
} }
} }
Future<void> close() async {
_isShuttingDown = true;
for (final stream in _activeKeyStreams.values) {
await stream._controller.close();
}
await _updatedTableNames.close();
while (_pendingTimers.isNotEmpty) {
await _pendingTimers.first.future;
}
_activeKeyStreams.clear();
}
} }
class QueryStream<T> { class QueryStream<T> {
final QueryStreamFetcher<T> _fetcher; final QueryStreamFetcher<T> _fetcher;
final StreamQueryStore _store; final StreamQueryStore _store;
// todo this controller is not disposed because it can be listened to at any
// time, so we have to rely on GC to clean this up.
// In a future release, we should implement a dispose method and encourage
// users to call it. See the comment at registerStream and https://github.com/simolus3/moor/issues/75
StreamController<T> _controller; StreamController<T> _controller;
StreamSubscription _tablesChangedSubscription; StreamSubscription _tablesChangedSubscription;
@ -165,41 +196,42 @@ class QueryStream<T> {
T _cachedData() => _lastData; T _cachedData() => _lastData;
void _onListen() { void _onListen() {
// first listener added, fetch query
fetchAndEmitData();
_store.markAsOpened(this); _store.markAsOpened(this);
// fetch new data whenever any table referenced in this stream changes its // fetch new data whenever any table referenced in this stream updates.
// name // It could be that we have an outstanding subscription when the
assert(_tablesChangedSubscription == null); // stream was closed but another listener attached quickly enough. In that
final names = _fetcher.readsFrom.map((t) => t.actualTableName).toSet(); // case we don't have to re-send the query
_tablesChangedSubscription = _store._updatedTableNames.stream if (_tablesChangedSubscription == null) {
.where((changed) => changed.any(names.contains)) // first listener added, fetch query
.listen((_) {
// table has changed, invalidate cache
_lastData = null;
fetchAndEmitData(); fetchAndEmitData();
});
final names = _fetcher.readsFrom.map((t) => t.actualTableName).toSet();
_tablesChangedSubscription = _store._updatedTableNames.stream
.where((changed) => changed.any(names.contains))
.listen((_) {
// table has changed, invalidate cache
_lastData = null;
fetchAndEmitData();
});
}
} }
void _onCancel() { void _onCancel() {
// last listener gone, dispose _store.markAsClosed(this, () {
_tablesChangedSubscription?.cancel(); // last listener gone, dispose
_tablesChangedSubscription = null; _tablesChangedSubscription?.cancel();
// we don't listen for table updates anymore, and we're guaranteed to // we don't listen for table updates anymore, and we're guaranteed to
// re-fetch data after a new listener comes in. We can't know if the table // re-fetch data after a new listener comes in. We can't know if the table
// was updated in the meantime, but let's delete the cached data just in // was updated in the meantime, but let's delete the cached data just in
// case // case
_lastData = null; _lastData = null;
_tablesChangedSubscription = null;
_store.markAsClosed(this); });
} }
Future<void> fetchAndEmitData() async { Future<void> fetchAndEmitData() async {
// Fetch data if it's needed, publish that data if it's possible.
if (!_controller.hasListener) return;
T data; T data;
try { try {

View File

@ -63,6 +63,18 @@ void main() {
verifyNoMoreInteractions(executor); verifyNoMoreInteractions(executor);
}); });
test('same stream emits cached data when listening twice', () async {
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
final stream = db.select(db.users).watch();
expect(await stream.first, isEmpty);
clearInteractions(executor);
await stream.first;
verifyNever(executor.runSelect(any, any));
});
group('updating clears cached data', () { group('updating clears cached data', () {
test('when an older stream is no longer listened to', () async { test('when an older stream is no longer listened to', () async {
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([])); when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
@ -187,11 +199,17 @@ void main() {
verifyNever(executor.runSelect(any, any)); verifyNever(executor.runSelect(any, any));
}); });
test('when the data updates after the listener has detached', () { test('when the data updates after the listener has detached', () async {
final subscription = db.select(db.users).watch().listen((_) {}); final subscription = db.select(db.users).watch().listen((_) {});
clearInteractions(executor); clearInteractions(executor);
subscription.cancel(); await subscription.cancel();
// The stream is kept open for the rest of this event iteration
final completer = Completer.sync();
Timer.run(completer.complete);
await completer.future;
db.markTablesUpdated({db.users}); db.markTablesUpdated({db.users});
verifyNever(executor.runSelect(any, any)); verifyNever(executor.runSelect(any, any));

View File

@ -34,7 +34,8 @@ class TodoEntry extends DataClass implements Insertable<TodoEntry> {
); );
} }
factory TodoEntry.fromJson(Map<String, dynamic> json, factory TodoEntry.fromJson(Map<String, dynamic> json,
{ValueSerializer serializer = const ValueSerializer.defaults()}) { {ValueSerializer serializer}) {
serializer ??= moorRuntimeOptions.defaultSerializer;
return TodoEntry( return TodoEntry(
id: serializer.fromJson<int>(json['id']), id: serializer.fromJson<int>(json['id']),
content: serializer.fromJson<String>(json['content']), content: serializer.fromJson<String>(json['content']),
@ -43,9 +44,9 @@ class TodoEntry extends DataClass implements Insertable<TodoEntry> {
); );
} }
@override @override
Map<String, dynamic> toJson( Map<String, dynamic> toJson({ValueSerializer serializer}) {
{ValueSerializer serializer = const ValueSerializer.defaults()}) { serializer ??= moorRuntimeOptions.defaultSerializer;
return { return <String, dynamic>{
'id': serializer.toJson<int>(id), 'id': serializer.toJson<int>(id),
'content': serializer.toJson<String>(content), 'content': serializer.toJson<String>(content),
'targetDate': serializer.toJson<DateTime>(targetDate), 'targetDate': serializer.toJson<DateTime>(targetDate),
@ -92,7 +93,7 @@ class TodoEntry extends DataClass implements Insertable<TodoEntry> {
int get hashCode => $mrjf($mrjc(id.hashCode, int get hashCode => $mrjf($mrjc(id.hashCode,
$mrjc(content.hashCode, $mrjc(targetDate.hashCode, category.hashCode)))); $mrjc(content.hashCode, $mrjc(targetDate.hashCode, category.hashCode))));
@override @override
bool operator ==(other) => bool operator ==(dynamic other) =>
identical(this, other) || identical(this, other) ||
(other is TodoEntry && (other is TodoEntry &&
other.id == this.id && other.id == this.id &&
@ -193,26 +194,20 @@ class $TodosTable extends Todos with TableInfo<$TodosTable, TodoEntry> {
final context = VerificationContext(); final context = VerificationContext();
if (d.id.present) { if (d.id.present) {
context.handle(_idMeta, id.isAcceptableValue(d.id.value, _idMeta)); context.handle(_idMeta, id.isAcceptableValue(d.id.value, _idMeta));
} else if (id.isRequired && isInserting) {
context.missing(_idMeta);
} }
if (d.content.present) { if (d.content.present) {
context.handle(_contentMeta, context.handle(_contentMeta,
content.isAcceptableValue(d.content.value, _contentMeta)); content.isAcceptableValue(d.content.value, _contentMeta));
} else if (content.isRequired && isInserting) { } else if (isInserting) {
context.missing(_contentMeta); context.missing(_contentMeta);
} }
if (d.targetDate.present) { if (d.targetDate.present) {
context.handle(_targetDateMeta, context.handle(_targetDateMeta,
targetDate.isAcceptableValue(d.targetDate.value, _targetDateMeta)); targetDate.isAcceptableValue(d.targetDate.value, _targetDateMeta));
} else if (targetDate.isRequired && isInserting) {
context.missing(_targetDateMeta);
} }
if (d.category.present) { if (d.category.present) {
context.handle(_categoryMeta, context.handle(_categoryMeta,
category.isAcceptableValue(d.category.value, _categoryMeta)); category.isAcceptableValue(d.category.value, _categoryMeta));
} else if (category.isRequired && isInserting) {
context.missing(_categoryMeta);
} }
return context; return context;
} }
@ -265,16 +260,17 @@ class Category extends DataClass implements Insertable<Category> {
); );
} }
factory Category.fromJson(Map<String, dynamic> json, factory Category.fromJson(Map<String, dynamic> json,
{ValueSerializer serializer = const ValueSerializer.defaults()}) { {ValueSerializer serializer}) {
serializer ??= moorRuntimeOptions.defaultSerializer;
return Category( return Category(
id: serializer.fromJson<int>(json['id']), id: serializer.fromJson<int>(json['id']),
description: serializer.fromJson<String>(json['description']), description: serializer.fromJson<String>(json['description']),
); );
} }
@override @override
Map<String, dynamic> toJson( Map<String, dynamic> toJson({ValueSerializer serializer}) {
{ValueSerializer serializer = const ValueSerializer.defaults()}) { serializer ??= moorRuntimeOptions.defaultSerializer;
return { return <String, dynamic>{
'id': serializer.toJson<int>(id), 'id': serializer.toJson<int>(id),
'description': serializer.toJson<String>(description), 'description': serializer.toJson<String>(description),
}; };
@ -306,7 +302,7 @@ class Category extends DataClass implements Insertable<Category> {
@override @override
int get hashCode => $mrjf($mrjc(id.hashCode, description.hashCode)); int get hashCode => $mrjf($mrjc(id.hashCode, description.hashCode));
@override @override
bool operator ==(other) => bool operator ==(dynamic other) =>
identical(this, other) || identical(this, other) ||
(other is Category && (other is Category &&
other.id == this.id && other.id == this.id &&
@ -374,13 +370,11 @@ class $CategoriesTable extends Categories
final context = VerificationContext(); final context = VerificationContext();
if (d.id.present) { if (d.id.present) {
context.handle(_idMeta, id.isAcceptableValue(d.id.value, _idMeta)); context.handle(_idMeta, id.isAcceptableValue(d.id.value, _idMeta));
} else if (id.isRequired && isInserting) {
context.missing(_idMeta);
} }
if (d.description.present) { if (d.description.present) {
context.handle(_descriptionMeta, context.handle(_descriptionMeta,
description.isAcceptableValue(d.description.value, _descriptionMeta)); description.isAcceptableValue(d.description.value, _descriptionMeta));
} else if (description.isRequired && isInserting) { } else if (isInserting) {
context.missing(_descriptionMeta); context.missing(_descriptionMeta);
} }
return context; return context;
@ -434,15 +428,25 @@ abstract class _$Database extends GeneratedDatabase {
); );
} }
Selectable<CategoriesWithCountResult> _categoriesWithCount() { Selectable<CategoriesWithCountResult> _categoriesWithCountQuery() {
return customSelectQuery( return customSelectQuery(
'SELECT\n c.id,\n c.desc,\n (SELECT COUNT(*) FROM todos WHERE category = c.id) AS amount\n FROM categories c\n UNION ALL\n SELECT null, null, (SELECT COUNT(*) FROM todos WHERE category IS NULL)', 'SELECT\n c.id,\n c.desc,\n (SELECT COUNT(*) FROM todos WHERE category = c.id) AS amount\n FROM categories c\n UNION ALL\n SELECT null, null, (SELECT COUNT(*) FROM todos WHERE category IS NULL)',
variables: [], variables: [],
readsFrom: {categories, todos}).map(_rowToCategoriesWithCountResult); readsFrom: {categories, todos}).map(_rowToCategoriesWithCountResult);
} }
Future<List<CategoriesWithCountResult>> _categoriesWithCount() {
return _categoriesWithCountQuery().get();
}
Stream<List<CategoriesWithCountResult>> _watchCategoriesWithCount() {
return _categoriesWithCountQuery().watch();
}
@override @override
List<TableInfo> get allTables => [todos, categories]; Iterable<TableInfo> get allTables => allSchemaEntities.whereType<TableInfo>();
@override
List<DatabaseSchemaEntity> get allSchemaEntities => [todos, categories];
} }
class CategoriesWithCountResult { class CategoriesWithCountResult {

View File

@ -23,7 +23,7 @@ dependencies:
# Moor-specific analysis # Moor-specific analysis
moor: ^2.0.1 moor: ^2.0.1
sqlparser: ^0.4.0 sqlparser: ^0.5.0
# Dart analysis # Dart analysis
analyzer: '>=0.36.4 <0.40.0' analyzer: '>=0.36.4 <0.40.0'