ffi: Remove isolate proxy

This commit is contained in:
Simon Binder 2019-11-14 18:00:55 +01:00
parent e83464df28
commit d39f2d9769
No known key found for this signature in database
GPG Key ID: 7891917E4147B8C0
17 changed files with 132 additions and 564 deletions

View File

@ -2,6 +2,7 @@
- Remove the `background` flag from the moor apis provided by this package. Use the moor isolate api
instead.
- Remove builtin support for background execution from the low-level `Database` api
- Support Dart 2.6, drop support for older versions
## 0.0.1

View File

@ -1,28 +0,0 @@
import 'package:moor_ffi/database.dart';
const _createTable = r'''
CREATE TABLE frameworks (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
name VARCHAR NOT NULL
);
''';
void main() async {
final db = await IsolateDb.openMemory();
await db.execute(_createTable);
final insertStmt =
await db.prepare('INSERT INTO frameworks(name) VALUES (?)');
await insertStmt.execute(['Flutter']);
await insertStmt.execute(['AngularDart']);
await insertStmt.close();
final selectStmt = await db.prepare('SELECT * FROM frameworks ORDER BY name');
final result = await selectStmt.select();
for (var row in result) {
print('${row['id']}: ${row['name']}');
}
await selectStmt.close();
await db.close();
}

View File

@ -1,11 +1,9 @@
/// Exports the low-level [Database] and [IsolateDb] classes to run operations
/// on a sqflite database.
/// Exports the low-level [Database] class to run operations on a sqlite
/// database via `dart:ffi`.
library database;
import 'package:moor_ffi/src/bindings/types.dart';
import 'src/impl/isolate/isolate_db.dart';
export 'src/api/database.dart';
export 'src/api/result.dart';
export 'src/impl/database.dart' show SqliteException, Database;
export 'src/impl/isolate/isolate_db.dart';
export 'src/impl/database.dart'
show SqliteException, Database, PreparedStatement;

View File

@ -1,44 +0,0 @@
import 'dart:async';
import 'package:moor_ffi/database.dart';
/// A opened sqlite database.
abstract class BaseDatabase {
/// Closes this database connection and releases the resources it uses. If
/// an error occurs while closing the database, an exception will be thrown.
/// The allocated memory will be freed either way.
FutureOr<void> close();
/// Executes the [sql] statement and ignores the result. Will throw if an
/// error occurs while executing.
FutureOr<void> execute(String sql);
/// Prepares the [sql] statement.
FutureOr<BasePreparedStatement> prepare(String sql);
/// Get the application defined version of this database.
FutureOr<int> userVersion();
/// Update the application defined version of this database.
FutureOr<void> setUserVersion(int version);
/// Returns the amount of rows affected by the last INSERT, UPDATE or DELETE
/// statement.
FutureOr<int> getUpdatedRows();
/// Returns the row-id of the last inserted row.
FutureOr<int> getLastInsertId();
}
/// A prepared statement that can be executed multiple times.
abstract class BasePreparedStatement {
/// Executes this prepared statement as a select statement. The returned rows
/// will be returned.
FutureOr<Result> select([List<dynamic> args]);
/// Executes this prepared statement.
FutureOr<void> execute([List<dynamic> params]);
/// Closes this prepared statement and releases its resources.
FutureOr<void> close();
}

View File

