searching transaction results
This commit is contained in:
parent
16cf7a5e0a
commit
ea0b205455
|
@ -1,5 +1,5 @@
|
|||
hash: 223d8e42a118e7861cb673ea58a035e99d3a98c94e4b71fb52998d320f9c3b49
|
||||
updated: 2017-11-25T22:00:24.612202481-08:00
|
||||
hash: e279cca35a5cc9a68bb266015dc6a57da749b28dabca3994b2c5dbe02309f470
|
||||
updated: 2017-11-28T00:53:04.816567531Z
|
||||
imports:
|
||||
- name: github.com/btcsuite/btcd
|
||||
version: 8cea3866d0f7fb12d567a20744942c0d078c7d15
|
||||
|
|
|
@ -299,7 +299,7 @@ func NewNode(config *cfg.Config,
|
|||
for event := range ch {
|
||||
// XXX: may be not perfomant to write one event at a time
|
||||
txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult
|
||||
txIndexer.Index(&txResult)
|
||||
txIndexer.Index(&txResult, strings.Split(config.TxIndex.IndexTags, ","))
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
|
@ -10,10 +10,10 @@ import (
|
|||
type TxIndexer interface {
|
||||
|
||||
// AddBatch analyzes, indexes and stores a batch of transactions.
|
||||
AddBatch(b *Batch) error
|
||||
AddBatch(b *Batch, allowedTags []string) error
|
||||
|
||||
// Index analyzes, indexes and stores a single transaction.
|
||||
Index(result *types.TxResult) error
|
||||
Index(result *types.TxResult, allowedTags []string) error
|
||||
|
||||
// Get returns the transaction specified by hash or nil if the transaction is not indexed
|
||||
// or stored.
|
||||
|
@ -26,18 +26,18 @@ type TxIndexer interface {
|
|||
// Batch groups together multiple Index operations to be performed at the same time.
|
||||
// NOTE: Batch is NOT thread-safe and must not be modified after starting its execution.
|
||||
type Batch struct {
|
||||
Ops []types.TxResult
|
||||
Ops []*types.TxResult
|
||||
}
|
||||
|
||||
// NewBatch creates a new Batch.
|
||||
func NewBatch(n int) *Batch {
|
||||
return &Batch{
|
||||
Ops: make([]types.TxResult, n),
|
||||
Ops: make([]*types.TxResult, n),
|
||||
}
|
||||
}
|
||||
|
||||
// Add or update an entry for the given result.Index.
|
||||
func (b *Batch) Add(result types.TxResult) error {
|
||||
func (b *Batch) Add(result *types.TxResult) error {
|
||||
b.Ops[result.Index] = result
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -2,16 +2,24 @@ package kv
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
abci "github.com/tendermint/abci/types"
|
||||
wire "github.com/tendermint/go-wire"
|
||||
|
||||
db "github.com/tendermint/tmlibs/db"
|
||||
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
db "github.com/tendermint/tmlibs/db"
|
||||
"github.com/tendermint/tmlibs/pubsub/query"
|
||||
)
|
||||
|
||||
var _ txindex.TxIndexer = (*TxIndex)(nil)
|
||||
|
||||
// TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB).
|
||||
// It can only index transaction by its identifier.
|
||||
type TxIndex struct {
|
||||
|
@ -46,20 +54,322 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
|
|||
return txResult, nil
|
||||
}
|
||||
|
||||
// AddBatch writes a batch of transactions into the TxIndex storage.
|
||||
func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
|
||||
// AddBatch indexes a batch of transactions using the given list of tags.
|
||||
func (txi *TxIndex) AddBatch(b *txindex.Batch, allowedTags []string) error {
|
||||
storeBatch := txi.store.NewBatch()
|
||||
|
||||
for _, result := range b.Ops {
|
||||
rawBytes := wire.BinaryBytes(&result)
|
||||
storeBatch.Set(result.Tx.Hash(), rawBytes)
|
||||
hash := result.Tx.Hash()
|
||||
|
||||
// index tx by tags
|
||||
for _, tag := range result.Result.Tags {
|
||||
if stringInSlice(tag.Key, allowedTags) {
|
||||
storeBatch.Set(keyForTag(tag, result), hash)
|
||||
}
|
||||
}
|
||||
|
||||
// index tx by hash
|
||||
rawBytes := wire.BinaryBytes(result)
|
||||
storeBatch.Set(hash, rawBytes)
|
||||
}
|
||||
|
||||
storeBatch.Write()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Index writes a single transaction into the TxIndex storage.
|
||||
func (txi *TxIndex) Index(result *types.TxResult) error {
|
||||
rawBytes := wire.BinaryBytes(result)
|
||||
txi.store.Set(result.Tx.Hash(), rawBytes)
|
||||
return nil
|
||||
// Index indexes a single transaction using the given list of tags.
|
||||
func (txi *TxIndex) Index(result *types.TxResult, allowedTags []string) error {
|
||||
batch := txindex.NewBatch(1)
|
||||
batch.Add(result)
|
||||
return txi.AddBatch(batch, allowedTags)
|
||||
}
|
||||
|
||||
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
|
||||
hashes := make(map[string][]byte) // key - (base 16, upper-case hash)
|
||||
|
||||
// get a list of conditions (like "tx.height > 5")
|
||||
conditions := q.Conditions()
|
||||
|
||||
// if there is a hash condition, return the result immediately
|
||||
hash, err, ok := lookForHash(conditions)
|
||||
if err != nil {
|
||||
return []*types.TxResult{}, errors.Wrap(err, "error during searching for a hash in the query")
|
||||
} else if ok {
|
||||
res, err := txi.Get(hash)
|
||||
return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result")
|
||||
}
|
||||
|
||||
// conditions to skip
|
||||
skipIndexes := make([]int, 0)
|
||||
|
||||
// if there is a height condition ("tx.height=3"), extract it for faster lookups
|
||||
height, heightIndex := lookForHeight(conditions)
|
||||
if heightIndex >= 0 {
|
||||
skipIndexes = append(skipIndexes, heightIndex)
|
||||
}
|
||||
|
||||
var hashes2 [][]byte
|
||||
|
||||
// extract ranges
|
||||
// if both upper and lower bounds exist, it's better to get them in order not
|
||||
// no iterate over kvs that are not within range.
|
||||
ranges, rangeIndexes := lookForRanges(conditions)
|
||||
if len(ranges) > 0 {
|
||||
skipIndexes = append(skipIndexes, rangeIndexes...)
|
||||
}
|
||||
for _, r := range ranges {
|
||||
hashes2 = txi.matchRange(r, startKeyForRange(r, height, heightIndex > 0))
|
||||
|
||||
// initialize hashes if we're running the first time
|
||||
if len(hashes) == 0 {
|
||||
for _, h := range hashes2 {
|
||||
hashes[hashKey(h)] = h
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// no matches
|
||||
if len(hashes2) == 0 {
|
||||
hashes = make(map[string][]byte)
|
||||
} else {
|
||||
// perform intersection as we go
|
||||
for _, h := range hashes2 {
|
||||
k := hashKey(h)
|
||||
if _, ok := hashes[k]; !ok {
|
||||
delete(hashes, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// for all other conditions
|
||||
for i, c := range conditions {
|
||||
if intInSlice(i, skipIndexes) {
|
||||
continue
|
||||
}
|
||||
|
||||
hashes2 = txi.match(c, startKey(c, height, heightIndex > 0))
|
||||
|
||||
// initialize hashes if we're running the first time
|
||||
if len(hashes) == 0 {
|
||||
for _, h := range hashes2 {
|
||||
hashes[hashKey(h)] = h
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// no matches
|
||||
if len(hashes2) == 0 {
|
||||
hashes = make(map[string][]byte)
|
||||
} else {
|
||||
// perform intersection as we go
|
||||
for _, h := range hashes2 {
|
||||
k := hashKey(h)
|
||||
if _, ok := hashes[k]; !ok {
|
||||
delete(hashes, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
results := make([]*types.TxResult, len(hashes))
|
||||
i := 0
|
||||
for _, h := range hashes {
|
||||
results[i], err = txi.Get(h)
|
||||
if err != nil {
|
||||
return []*types.TxResult{}, errors.Wrapf(err, "failed to get Tx{%X}", h)
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) {
|
||||
for _, c := range conditions {
|
||||
if c.Tag == types.TxHashKey {
|
||||
decoded, err := hex.DecodeString(c.Operand.(string))
|
||||
return decoded, err, true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func lookForHeight(conditions []query.Condition) (height uint64, index int) {
|
||||
for i, c := range conditions {
|
||||
if c.Tag == types.TxHeightKey {
|
||||
return uint64(c.Operand.(int64)), i
|
||||
}
|
||||
}
|
||||
return 0, -1
|
||||
}
|
||||
|
||||
type queryRanges map[string]queryRange
|
||||
|
||||
type queryRange struct {
|
||||
key string
|
||||
lowerBound interface{} // int || time.Time
|
||||
includeLowerBound bool
|
||||
upperBound interface{} // int || time.Time
|
||||
includeUpperBound bool
|
||||
}
|
||||
|
||||
func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) {
|
||||
ranges = make(queryRanges)
|
||||
for i, c := range conditions {
|
||||
if isRangeOperation(c.Op) {
|
||||
r, ok := ranges[c.Tag]
|
||||
if !ok {
|
||||
r = queryRange{key: c.Tag}
|
||||
}
|
||||
switch c.Op {
|
||||
case query.OpGreater:
|
||||
r.lowerBound = c.Operand
|
||||
case query.OpGreaterEqual:
|
||||
r.includeLowerBound = true
|
||||
r.lowerBound = c.Operand
|
||||
case query.OpLess:
|
||||
r.upperBound = c.Operand
|
||||
case query.OpLessEqual:
|
||||
r.includeUpperBound = true
|
||||
r.upperBound = c.Operand
|
||||
}
|
||||
ranges[c.Tag] = r
|
||||
indexes = append(indexes, i)
|
||||
}
|
||||
}
|
||||
return ranges, indexes
|
||||
}
|
||||
|
||||
func isRangeOperation(op query.Operator) bool {
|
||||
switch op {
|
||||
case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) {
|
||||
if c.Op == query.OpEqual {
|
||||
it := txi.store.IteratorPrefix(startKey)
|
||||
for it.Next() {
|
||||
hashes = append(hashes, it.Value())
|
||||
}
|
||||
} else if c.Op == query.OpContains {
|
||||
// XXX: full scan
|
||||
it := txi.store.Iterator()
|
||||
for it.Next() {
|
||||
// if it is a hash key, continue
|
||||
if !strings.Contains(string(it.Key()), "/") {
|
||||
continue
|
||||
}
|
||||
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
|
||||
hashes = append(hashes, it.Value())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
panic("other operators should be handled already")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func startKey(c query.Condition, height uint64, heightSpecified bool) []byte {
|
||||
var key string
|
||||
if heightSpecified {
|
||||
key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height)
|
||||
} else {
|
||||
key = fmt.Sprintf("%s/%v", c.Tag, c.Operand)
|
||||
}
|
||||
return []byte(key)
|
||||
}
|
||||
|
||||
func startKeyForRange(r queryRange, height uint64, heightSpecified bool) []byte {
|
||||
var lowerBound interface{}
|
||||
if r.includeLowerBound {
|
||||
lowerBound = r.lowerBound
|
||||
} else {
|
||||
switch t := r.lowerBound.(type) {
|
||||
case int64:
|
||||
lowerBound = t + 1
|
||||
case time.Time:
|
||||
lowerBound = t.Unix() + 1
|
||||
default:
|
||||
panic("not implemented")
|
||||
}
|
||||
}
|
||||
var key string
|
||||
if heightSpecified {
|
||||
key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height)
|
||||
} else {
|
||||
key = fmt.Sprintf("%s/%v", r.key, lowerBound)
|
||||
}
|
||||
return []byte(key)
|
||||
}
|
||||
|
||||
func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) {
|
||||
it := txi.store.IteratorPrefix(startKey)
|
||||
defer it.Release()
|
||||
for it.Next() {
|
||||
// no other way to stop iterator other than checking for upperBound
|
||||
switch (r.upperBound).(type) {
|
||||
case int64:
|
||||
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
||||
if err == nil && v == r.upperBound {
|
||||
if r.includeUpperBound {
|
||||
hashes = append(hashes, it.Value())
|
||||
}
|
||||
break
|
||||
}
|
||||
// XXX: passing time in a ABCI Tags is not yet implemented
|
||||
// case time.Time:
|
||||
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
||||
// if v == r.upperBound {
|
||||
// break
|
||||
// }
|
||||
}
|
||||
hashes = append(hashes, it.Value())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func extractValueFromKey(key []byte) string {
|
||||
s := string(key)
|
||||
parts := strings.SplitN(s, "/", 3)
|
||||
return parts[1]
|
||||
}
|
||||
|
||||
func keyForTag(tag *abci.KVPair, result *types.TxResult) []byte {
|
||||
switch tag.ValueType {
|
||||
case abci.KVPair_STRING:
|
||||
return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueString, result.Height, result.Index))
|
||||
case abci.KVPair_INT:
|
||||
return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueInt, result.Height, result.Index))
|
||||
// case abci.KVPair_TIME:
|
||||
// return []byte(fmt.Sprintf("%s/%d/%d/%d", tag.Key, tag.ValueTime.Unix(), result.Height, result.Index))
|
||||
default:
|
||||
panic(fmt.Sprintf("Undefined value type: %v", tag.ValueType))
|
||||
}
|
||||
}
|
||||
|
||||
func hashKey(hash []byte) string {
|
||||
return fmt.Sprintf("%X", hash)
|
||||
}
|
||||
|
||||
func stringInSlice(a string, list []string) bool {
|
||||
for _, b := range list {
|
||||
if b == a {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func intInSlice(a int, list []int) bool {
|
||||
for _, b := range list {
|
||||
if b == a {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
db "github.com/tendermint/tmlibs/db"
|
||||
"github.com/tendermint/tmlibs/pubsub/query"
|
||||
)
|
||||
|
||||
func TestTxIndex(t *testing.T) {
|
||||
|
@ -21,28 +23,89 @@ func TestTxIndex(t *testing.T) {
|
|||
hash := tx.Hash()
|
||||
|
||||
batch := txindex.NewBatch(1)
|
||||
if err := batch.Add(*txResult); err != nil {
|
||||
if err := batch.Add(txResult); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err := indexer.AddBatch(batch)
|
||||
require.Nil(t, err)
|
||||
err := indexer.AddBatch(batch, []string{})
|
||||
require.NoError(t, err)
|
||||
|
||||
loadedTxResult, err := indexer.Get(hash)
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, txResult, loadedTxResult)
|
||||
|
||||
tx2 := types.Tx("BYE BYE WORLD")
|
||||
txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}}
|
||||
hash2 := tx2.Hash()
|
||||
|
||||
err = indexer.Index(txResult2)
|
||||
require.Nil(t, err)
|
||||
err = indexer.Index(txResult2, []string{})
|
||||
require.NoError(t, err)
|
||||
|
||||
loadedTxResult2, err := indexer.Get(hash2)
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, txResult2, loadedTxResult2)
|
||||
}
|
||||
|
||||
func TestTxSearch(t *testing.T) {
|
||||
indexer := &TxIndex{store: db.NewMemDB()}
|
||||
|
||||
tx := types.Tx("HELLO WORLD")
|
||||
tags := []*abci.KVPair{
|
||||
&abci.KVPair{Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 1},
|
||||
&abci.KVPair{Key: "account.owner", ValueType: abci.KVPair_STRING, ValueString: "Ivan"},
|
||||
&abci.KVPair{Key: "not_allowed", ValueType: abci.KVPair_STRING, ValueString: "Vlad"},
|
||||
}
|
||||
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: tags}}
|
||||
hash := tx.Hash()
|
||||
|
||||
allowedTags := []string{"account.number", "account.owner", "account.date"}
|
||||
err := indexer.Index(txResult, allowedTags)
|
||||
require.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
q string
|
||||
expectError bool
|
||||
resultsLength int
|
||||
results []*types.TxResult
|
||||
}{
|
||||
// search by hash
|
||||
{fmt.Sprintf("tx.hash = '%X'", hash), false, 1, []*types.TxResult{txResult}},
|
||||
// search by exact match (one tag)
|
||||
{"account.number = 1", false, 1, []*types.TxResult{txResult}},
|
||||
// search by exact match (two tags)
|
||||
{"account.number = 1 AND account.owner = 'Ivan'", false, 1, []*types.TxResult{txResult}},
|
||||
// search by exact match (two tags)
|
||||
{"account.number = 1 AND account.owner = 'Vlad'", false, 0, []*types.TxResult{}},
|
||||
// search by range
|
||||
{"account.number >= 1 AND account.number <= 5", false, 1, []*types.TxResult{txResult}},
|
||||
// search using not allowed tag
|
||||
{"not_allowed = 'boom'", false, 0, []*types.TxResult{}},
|
||||
// search for not existing tx result
|
||||
{"account.number >= 2 AND account.number <= 5", false, 0, []*types.TxResult{}},
|
||||
// search using not existing tag
|
||||
{"account.date >= TIME 2013-05-03T14:45:00Z", false, 0, []*types.TxResult{}},
|
||||
// search using CONTAINS
|
||||
{"account.owner CONTAINS 'an'", false, 1, []*types.TxResult{txResult}},
|
||||
// search using CONTAINS
|
||||
{"account.owner CONTAINS 'Vlad'", false, 0, []*types.TxResult{}},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.q, func(t *testing.T) {
|
||||
results, err := indexer.Search(query.MustParse(tc.q))
|
||||
if tc.expectError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Len(t, results, tc.resultsLength)
|
||||
if tc.resultsLength > 0 {
|
||||
assert.Equal(t, tc.results, results)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkTxIndex(txsCount int, b *testing.B) {
|
||||
tx := types.Tx("HELLO WORLD")
|
||||
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}}
|
||||
|
@ -58,7 +121,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) {
|
|||
|
||||
batch := txindex.NewBatch(txsCount)
|
||||
for i := 0; i < txsCount; i++ {
|
||||
if err := batch.Add(*txResult); err != nil {
|
||||
if err := batch.Add(txResult); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
txResult.Index += 1
|
||||
|
@ -67,7 +130,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) {
|
|||
b.ResetTimer()
|
||||
|
||||
for n := 0; n < b.N; n++ {
|
||||
err = indexer.AddBatch(batch)
|
||||
err = indexer.AddBatch(batch, []string{})
|
||||
}
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
|
|
|
@ -5,8 +5,11 @@ import (
|
|||
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tmlibs/pubsub/query"
|
||||
)
|
||||
|
||||
var _ txindex.TxIndexer = (*TxIndex)(nil)
|
||||
|
||||
// TxIndex acts as a /dev/null.
|
||||
type TxIndex struct{}
|
||||
|
||||
|
@ -16,11 +19,15 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
|
|||
}
|
||||
|
||||
// AddBatch is a noop and always returns nil.
|
||||
func (txi *TxIndex) AddBatch(batch *txindex.Batch) error {
|
||||
func (txi *TxIndex) AddBatch(batch *txindex.Batch, allowedTags []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Index is a noop and always returns nil.
|
||||
func (txi *TxIndex) Index(result *types.TxResult) error {
|
||||
func (txi *TxIndex) Index(result *types.TxResult, allowedTags []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
|
||||
return []*types.TxResult{}, nil
|
||||
}
|
||||
|
|
|
@ -116,6 +116,16 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error {
|
|||
}
|
||||
tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash())
|
||||
|
||||
if tag, ok := tags[TxHeightKey]; ok {
|
||||
b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag)
|
||||
}
|
||||
tags[TxHeightKey] = event.Height
|
||||
|
||||
if tag, ok := tags[TxIndexKey]; ok {
|
||||
b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag)
|
||||
}
|
||||
tags[TxIndexKey] = event.Index
|
||||
|
||||
b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -140,6 +140,12 @@ const (
|
|||
// TxHashKey is a reserved key, used to specify transaction's hash.
|
||||
// see EventBus#PublishEventTx
|
||||
TxHashKey = "tx.hash"
|
||||
// TxHeightKey is a reserved key, used to specify transaction block's height.
|
||||
// see EventBus#PublishEventTx
|
||||
TxHeightKey = "tx.height"
|
||||
// TxIndexKey is a reserved key, used to specify transaction's index within the block.
|
||||
// see EventBus#PublishEventTx
|
||||
TxIndexKey = "tx.index"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
Loading…
Reference in New Issue