storage: store full transactions and decouple storage from rpc
This commit is contained in:
parent
f35e72923a
commit
5c2e5479a3
|
@ -5,8 +5,6 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
|
||||||
"github.com/gtank/ctxd/rpc"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,9 +34,28 @@ func CreateTables(conn *sql.DB) error {
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
_, err = conn.Exec(blockTable)
|
_, err = conn.Exec(blockTable)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
txTable := `
|
||||||
|
CREATE TABLE IF NOT EXISTS transactions (
|
||||||
|
block_height INTEGER,
|
||||||
|
block_hash TEXT,
|
||||||
|
tx_index INTEGER,
|
||||||
|
tx_hash TEXT,
|
||||||
|
tx_bytes BLOB,
|
||||||
|
FOREIGN KEY (block_height) REFERENCES blocks (height),
|
||||||
|
FOREIGN KEY (block_hash) REFERENCES blocks (hash)
|
||||||
|
);
|
||||||
|
`
|
||||||
|
_, err = conn.Exec(txTable)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO consider max/count queries instead of state table. bit of a coupling assumption though.
|
||||||
|
|
||||||
func GetCurrentHeight(ctx context.Context, db *sql.DB) (int, error) {
|
func GetCurrentHeight(ctx context.Context, db *sql.DB) (int, error) {
|
||||||
var height int
|
var height int
|
||||||
query := "SELECT current_height FROM state WHERE rowid = 1"
|
query := "SELECT current_height FROM state WHERE rowid = 1"
|
||||||
|
@ -46,6 +63,71 @@ func GetCurrentHeight(ctx context.Context, db *sql.DB) (int, error) {
|
||||||
return height, err
|
return height, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetBlock(ctx context.Context, db *sql.DB, height int) ([]byte, error) {
|
||||||
|
var blockBytes []byte // avoid a copy with *RawBytes
|
||||||
|
query := "SELECT compact_encoding from blocks WHERE height = ?"
|
||||||
|
err := db.QueryRowContext(ctx, query, height).Scan(&blockBytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return blockBytes, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetBlockByHash(ctx context.Context, db *sql.DB, hash string) ([]byte, error) {
|
||||||
|
var blockBytes []byte // avoid a copy with *RawBytes
|
||||||
|
query := "SELECT compact_encoding from blocks WHERE hash = ?"
|
||||||
|
err := db.QueryRowContext(ctx, query, hash).Scan(&blockBytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, fmt.Sprintf("getting block with hash %s", hash))
|
||||||
|
}
|
||||||
|
return blockBytes, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// [start, end]
|
||||||
|
func GetBlockRange(conn *sql.DB, start, end int) ([][]byte, error) {
|
||||||
|
// TODO sanity check range bounds
|
||||||
|
query := "SELECT compact_encoding from blocks WHERE (height BETWEEN ? AND ?)"
|
||||||
|
result, err := conn.Query(query, start, end)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer result.Close()
|
||||||
|
|
||||||
|
compactBlocks := make([][]byte, 0, (end-start)+1)
|
||||||
|
for result.Next() {
|
||||||
|
var blockBytes []byte // avoid a copy with *RawBytes
|
||||||
|
err = result.Scan(&blockBytes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
compactBlocks = append(compactBlocks, blockBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = result.Err()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(compactBlocks) == 0 {
|
||||||
|
return nil, ErrBadRange
|
||||||
|
}
|
||||||
|
return compactBlocks, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func StoreBlock(conn *sql.DB, height int, hash string, sapling bool, encoded []byte) error {
|
||||||
|
insertBlock := "INSERT INTO blocks (height, hash, has_sapling_tx, compact_encoding) values (?, ?, ?, ?)"
|
||||||
|
_, err := conn.Exec(insertBlock, height, hash, sapling, encoded)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, fmt.Sprintf("storing compact block %d", height))
|
||||||
|
}
|
||||||
|
|
||||||
|
currentHeight, err := GetCurrentHeight(context.Background(), conn)
|
||||||
|
if err != nil || height > currentHeight {
|
||||||
|
err = SetCurrentHeight(conn, height)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func SetCurrentHeight(conn *sql.DB, height int) error {
|
func SetCurrentHeight(conn *sql.DB, height int) error {
|
||||||
update := "UPDATE state SET current_height=?, timestamp=CURRENT_TIMESTAMP WHERE rowid = 1"
|
update := "UPDATE state SET current_height=?, timestamp=CURRENT_TIMESTAMP WHERE rowid = 1"
|
||||||
result, err := conn.Exec(update, height)
|
result, err := conn.Exec(update, height)
|
||||||
|
@ -74,76 +156,44 @@ func SetCurrentHeight(conn *sql.DB, height int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetBlock(ctx context.Context, db *sql.DB, height int) (*rpc.CompactBlock, error) {
|
func StoreFullTx(db *sql.DB, blockHeight int, blockHash string, txIndex int, txHash string, txBytes []byte) error {
|
||||||
var blockBytes []byte // avoid a copy with *RawBytes
|
insertTx := "INSERT INTO transactions (block_height, block_hash, tx_index, tx_hash, tx_bytes) VALUES (?,?,?,?,?)"
|
||||||
query := "SELECT compact_encoding from blocks WHERE height = ?"
|
_, err := db.Exec(insertTx, blockHeight, blockHash, txIndex, txHash, txBytes)
|
||||||
err := db.QueryRowContext(ctx, query, height).Scan(&blockBytes)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return errors.Wrap(err, fmt.Sprintf("storing tx %x", txHash))
|
||||||
}
|
}
|
||||||
compactBlock := &rpc.CompactBlock{}
|
return nil
|
||||||
err = proto.Unmarshal(blockBytes, compactBlock)
|
|
||||||
return compactBlock, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetBlockByHash(ctx context.Context, db *sql.DB, hash string) (*rpc.CompactBlock, error) {
|
// GetFulTxByHash retrieves a full transaction by its little-endian hash.
|
||||||
var blockBytes []byte // avoid a copy with *RawBytes
|
func GetFullTxByHash(ctx context.Context, db *sql.DB, txHash string) ([]byte, error) {
|
||||||
query := "SELECT compact_encoding from blocks WHERE hash = ?"
|
var txBytes []byte // avoid a copy with *RawBytes
|
||||||
err := db.QueryRowContext(ctx, query, hash).Scan(&blockBytes)
|
query := "SELECT bytes from transactions WHERE tx_hash = ?"
|
||||||
|
err := db.QueryRowContext(ctx, query, txHash).Scan(&txBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, fmt.Sprintf("getting block with hash %s", hash))
|
return nil, errors.Wrap(err, fmt.Sprintf("getting tx with hash %s", txHash))
|
||||||
}
|
}
|
||||||
compactBlock := &rpc.CompactBlock{}
|
return txBytes, nil
|
||||||
err = proto.Unmarshal(blockBytes, compactBlock)
|
|
||||||
return compactBlock, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// [start, end]
|
// GetFullTxByHeightAndIndex retrieves a full transaction by its parent block height and index
|
||||||
func GetBlockRange(conn *sql.DB, start, end int) ([]*rpc.CompactBlock, error) {
|
func GetFullTxByHeightAndIndex(ctx context.Context, db *sql.DB, blockHeight, txIndex int) ([]byte, error) {
|
||||||
// TODO sanity check range bounds
|
var txBytes []byte // avoid a copy with *RawBytes
|
||||||
query := "SELECT compact_encoding from blocks WHERE (height BETWEEN ? AND ?)"
|
query := "SELECT bytes from transactions WHERE (block_height = ? AND tx_index = ?)"
|
||||||
result, err := conn.Query(query, start, end)
|
err := db.QueryRowContext(ctx, query, blockHeight, txIndex).Scan(&txBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.Wrap(err, fmt.Sprintf("getting tx (%d, %d)", blockHeight, txIndex))
|
||||||
}
|
}
|
||||||
defer result.Close()
|
return txBytes, nil
|
||||||
|
|
||||||
compactBlocks := make([]*rpc.CompactBlock, 0, (end-start)+1)
|
|
||||||
for result.Next() {
|
|
||||||
var blockBytes []byte // avoid a copy with *RawBytes
|
|
||||||
err = result.Scan(&blockBytes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
newBlock := &rpc.CompactBlock{}
|
|
||||||
err = proto.Unmarshal(blockBytes, newBlock)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
compactBlocks = append(compactBlocks, newBlock)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = result.Err()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(compactBlocks) == 0 {
|
|
||||||
return nil, ErrBadRange
|
|
||||||
}
|
|
||||||
return compactBlocks, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func StoreBlock(conn *sql.DB, height int, hash string, sapling bool, encoded []byte) error {
|
// GetFullTxByHashAndIndex retrieves a full transaction by its parent block hash and index
|
||||||
insertBlock := "INSERT INTO blocks (height, hash, has_sapling_tx, compact_encoding) values (?, ?, ?, ?)"
|
func GetFullTxByHashAndIndex(ctx context.Context, db *sql.DB, blockHash string, txIndex int) ([]byte, error) {
|
||||||
_, err := conn.Exec(insertBlock, height, hash, sapling, encoded)
|
var txBytes []byte // avoid a copy with *RawBytes
|
||||||
|
query := "SELECT bytes from transactions WHERE (block_hash = ? AND tx_index = ?)"
|
||||||
|
err := db.QueryRowContext(ctx, query, blockHash, txIndex).Scan(&txBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, fmt.Sprintf("storing compact block %d", height))
|
return nil, errors.Wrap(err, fmt.Sprintf("getting tx (%x, %d)", blockHash, txIndex))
|
||||||
}
|
}
|
||||||
|
return txBytes, nil
|
||||||
currentHeight, err := GetCurrentHeight(context.Background(), conn)
|
|
||||||
if err != nil || height > currentHeight {
|
|
||||||
err = SetCurrentHeight(conn, height)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,10 +9,12 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
protobuf "github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/gtank/ctxd/parser"
|
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"github.com/gtank/ctxd/parser"
|
||||||
|
"github.com/gtank/ctxd/rpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSqliteStorage(t *testing.T) {
|
func TestSqliteStorage(t *testing.T) {
|
||||||
|
@ -58,7 +60,7 @@ func TestSqliteStorage(t *testing.T) {
|
||||||
hash := hex.EncodeToString(block.GetEncodableHash())
|
hash := hex.EncodeToString(block.GetEncodableHash())
|
||||||
hasSapling := block.HasSaplingTransactions()
|
hasSapling := block.HasSaplingTransactions()
|
||||||
protoBlock := block.ToCompact()
|
protoBlock := block.ToCompact()
|
||||||
marshaled, _ := protobuf.Marshal(protoBlock)
|
marshaled, _ := proto.Marshal(protoBlock)
|
||||||
|
|
||||||
err = StoreBlock(conn, height, hash, hasSapling, marshaled)
|
err = StoreBlock(conn, height, hash, hasSapling, marshaled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -92,8 +94,13 @@ func TestSqliteStorage(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(errors.Wrap(err, "retrieving stored block"))
|
t.Error(errors.Wrap(err, "retrieving stored block"))
|
||||||
}
|
}
|
||||||
|
cblock := &rpc.CompactBlock{}
|
||||||
|
err = proto.Unmarshal(retBlock, cblock)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
if int(retBlock.Height) != lastBlockTest.BlockHeight {
|
if int(cblock.Height) != lastBlockTest.BlockHeight {
|
||||||
t.Error("incorrect retrieval")
|
t.Error("incorrect retrieval")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue