Merge branch 'develop' into beta

This commit is contained in:
Simon Binder 2019-10-03 13:58:05 +02:00
commit ab12380ec1
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
5 changed files with 85 additions and 20 deletions

View File

@ -263,8 +263,8 @@ mixin QueryEngine on DatabaseConnectionUser {
/// Executes the custom sql [statement] on the database.
@protected
@visibleForTesting
Future<void> customStatement(String statement) {
return _resolvedEngine.executor.runCustom(statement);
Future<void> customStatement(String statement, [List<dynamic> args]) {
return _resolvedEngine.executor.runCustom(statement, args);
}
/// Executes [action] in a transaction, which means that all its queries and
@ -366,15 +366,24 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
executor?.databaseInfo = this;
}
/// Creates a migrator with the provided query executor. We sometimes can't
/// use the regular [GeneratedDatabase.executor] because migration happens
/// before that executor is ready.
Migrator _createMigrator(SqlExecutor executor) => Migrator(this, executor);
/// Creates a [Migrator] with the provided query executor. Migrators generate
/// sql statements to create or drop tables.
///
/// This api is mainly used internally in moor, for instance in
/// [handleDatabaseCreation] and [handleDatabaseVersionChange]. However, it
/// can also be used if you need to create tables manually and outside of a
/// [MigrationStrategy]. For almost all use cases, overriding [migration]
/// should suffice.
@protected
Migrator createMigrator([SqlExecutor executor]) {
final actualExecutor = executor ?? customStatement;
return Migrator(this, actualExecutor);
}
/// Handles database creation by delegating the work to the [migration]
/// strategy. This method should not be called by users.
Future<void> handleDatabaseCreation({@required SqlExecutor executor}) {
final migrator = _createMigrator(executor);
final migrator = createMigrator(executor);
return _resolvedMigration.onCreate(migrator);
}
@ -382,7 +391,7 @@ abstract class GeneratedDatabase extends DatabaseConnectionUser
/// strategy. This method should not be called by users.
Future<void> handleDatabaseVersionChange(
{@required SqlExecutor executor, int from, int to}) {
final migrator = _createMigrator(executor);
final migrator = createMigrator(executor);
return _resolvedMigration.onUpgrade(migrator, from, to);
}

View File

@ -61,8 +61,14 @@ class StreamKey {
/// them when needed.
class StreamQueryStore {
final Map<StreamKey, QueryStream> _activeKeyStreams = {};
// Why is this stream synchronous? We want to dispatch table updates before
// the future from the query completes. This allows streams to invalidate
// their cached data before the user can send another query.
// There shouldn't be a problem as this stream is not exposed in any user-
// facing api.
final StreamController<Set<String>> _updatedTableNames =
StreamController.broadcast();
StreamController.broadcast(sync: true);
StreamQueryStore();
@ -151,7 +157,11 @@ class QueryStream<T> {
final names = _fetcher.readsFrom.map((t) => t.actualTableName).toSet();
_tablesChangedSubscription = _store._updatedTableNames.stream
.where((changed) => changed.any(names.contains))
.listen((_) => fetchAndEmitData());
.listen((_) {
// table has changed, invalidate cache
_lastData = null;
fetchAndEmitData();
});
}
void _onCancel() {
@ -159,6 +169,12 @@ class QueryStream<T> {
_tablesChangedSubscription?.cancel();
_tablesChangedSubscription = null;
// we don't listen for table updates anymore, and we're guaranteed to
// re-fetch data after a new listener comes in. We can't know if the table
// was updated in the meantime, but let's delete the cached data just in
// case
_lastData = null;
_store.markAsClosed(this);
}

View File

@ -56,7 +56,9 @@ class Migrator {
/// Creates all tables specified for the database, if they don't exist
Future<void> createAllTables() async {
return Future.wait(_db.allTables.map(createTable));
for (var table in _db.allTables) {
await createTable(table);
}
}
GenerationContext _createContext() {

View File

@ -21,16 +21,18 @@ class StartWithValueTransformer<T> extends StreamTransformerBase<T, T> {
controller
..onListen = () {
final data = _value();
if (data != null) {
// Dart's stream contract specifies that listeners are only notified
// after the .listen() code completes. So, we add the initial data in
// a later microtask.
scheduleMicrotask(() {
// Dart's stream contract specifies that listeners are only notified
// after the .listen() code completes. So, we add the initial data in
// a later microtask.
scheduleMicrotask(() {
final data = _value();
if (data != null) {
controller.add(data);
});
}
}
});
// the .listen will run in a later microtask, so the cached data would
// still be added first.
subscription = stream.listen(
controller.add,
onError: controller.addError,

View File

@ -39,6 +39,7 @@ void main() {
final second = db.alias(db.users, 'two');
db.select(first).watch().listen((_) {});
await pumpEventQueue(times: 1);
db.markTablesUpdated({second});
await pumpEventQueue(times: 1);
@ -57,12 +58,46 @@ void main() {
final second = (db.select(db.users).watch());
expect(second, emits(isEmpty));
await pumpEventQueue(times: 1);
// calling executor.dialect is ok, it's needed to construct the statement
verify(executor.dialect);
verifyNoMoreInteractions(executor);
});
group('updating clears cached data', () {
test('when an older stream is no longer listened to', () async {
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
final first = db.select(db.categories).watch();
await first.first; // subscribe to first stream, then drop subscription
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([
{'id': 1, 'description': 'd'}
]));
await db
.into(db.categories)
.insert(CategoriesCompanion.insert(description: 'd'));
final second = db.select(db.categories).watch();
expect(second.first, completion(isNotEmpty));
});
test('when an older stream is still listened to', () async {
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
final first = db.select(db.categories).watch();
final subscription = first.listen((_) {});
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([
{'id': 1, 'description': 'd'}
]));
await db
.into(db.categories)
.insert(CategoriesCompanion.insert(description: 'd'));
final second = db.select(db.categories).watch();
expect(second.first, completion(isNotEmpty));
await subscription.cancel();
});
});
test('every stream instance can be listened to', () async {
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
@ -99,6 +134,7 @@ void main() {
await stream.first; // listen to stream, then cancel
await pumpEventQueue(); // should remove the stream from the cache
await stream.first; // listen again
await pumpEventQueue(times: 1);
verify(executor.runSelect(any, any)).called(2);
});