mirror of https://github.com/AMT-Cheif/drift.git
356 lines
11 KiB
Dart
356 lines
11 KiB
Dart
import 'dart:async';
|
|
|
|
import 'package:mockito/mockito.dart';
|
|
import 'package:moor/moor.dart';
|
|
import 'package:moor/src/runtime/api/runtime_api.dart';
|
|
import 'package:moor/src/runtime/executor/stream_queries.dart';
|
|
import 'package:rxdart/rxdart.dart';
|
|
import 'package:test/test.dart';
|
|
|
|
import 'data/tables/custom_tables.dart';
|
|
import 'data/tables/todos.dart';
|
|
import 'data/utils/mocks.dart';
|
|
|
|
void main() {
|
|
late TodoDb db;
|
|
late MockExecutor executor;
|
|
setUp(() {
|
|
executor = MockExecutor();
|
|
db = TodoDb(executor);
|
|
});
|
|
|
|
test('streams fetch when the first listener attaches', () async {
|
|
final stream = db.select(db.users).watch();
|
|
|
|
verifyNever(executor.runSelect(any, any));
|
|
|
|
stream.listen((_) {});
|
|
await pumpEventQueue(times: 1);
|
|
|
|
verify(executor.runSelect(any, any)).called(1);
|
|
});
|
|
|
|
test('streams fetch when the underlying data changes', () async {
|
|
db.select(db.users).watch().listen((_) {});
|
|
|
|
db.markTablesUpdated({db.users});
|
|
await pumpEventQueue(times: 1);
|
|
|
|
// twice: Once because the listener attached, once because the data changed
|
|
verify(executor.runSelect(any, any)).called(2);
|
|
});
|
|
|
|
test('streams recognize aliased tables', () async {
|
|
final first = db.alias(db.users, 'one');
|
|
final second = db.alias(db.users, 'two');
|
|
|
|
db.select(first).watch().listen((_) {});
|
|
await pumpEventQueue(times: 1);
|
|
|
|
db.markTablesUpdated({second});
|
|
await pumpEventQueue(times: 1);
|
|
|
|
verify(executor.runSelect(any, any)).called(2);
|
|
});
|
|
|
|
test('streams emit cached data when a new listener attaches', () async {
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
|
|
|
final first = db.select(db.users).watch();
|
|
expect(first, emits(isEmpty));
|
|
|
|
clearInteractions(executor);
|
|
|
|
final second = db.select(db.users).watch();
|
|
expect(second, emits(isEmpty));
|
|
|
|
// calling executor.dialect is ok, it's needed to construct the statement
|
|
verify(executor.dialect);
|
|
verifyNoMoreInteractions(executor);
|
|
});
|
|
|
|
test('same stream emits cached data when listening twice', () async {
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
|
|
|
final stream = db.select(db.users).watch();
|
|
expect(await stream.first, isEmpty);
|
|
|
|
clearInteractions(executor);
|
|
|
|
await stream.first;
|
|
verifyNever(executor.runSelect(any, any));
|
|
});
|
|
|
|
group('updating clears cached data', () {
|
|
test('when an older stream is no longer listened to', () async {
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
|
final first = db.select(db.categories).watch();
|
|
await first.first; // subscribe to first stream, then drop subscription
|
|
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([
|
|
{'id': 1, 'desc': 'd', 'priority': 0}
|
|
]));
|
|
await db
|
|
.into(db.categories)
|
|
.insert(CategoriesCompanion.insert(description: 'd'));
|
|
|
|
final second = db.select(db.categories).watch();
|
|
expect(second.first, completion(isNotEmpty));
|
|
});
|
|
|
|
test('when an older stream is still listened to', () async {
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
|
final first = db.select(db.categories).watch();
|
|
final subscription = first.listen((_) {});
|
|
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([
|
|
{'id': 1, 'desc': 'd', 'priority': 0}
|
|
]));
|
|
await db
|
|
.into(db.categories)
|
|
.insert(CategoriesCompanion.insert(description: 'd'));
|
|
|
|
final second = db.select(db.categories).watch();
|
|
expect(second.first, completion(isNotEmpty));
|
|
await subscription.cancel();
|
|
});
|
|
});
|
|
|
|
test('every stream instance can be listened to', () async {
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
|
|
|
final first = db.select(db.users).watch();
|
|
final second = db.select(db.users).watch();
|
|
|
|
await first.first; // will listen to stream, then cancel
|
|
await pumpEventQueue(times: 1); // give cancel event time to propagate
|
|
|
|
final checkEmits = expectLater(second, emitsInOrder([[], []]));
|
|
|
|
db.markTablesUpdated({db.users});
|
|
await pumpEventQueue(times: 1);
|
|
|
|
await checkEmits;
|
|
});
|
|
|
|
test('same stream instance can be listened to multiple times', () async {
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
|
|
|
final stream = db.select(db.users).watch();
|
|
|
|
final firstSub = stream.take(2).listen(null); // will listen forever
|
|
final second = await stream.first;
|
|
|
|
expect(second, isEmpty);
|
|
verify(executor.runSelect(any, any)).called(1);
|
|
await firstSub.cancel();
|
|
});
|
|
|
|
test('streams are disposed when not listening for a while', () async {
|
|
when(executor.runSelect(any, any)).thenAnswer((_) => Future.value([]));
|
|
|
|
final stream = db.select(db.users).watch();
|
|
|
|
await stream.first; // listen to stream, then cancel
|
|
await pumpEventQueue(); // should remove the stream from the cache
|
|
await stream.first; // listen again
|
|
await pumpEventQueue(times: 1);
|
|
|
|
verify(executor.runSelect(any, any)).called(2);
|
|
});
|
|
|
|
test('stream emits error when loading the query throws', () {
|
|
final exception = Exception('stub');
|
|
when(executor.runSelect(any, any))
|
|
.thenAnswer((_) => Future.error(exception));
|
|
|
|
final result = db.customSelect('select 1').watch().first;
|
|
expectLater(result, throwsA(exception));
|
|
});
|
|
|
|
test('database can be closed when a stream has a paused subscription',
|
|
() async {
|
|
// this test is more relevant than it seems - some test stream matchers
|
|
// leave the stream in an empty state.
|
|
final stream = db.select(db.users).watch();
|
|
final subscription = stream.listen((_) {})..pause();
|
|
|
|
await db.close();
|
|
|
|
subscription.resume();
|
|
await subscription.cancel();
|
|
});
|
|
|
|
group('stream keys', () {
|
|
final keyA = StreamKey('SELECT * FROM users;', [], User);
|
|
final keyB = StreamKey('SELECT * FROM users;', [], User);
|
|
final keyCustom = StreamKey('SELECT * FROM users;', [], QueryRow);
|
|
final keyCustomTodos = StreamKey('SELECT * FROM todos;', [], QueryRow);
|
|
final keyArgs = StreamKey('SELECT * FROM users;', ['name'], User);
|
|
|
|
test('are equal for same parameters', () {
|
|
expect(keyA, equals(keyB));
|
|
expect(keyA.hashCode, keyB.hashCode);
|
|
});
|
|
|
|
test('are not equal for different queries', () {
|
|
expect(keyCustomTodos, isNot(keyCustom));
|
|
expect(keyCustomTodos.hashCode, isNot(keyCustom.hashCode));
|
|
});
|
|
|
|
test('are not equal for different variables', () {
|
|
expect(keyArgs, isNot(keyA));
|
|
expect(keyArgs.hashCode, isNot(keyA.hashCode));
|
|
});
|
|
|
|
test('are not equal for different types', () {
|
|
expect(keyCustom, isNot(keyA));
|
|
expect(keyCustom.hashCode, isNot(keyA.hashCode));
|
|
});
|
|
});
|
|
|
|
group("streams don't fetch", () {
|
|
test('when no listeners were attached', () {
|
|
db.select(db.users).watch();
|
|
|
|
db.markTablesUpdated({db.users});
|
|
|
|
verifyNever(executor.runSelect(any, any));
|
|
});
|
|
|
|
test('when the data updates after the listener has detached', () async {
|
|
final subscription = db.select(db.users).watch().listen((_) {});
|
|
|
|
await subscription.cancel();
|
|
clearInteractions(executor);
|
|
|
|
// The stream is kept open for the rest of this event iteration
|
|
final completer = Completer.sync();
|
|
Timer.run(completer.complete);
|
|
await completer.future;
|
|
|
|
db.markTablesUpdated({db.users});
|
|
|
|
verifyNever(executor.runSelect(any, any));
|
|
});
|
|
});
|
|
|
|
// note: There's a trigger on config inserts that updates with_defaults
|
|
test('updates streams for updates caused by triggers', () async {
|
|
final db = CustomTablesDb(executor);
|
|
db.select(db.withDefaults).watch().listen((_) {});
|
|
|
|
db.notifyUpdates({const TableUpdate('config', kind: UpdateKind.insert)});
|
|
await pumpEventQueue(times: 1);
|
|
|
|
verify(executor.runSelect(any, any)).called(2);
|
|
});
|
|
|
|
test('limits trigger propagation to the target type of trigger', () async {
|
|
final db = CustomTablesDb(executor);
|
|
db.select(db.withDefaults).watch().listen((_) {});
|
|
|
|
db.notifyUpdates({const TableUpdate('config', kind: UpdateKind.delete)});
|
|
await pumpEventQueue(times: 1);
|
|
|
|
verify(executor.runSelect(any, any)).called(1);
|
|
});
|
|
|
|
group('listen for table updates', () {
|
|
test('any', () async {
|
|
var counter = 0;
|
|
db.tableUpdates().listen((event) => counter++);
|
|
|
|
db.markTablesUpdated({db.todosTable});
|
|
await pumpEventQueue(times: 1);
|
|
expect(counter, 1);
|
|
|
|
db.markTablesUpdated({db.users});
|
|
await pumpEventQueue(times: 1);
|
|
expect(counter, 2);
|
|
});
|
|
|
|
test('stream is async', () {
|
|
var counter = 0;
|
|
db.tableUpdates().listen((event) => counter++);
|
|
|
|
db.markTablesUpdated({});
|
|
// no wait here, the counter should not be updated yet.
|
|
expect(counter, 0);
|
|
});
|
|
|
|
test('specific table', () async {
|
|
var counter = 0;
|
|
db
|
|
.tableUpdates(TableUpdateQuery.onTable(db.users))
|
|
.listen((event) => counter++);
|
|
|
|
db.markTablesUpdated({db.todosTable});
|
|
await pumpEventQueue(times: 1);
|
|
expect(counter, 0);
|
|
|
|
db.markTablesUpdated({db.users});
|
|
await pumpEventQueue(times: 1);
|
|
expect(counter, 1);
|
|
|
|
db.markTablesUpdated({db.categories});
|
|
await pumpEventQueue(times: 1);
|
|
expect(counter, 1);
|
|
});
|
|
|
|
test('specific table and update kind', () async {
|
|
var counter = 0;
|
|
db
|
|
.tableUpdates(TableUpdateQuery.onTable(db.users,
|
|
limitUpdateKind: UpdateKind.update))
|
|
.listen((event) => counter++);
|
|
|
|
db.markTablesUpdated({db.todosTable});
|
|
await pumpEventQueue(times: 1);
|
|
expect(counter, 0);
|
|
|
|
db.notifyUpdates(
|
|
{TableUpdate.onTable(db.users, kind: UpdateKind.update)});
|
|
await pumpEventQueue(times: 1);
|
|
expect(counter, 1);
|
|
|
|
db.notifyUpdates(
|
|
{TableUpdate.onTable(db.users, kind: UpdateKind.delete)});
|
|
await pumpEventQueue(times: 1);
|
|
expect(counter, 1);
|
|
});
|
|
});
|
|
|
|
test('stream queries are broadcasts', () {
|
|
expect(db.customSelect('SELECT 1').watch().isBroadcast, isTrue);
|
|
expect(db.customSelect('SELECT 1').watchSingle().isBroadcast, isTrue);
|
|
});
|
|
|
|
test('moor streams can be used with switchMap in rxdart', () async {
|
|
// Regression test for https://github.com/simolus3/moor/issues/500
|
|
when(executor.runSelect(any, any)).thenAnswer((i) async {
|
|
final sql = i.positionalArguments.first as String;
|
|
|
|
return [
|
|
if (sql.contains("'a'")) {'a': 'a'} else {'b': 'b'}
|
|
];
|
|
});
|
|
|
|
final a = db
|
|
.customSelect("select 'a' as a")
|
|
.map(($) => $.readString('a'))
|
|
.watchSingle();
|
|
final b = db
|
|
.customSelect("select 'b' as b")
|
|
.map(($) => $.readString('b'))
|
|
.watchSingle();
|
|
final c = a.switchMap((_) => b);
|
|
expect(await a.first, 'a');
|
|
expect(await a.first, 'a');
|
|
expect(await b.first, 'b');
|
|
expect(await b.first, 'b');
|
|
expect(await c.first, 'b');
|
|
expect(await c.first, 'b');
|
|
});
|
|
}
|