ingest: make zmq client store raw transaction data

This commit is contained in:
George Tankersley 2018-12-14 21:54:33 -05:00
parent d4d991a191
commit 71c0624abe
4 changed files with 54 additions and 21 deletions

View File

@ -188,13 +188,13 @@ func handleBlock(db *sql.DB, sequence int, blockData []byte) {
return return
} }
displayHash := hex.EncodeToString(block.GetEncodableHash()) blockHash := hex.EncodeToString(block.GetEncodableHash())
marshaledBlock, _ := proto.Marshal(block.ToCompact()) marshaledBlock, _ := proto.Marshal(block.ToCompact())
err = storage.StoreBlock( err = storage.StoreBlock(
db, db,
block.GetHeight(), block.GetHeight(),
displayHash, blockHash,
block.HasSaplingTransactions(), block.HasSaplingTransactions(),
marshaledBlock, marshaledBlock,
) )
@ -202,10 +202,10 @@ func handleBlock(db *sql.DB, sequence int, blockData []byte) {
entry := log.WithFields(logrus.Fields{ entry := log.WithFields(logrus.Fields{
"seqnum": sequence, "seqnum": sequence,
"block_height": block.GetHeight(), "block_height": block.GetHeight(),
"block_hash": displayHash, "block_hash": blockHash,
"block_version": block.GetVersion(), "block_version": block.GetVersion(),
"tx_count": block.GetTxCount(), "tx_count": block.GetTxCount(),
"has_sapling_tx": block.HasSaplingTransactions(), "has_sapling": block.HasSaplingTransactions(),
"error": err, "error": err,
}) })
@ -214,4 +214,28 @@ func handleBlock(db *sql.DB, sequence int, blockData []byte) {
} else { } else {
entry.Info("received new block") entry.Info("received new block")
} }
for index, tx := range block.Transactions() {
txHash := hex.EncodeToString(tx.GetEncodableHash())
err = storage.StoreTransaction(
db,
block.GetHeight(),
blockHash,
index,
txHash,
tx.Bytes(),
)
entry = log.WithFields(logrus.Fields{
"block_height": block.GetHeight(),
"block_hash": blockHash,
"tx_index": index,
"has_sapling": tx.HasSaplingTransactions(),
"error": err,
})
if err != nil {
entry.Error("error storing tx")
} else {
entry.Debug("storing tx")
}
}
} }

View File

@ -10,7 +10,7 @@ import (
type block struct { type block struct {
hdr *blockHeader hdr *blockHeader
vtx []*transaction vtx []*Transaction
} }
func NewBlock() *block { func NewBlock() *block {
@ -25,6 +25,11 @@ func (b *block) GetTxCount() int {
return len(b.vtx) return len(b.vtx)
} }
func (b *block) Transactions() []*Transaction {
// TODO: these should NOT be mutable
return b.vtx
}
// GetDisplayHash returns the block hash in big-endian display order. // GetDisplayHash returns the block hash in big-endian display order.
func (b *block) GetDisplayHash() []byte { func (b *block) GetDisplayHash() []byte {
return b.hdr.GetDisplayHash() return b.hdr.GetDisplayHash()
@ -101,7 +106,7 @@ func (b *block) ParseFromSlice(data []byte) (rest []byte, err error) {
} }
data = []byte(s) data = []byte(s)
vtx := make([]*transaction, 0, txCount) vtx := make([]*Transaction, 0, txCount)
for i := 0; len(data) > 0; i++ { for i := 0; len(data) > 0; i++ {
tx := NewTransaction() tx := NewTransaction()
data, err = tx.ParseFromSlice(data) data, err = tx.ParseFromSlice(data)

View File

@ -263,14 +263,14 @@ func (p *joinSplit) ParseFromSlice(data []byte) ([]byte, error) {
return []byte(s), nil return []byte(s), nil
} }
type transaction struct { type Transaction struct {
*rawTransaction *rawTransaction
rawBytes []byte rawBytes []byte
txId []byte txId []byte
} }
// GetDisplayHash returns the transaction hash in big-endian display order. // GetDisplayHash returns the transaction hash in big-endian display order.
func (tx *transaction) GetDisplayHash() []byte { func (tx *Transaction) GetDisplayHash() []byte {
if tx.txId != nil { if tx.txId != nil {
return tx.txId return tx.txId
} }
@ -290,17 +290,21 @@ func (tx *transaction) GetDisplayHash() []byte {
} }
// GetEncodableHash returns the transaction hash in little-endian wire format order. // GetEncodableHash returns the transaction hash in little-endian wire format order.
func (tx *transaction) GetEncodableHash() []byte { func (tx *Transaction) GetEncodableHash() []byte {
digest := sha256.Sum256(tx.rawBytes) digest := sha256.Sum256(tx.rawBytes)
digest = sha256.Sum256(digest[:]) digest = sha256.Sum256(digest[:])
return digest[:] return digest[:]
} }
func (tx *transaction) HasSaplingTransactions() bool { func (tx *Transaction) Bytes() []byte {
return tx.rawBytes
}
func (tx *Transaction) HasSaplingTransactions() bool {
return tx.version >= 4 && (len(tx.shieldedSpends)+len(tx.shieldedOutputs)) > 0 return tx.version >= 4 && (len(tx.shieldedSpends)+len(tx.shieldedOutputs)) > 0
} }
func (tx *transaction) ToCompact(index int) *rpc.CompactTx { func (tx *Transaction) ToCompact(index int) *rpc.CompactTx {
ctx := &rpc.CompactTx{ ctx := &rpc.CompactTx{
Index: uint64(index), // index is contextual Index: uint64(index), // index is contextual
Hash: tx.GetEncodableHash(), Hash: tx.GetEncodableHash(),
@ -317,7 +321,7 @@ func (tx *transaction) ToCompact(index int) *rpc.CompactTx {
return ctx return ctx
} }
func (tx *transaction) ParseFromSlice(data []byte) ([]byte, error) { func (tx *Transaction) ParseFromSlice(data []byte) ([]byte, error) {
s := bytestring.String(data) s := bytestring.String(data)
// declare here to prevent shadowing problems in cryptobyte assignments // declare here to prevent shadowing problems in cryptobyte assignments
@ -465,8 +469,8 @@ func (tx *transaction) ParseFromSlice(data []byte) ([]byte, error) {
return []byte(s), nil return []byte(s), nil
} }
func NewTransaction() *transaction { func NewTransaction() *Transaction {
return &transaction{ return &Transaction{
rawTransaction: new(rawTransaction), rawTransaction: new(rawTransaction),
} }
} }

View File

@ -225,7 +225,7 @@ func TestSproutTransactionParser(t *testing.T) {
} }
} }
func subTestCommonBlockMeta(tt *txTestVector, tx *transaction, t *testing.T, caseNum int) bool { func subTestCommonBlockMeta(tt *txTestVector, tx *Transaction, t *testing.T, caseNum int) bool {
headerBytes, _ := hex.DecodeString(tt.header) headerBytes, _ := hex.DecodeString(tt.header)
header := binary.LittleEndian.Uint32(headerBytes) header := binary.LittleEndian.Uint32(headerBytes)
if (header >> 31) == 1 != tx.fOverwintered { if (header >> 31) == 1 != tx.fOverwintered {