@ -16,7 +16,8 @@ part 'prepared_statement.dart';
const _openingFlags = Flags.SQLITE_OPEN_READWRITE | Flags.SQLITE_OPEN_CREATE;
class Database implements BaseDatabase {
/// A opened sqlite database.
class Database {
final Pointer<types.Database> _db;
final List<PreparedStatement> _preparedStmt = [];
bool _isClosed = false;
@ -56,7 +57,9 @@ class Database implements BaseDatabase {
}
}
@override
/// Closes this database connection and releases the resources it uses. If
/// an error occurs while closing the database, an exception will be thrown.
/// The allocated memory will be freed either way.
void close() {
// close all prepared statements first
_isClosed = true;
@ -84,7 +87,8 @@ class Database implements BaseDatabase {
}
}
@override
/// Executes the [sql] statement and ignores the result. Will throw if an
/// error occurs while executing.
void execute(String sql) {
_ensureOpen();
@ -111,7 +115,7 @@ class Database implements BaseDatabase {
}
}
@override
/// Prepares the [sql] statement.
PreparedStatement prepare(String sql) {
_ensureOpen();
@ -137,7 +141,7 @@ class Database implements BaseDatabase {
return prepared;
}
@override
/// Get the application defined version of this database.
int userVersion() {
final stmt = prepare('PRAGMA user_version');
final result = stmt.select();
@ -146,18 +150,19 @@ class Database implements BaseDatabase {
return result.first.columnAt(0) as int;
}
@override
/// Update the application defined version of this database.
void setUserVersion(int version) {
execute('PRAGMA user_version = $version');
}
@override
/// Returns the amount of rows affected by the last INSERT, UPDATE or DELETE
/// statement.
int getUpdatedRows() {
_ensureOpen();
return bindings.sqlite3_changes(_db);
}
@override
/// Returns the row-id of the last inserted row.
int getLastInsertId() {
_ensureOpen();
return bindings.sqlite3_last_insert_rowid(_db);

View File

@ -1,194 +0,0 @@
import 'dart:async';
import 'dart:isolate';
import 'package:moor_ffi/database.dart';
import 'package:moor_ffi/src/impl/database.dart';
enum IsolateCommandType {
openDatabase,
closeDatabase,
executeSqlDirectly,
prepareStatement,
getUserVersion,
setUserVersion,
getUpdatedRows,
getLastInsertId,
preparedSelect,
preparedExecute,
preparedClose
}
class IsolateCommand {
final int requestId;
final IsolateCommandType type;
final dynamic data;
/// If this command operates on a prepared statement, contains the id of that
/// statement as sent by the background isolate.
int preparedStatementId;
IsolateCommand(this.requestId, this.type, this.data);
}
class IsolateResponse {
final int requestId;
final dynamic response;
final dynamic error;
IsolateResponse(this.requestId, this.response, this.error);
}
/// Communicates with a background isolate over an RPC-like api.
class DbOperationProxy {
/// Stream of messages received by the background isolate.
final StreamController<dynamic> backgroundMsgs;
final ReceivePort _receivePort;
final Map<int, Completer> _pendingRequests = {};
final SendPort send;
final Isolate isolate;
var closed = false;
int _currentRequestId = 0;
DbOperationProxy(
this.backgroundMsgs, this._receivePort, this.send, this.isolate) {
backgroundMsgs.stream.listen(_handleResponse);
}
Future<dynamic> sendRequest(IsolateCommandType type, dynamic data,
{int preparedStmtId}) {
if (closed) {
throw StateError('Tried to call a database method after .close()');
}
final id = _currentRequestId++;
final cmd = IsolateCommand(id, type, data)
..preparedStatementId = preparedStmtId;
final completer = Completer();
_pendingRequests[id] = completer;
send.send(cmd);
return completer.future;
}
void _handleResponse(dynamic response) {
if (response is IsolateResponse) {
final completer = _pendingRequests.remove(response.requestId);
if (response.error != null) {
completer.completeError(response.error);
} else {
completer.complete(response.response);
}
}
}
void close() {
closed = true;
_receivePort.close();
backgroundMsgs.close();
isolate.kill();
}
static Future<DbOperationProxy> spawn() async {
final foregroundReceive = ReceivePort();
final backgroundSend = foregroundReceive.sendPort;
final isolate = await Isolate.spawn(_entryPoint, backgroundSend,
debugName: 'moor_ffi background isolate');
final controller = StreamController.broadcast();
foregroundReceive.listen(controller.add);
final foregroundSend = await controller.stream
.firstWhere((msg) => msg is SendPort) as SendPort;
return DbOperationProxy(
controller, foregroundReceive, foregroundSend, isolate);
}
static void _entryPoint(SendPort backgroundSend) {
final backgroundReceive = ReceivePort();
final foregroundSend = backgroundReceive.sendPort;
// inform the main isolate about the created send port
backgroundSend.send(foregroundSend);
BackgroundIsolateRunner(backgroundReceive, backgroundSend).start();
}
}
class BackgroundIsolateRunner {
final ReceivePort receive;
final SendPort send;
Database db;
List<PreparedStatement> stmts = [];
BackgroundIsolateRunner(this.receive, this.send);
void start() {
receive.listen((data) {
if (data is IsolateCommand) {
try {
final response = _handleCommand(data);
send.send(IsolateResponse(data.requestId, response, null));
} catch (e) {
if (e is Error) {
// errors contain a StackTrace, which cannot be sent. So we just
// send the description of that stacktrace.
final exception =
Exception('Error in background isolate: $e\n${e.stackTrace}');
send.send(IsolateResponse(data.requestId, null, exception));
} else {
send.send(IsolateResponse(data.requestId, null, e));
}
}
}
});
}
dynamic _handleCommand(IsolateCommand cmd) {
switch (cmd.type) {
case IsolateCommandType.openDatabase:
assert(db == null);
db = Database.open(cmd.data as String);
break;
case IsolateCommandType.closeDatabase:
db?.close();
stmts.clear();
db = null;
break;
case IsolateCommandType.executeSqlDirectly:
db.execute(cmd.data as String);
break;
case IsolateCommandType.prepareStatement:
final stmt = db.prepare(cmd.data as String);
stmts.add(stmt);
return stmts.length - 1;
case IsolateCommandType.getUserVersion:
return db.userVersion();
case IsolateCommandType.setUserVersion:
final version = cmd.data as int;
db.setUserVersion(version);
break;
case IsolateCommandType.getUpdatedRows:
return db.getUpdatedRows();
case IsolateCommandType.getLastInsertId:
return db.getLastInsertId();
case IsolateCommandType.preparedSelect:
final stmt = stmts[cmd.preparedStatementId];
return stmt.select(cmd.data as List);
case IsolateCommandType.preparedExecute:
final stmt = stmts[cmd.preparedStatementId];
stmt.execute(cmd.data as List);
break;
case IsolateCommandType.preparedClose:
final index = cmd.preparedStatementId;
stmts[index].close();
stmts.removeAt(index);
break;
}
}
}

View File

@ -1,105 +0,0 @@
import 'dart:async';
import 'dart:io';
import 'package:moor_ffi/database.dart';
import 'package:moor_ffi/src/impl/database.dart';
import 'package:moor_ffi/src/impl/isolate/background.dart';
class IsolateDb implements BaseDatabase {
/// Spawns a background isolate and opens the [file] on that isolate. The file
/// will be created if it doesn't exist.
static Future<IsolateDb> openFile(File file) => open(file.absolute.path);
/// Opens a in-memory database on a background isolates.
///
/// If you're not using extensive queries, a synchronous [Database] will
/// provide better performance for in-memory databases!
static Future<IsolateDb> openMemory() => open(':memory:');
/// Spawns a background isolate and opens a sqlite3 database from its
/// filename.
static Future<IsolateDb> open(String path) async {
final proxy = await DbOperationProxy.spawn();
final isolate = IsolateDb._(proxy);
await isolate._open(path);
return isolate;
}
final DbOperationProxy _proxy;
IsolateDb._(this._proxy);
Future<int> _sendAndAssumeInt(IsolateCommandType type, [dynamic data]) async {
return await _proxy.sendRequest(type, data) as int;
}
Future<void> _open(String path) {
return _proxy.sendRequest(IsolateCommandType.openDatabase, path);
}
@override
Future<void> close() async {
await _proxy.sendRequest(IsolateCommandType.closeDatabase, null);
_proxy.close();
}
@override
Future<void> execute(String sql) async {
await _proxy.sendRequest(IsolateCommandType.executeSqlDirectly, sql);
}
@override
Future<int> getLastInsertId() async {
return _sendAndAssumeInt(IsolateCommandType.getLastInsertId);
}
@override
Future<int> getUpdatedRows() async {
return _sendAndAssumeInt(IsolateCommandType.getUpdatedRows);
}
@override
FutureOr<BasePreparedStatement> prepare(String sql) async {
final id =
await _sendAndAssumeInt(IsolateCommandType.prepareStatement, sql);
return IsolatePreparedStatement(this, id);
}
@override
Future<void> setUserVersion(int version) async {
await _proxy.sendRequest(IsolateCommandType.setUserVersion, version);
}
@override
Future<int> userVersion() async {
return _sendAndAssumeInt(IsolateCommandType.getUserVersion);
}
}
class IsolatePreparedStatement implements BasePreparedStatement {
final IsolateDb _db;
final int _id;
IsolatePreparedStatement(this._db, this._id);
@override
Future<void> close() async {
await _db._proxy.sendRequest(IsolateCommandType.preparedClose, null,
preparedStmtId: _id);
}
@override
Future<void> execute([List params]) async {
await _db._proxy.sendRequest(IsolateCommandType.preparedExecute, params,
preparedStmtId: _id);
}
@override
Future<Result> select([List params]) async {
final response = await _db._proxy.sendRequest(
IsolateCommandType.preparedSelect, params,
preparedStmtId: _id);
return response as Result;
}
}

View File

@ -1,6 +1,7 @@
part of 'database.dart';
class PreparedStatement implements BasePreparedStatement {
/// A prepared statement that can be executed multiple times.
class PreparedStatement {
final Pointer<types.Statement> _stmt;
final Database _db;
bool _closed = false;
@ -10,7 +11,7 @@ class PreparedStatement implements BasePreparedStatement {
PreparedStatement._(this._stmt, this._db);
@override
/// Closes this prepared statement and releases its resources.
void close() {
if (!_closed) {
_reset();
@ -26,7 +27,8 @@ class PreparedStatement implements BasePreparedStatement {
}
}
@override
/// Executes this prepared statement as a select statement. The returned rows
/// will be returned.
Result select([List<dynamic> params]) {
_ensureNotFinalized();
_reset();
@ -71,7 +73,7 @@ class PreparedStatement implements BasePreparedStatement {
}
}
@override
/// Executes this prepared statement.
void execute([List<dynamic> params]) {
_ensureNotFinalized();
_reset();

View File

@ -0,0 +1,16 @@
import 'package:moor_ffi/database.dart';
import 'package:test/test.dart';
void main() {
test('insert statements report their id', () {
final opened = Database.memory();
opened.execute('CREATE TABLE tbl(a INTEGER PRIMARY KEY AUTOINCREMENT)');
for (var i = 0; i < 5; i++) {
opened.execute('INSERT INTO tbl DEFAULT VALUES');
expect(opened.getLastInsertId(), i + 1);
}
opened.close();
});
}

View File

@ -0,0 +1,43 @@
import 'package:moor_ffi/database.dart';
import 'package:test/test.dart';
void main() {
test('prepared statements can be used multiple times', () {
final opened = Database.memory();
opened.execute('CREATE TABLE tbl (a TEXT);');
final stmt = opened.prepare('INSERT INTO tbl(a) VALUES(?)');
stmt.execute(['a']);
stmt.execute(['b']);
stmt.close();
final select = opened.prepare('SELECT * FROM tbl ORDER BY a');
final result = select.select();
expect(result, hasLength(2));
expect(result.map((row) => row['a']), ['a', 'b']);
select.close();
opened.close();
});
test('prepared statements cannot be used after close', () {
final opened = Database.memory();
final stmt = opened.prepare('SELECT ?');
stmt.close();
expect(stmt.select, throwsA(anything));
opened.close();
});
test('prepared statements cannot be used after db is closed', () {
final opened = Database.memory();
final stmt = opened.prepare('SELECT 1');
opened.close();
expect(stmt.select, throwsA(anything));
});
}

View File

@ -0,0 +1,20 @@
import 'package:moor_ffi/database.dart';
import 'package:test/test.dart';
void main() {
test('select statements return expected value', () {
final opened = Database.memory();
final prepared = opened.prepare('SELECT ?');
final result1 = prepared.select([1]);
expect(result1.columnNames, ['?']);
expect(result1.single.columnAt(0), 1);
final result2 = prepared.select([2]);
expect(result2.columnNames, ['?']);
expect(result2.single.columnAt(0), 2);
opened.close();
});
}

View File

@ -0,0 +1,29 @@
import 'dart:io';
import 'package:moor_ffi/database.dart';
import 'package:test/test.dart';
import 'package:path/path.dart' as p;
void main() {
test('can set the user version on a database', () {
final file = File(p.join(
Directory.systemTemp.absolute.path, 'moor_ffi_test_user_version.db'));
final opened = Database.openFile(file);
var version = opened.userVersion();
expect(version, 0);
opened.setUserVersion(3);
version = opened.userVersion();
expect(version, 3);
// ensure that the version is stored on file
opened.close();
final another = Database.openFile(file);
expect(another.userVersion(), 3);
another.close();
file.deleteSync();
});
}

View File

@ -1,68 +0,0 @@
import 'dart:async';
import 'dart:io';
import 'package:path/path.dart' as p;
import 'package:test/test.dart';
import 'package:moor_ffi/database.dart';
import 'suite/insert.dart' as insert;
import 'suite/prepared_statements.dart' as prepared_statements;
import 'suite/select.dart' as select;
import 'suite/user_version.dart' as user_version;
var _tempFileCounter = 0;
List<File> _createdFiles = [];
File temporaryFile() {
final count = _tempFileCounter++;
final path =
p.join(Directory.systemTemp.absolute.path, 'moor_ffi_test_$count.db');
final file = File(path);
_createdFiles.add(file);
return file;
}
abstract class TestedDatabase {
FutureOr<BaseDatabase> openFile(File file);
FutureOr<BaseDatabase> openMemory();
}
class TestRegularDatabase implements TestedDatabase {
@override
BaseDatabase openFile(File file) => Database.openFile(file);
@override
BaseDatabase openMemory() => Database.memory();
}
class TestIsolateDatabase implements TestedDatabase {
@override
Future<BaseDatabase> openFile(File file) => IsolateDb.openFile(file);
@override
FutureOr<BaseDatabase> openMemory() => IsolateDb.openMemory();
}
void main() {
group('regular database', () {
_declareAll(TestRegularDatabase());
});
group('isolate database', () {
_declareAll(TestIsolateDatabase());
});
tearDownAll(() async {
for (var file in _createdFiles) {
if (await file.exists()) {
await file.delete();
}
}
});
}
void _declareAll(TestedDatabase db) {
insert.main(db);
prepared_statements.main(db);
select.main(db);
user_version.main(db);
}

View File

@ -1,18 +0,0 @@
import 'package:test/test.dart';
import '../ffi_test.dart';
void main(TestedDatabase db) {
test('insert statements report their id', () async {
final opened = await db.openMemory();
await opened
.execute('CREATE TABLE tbl(a INTEGER PRIMARY KEY AUTOINCREMENT)');
for (var i = 0; i < 5; i++) {
await opened.execute('INSERT INTO tbl DEFAULT VALUES');
expect(await opened.getLastInsertId(), i + 1);
}
await opened.close();
});
}

View File

@ -1,44 +0,0 @@
import 'package:test/test.dart';
import '../ffi_test.dart';
void main(TestedDatabase db) {
test('prepared statements can be used multiple times', () async {
final opened = await db.openMemory();
await opened.execute('CREATE TABLE tbl (a TEXT);');
final stmt = await opened.prepare('INSERT INTO tbl(a) VALUES(?)');
await stmt.execute(['a']);
await stmt.execute(['b']);
await stmt.close();
final select = await opened.prepare('SELECT * FROM tbl ORDER BY a');
final result = await select.select();
expect(result, hasLength(2));
expect(result.map((row) => row['a']), ['a', 'b']);
await select.close();
await opened.close();
});
test('prepared statements cannot be used after close', () async {
final opened = await db.openMemory();
final stmt = await opened.prepare('SELECT ?');
await stmt.close();
expect(stmt.select, throwsA(anything));
await opened.close();
});
test('prepared statements cannot be used after db is closed', () async {
final opened = await db.openMemory();
final stmt = await opened.prepare('SELECT 1');
await opened.close();
expect(stmt.select, throwsA(anything));
});
}

View File

@ -1,21 +0,0 @@
import 'package:test/test.dart';
import '../ffi_test.dart';
void main(TestedDatabase db) {
test('select statements return expected value', () async {
final opened = await db.openMemory();
final prepared = await opened.prepare('SELECT ?');
final result1 = await prepared.select([1]);
expect(result1.columnNames, ['?']);
expect(result1.single.columnAt(0), 1);
final result2 = await prepared.select([2]);
expect(result2.columnNames, ['?']);
expect(result2.single.columnAt(0), 2);
await opened.close();
});
}

View File

@ -1,24 +0,0 @@
import 'package:test/test.dart';
import '../ffi_test.dart';
void main(TestedDatabase db) {
test('can set the user version on a database', () async {
final file = temporaryFile();
final opened = await db.openFile(file);
var version = await opened.userVersion();
expect(version, 0);
await opened.setUserVersion(3);
version = await opened.userVersion();
expect(version, 3);
// ensure that the version is stored on file
await opened.close();
final another = await db.openFile(file);
expect(await another.userVersion(), 3);
await another.close();
});
}