Transactions for the web backend

This commit is contained in:
Simon Binder 2019-07-05 09:17:46 +02:00
parent 1c423d9d7b
commit 54fb22d970
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
3 changed files with 226 additions and 113 deletions

View File

@ -11,6 +11,7 @@ import 'dart:indexed_db';
import 'package:meta/meta.dart'; import 'package:meta/meta.dart';
import 'package:meta/dart2js.dart'; import 'package:meta/dart2js.dart';
import 'package:synchronized/synchronized.dart';
import 'moor.dart'; import 'moor.dart';
import 'src/web/sql_js.dart'; import 'src/web/sql_js.dart';

View File

@ -1,20 +1,172 @@
part of 'package:moor/moor_web.dart'; part of 'package:moor/moor_web.dart';
class _DbState {
final String name;
final bool logStatements;
final Lock lock = Lock();
SqlJsDatabase db;
_DbState(this.name, this.logStatements);
}
abstract class _DatabaseUser extends QueryExecutor {
final _DbState _state;
String get name => _state.name;
bool get logStatements => _state.logStatements;
SqlJsDatabase get _db => _state.db;
bool get _bypassLock => false;
String get _persistenceKey => 'moor_db_str_$name';
_DatabaseUser(this._state);
Future<T> _synchronized<T>(FutureOr<T> computation()) async {
final lock = _state.lock;
if (_bypassLock) {
return await computation();
}
return await lock.synchronized(computation);
}
// todo base64 works, but is very slow. Figure out why bin2str is broken
Uint8List _restoreDb() {
final raw = window.localStorage[_persistenceKey];
if (raw != null) {
return base64.decode(raw);
}
return null;
}
void _storeDb() {
final data = _db.export();
final binStr = base64.encode(data);
window.localStorage[_persistenceKey] = binStr;
}
@tryInline
void _log(String sql, List<dynamic> variables) {
if (logStatements) {
print('[moor_web]: Running $sql with bound args: $variables');
}
}
/// Executes [sql] with the bound [variables], and ignores the result.
Future _runSimple(String sql, List<dynamic> variables) {
return _synchronized(() {
_log(sql, variables);
_db.runWithArgs(sql, variables);
});
}
Future<void> _runWithoutArgs(String query) {
return _synchronized(() {
_db.run(query);
});
}
@override
Future<void> runCustom(String statement) {
return _runWithoutArgs(statement);
}
@override
Future<int> runDelete(String statement, List args) {
_runSimple(statement, args);
return _handlePotentialUpdate();
}
@override
Future<int> runUpdate(String statement, List args) {
_runSimple(statement, args);
return _handlePotentialUpdate();
}
@override
Future<void> runBatched(List<BatchedStatement> statements) async {
await _synchronized(() {
for (var stmt in statements) {
final prepared = _db.prepare(stmt.sql);
for (var args in stmt.variables) {
prepared.executeWith(args);
}
}
});
await _handlePotentialUpdate();
}
/// Saves the database if the last statement changed rows. As a side-effect,
/// saving the database resets the `last_insert_id` counter in sqlite.
Future<int> _handlePotentialUpdate() {
final modified = _db.lastModifiedRows();
if (modified > 0) {
_storeDb();
}
return Future.value(modified);
}
@override
Future<int> runInsert(String statement, List args) async {
await _runSimple(statement, args);
final insertId = _db.lastInsertId();
await _handlePotentialUpdate();
return insertId;
}
@override
Future<List<Map<String, dynamic>>> runSelect(String statement, List args) {
_log(statement, args);
return _synchronized(() async {
// todo at least for stream queries we should cache prepared statements.
final stmt = _db.prepare(statement)..executeWith(args);
List<String> columnNames;
final rows = <Map<String, dynamic>>[];
while (stmt.step()) {
columnNames ??= stmt.columnNames();
final row = stmt.currentRow();
rows.add({for (var i = 0; i < row.length; i++) columnNames[i]: row[i]});
}
stmt.free();
return rows;
});
}
}
/// Experimental moor backend for the web. To use this platform, you need to /// Experimental moor backend for the web. To use this platform, you need to
/// include the latest version of `sql.js` in your html. /// include the latest version of `sql.js` in your html.
class WebDatabase extends QueryExecutor { class WebDatabase extends _DatabaseUser {
final bool logStatements;
final String name;
Completer<bool> _openingCompleter; Completer<bool> _openingCompleter;
SqlJsDatabase _db;
WebDatabase(this.name, {this.logStatements = false}); WebDatabase(String name, {bool logStatements = false})
: super(_DbState(name, logStatements));
@override @override
TransactionExecutor beginTransaction() { TransactionExecutor beginTransaction() {
throw StateError( final transactionReady = Completer<bool>();
'Transactions are not currently supported with the sql.js backend'); final executor = _TransactionExecutor(_state, transactionReady.future);
_synchronized(() async {
// we have the lock -> start the transaction
transactionReady.complete(true);
// wait until the transaction is done, then release the lock
await executor.completed;
if (executor._needsSave) {
_storeDb();
}
});
return executor;
} }
@override @override
@ -54,7 +206,7 @@ class WebDatabase extends QueryExecutor {
final module = await initSqlJs(); final module = await initSqlJs();
final restored = _restoreDb(); final restored = _restoreDb();
_db = module.createDatabase(restored); _state.db = module.createDatabase(restored);
if (upgradeNeeded) { if (upgradeNeeded) {
if (version == null || version < 1) { if (version == null || version < 1) {
@ -66,110 +218,69 @@ class WebDatabase extends QueryExecutor {
to: databaseInfo.schemaVersion); to: databaseInfo.schemaVersion);
} }
} }
}
String get _persistenceKey => 'moor_db_str_$name'; await _synchronized(() {
return databaseInfo.beforeOpenCallback(_BeforeOpenExecutor(_state),
// todo base64 works, but is very slow. Figure out why bin2str is broken OpeningDetails(version, databaseInfo.schemaVersion));
});
Uint8List _restoreDb() { }
final raw = window.localStorage[_persistenceKey]; }
if (raw != null) {
return base64.decode(raw); class _BeforeOpenExecutor extends _DatabaseUser {
} _BeforeOpenExecutor(_DbState state) : super(state);
return null;
} @override
final bool _bypassLock = true;
void _storeDb() {
final data = _db.export(); @override
final binStr = base64.encode(data); TransactionExecutor beginTransaction() {
window.localStorage[_persistenceKey] = binStr; throw UnsupportedError(
} "Transactions aren't supported in the before open callback");
}
@tryInline
void _log(String sql, List<dynamic> variables) { @override
if (logStatements) { Future<bool> ensureOpen() => Future.value(true);
print('[moor_web]: Running $sql with bound args: $variables'); }
}
} class _TransactionExecutor extends _DatabaseUser
implements TransactionExecutor {
/// Executes [sql] with the bound [variables], and ignores the result. _TransactionExecutor(_DbState state, this._openingFuture) : super(state);
void _runSimple(String sql, List<dynamic> variables) {
_log(sql, variables); @override
_db.runWithArgs(sql, variables); final bool _bypassLock = true;
}
final Future<bool> _openingFuture;
Future<void> _runWithoutArgs(String query) { bool _sentBeginTransaction = false;
_db.run(query);
return Future.value(null); final Completer<void> _completer = Completer();
} Future<void> get completed => _completer.future;
bool _needsSave = false;
@override
Future<void> runCustom(String statement) { @override
return _runWithoutArgs(statement); void _storeDb() {
} // no-op inside a transaction. Store the database when we it's done!
_needsSave = true;
@override }
Future<int> runDelete(String statement, List args) {
_runSimple(statement, args); @override
return _handlePotentialUpdate(); TransactionExecutor beginTransaction() {
} throw UnsupportedError('Cannot have nested transactions');
}
@override
Future<int> runUpdate(String statement, List args) { @override
_runSimple(statement, args); Future<bool> ensureOpen() async {
return _handlePotentialUpdate(); await _openingFuture;
} if (!_sentBeginTransaction) {
_db.run('BEGIN TRANSACTION');
@override _sentBeginTransaction = true;
Future<void> runBatched(List<BatchedStatement> statements) async { }
for (var stmt in statements) { return Future.value(true);
final prepared = _db.prepare(stmt.sql); }
for (var args in stmt.variables) { @override
prepared.executeWith(args); Future<void> send() {
} _db.run('COMMIT TRANSACTION;');
} _completer.complete();
return Future.value();
await _handlePotentialUpdate();
}
/// Saves the database if the last statement changed rows. As a side-effect,
/// saving the database resets the `last_insert_id` counter in sqlite.
Future<int> _handlePotentialUpdate() {
final modified = _db.lastModifiedRows();
if (modified > 0) {
_storeDb();
}
return Future.value(modified);
}
@override
Future<int> runInsert(String statement, List args) async {
_runSimple(statement, args);
final insertId = _db.lastInsertId();
await _handlePotentialUpdate();
return insertId;
}
@override
Future<List<Map<String, dynamic>>> runSelect(
String statement, List args) async {
_log(statement, args);
// todo at least for stream queries we should cache prepared statements.
final stmt = _db.prepare(statement)..executeWith(args);
List<String> columnNames;
final rows = <Map<String, dynamic>>[];
while (stmt.step()) {
columnNames ??= stmt.columnNames();
final row = stmt.currentRow();
rows.add({for (var i = 0; i < row.length; i++) columnNames[i]: row[i]});
}
stmt.free();
return rows;
} }
} }

View File

@ -14,6 +14,7 @@ environment:
dependencies: dependencies:
meta: '>= 1.0.0 <2.0.0' meta: '>= 1.0.0 <2.0.0'
collection: '>= 1.0.0 <2.0.0' collection: '>= 1.0.0 <2.0.0'
synchronized: ^2.1.0
dev_dependencies: dev_dependencies:
moor_generator: ^1.5.0 moor_generator: ^1.5.0