storage: small fixes and sqlite3-specific tuning
This commit is contained in:
parent
9a0639761b
commit
09007ad856
|
@ -73,7 +73,8 @@ func main() {
|
||||||
})
|
})
|
||||||
|
|
||||||
// Initialize database
|
// Initialize database
|
||||||
db, err := sql.Open("sqlite3", opts.dbPath)
|
db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=10000&cache=shared", opts.dbPath))
|
||||||
|
db.SetMaxOpenConns(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
"db_path": opts.dbPath,
|
"db_path": opts.dbPath,
|
||||||
|
@ -117,6 +118,7 @@ func main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
connString := fmt.Sprintf("tcp://%s", opts.zmqAddr)
|
connString := fmt.Sprintf("tcp://%s", opts.zmqAddr)
|
||||||
|
|
||||||
err = sock.Connect(connString)
|
err = sock.Connect(connString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
|
@ -155,11 +157,6 @@ func main() {
|
||||||
switch string(topic) {
|
switch string(topic) {
|
||||||
|
|
||||||
case opts.zmqTopic:
|
case opts.zmqTopic:
|
||||||
log.WithFields(logrus.Fields{
|
|
||||||
"seqnum": sequence,
|
|
||||||
"header": fmt.Sprintf("%x", body[:80]),
|
|
||||||
}).Debug("got block")
|
|
||||||
|
|
||||||
// there's an implicit mutex here
|
// there's an implicit mutex here
|
||||||
go handleBlock(db, sequence, body)
|
go handleBlock(db, sequence, body)
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
@ -25,7 +26,8 @@ type SqlStreamer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSQLiteStreamer(dbPath string) (rpc.CompactTxStreamerServer, error) {
|
func NewSQLiteStreamer(dbPath string) (rpc.CompactTxStreamerServer, error) {
|
||||||
db, err := sql.Open("sqlite3", dbPath)
|
db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=10000&cache=shared", dbPath))
|
||||||
|
db.SetMaxOpenConns(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ func CreateTables(conn *sql.DB) error {
|
||||||
CREATE TABLE IF NOT EXISTS state (
|
CREATE TABLE IF NOT EXISTS state (
|
||||||
current_height INTEGER,
|
current_height INTEGER,
|
||||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
FOREIGN KEY (current_height) REFERENCES blocks (height)
|
FOREIGN KEY (current_height) REFERENCES blocks (block_height)
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
_, err := conn.Exec(stateTable)
|
_, err := conn.Exec(stateTable)
|
||||||
|
@ -45,8 +45,8 @@ func CreateTables(conn *sql.DB) error {
|
||||||
tx_index INTEGER,
|
tx_index INTEGER,
|
||||||
tx_hash TEXT,
|
tx_hash TEXT,
|
||||||
tx_bytes BLOB,
|
tx_bytes BLOB,
|
||||||
FOREIGN KEY (block_height) REFERENCES blocks (height),
|
FOREIGN KEY (block_height) REFERENCES blocks (block_height),
|
||||||
FOREIGN KEY (block_hash) REFERENCES blocks (hash)
|
FOREIGN KEY (block_hash) REFERENCES blocks (block_hash)
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
_, err = conn.Exec(txTable)
|
_, err = conn.Exec(txTable)
|
||||||
|
@ -137,7 +137,7 @@ func StoreBlock(conn *sql.DB, height int, hash string, sapling bool, encoded []b
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetCurrentHeight(conn *sql.DB, height int) error {
|
func SetCurrentHeight(conn *sql.DB, height int) error {
|
||||||
update := "UPDATE state SET current_height=? WHERE rowid = 1"
|
update := "UPDATE state SET current_height=?, timestamp=CURRENT_TIMESTAMP WHERE rowid = 1"
|
||||||
result, err := conn.Exec(update, height)
|
result, err := conn.Exec(update, height)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "updating state row")
|
return errors.Wrap(err, "updating state row")
|
||||||
|
@ -176,7 +176,7 @@ func StoreTransaction(db *sql.DB, blockHeight int, blockHash string, txIndex int
|
||||||
// GetTxByHash retrieves a full transaction by its little-endian hash.
|
// GetTxByHash retrieves a full transaction by its little-endian hash.
|
||||||
func GetTxByHash(ctx context.Context, db *sql.DB, txHash string) ([]byte, error) {
|
func GetTxByHash(ctx context.Context, db *sql.DB, txHash string) ([]byte, error) {
|
||||||
var txBytes []byte // avoid a copy with *RawBytes
|
var txBytes []byte // avoid a copy with *RawBytes
|
||||||
query := "SELECT bytes from transactions WHERE tx_hash = ?"
|
query := "SELECT tx_bytes from transactions WHERE tx_hash = ?"
|
||||||
err := db.QueryRowContext(ctx, query, txHash).Scan(&txBytes)
|
err := db.QueryRowContext(ctx, query, txHash).Scan(&txBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, fmt.Sprintf("getting tx with hash %s", txHash))
|
return nil, errors.Wrap(err, fmt.Sprintf("getting tx with hash %s", txHash))
|
||||||
|
@ -187,7 +187,7 @@ func GetTxByHash(ctx context.Context, db *sql.DB, txHash string) ([]byte, error)
|
||||||
// GetTxByHeightAndIndex retrieves a full transaction by its parent block height and index
|
// GetTxByHeightAndIndex retrieves a full transaction by its parent block height and index
|
||||||
func GetTxByHeightAndIndex(ctx context.Context, db *sql.DB, blockHeight, txIndex int) ([]byte, error) {
|
func GetTxByHeightAndIndex(ctx context.Context, db *sql.DB, blockHeight, txIndex int) ([]byte, error) {
|
||||||
var txBytes []byte // avoid a copy with *RawBytes
|
var txBytes []byte // avoid a copy with *RawBytes
|
||||||
query := "SELECT bytes from transactions WHERE (block_height = ? AND tx_index = ?)"
|
query := "SELECT tx_bytes from transactions WHERE (block_height = ? AND tx_index = ?)"
|
||||||
err := db.QueryRowContext(ctx, query, blockHeight, txIndex).Scan(&txBytes)
|
err := db.QueryRowContext(ctx, query, blockHeight, txIndex).Scan(&txBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, fmt.Sprintf("getting tx (%d, %d)", blockHeight, txIndex))
|
return nil, errors.Wrap(err, fmt.Sprintf("getting tx (%d, %d)", blockHeight, txIndex))
|
||||||
|
@ -198,7 +198,7 @@ func GetTxByHeightAndIndex(ctx context.Context, db *sql.DB, blockHeight, txIndex
|
||||||
// GetTxByHashAndIndex retrieves a full transaction by its parent block hash and index
|
// GetTxByHashAndIndex retrieves a full transaction by its parent block hash and index
|
||||||
func GetTxByHashAndIndex(ctx context.Context, db *sql.DB, blockHash string, txIndex int) ([]byte, error) {
|
func GetTxByHashAndIndex(ctx context.Context, db *sql.DB, blockHash string, txIndex int) ([]byte, error) {
|
||||||
var txBytes []byte // avoid a copy with *RawBytes
|
var txBytes []byte // avoid a copy with *RawBytes
|
||||||
query := "SELECT bytes from transactions WHERE (block_hash = ? AND tx_index = ?)"
|
query := "SELECT tx_bytes from transactions WHERE (block_hash = ? AND tx_index = ?)"
|
||||||
err := db.QueryRowContext(ctx, query, blockHash, txIndex).Scan(&txBytes)
|
err := db.QueryRowContext(ctx, query, blockHash, txIndex).Scan(&txBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, fmt.Sprintf("getting tx (%x, %d)", blockHash, txIndex))
|
return nil, errors.Wrap(err, fmt.Sprintf("getting tx (%x, %d)", blockHash, txIndex))
|
||||||
|
|
Loading…
Reference in New Issue