From 09007ad8567ab817d197149362b8817268f71cd2 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Sat, 15 Dec 2018 20:01:28 +0000 Subject: [PATCH] storage: small fixes and sqlite3-specific tuning --- cmd/ingest/main.go | 9 +++------ frontend/service.go | 4 +++- storage/sqlite3.go | 14 +++++++------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/cmd/ingest/main.go b/cmd/ingest/main.go index 9d85a28..85ccae6 100644 --- a/cmd/ingest/main.go +++ b/cmd/ingest/main.go @@ -73,7 +73,8 @@ func main() { }) // Initialize database - db, err := sql.Open("sqlite3", opts.dbPath) + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=10000&cache=shared", opts.dbPath)) + db.SetMaxOpenConns(1) if err != nil { log.WithFields(logrus.Fields{ "db_path": opts.dbPath, @@ -117,6 +118,7 @@ func main() { } connString := fmt.Sprintf("tcp://%s", opts.zmqAddr) + err = sock.Connect(connString) if err != nil { log.WithFields(logrus.Fields{ @@ -155,11 +157,6 @@ func main() { switch string(topic) { case opts.zmqTopic: - log.WithFields(logrus.Fields{ - "seqnum": sequence, - "header": fmt.Sprintf("%x", body[:80]), - }).Debug("got block") - // there's an implicit mutex here go handleBlock(db, sequence, body) diff --git a/frontend/service.go b/frontend/service.go index f446026..3ba7b75 100644 --- a/frontend/service.go +++ b/frontend/service.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/hex" "errors" + "fmt" "time" "github.com/golang/protobuf/proto" @@ -25,7 +26,8 @@ type SqlStreamer struct { } func NewSQLiteStreamer(dbPath string) (rpc.CompactTxStreamerServer, error) { - db, err := sql.Open("sqlite3", dbPath) + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=10000&cache=shared", dbPath)) + db.SetMaxOpenConns(1) if err != nil { return nil, err } diff --git a/storage/sqlite3.go b/storage/sqlite3.go index f0fdf4b..f971647 100644 --- a/storage/sqlite3.go +++ b/storage/sqlite3.go @@ -17,7 +17,7 @@ func CreateTables(conn *sql.DB) error { CREATE TABLE IF NOT EXISTS state ( current_height INTEGER, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (current_height) REFERENCES blocks (height) + FOREIGN KEY (current_height) REFERENCES blocks (block_height) ); ` _, err := conn.Exec(stateTable) @@ -45,8 +45,8 @@ func CreateTables(conn *sql.DB) error { tx_index INTEGER, tx_hash TEXT, tx_bytes BLOB, - FOREIGN KEY (block_height) REFERENCES blocks (height), - FOREIGN KEY (block_hash) REFERENCES blocks (hash) + FOREIGN KEY (block_height) REFERENCES blocks (block_height), + FOREIGN KEY (block_hash) REFERENCES blocks (block_hash) ); ` _, err = conn.Exec(txTable) @@ -137,7 +137,7 @@ func StoreBlock(conn *sql.DB, height int, hash string, sapling bool, encoded []b } func SetCurrentHeight(conn *sql.DB, height int) error { - update := "UPDATE state SET current_height=? WHERE rowid = 1" + update := "UPDATE state SET current_height=?, timestamp=CURRENT_TIMESTAMP WHERE rowid = 1" result, err := conn.Exec(update, height) if err != nil { return errors.Wrap(err, "updating state row") @@ -176,7 +176,7 @@ func StoreTransaction(db *sql.DB, blockHeight int, blockHash string, txIndex int // GetTxByHash retrieves a full transaction by its little-endian hash. func GetTxByHash(ctx context.Context, db *sql.DB, txHash string) ([]byte, error) { var txBytes []byte // avoid a copy with *RawBytes - query := "SELECT bytes from transactions WHERE tx_hash = ?" + query := "SELECT tx_bytes from transactions WHERE tx_hash = ?" err := db.QueryRowContext(ctx, query, txHash).Scan(&txBytes) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("getting tx with hash %s", txHash)) @@ -187,7 +187,7 @@ func GetTxByHash(ctx context.Context, db *sql.DB, txHash string) ([]byte, error) // GetTxByHeightAndIndex retrieves a full transaction by its parent block height and index func GetTxByHeightAndIndex(ctx context.Context, db *sql.DB, blockHeight, txIndex int) ([]byte, error) { var txBytes []byte // avoid a copy with *RawBytes - query := "SELECT bytes from transactions WHERE (block_height = ? AND tx_index = ?)" + query := "SELECT tx_bytes from transactions WHERE (block_height = ? AND tx_index = ?)" err := db.QueryRowContext(ctx, query, blockHeight, txIndex).Scan(&txBytes) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("getting tx (%d, %d)", blockHeight, txIndex)) @@ -198,7 +198,7 @@ func GetTxByHeightAndIndex(ctx context.Context, db *sql.DB, blockHeight, txIndex // GetTxByHashAndIndex retrieves a full transaction by its parent block hash and index func GetTxByHashAndIndex(ctx context.Context, db *sql.DB, blockHash string, txIndex int) ([]byte, error) { var txBytes []byte // avoid a copy with *RawBytes - query := "SELECT bytes from transactions WHERE (block_hash = ? AND tx_index = ?)" + query := "SELECT tx_bytes from transactions WHERE (block_hash = ? AND tx_index = ?)" err := db.QueryRowContext(ctx, query, blockHash, txIndex).Scan(&txBytes) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("getting tx (%x, %d)", blockHash, txIndex))