Merge pull request #835 from tendermint/feature/indexing

Indexing DeliverTx tags
This commit is contained in:
Ethan Buchman 2017-12-01 02:54:30 -05:00 committed by GitHub
commit 74cbfc68a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1005 additions and 258 deletions

View File

@ -16,6 +16,7 @@ type Config struct {
P2P *P2PConfig `mapstructure:"p2p"` P2P *P2PConfig `mapstructure:"p2p"`
Mempool *MempoolConfig `mapstructure:"mempool"` Mempool *MempoolConfig `mapstructure:"mempool"`
Consensus *ConsensusConfig `mapstructure:"consensus"` Consensus *ConsensusConfig `mapstructure:"consensus"`
TxIndex *TxIndexConfig `mapstructure:"tx_index"`
} }
// DefaultConfig returns a default configuration for a Tendermint node // DefaultConfig returns a default configuration for a Tendermint node
@ -26,6 +27,7 @@ func DefaultConfig() *Config {
P2P: DefaultP2PConfig(), P2P: DefaultP2PConfig(),
Mempool: DefaultMempoolConfig(), Mempool: DefaultMempoolConfig(),
Consensus: DefaultConsensusConfig(), Consensus: DefaultConsensusConfig(),
TxIndex: DefaultTxIndexConfig(),
} }
} }
@ -37,6 +39,7 @@ func TestConfig() *Config {
P2P: TestP2PConfig(), P2P: TestP2PConfig(),
Mempool: DefaultMempoolConfig(), Mempool: DefaultMempoolConfig(),
Consensus: TestConsensusConfig(), Consensus: TestConsensusConfig(),
TxIndex: DefaultTxIndexConfig(),
} }
} }
@ -93,9 +96,6 @@ type BaseConfig struct {
// so the app can decide if we should keep the connection or not // so the app can decide if we should keep the connection or not
FilterPeers bool `mapstructure:"filter_peers"` // false FilterPeers bool `mapstructure:"filter_peers"` // false
// What indexer to use for transactions
TxIndex string `mapstructure:"tx_index"`
// Database backend: leveldb | memdb // Database backend: leveldb | memdb
DBBackend string `mapstructure:"db_backend"` DBBackend string `mapstructure:"db_backend"`
@ -115,7 +115,6 @@ func DefaultBaseConfig() BaseConfig {
ProfListenAddress: "", ProfListenAddress: "",
FastSync: true, FastSync: true,
FilterPeers: false, FilterPeers: false,
TxIndex: "kv",
DBBackend: "leveldb", DBBackend: "leveldb",
DBPath: "data", DBPath: "data",
} }
@ -412,6 +411,41 @@ func (c *ConsensusConfig) SetWalFile(walFile string) {
c.walFile = walFile c.walFile = walFile
} }
//-----------------------------------------------------------------------------
// TxIndexConfig
// TxIndexConfig defines the confuguration for the transaction
// indexer, including tags to index.
type TxIndexConfig struct {
// What indexer to use for transactions
//
// Options:
// 1) "null" (default)
// 2) "kv" - the simplest possible indexer, backed by key-value storage (defaults to levelDB; see DBBackend).
Indexer string `mapstructure:"indexer"`
// Comma-separated list of tags to index (by default the only tag is tx hash)
//
// It's recommended to index only a subset of tags due to possible memory
// bloat. This is, of course, depends on the indexer's DB and the volume of
// transactions.
IndexTags string `mapstructure:"index_tags"`
// When set to true, tells indexer to index all tags. Note this may be not
// desirable (see the comment above). IndexTags has a precedence over
// IndexAllTags (i.e. when given both, IndexTags will be indexed).
IndexAllTags bool `mapstructure:"index_all_tags"`
}
// DefaultTxIndexConfig returns a default configuration for the transaction indexer.
func DefaultTxIndexConfig() *TxIndexConfig {
return &TxIndexConfig{
Indexer: "kv",
IndexTags: "",
IndexAllTags: false,
}
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Utils // Utils

View File

@ -2,6 +2,7 @@ package consensus
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"testing" "testing"
"time" "time"
@ -188,33 +189,41 @@ func (app *CounterApplication) Info(req abci.RequestInfo) abci.ResponseInfo {
return abci.ResponseInfo{Data: cmn.Fmt("txs:%v", app.txCount)} return abci.ResponseInfo{Data: cmn.Fmt("txs:%v", app.txCount)}
} }
func (app *CounterApplication) DeliverTx(tx []byte) abci.Result { func (app *CounterApplication) DeliverTx(tx []byte) abci.ResponseDeliverTx {
return runTx(tx, &app.txCount) txValue := txAsUint64(tx)
if txValue != uint64(app.txCount) {
return abci.ResponseDeliverTx{
Code: abci.CodeType_BadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.txCount, txValue)}
}
app.txCount += 1
return abci.ResponseDeliverTx{Code: abci.CodeType_OK}
} }
func (app *CounterApplication) CheckTx(tx []byte) abci.Result { func (app *CounterApplication) CheckTx(tx []byte) abci.ResponseCheckTx {
return runTx(tx, &app.mempoolTxCount) txValue := txAsUint64(tx)
if txValue != uint64(app.mempoolTxCount) {
return abci.ResponseCheckTx{
Code: abci.CodeType_BadNonce,
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue)}
}
app.mempoolTxCount += 1
return abci.ResponseCheckTx{Code: abci.CodeType_OK}
} }
func runTx(tx []byte, countPtr *int) abci.Result { func txAsUint64(tx []byte) uint64 {
count := *countPtr
tx8 := make([]byte, 8) tx8 := make([]byte, 8)
copy(tx8[len(tx8)-len(tx):], tx) copy(tx8[len(tx8)-len(tx):], tx)
txValue := binary.BigEndian.Uint64(tx8) return binary.BigEndian.Uint64(tx8)
if txValue != uint64(count) {
return abci.ErrBadNonce.AppendLog(cmn.Fmt("Invalid nonce. Expected %v, got %v", count, txValue))
}
*countPtr += 1
return abci.OK
} }
func (app *CounterApplication) Commit() abci.Result { func (app *CounterApplication) Commit() abci.ResponseCommit {
app.mempoolTxCount = app.txCount app.mempoolTxCount = app.txCount
if app.txCount == 0 { if app.txCount == 0 {
return abci.OK return abci.ResponseCommit{Code: abci.CodeType_OK}
} else { } else {
hash := make([]byte, 8) hash := make([]byte, 8)
binary.BigEndian.PutUint64(hash, uint64(app.txCount)) binary.BigEndian.PutUint64(hash, uint64(app.txCount))
return abci.NewResultOK(hash, "") return abci.ResponseCommit{Code: abci.CodeType_OK, Data: hash}
} }
} }

View File

@ -236,7 +236,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain // If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain
if appBlockHeight == 0 { if appBlockHeight == 0 {
validators := types.TM2PB.Validators(h.state.Validators) validators := types.TM2PB.Validators(h.state.Validators)
if err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
return nil, err return nil, err
} }
} }
@ -385,21 +385,17 @@ type mockProxyApp struct {
abciResponses *sm.ABCIResponses abciResponses *sm.ABCIResponses
} }
func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result { func (mock *mockProxyApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
r := mock.abciResponses.DeliverTx[mock.txCount] r := mock.abciResponses.DeliverTx[mock.txCount]
mock.txCount += 1 mock.txCount += 1
return abci.Result{ return *r
r.Code,
r.Data,
r.Log,
}
} }
func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock { func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
mock.txCount = 0 mock.txCount = 0
return mock.abciResponses.EndBlock return *mock.abciResponses.EndBlock
} }
func (mock *mockProxyApp) Commit() abci.Result { func (mock *mockProxyApp) Commit() abci.ResponseCommit {
return abci.NewResultOK(mock.appHash, "") return abci.ResponseCommit{Code: abci.CodeType_OK, Data: mock.appHash}
} }

View File

@ -411,7 +411,7 @@ func buildAppStateFromChain(proxyApp proxy.AppConns,
} }
validators := types.TM2PB.Validators(state.Validators) validators := types.TM2PB.Validators(state.Validators)
if err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
panic(err) panic(err)
} }
@ -447,7 +447,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B
defer proxyApp.Stop() defer proxyApp.Stop()
validators := types.TM2PB.Validators(state.Validators) validators := types.TM2PB.Validators(state.Validators)
if err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
panic(err) panic(err)
} }

12
glide.lock generated
View File

@ -1,5 +1,5 @@
hash: 223d8e42a118e7861cb673ea58a035e99d3a98c94e4b71fb52998d320f9c3b49 hash: 8c38726da2666831affa40474117d3cef5dad083176e81fb013d7e8493b83e6f
updated: 2017-11-25T22:00:24.612202481-08:00 updated: 2017-12-01T02:14:22.08770964Z
imports: imports:
- name: github.com/btcsuite/btcd - name: github.com/btcsuite/btcd
version: 8cea3866d0f7fb12d567a20744942c0d078c7d15 version: 8cea3866d0f7fb12d567a20744942c0d078c7d15
@ -98,7 +98,7 @@ imports:
- leveldb/table - leveldb/table
- leveldb/util - leveldb/util
- name: github.com/tendermint/abci - name: github.com/tendermint/abci
version: 76ef8a0697c6179220a74c479b36c27a5b53008a version: 22b491bb1952125dd2fb0730d6ca8e59e310547c
subpackages: subpackages:
- client - client
- example/counter - example/counter
@ -113,7 +113,7 @@ imports:
- name: github.com/tendermint/go-crypto - name: github.com/tendermint/go-crypto
version: dd20358a264c772b4a83e477b0cfce4c88a7001d version: dd20358a264c772b4a83e477b0cfce4c88a7001d
- name: github.com/tendermint/go-wire - name: github.com/tendermint/go-wire
version: 7d50b38b3815efe313728de77e2995c8813ce13f version: 5ab49b4c6ad674da6b81442911cf713ef0afb544
subpackages: subpackages:
- data - data
- data/base58 - data/base58
@ -123,7 +123,7 @@ imports:
subpackages: subpackages:
- iavl - iavl
- name: github.com/tendermint/tmlibs - name: github.com/tendermint/tmlibs
version: 1e12754b3a3b5f1c23bf44c2d882faae688fb2e8 version: 21fb7819891997c96838308b4eba5a50b07ff03f
subpackages: subpackages:
- autofile - autofile
- cli - cli
@ -160,6 +160,8 @@ imports:
- trace - trace
- name: golang.org/x/sys - name: golang.org/x/sys
version: b98136db334ff9cb24f28a68e3be3cb6608f7630 version: b98136db334ff9cb24f28a68e3be3cb6608f7630
subpackages:
- unix
- name: golang.org/x/text - name: golang.org/x/text
version: 88f656faf3f37f690df1a32515b479415e1a6769 version: 88f656faf3f37f690df1a32515b479415e1a6769
subpackages: subpackages:

View File

@ -18,7 +18,7 @@ import:
- package: github.com/spf13/viper - package: github.com/spf13/viper
version: v1.0.0 version: v1.0.0
- package: github.com/tendermint/abci - package: github.com/tendermint/abci
version: ~0.7.0 version: 22b491bb1952125dd2fb0730d6ca8e59e310547c
subpackages: subpackages:
- client - client
- example/dummy - example/dummy
@ -34,7 +34,7 @@ import:
subpackages: subpackages:
- iavl - iavl
- package: github.com/tendermint/tmlibs - package: github.com/tendermint/tmlibs
version: 1e12754b3a3b5f1c23bf44c2d882faae688fb2e8 version: develop
subpackages: subpackages:
- autofile - autofile
- cli - cli

View File

@ -13,6 +13,7 @@ import (
"github.com/tendermint/abci/example/counter" "github.com/tendermint/abci/example/counter"
"github.com/tendermint/abci/example/dummy" "github.com/tendermint/abci/example/dummy"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
@ -115,7 +116,7 @@ func TestTxsAvailable(t *testing.T) {
func TestSerialReap(t *testing.T) { func TestSerialReap(t *testing.T) {
app := counter.NewCounterApplication(true) app := counter.NewCounterApplication(true)
app.SetOption("serial", "on") app.SetOption(abci.RequestSetOption{"serial", "on"})
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(cc) mempool := newMempoolWithApp(cc)
@ -172,13 +173,19 @@ func TestSerialReap(t *testing.T) {
for i := start; i < end; i++ { for i := start; i < end; i++ {
txBytes := make([]byte, 8) txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i)) binary.BigEndian.PutUint64(txBytes, uint64(i))
res := appConnCon.DeliverTxSync(txBytes) res, err := appConnCon.DeliverTxSync(txBytes)
if !res.IsOK() { if err != nil {
t.Errorf("Client error committing tx: %v", err)
}
if res.IsErr() {
t.Errorf("Error committing tx. Code:%v result:%X log:%v", t.Errorf("Error committing tx. Code:%v result:%X log:%v",
res.Code, res.Data, res.Log) res.Code, res.Data, res.Log)
} }
} }
res := appConnCon.CommitSync() res, err := appConnCon.CommitSync()
if err != nil {
t.Errorf("Client error committing: %v", err)
}
if len(res.Data) != 8 { if len(res.Data) != 8 {
t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log) t.Errorf("Error committing. Hash:%X log:%v", res.Data, res.Log)
} }

View File

@ -111,6 +111,7 @@ type Node struct {
proxyApp proxy.AppConns // connection to the application proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer txIndexer txindex.TxIndexer
indexerService *txindex.IndexerService
} }
// NewNode returns a new, ready to go, Tendermint Node. // NewNode returns a new, ready to go, Tendermint Node.
@ -173,20 +174,6 @@ func NewNode(config *cfg.Config,
state = sm.LoadState(stateDB) state = sm.LoadState(stateDB)
state.SetLogger(stateLogger) state.SetLogger(stateLogger)
// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
txIndexer = kv.NewTxIndex(store)
default:
txIndexer = &null.TxIndex{}
}
state.TxIndexer = txIndexer
// Generate node PrivKey // Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519() privKey := crypto.GenPrivKeyEd25519()
@ -293,6 +280,27 @@ func NewNode(config *cfg.Config,
bcReactor.SetEventBus(eventBus) bcReactor.SetEventBus(eventBus)
consensusReactor.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus)
// Transaction indexing
var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, err
}
if config.TxIndex.IndexTags != "" {
txIndexer = kv.NewTxIndex(store, kv.IndexTags(strings.Split(config.TxIndex.IndexTags, ",")))
} else if config.TxIndex.IndexAllTags {
txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
} else {
txIndexer = kv.NewTxIndex(store)
}
default:
txIndexer = &null.TxIndex{}
}
indexerService := txindex.NewIndexerService(txIndexer, eventBus)
// run the profile server // run the profile server
profileHost := config.ProfListenAddress profileHost := config.ProfListenAddress
if profileHost != "" { if profileHost != "" {
@ -318,6 +326,7 @@ func NewNode(config *cfg.Config,
consensusReactor: consensusReactor, consensusReactor: consensusReactor,
proxyApp: proxyApp, proxyApp: proxyApp,
txIndexer: txIndexer, txIndexer: txIndexer,
indexerService: indexerService,
eventBus: eventBus, eventBus: eventBus,
} }
node.BaseService = *cmn.NewBaseService(logger, "Node", node) node.BaseService = *cmn.NewBaseService(logger, "Node", node)
@ -363,6 +372,12 @@ func (n *Node) OnStart() error {
} }
} }
// start tx indexer
err = n.indexerService.Start()
if err != nil {
return err
}
return nil return nil
} }
@ -382,6 +397,8 @@ func (n *Node) OnStop() {
} }
n.eventBus.Stop() n.eventBus.Stop()
n.indexerService.Stop()
} }
// RunForever waits for an interrupt signal and stops the node. // RunForever waits for an interrupt signal and stops the node.

View File

@ -12,12 +12,12 @@ type AppConnConsensus interface {
SetResponseCallback(abcicli.Callback) SetResponseCallback(abcicli.Callback)
Error() error Error() error
InitChainSync(types.RequestInitChain) (err error) InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error)
BeginBlockSync(types.RequestBeginBlock) (err error) BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
DeliverTxAsync(tx []byte) *abcicli.ReqRes DeliverTxAsync(tx []byte) *abcicli.ReqRes
EndBlockSync(height uint64) (types.ResponseEndBlock, error) EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error)
CommitSync() (res types.Result) CommitSync() (*types.ResponseCommit, error)
} }
type AppConnMempool interface { type AppConnMempool interface {
@ -33,9 +33,9 @@ type AppConnMempool interface {
type AppConnQuery interface { type AppConnQuery interface {
Error() error Error() error
EchoSync(string) (res types.Result) EchoSync(string) (*types.ResponseEcho, error)
InfoSync(types.RequestInfo) (types.ResponseInfo, error) InfoSync(types.RequestInfo) (*types.ResponseInfo, error)
QuerySync(types.RequestQuery) (types.ResponseQuery, error) QuerySync(types.RequestQuery) (*types.ResponseQuery, error)
// SetOptionSync(key string, value string) (res types.Result) // SetOptionSync(key string, value string) (res types.Result)
} }
@ -61,11 +61,11 @@ func (app *appConnConsensus) Error() error {
return app.appConn.Error() return app.appConn.Error()
} }
func (app *appConnConsensus) InitChainSync(req types.RequestInitChain) (err error) { func (app *appConnConsensus) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) {
return app.appConn.InitChainSync(req) return app.appConn.InitChainSync(req)
} }
func (app *appConnConsensus) BeginBlockSync(req types.RequestBeginBlock) (err error) { func (app *appConnConsensus) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
return app.appConn.BeginBlockSync(req) return app.appConn.BeginBlockSync(req)
} }
@ -73,11 +73,11 @@ func (app *appConnConsensus) DeliverTxAsync(tx []byte) *abcicli.ReqRes {
return app.appConn.DeliverTxAsync(tx) return app.appConn.DeliverTxAsync(tx)
} }
func (app *appConnConsensus) EndBlockSync(height uint64) (types.ResponseEndBlock, error) { func (app *appConnConsensus) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) {
return app.appConn.EndBlockSync(height) return app.appConn.EndBlockSync(req)
} }
func (app *appConnConsensus) CommitSync() (res types.Result) { func (app *appConnConsensus) CommitSync() (*types.ResponseCommit, error) {
return app.appConn.CommitSync() return app.appConn.CommitSync()
} }
@ -131,14 +131,14 @@ func (app *appConnQuery) Error() error {
return app.appConn.Error() return app.appConn.Error()
} }
func (app *appConnQuery) EchoSync(msg string) (res types.Result) { func (app *appConnQuery) EchoSync(msg string) (*types.ResponseEcho, error) {
return app.appConn.EchoSync(msg) return app.appConn.EchoSync(msg)
} }
func (app *appConnQuery) InfoSync(req types.RequestInfo) (types.ResponseInfo, error) { func (app *appConnQuery) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
return app.appConn.InfoSync(req) return app.appConn.InfoSync(req)
} }
func (app *appConnQuery) QuerySync(reqQuery types.RequestQuery) (types.ResponseQuery, error) { func (app *appConnQuery) QuerySync(reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
return app.appConn.QuerySync(reqQuery) return app.appConn.QuerySync(reqQuery)
} }

View File

@ -17,7 +17,7 @@ import (
type AppConnTest interface { type AppConnTest interface {
EchoAsync(string) *abcicli.ReqRes EchoAsync(string) *abcicli.ReqRes
FlushSync() error FlushSync() error
InfoSync(types.RequestInfo) (types.ResponseInfo, error) InfoSync(types.RequestInfo) (*types.ResponseInfo, error)
} }
type appConnTest struct { type appConnTest struct {
@ -36,7 +36,7 @@ func (app *appConnTest) FlushSync() error {
return app.appConn.FlushSync() return app.appConn.FlushSync()
} }
func (app *appConnTest) InfoSync(req types.RequestInfo) (types.ResponseInfo, error) { func (app *appConnTest) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
return app.appConn.InfoSync(req) return app.appConn.InfoSync(req)
} }

View File

@ -100,7 +100,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
require.True(ok, "%d: %#v", i, evt) require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx // make sure this is the proper tx
require.EqualValues(tx, txe.Tx) require.EqualValues(tx, txe.Tx)
require.True(txe.Code.IsOK()) require.True(txe.Result.Code.IsOK())
} }
} }
@ -132,6 +132,6 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
require.True(ok, "%d: %#v", i, evt) require.True(ok, "%d: %#v", i, evt)
// make sure this is the proper tx // make sure this is the proper tx
require.EqualValues(tx, txe.Tx) require.EqualValues(tx, txe.Tx)
require.True(txe.Code.IsOK()) require.True(txe.Result.Code.IsOK())
} }
} }

View File

@ -163,17 +163,30 @@ func (c *HTTP) Commit(height *int) (*ctypes.ResultCommit, error) {
func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { func (c *HTTP) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
result := new(ctypes.ResultTx) result := new(ctypes.ResultTx)
query := map[string]interface{}{ params := map[string]interface{}{
"hash": hash, "hash": hash,
"prove": prove, "prove": prove,
} }
_, err := c.rpc.Call("tx", query, result) _, err := c.rpc.Call("tx", params, result)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Tx") return nil, errors.Wrap(err, "Tx")
} }
return result, nil return result, nil
} }
func (c *HTTP) TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) {
results := new([]*ctypes.ResultTx)
params := map[string]interface{}{
"query": query,
"prove": prove,
}
_, err := c.rpc.Call("tx_search", params, results)
if err != nil {
return nil, errors.Wrap(err, "TxSearch")
}
return *results, nil
}
func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) { func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) {
result := new(ctypes.ResultValidators) result := new(ctypes.ResultValidators)
_, err := c.rpc.Call("validators", map[string]interface{}{"height": height}, result) _, err := c.rpc.Call("validators", map[string]interface{}{"height": height}, result)

View File

@ -50,6 +50,7 @@ type SignClient interface {
Commit(height *int) (*ctypes.ResultCommit, error) Commit(height *int) (*ctypes.ResultCommit, error)
Validators(height *int) (*ctypes.ResultValidators, error) Validators(height *int) (*ctypes.ResultValidators, error)
Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error)
TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error)
} }
// HistoryClient shows us data from genesis to now in large chunks. // HistoryClient shows us data from genesis to now in large chunks.

View File

@ -124,6 +124,10 @@ func (Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
return core.Tx(hash, prove) return core.Tx(hash, prove)
} }
func (Local) TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) {
return core.TxSearch(query, prove)
}
func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {
q, err := tmquery.New(query) q, err := tmquery.New(query)
if err != nil { if err != nil {

View File

@ -38,7 +38,7 @@ func (a ABCIApp) ABCIQueryWithOptions(path string, data data.Bytes, opts client.
func (a ABCIApp) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { func (a ABCIApp) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
res := ctypes.ResultBroadcastTxCommit{} res := ctypes.ResultBroadcastTxCommit{}
res.CheckTx = a.App.CheckTx(tx) res.CheckTx = a.App.CheckTx(tx)
if !res.CheckTx.IsOK() { if res.CheckTx.IsErr() {
return &res, nil return &res, nil
} }
res.DeliverTx = a.App.DeliverTx(tx) res.DeliverTx = a.App.DeliverTx(tx)
@ -48,7 +48,7 @@ func (a ABCIApp) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit
func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
c := a.App.CheckTx(tx) c := a.App.CheckTx(tx)
// and this gets written in a background thread... // and this gets written in a background thread...
if c.IsOK() { if !c.IsErr() {
go func() { a.App.DeliverTx(tx) }() // nolint: errcheck go func() { a.App.DeliverTx(tx) }() // nolint: errcheck
} }
return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil
@ -57,7 +57,7 @@ func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error
func (a ABCIApp) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func (a ABCIApp) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
c := a.App.CheckTx(tx) c := a.App.CheckTx(tx)
// and this gets written in a background thread... // and this gets written in a background thread...
if c.IsOK() { if !c.IsErr() {
go func() { a.App.DeliverTx(tx) }() // nolint: errcheck go func() { a.App.DeliverTx(tx) }() // nolint: errcheck
} }
return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil return &ctypes.ResultBroadcastTx{c.Code, c.Data, c.Log, tx.Hash()}, nil

View File

@ -37,8 +37,8 @@ func TestABCIMock(t *testing.T) {
BroadcastCommit: mock.Call{ BroadcastCommit: mock.Call{
Args: goodTx, Args: goodTx,
Response: &ctypes.ResultBroadcastTxCommit{ Response: &ctypes.ResultBroadcastTxCommit{
CheckTx: abci.Result{Data: data.Bytes("stand")}, CheckTx: abci.ResponseCheckTx{Data: data.Bytes("stand")},
DeliverTx: abci.Result{Data: data.Bytes("deliver")}, DeliverTx: abci.ResponseDeliverTx{Data: data.Bytes("deliver")},
}, },
Error: errors.New("bad tx"), Error: errors.New("bad tx"),
}, },

View File

@ -1,6 +1,7 @@
package client_test package client_test
import ( import (
"fmt"
"strings" "strings"
"testing" "testing"
@ -104,7 +105,7 @@ func TestABCIQuery(t *testing.T) {
k, v, tx := MakeTxKV() k, v, tx := MakeTxKV()
bres, err := c.BroadcastTxCommit(tx) bres, err := c.BroadcastTxCommit(tx)
require.Nil(t, err, "%d: %+v", i, err) require.Nil(t, err, "%d: %+v", i, err)
apph := bres.Height + 1 // this is where the tx will be applied to the state apph := int(bres.Height) + 1 // this is where the tx will be applied to the state
// wait before querying // wait before querying
client.WaitForHeight(c, apph, nil) client.WaitForHeight(c, apph, nil)
@ -136,7 +137,7 @@ func TestAppCalls(t *testing.T) {
bres, err := c.BroadcastTxCommit(tx) bres, err := c.BroadcastTxCommit(tx)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(bres.DeliverTx.Code.IsOK()) require.True(bres.DeliverTx.Code.IsOK())
txh := bres.Height txh := int(bres.Height)
apph := txh + 1 // this is where the tx will be applied to the state apph := txh + 1 // this is where the tx will be applied to the state
// wait before querying // wait before querying
@ -153,7 +154,7 @@ func TestAppCalls(t *testing.T) {
// ptx, err := c.Tx(bres.Hash, true) // ptx, err := c.Tx(bres.Hash, true)
ptx, err := c.Tx(bres.Hash, true) ptx, err := c.Tx(bres.Hash, true)
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
assert.Equal(txh, ptx.Height) assert.EqualValues(txh, ptx.Height)
assert.EqualValues(tx, ptx.Tx) assert.EqualValues(tx, ptx.Tx)
// and we can even check the block is added // and we can even check the block is added
@ -280,9 +281,9 @@ func TestTx(t *testing.T) {
require.NotNil(err) require.NotNil(err)
} else { } else {
require.Nil(err, "%+v", err) require.Nil(err, "%+v", err)
assert.Equal(txHeight, ptx.Height) assert.EqualValues(txHeight, ptx.Height)
assert.EqualValues(tx, ptx.Tx) assert.EqualValues(tx, ptx.Tx)
assert.Equal(0, ptx.Index) assert.Zero(ptx.Index)
assert.True(ptx.TxResult.Code.IsOK()) assert.True(ptx.TxResult.Code.IsOK())
// time to verify the proof // time to verify the proof
@ -294,3 +295,50 @@ func TestTx(t *testing.T) {
} }
} }
} }
func TestTxSearch(t *testing.T) {
// first we broadcast a tx
c := getHTTPClient()
_, _, tx := MakeTxKV()
bres, err := c.BroadcastTxCommit(tx)
require.Nil(t, err, "%+v", err)
txHeight := bres.Height
txHash := bres.Hash
anotherTxHash := types.Tx("a different tx").Hash()
for i, c := range GetClients() {
t.Logf("client %d", i)
// now we query for the tx.
// since there's only one tx, we know index=0.
results, err := c.TxSearch(fmt.Sprintf("tx.hash='%v'", txHash), true)
require.Nil(t, err, "%+v", err)
require.Len(t, results, 1)
ptx := results[0]
assert.EqualValues(t, txHeight, ptx.Height)
assert.EqualValues(t, tx, ptx.Tx)
assert.Zero(t, ptx.Index)
assert.True(t, ptx.TxResult.Code.IsOK())
// time to verify the proof
proof := ptx.Proof
if assert.EqualValues(t, tx, proof.Data) {
assert.True(t, proof.Proof.Verify(proof.Index, proof.Total, txHash, proof.RootHash))
}
// we query for non existing tx
results, err = c.TxSearch(fmt.Sprintf("tx.hash='%X'", anotherTxHash), false)
require.Nil(t, err, "%+v", err)
require.Len(t, results, 0)
// we query using a tag (see dummy application)
results, err = c.TxSearch("app.creator='jae'", false)
require.Nil(t, err, "%+v", err)
if len(results) == 0 {
t.Fatal("expected a lot of transactions")
}
}
}

View File

@ -93,5 +93,5 @@ func ABCIInfo() (*ctypes.ResultABCIInfo, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &ctypes.ResultABCIInfo{resInfo}, nil return &ctypes.ResultABCIInfo{*resInfo}, nil
} }

View File

@ -154,7 +154,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
deliverTxResCh := make(chan interface{}) deliverTxResCh := make(chan interface{})
q := types.EventQueryTx(tx) q := types.EventQueryTxFor(tx)
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx") err = errors.Wrap(err, "failed to subscribe to tx")
@ -177,8 +177,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
if checkTxR.Code != abci.CodeType_OK { if checkTxR.Code != abci.CodeType_OK {
// CheckTx failed! // CheckTx failed!
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(), CheckTx: *checkTxR,
DeliverTx: abci.Result{}, DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(), Hash: tx.Hash(),
}, nil }, nil
} }
@ -191,28 +191,22 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
case deliverTxResMsg := <-deliverTxResCh: case deliverTxResMsg := <-deliverTxResCh:
deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx) deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx)
// The tx was included in a block. // The tx was included in a block.
deliverTxR := &abci.ResponseDeliverTx{ deliverTxR := deliverTxRes.Result
Code: deliverTxRes.Code,
Data: deliverTxRes.Data,
Log: deliverTxRes.Log,
}
logger.Info("DeliverTx passed ", "tx", data.Bytes(tx), "response", deliverTxR) logger.Info("DeliverTx passed ", "tx", data.Bytes(tx), "response", deliverTxR)
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(), CheckTx: *checkTxR,
DeliverTx: deliverTxR.Result(), DeliverTx: deliverTxR,
Hash: tx.Hash(), Hash: tx.Hash(),
Height: deliverTxRes.Height, Height: deliverTxRes.Height,
}, nil }, nil
case <-timer.C: case <-timer.C:
logger.Error("failed to include tx") logger.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(), CheckTx: *checkTxR,
DeliverTx: abci.Result{}, DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(), Hash: tx.Hash(),
}, fmt.Errorf("Timed out waiting for transaction to be included in a block") }, fmt.Errorf("Timed out waiting for transaction to be included in a block")
} }
panic("Should never happen!")
} }
// Get unconfirmed transactions including their number. // Get unconfirmed transactions including their number.

View File

@ -19,6 +19,7 @@ var Routes = map[string]*rpc.RPCFunc{
"block": rpc.NewRPCFunc(Block, "height"), "block": rpc.NewRPCFunc(Block, "height"),
"commit": rpc.NewRPCFunc(Commit, "height"), "commit": rpc.NewRPCFunc(Commit, "height"),
"tx": rpc.NewRPCFunc(Tx, "hash,prove"), "tx": rpc.NewRPCFunc(Tx, "hash,prove"),
"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove"),
"validators": rpc.NewRPCFunc(Validators, "height"), "validators": rpc.NewRPCFunc(Validators, "height"),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""), "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, ""), "unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, ""),

View File

@ -6,6 +6,7 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmquery "github.com/tendermint/tmlibs/pubsub/query"
) )
// Tx allows you to query the transaction results. `nil` could mean the // Tx allows you to query the transaction results. `nil` could mean the
@ -82,20 +83,123 @@ func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
return nil, fmt.Errorf("Tx (%X) not found", hash) return nil, fmt.Errorf("Tx (%X) not found", hash)
} }
height := int(r.Height) // XXX height := r.Height
index := int(r.Index) index := r.Index
var proof types.TxProof var proof types.TxProof
if prove { if prove {
block := blockStore.LoadBlock(height) // TODO: handle overflow
proof = block.Data.Txs.Proof(index) block := blockStore.LoadBlock(int(height))
proof = block.Data.Txs.Proof(int(index))
} }
return &ctypes.ResultTx{ return &ctypes.ResultTx{
Height: height, Height: height,
Index: index, Index: index,
TxResult: r.Result.Result(), TxResult: r.Result,
Tx: r.Tx, Tx: r.Tx,
Proof: proof, Proof: proof,
}, nil }, nil
} }
// TxSearch allows you to query for multiple transactions results.
//
// ```shell
// curl "localhost:46657/tx_search?query=\"account.owner='Ivan'\"&prove=true"
// ```
//
// ```go
// client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket")
// q, err := tmquery.New("account.owner='Ivan'")
// tx, err := client.TxSearch(q, true)
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "result": [
// {
// "proof": {
// "Proof": {
// "aunts": [
// "J3LHbizt806uKnABNLwG4l7gXCA=",
// "iblMO/M1TnNtlAefJyNCeVhjAb0=",
// "iVk3ryurVaEEhdeS0ohAJZ3wtB8=",
// "5hqMkTeGqpct51ohX0lZLIdsn7Q=",
// "afhsNxFnLlZgFDoyPpdQSe0bR8g="
// ]
// },
// "Data": "mvZHHa7HhZ4aRT0xMDA=",
// "RootHash": "F6541223AA46E428CB1070E9840D2C3DF3B6D776",
// "Total": 32,
// "Index": 31
// },
// "tx": "mvZHHa7HhZ4aRT0xMDA=",
// "tx_result": {},
// "index": 31,
// "height": 12
// }
// ],
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
//
// Returns transactions matching the given query.
//
// ### Query Parameters
//
// | Parameter | Type | Default | Required | Description |
// |-----------+--------+---------+----------+-----------------------------------------------------------|
// | query | string | "" | true | Query |
// | prove | bool | false | false | Include proofs of the transactions inclusion in the block |
//
// ### Returns
//
// - `proof`: the `types.TxProof` object
// - `tx`: `[]byte` - the transaction
// - `tx_result`: the `abci.Result` object
// - `index`: `int` - index of the transaction
// - `height`: `int` - height of the block where this transaction was in
func TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) {
// if index is disabled, return error
if _, ok := txIndexer.(*null.TxIndex); ok {
return nil, fmt.Errorf("Transaction indexing is disabled.")
}
q, err := tmquery.New(query)
if err != nil {
return nil, err
}
results, err := txIndexer.Search(q)
if err != nil {
return nil, err
}
// TODO: we may want to consider putting a maximum on this length and somehow
// informing the user that things were truncated.
apiResults := make([]*ctypes.ResultTx, len(results))
var proof types.TxProof
for i, r := range results {
height := r.Height
index := r.Index
if prove {
// TODO: handle overflow
block := blockStore.LoadBlock(int(height))
proof = block.Data.Txs.Proof(int(index))
}
apiResults[i] = &ctypes.ResultTx{
Height: height,
Index: index,
TxResult: r.Result,
Tx: r.Tx,
Proof: proof,
}
}
return apiResults, nil
}

View File

@ -104,18 +104,18 @@ type ResultBroadcastTx struct {
} }
type ResultBroadcastTxCommit struct { type ResultBroadcastTxCommit struct {
CheckTx abci.Result `json:"check_tx"` CheckTx abci.ResponseCheckTx `json:"check_tx"`
DeliverTx abci.Result `json:"deliver_tx"` DeliverTx abci.ResponseDeliverTx `json:"deliver_tx"`
Hash data.Bytes `json:"hash"` Hash data.Bytes `json:"hash"`
Height int `json:"height"` Height uint64 `json:"height"`
} }
type ResultTx struct { type ResultTx struct {
Height int `json:"height"` Height uint64 `json:"height"`
Index int `json:"index"` Index uint32 `json:"index"`
TxResult abci.Result `json:"tx_result"` TxResult abci.ResponseDeliverTx `json:"tx_result"`
Tx types.Tx `json:"tx"` Tx types.Tx `json:"tx"`
Proof types.TxProof `json:"proof,omitempty"` Proof types.TxProof `json:"proof,omitempty"`
} }
type ResultUnconfirmedTxs struct { type ResultUnconfirmedTxs struct {

View File

@ -80,6 +80,7 @@ func GetConfig() *cfg.Config {
globalConfig.P2P.ListenAddress = tm globalConfig.P2P.ListenAddress = tm
globalConfig.RPC.ListenAddress = rpc globalConfig.RPC.ListenAddress = rpc
globalConfig.RPC.GRPCListenAddress = grpc globalConfig.RPC.GRPCListenAddress = grpc
globalConfig.TxIndex.IndexTags = "app.creator" // see dummy application
} }
return globalConfig return globalConfig
} }

View File

@ -8,7 +8,6 @@ import (
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
@ -54,36 +53,31 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p
// TODO: make use of this info // TODO: make use of this info
// Blocks may include invalid txs. // Blocks may include invalid txs.
// reqDeliverTx := req.(abci.RequestDeliverTx) // reqDeliverTx := req.(abci.RequestDeliverTx)
txError := ""
txResult := r.DeliverTx txResult := r.DeliverTx
if txResult.Code == abci.CodeType_OK { if txResult.Code == abci.CodeType_OK {
validTxs++ validTxs++
} else { } else {
logger.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log) logger.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log)
invalidTxs++ invalidTxs++
txError = txResult.Code.String()
} }
abciResponses.DeliverTx[txIndex] = txResult
txIndex++
// NOTE: if we count we can access the tx from the block instead of // NOTE: if we count we can access the tx from the block instead of
// pulling it from the req // pulling it from the req
event := types.EventDataTx{ txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{
Height: block.Height, Height: uint64(block.Height),
Index: uint32(txIndex),
Tx: types.Tx(req.GetDeliverTx().Tx), Tx: types.Tx(req.GetDeliverTx().Tx),
Data: txResult.Data, Result: *txResult,
Code: txResult.Code, }})
Log: txResult.Log,
Error: txError, abciResponses.DeliverTx[txIndex] = txResult
} txIndex++
txEventPublisher.PublishEventTx(event)
} }
} }
proxyAppConn.SetResponseCallback(proxyCb) proxyAppConn.SetResponseCallback(proxyCb)
// Begin block // Begin block
err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
block.Hash(), block.Hash(),
types.TM2PB.Header(block.Header), types.TM2PB.Header(block.Header),
}) })
@ -101,7 +95,7 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p
} }
// End block // End block
abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(uint64(block.Height)) abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(abci.RequestEndBlock{uint64(block.Height)})
if err != nil { if err != nil {
logger.Error("Error in proxyAppConn.EndBlock", "err", err) logger.Error("Error in proxyAppConn.EndBlock", "err", err)
return nil, err return nil, err
@ -210,7 +204,6 @@ func (s *State) validateBlock(block *types.Block) error {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// ApplyBlock validates & executes the block, updates state w/ ABCI responses, // ApplyBlock validates & executes the block, updates state w/ ABCI responses,
// then commits and updates the mempool atomically, then saves state. // then commits and updates the mempool atomically, then saves state.
// Transaction results are optionally indexed.
// ApplyBlock validates the block against the state, executes it against the app, // ApplyBlock validates the block against the state, executes it against the app,
// commits it, and saves the block and state. It's the only function that needs to be called // commits it, and saves the block and state. It's the only function that needs to be called
@ -225,9 +218,6 @@ func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn
fail.Fail() // XXX fail.Fail() // XXX
// index txs. This could run in the background
s.indexTxs(abciResponses)
// save the results before we commit // save the results before we commit
s.SaveABCIResponses(abciResponses) s.SaveABCIResponses(abciResponses)
@ -258,7 +248,11 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
defer mempool.Unlock() defer mempool.Unlock()
// Commit block, get hash back // Commit block, get hash back
res := proxyAppConn.CommitSync() res, err := proxyAppConn.CommitSync()
if err != nil {
s.logger.Error("Client error during proxyAppConn.CommitSync", "err", err)
return err
}
if res.IsErr() { if res.IsErr() {
s.logger.Error("Error in proxyAppConn.CommitSync", "err", res) s.logger.Error("Error in proxyAppConn.CommitSync", "err", res)
return res return res
@ -276,26 +270,6 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
return mempool.Update(block.Height, block.Txs) return mempool.Update(block.Height, block.Txs)
} }
func (s *State) indexTxs(abciResponses *ABCIResponses) {
// save the tx results using the TxIndexer
// NOTE: these may be overwriting, but the values should be the same.
batch := txindex.NewBatch(len(abciResponses.DeliverTx))
for i, d := range abciResponses.DeliverTx {
tx := abciResponses.txs[i]
if err := batch.Add(types.TxResult{
Height: uint64(abciResponses.Height),
Index: uint32(i),
Tx: tx,
Result: *d,
}); err != nil {
s.logger.Error("Error with batch.Add", "err", err)
}
}
if err := s.TxIndexer.AddBatch(batch); err != nil {
s.logger.Error("Error adding batch", "err", err)
}
}
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
// It returns the application root hash (result of abci.Commit). // It returns the application root hash (result of abci.Commit).
func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) { func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) {
@ -305,7 +279,11 @@ func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block
return nil, err return nil, err
} }
// Commit block, get hash back // Commit block, get hash back
res := appConnConsensus.CommitSync() res, err := appConnConsensus.CommitSync()
if err != nil {
logger.Error("Client error during proxyAppConn.CommitSync", "err", res)
return nil, err
}
if res.IsErr() { if res.IsErr() {
logger.Error("Error in proxyAppConn.CommitSync", "err", res) logger.Error("Error in proxyAppConn.CommitSync", "err", res)
return nil, res return nil, res

View File

@ -3,13 +3,11 @@ package state
import ( import (
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tendermint/abci/example/dummy" "github.com/tendermint/abci/example/dummy"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
@ -31,8 +29,6 @@ func TestApplyBlock(t *testing.T) {
state := state() state := state()
state.SetLogger(log.TestingLogger()) state.SetLogger(log.TestingLogger())
indexer := &dummyIndexer{0}
state.TxIndexer = indexer
// make block // make block
block := makeBlock(1, state) block := makeBlock(1, state)
@ -40,7 +36,6 @@ func TestApplyBlock(t *testing.T) {
err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{}) err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{})
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, nTxsPerBlock, indexer.Indexed) // test indexing works
// TODO check state and mempool // TODO check state and mempool
} }
@ -75,16 +70,3 @@ func makeBlock(num int, state *State) *types.Block {
prevBlockID, valHash, state.AppHash, testPartSize) prevBlockID, valHash, state.AppHash, testPartSize)
return block return block
} }
// dummyIndexer increments counter every time we index transaction.
type dummyIndexer struct {
Indexed int
}
func (indexer *dummyIndexer) Get(hash []byte) (*types.TxResult, error) {
return nil, nil
}
func (indexer *dummyIndexer) AddBatch(batch *txindex.Batch) error {
indexer.Indexed += batch.Size()
return nil
}

View File

@ -15,8 +15,6 @@ import (
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -61,9 +59,6 @@ type State struct {
// AppHash is updated after Commit // AppHash is updated after Commit
AppHash []byte AppHash []byte
// TxIndexer indexes transactions
TxIndexer txindex.TxIndexer `json:"-"`
logger log.Logger logger log.Logger
} }
@ -95,7 +90,7 @@ func loadState(db dbm.DB, key []byte) *State {
return nil return nil
} }
s := &State{db: db, TxIndexer: &null.TxIndex{}} s := &State{db: db}
r, n, err := bytes.NewReader(buf), new(int), new(error) r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(&s, r, 0, n, err) wire.ReadBinaryPtr(&s, r, 0, n, err)
if *err != nil { if *err != nil {
@ -114,8 +109,6 @@ func (s *State) SetLogger(l log.Logger) {
} }
// Copy makes a copy of the State for mutating. // Copy makes a copy of the State for mutating.
// NOTE: Does not create a copy of TxIndexer. It creates a new pointer that points to the same
// underlying TxIndexer.
func (s *State) Copy() *State { func (s *State) Copy() *State {
return &State{ return &State{
db: s.db, db: s.db,
@ -125,7 +118,6 @@ func (s *State) Copy() *State {
Validators: s.Validators.Copy(), Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(), LastValidators: s.LastValidators.Copy(),
AppHash: s.AppHash, AppHash: s.AppHash,
TxIndexer: s.TxIndexer,
LastHeightValidatorsChanged: s.LastHeightValidatorsChanged, LastHeightValidatorsChanged: s.LastHeightValidatorsChanged,
logger: s.logger, logger: s.logger,
ChainID: s.ChainID, ChainID: s.ChainID,
@ -287,7 +279,7 @@ type ABCIResponses struct {
Height int Height int
DeliverTx []*abci.ResponseDeliverTx DeliverTx []*abci.ResponseDeliverTx
EndBlock abci.ResponseEndBlock EndBlock *abci.ResponseEndBlock
txs types.Txs // reference for indexing results by hash txs types.Txs // reference for indexing results by hash
} }
@ -368,7 +360,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) {
} }
} }
// we do not need indexer during replay and in tests
return &State{ return &State{
db: db, db: db,
@ -381,7 +372,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) {
Validators: types.NewValidatorSet(validators), Validators: types.NewValidatorSet(validators),
LastValidators: types.NewValidatorSet(nil), LastValidators: types.NewValidatorSet(nil),
AppHash: genDoc.AppHash, AppHash: genDoc.AppHash,
TxIndexer: &null.TxIndex{},
LastHeightValidatorsChanged: 1, LastHeightValidatorsChanged: 1,
}, nil }, nil
} }

View File

@ -78,9 +78,9 @@ func TestABCIResponsesSaveLoad(t *testing.T) {
// build mock responses // build mock responses
block := makeBlock(2, state) block := makeBlock(2, state)
abciResponses := NewABCIResponses(block) abciResponses := NewABCIResponses(block)
abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo")} abciResponses.DeliverTx[0] = &abci.ResponseDeliverTx{Data: []byte("foo"), Tags: []*abci.KVPair{}}
abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok"} abciResponses.DeliverTx[1] = &abci.ResponseDeliverTx{Data: []byte("bar"), Log: "ok", Tags: []*abci.KVPair{}}
abciResponses.EndBlock = abci.ResponseEndBlock{Diffs: []*abci.Validator{ abciResponses.EndBlock = &abci.ResponseEndBlock{Diffs: []*abci.Validator{
{ {
PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(), PubKey: crypto.GenPrivKeyEd25519().PubKey().Bytes(),
Power: 10, Power: 10,
@ -198,12 +198,13 @@ func makeHeaderPartsResponses(state *State, height int,
block := makeBlock(height, state) block := makeBlock(height, state)
_, val := state.Validators.GetByIndex(0) _, val := state.Validators.GetByIndex(0)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
Height: height, Height: height,
EndBlock: &abci.ResponseEndBlock{Diffs: []*abci.Validator{}},
} }
// if the pubkey is new, remove the old and add the new // if the pubkey is new, remove the old and add the new
if !bytes.Equal(pubkey.Bytes(), val.PubKey.Bytes()) { if !bytes.Equal(pubkey.Bytes(), val.PubKey.Bytes()) {
abciResponses.EndBlock = abci.ResponseEndBlock{ abciResponses.EndBlock = &abci.ResponseEndBlock{
Diffs: []*abci.Validator{ Diffs: []*abci.Validator{
{val.PubKey.Bytes(), 0}, {val.PubKey.Bytes(), 0},
{pubkey.Bytes(), 10}, {pubkey.Bytes(), 10},

View File

@ -4,20 +4,24 @@ import (
"errors" "errors"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmlibs/pubsub/query"
) )
// TxIndexer interface defines methods to index and search transactions. // TxIndexer interface defines methods to index and search transactions.
type TxIndexer interface { type TxIndexer interface {
// AddBatch analyzes, indexes or stores a batch of transactions. // AddBatch analyzes, indexes and stores a batch of transactions.
// NOTE: We do not specify Index method for analyzing a single transaction
// here because it bears heavy performance losses. Almost all advanced indexers
// support batching.
AddBatch(b *Batch) error AddBatch(b *Batch) error
// Index analyzes, indexes and stores a single transaction.
Index(result *types.TxResult) error
// Get returns the transaction specified by hash or nil if the transaction is not indexed // Get returns the transaction specified by hash or nil if the transaction is not indexed
// or stored. // or stored.
Get(hash []byte) (*types.TxResult, error) Get(hash []byte) (*types.TxResult, error)
// Search allows you to query for transactions.
Search(q *query.Query) ([]*types.TxResult, error)
} }
//---------------------------------------------------- //----------------------------------------------------
@ -26,18 +30,18 @@ type TxIndexer interface {
// Batch groups together multiple Index operations to be performed at the same time. // Batch groups together multiple Index operations to be performed at the same time.
// NOTE: Batch is NOT thread-safe and must not be modified after starting its execution. // NOTE: Batch is NOT thread-safe and must not be modified after starting its execution.
type Batch struct { type Batch struct {
Ops []types.TxResult Ops []*types.TxResult
} }
// NewBatch creates a new Batch. // NewBatch creates a new Batch.
func NewBatch(n int) *Batch { func NewBatch(n int) *Batch {
return &Batch{ return &Batch{
Ops: make([]types.TxResult, n), Ops: make([]*types.TxResult, n),
} }
} }
// Add or update an entry for the given result.Index. // Add or update an entry for the given result.Index.
func (b *Batch) Add(result types.TxResult) error { func (b *Batch) Add(result *types.TxResult) error {
b.Ops[result.Index] = result b.Ops[result.Index] = result
return nil return nil
} }

View File

@ -0,0 +1,49 @@
package txindex
import (
"context"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
const (
subscriber = "IndexerService"
)
type IndexerService struct {
cmn.BaseService
idr TxIndexer
eventBus *types.EventBus
}
func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService {
is := &IndexerService{idr: idr, eventBus: eventBus}
is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is)
return is
}
// OnStart implements cmn.Service by subscribing for all transactions
// and indexing them by tags.
func (is *IndexerService) OnStart() error {
ch := make(chan interface{})
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, ch); err != nil {
return err
}
go func() {
for event := range ch {
// TODO: may be not perfomant to write one event at a time
txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult
is.idr.Index(&txResult)
}
}()
return nil
}
// OnStop implements cmn.Service by unsubscribing from all transactions.
func (is *IndexerService) OnStop() {
if is.eventBus.IsRunning() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}
}

View File

@ -2,25 +2,57 @@ package kv
import ( import (
"bytes" "bytes"
"encoding/hex"
"fmt" "fmt"
"strconv"
"strings"
"time"
"github.com/tendermint/go-wire" "github.com/pkg/errors"
db "github.com/tendermint/tmlibs/db"
abci "github.com/tendermint/abci/types"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
db "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/pubsub/query"
) )
// TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB). const (
// It can only index transaction by its identifier. tagKeySeparator = "/"
)
var _ txindex.TxIndexer = (*TxIndex)(nil)
// TxIndex is the simplest possible indexer, backed by key-value storage (levelDB).
type TxIndex struct { type TxIndex struct {
store db.DB store db.DB
tagsToIndex []string
indexAllTags bool
} }
// NewTxIndex returns new instance of TxIndex. // NewTxIndex creates new KV indexer.
func NewTxIndex(store db.DB) *TxIndex { func NewTxIndex(store db.DB, options ...func(*TxIndex)) *TxIndex {
return &TxIndex{store: store} txi := &TxIndex{store: store, tagsToIndex: make([]string, 0), indexAllTags: false}
for _, o := range options {
o(txi)
}
return txi
}
// IndexTags is an option for setting which tags to index.
func IndexTags(tags []string) func(*TxIndex) {
return func(txi *TxIndex) {
txi.tagsToIndex = tags
}
}
// IndexAllTags is an option for indexing all tags.
func IndexAllTags() func(*TxIndex) {
return func(txi *TxIndex) {
txi.indexAllTags = true
}
} }
// Get gets transaction from the TxIndex storage and returns it or nil if the // Get gets transaction from the TxIndex storage and returns it or nil if the
@ -46,13 +78,328 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
return txResult, nil return txResult, nil
} }
// AddBatch writes a batch of transactions into the TxIndex storage. // AddBatch indexes a batch of transactions using the given list of tags.
func (txi *TxIndex) AddBatch(b *txindex.Batch) error { func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
storeBatch := txi.store.NewBatch() storeBatch := txi.store.NewBatch()
for _, result := range b.Ops { for _, result := range b.Ops {
rawBytes := wire.BinaryBytes(&result) hash := result.Tx.Hash()
storeBatch.Set(result.Tx.Hash(), rawBytes)
// index tx by tags
for _, tag := range result.Result.Tags {
if txi.indexAllTags || cmn.StringInSlice(tag.Key, txi.tagsToIndex) {
storeBatch.Set(keyForTag(tag, result), hash)
}
}
// index tx by hash
rawBytes := wire.BinaryBytes(result)
storeBatch.Set(hash, rawBytes)
} }
storeBatch.Write() storeBatch.Write()
return nil return nil
} }
// Index indexes a single transaction using the given list of tags.
func (txi *TxIndex) Index(result *types.TxResult) error {
b := txi.store.NewBatch()
hash := result.Tx.Hash()
// index tx by tags
for _, tag := range result.Result.Tags {
if txi.indexAllTags || cmn.StringInSlice(tag.Key, txi.tagsToIndex) {
b.Set(keyForTag(tag, result), hash)
}
}
// index tx by hash
rawBytes := wire.BinaryBytes(result)
b.Set(hash, rawBytes)
b.Write()
return nil
}
// Search performs a search using the given query. It breaks the query into
// conditions (like "tx.height > 5"). For each condition, it queries the DB
// index. One special use cases here: (1) if "tx.hash" is found, it returns tx
// result for it (2) for range queries it is better for the client to provide
// both lower and upper bounds, so we are not performing a full scan. Results
// from querying indexes are then intersected and returned to the caller.
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
var hashes [][]byte
var hashesInitialized bool
// get a list of conditions (like "tx.height > 5")
conditions := q.Conditions()
// if there is a hash condition, return the result immediately
hash, err, ok := lookForHash(conditions)
if err != nil {
return nil, errors.Wrap(err, "error during searching for a hash in the query")
} else if ok {
res, err := txi.Get(hash)
if res == nil {
return []*types.TxResult{}, nil
} else {
return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result")
}
}
// conditions to skip because they're handled before "everything else"
skipIndexes := make([]int, 0)
// if there is a height condition ("tx.height=3"), extract it for faster lookups
height, heightIndex := lookForHeight(conditions)
if heightIndex >= 0 {
skipIndexes = append(skipIndexes, heightIndex)
}
// extract ranges
// if both upper and lower bounds exist, it's better to get them in order not
// no iterate over kvs that are not within range.
ranges, rangeIndexes := lookForRanges(conditions)
if len(ranges) > 0 {
skipIndexes = append(skipIndexes, rangeIndexes...)
for _, r := range ranges {
if !hashesInitialized {
hashes = txi.matchRange(r, startKeyForRange(r, height))
hashesInitialized = true
} else {
hashes = intersect(hashes, txi.matchRange(r, startKeyForRange(r, height)))
}
}
}
// for all other conditions
for i, c := range conditions {
if cmn.IntInSlice(i, skipIndexes) {
continue
}
if !hashesInitialized {
hashes = txi.match(c, startKey(c, height))
hashesInitialized = true
} else {
hashes = intersect(hashes, txi.match(c, startKey(c, height)))
}
}
results := make([]*types.TxResult, len(hashes))
i := 0
for _, h := range hashes {
results[i], err = txi.Get(h)
if err != nil {
return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
}
i++
}
return results, nil
}
func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) {
for _, c := range conditions {
if c.Tag == types.TxHashKey {
decoded, err := hex.DecodeString(c.Operand.(string))
return decoded, err, true
}
}
return
}
func lookForHeight(conditions []query.Condition) (height uint64, index int) {
for i, c := range conditions {
if c.Tag == types.TxHeightKey {
return uint64(c.Operand.(int64)), i
}
}
return 0, -1
}
// special map to hold range conditions
// Example: account.number => queryRange{lowerBound: 1, upperBound: 5}
type queryRanges map[string]queryRange
type queryRange struct {
key string
lowerBound interface{} // int || time.Time
includeLowerBound bool
upperBound interface{} // int || time.Time
includeUpperBound bool
}
func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) {
ranges = make(queryRanges)
for i, c := range conditions {
if isRangeOperation(c.Op) {
r, ok := ranges[c.Tag]
if !ok {
r = queryRange{key: c.Tag}
}
switch c.Op {
case query.OpGreater:
r.lowerBound = c.Operand
case query.OpGreaterEqual:
r.includeLowerBound = true
r.lowerBound = c.Operand
case query.OpLess:
r.upperBound = c.Operand
case query.OpLessEqual:
r.includeUpperBound = true
r.upperBound = c.Operand
}
ranges[c.Tag] = r
indexes = append(indexes, i)
}
}
return ranges, indexes
}
func isRangeOperation(op query.Operator) bool {
switch op {
case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual:
return true
default:
return false
}
}
func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) {
if c.Op == query.OpEqual {
it := txi.store.IteratorPrefix(startKey)
defer it.Release()
for it.Next() {
hashes = append(hashes, it.Value())
}
} else if c.Op == query.OpContains {
// XXX: doing full scan because startKey does not apply here
// For example, if startKey = "account.owner=an" and search query = "accoutn.owner CONSISTS an"
// we can't iterate with prefix "account.owner=an" because we might miss keys like "account.owner=Ulan"
it := txi.store.Iterator()
defer it.Release()
for it.Next() {
if !isTagKey(it.Key()) {
continue
}
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
hashes = append(hashes, it.Value())
}
}
} else {
panic("other operators should be handled already")
}
return
}
func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) {
it := txi.store.IteratorPrefix(startKey)
defer it.Release()
LOOP:
for it.Next() {
if !isTagKey(it.Key()) {
continue
}
if r.upperBound != nil {
// no other way to stop iterator other than checking for upperBound
switch (r.upperBound).(type) {
case int64:
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
if err == nil && v == r.upperBound {
if r.includeUpperBound {
hashes = append(hashes, it.Value())
}
break LOOP
}
// XXX: passing time in a ABCI Tags is not yet implemented
// case time.Time:
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
// if v == r.upperBound {
// break
// }
}
}
hashes = append(hashes, it.Value())
}
return
}
///////////////////////////////////////////////////////////////////////////////
// Keys
func startKey(c query.Condition, height uint64) []byte {
var key string
if height > 0 {
key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height)
} else {
key = fmt.Sprintf("%s/%v", c.Tag, c.Operand)
}
return []byte(key)
}
func startKeyForRange(r queryRange, height uint64) []byte {
if r.lowerBound == nil {
return []byte(fmt.Sprintf("%s", r.key))
}
var lowerBound interface{}
if r.includeLowerBound {
lowerBound = r.lowerBound
} else {
switch t := r.lowerBound.(type) {
case int64:
lowerBound = t + 1
case time.Time:
lowerBound = t.Unix() + 1
default:
panic("not implemented")
}
}
var key string
if height > 0 {
key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height)
} else {
key = fmt.Sprintf("%s/%v", r.key, lowerBound)
}
return []byte(key)
}
func isTagKey(key []byte) bool {
return strings.Count(string(key), tagKeySeparator) == 3
}
func extractValueFromKey(key []byte) string {
parts := strings.SplitN(string(key), tagKeySeparator, 3)
return parts[1]
}
func keyForTag(tag *abci.KVPair, result *types.TxResult) []byte {
switch tag.ValueType {
case abci.KVPair_STRING:
return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueString, result.Height, result.Index))
case abci.KVPair_INT:
return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueInt, result.Height, result.Index))
// case abci.KVPair_TIME:
// return []byte(fmt.Sprintf("%s/%d/%d/%d", tag.Key, tag.ValueTime.Unix(), result.Height, result.Index))
default:
panic(fmt.Sprintf("Undefined value type: %v", tag.ValueType))
}
}
///////////////////////////////////////////////////////////////////////////////
// Utils
func intersect(as, bs [][]byte) [][]byte {
i := make([][]byte, 0, cmn.MinInt(len(as), len(bs)))
for _, a := range as {
for _, b := range bs {
if bytes.Equal(a, b) {
i = append(i, a)
}
}
}
return i
}

View File

@ -1,6 +1,7 @@
package kv package kv
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"testing" "testing"
@ -11,30 +12,145 @@ import (
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
db "github.com/tendermint/tmlibs/db" db "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/pubsub/query"
) )
func TestTxIndex(t *testing.T) { func TestTxIndex(t *testing.T) {
indexer := &TxIndex{store: db.NewMemDB()} indexer := NewTxIndex(db.NewMemDB())
tx := types.Tx("HELLO WORLD") tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}}
hash := tx.Hash() hash := tx.Hash()
batch := txindex.NewBatch(1) batch := txindex.NewBatch(1)
if err := batch.Add(*txResult); err != nil { if err := batch.Add(txResult); err != nil {
t.Error(err) t.Error(err)
} }
err := indexer.AddBatch(batch) err := indexer.AddBatch(batch)
require.Nil(t, err) require.NoError(t, err)
loadedTxResult, err := indexer.Get(hash) loadedTxResult, err := indexer.Get(hash)
require.Nil(t, err) require.NoError(t, err)
assert.Equal(t, txResult, loadedTxResult) assert.Equal(t, txResult, loadedTxResult)
tx2 := types.Tx("BYE BYE WORLD")
txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}}
hash2 := tx2.Hash()
err = indexer.Index(txResult2)
require.NoError(t, err)
loadedTxResult2, err := indexer.Get(hash2)
require.NoError(t, err)
assert.Equal(t, txResult2, loadedTxResult2)
}
func TestTxSearch(t *testing.T) {
allowedTags := []string{"account.number", "account.owner", "account.date"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
txResult := txResultWithTags([]*abci.KVPair{
{Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 1},
{Key: "account.owner", ValueType: abci.KVPair_STRING, ValueString: "Ivan"},
{Key: "not_allowed", ValueType: abci.KVPair_STRING, ValueString: "Vlad"},
})
hash := txResult.Tx.Hash()
err := indexer.Index(txResult)
require.NoError(t, err)
testCases := []struct {
q string
resultsLength int
}{
// search by hash
{fmt.Sprintf("tx.hash = '%X'", hash), 1},
// search by exact match (one tag)
{"account.number = 1", 1},
// search by exact match (two tags)
{"account.number = 1 AND account.owner = 'Ivan'", 1},
// search by exact match (two tags)
{"account.number = 1 AND account.owner = 'Vlad'", 0},
// search by range
{"account.number >= 1 AND account.number <= 5", 1},
// search by range (lower bound)
{"account.number >= 1", 1},
// search by range (upper bound)
{"account.number <= 5", 1},
// search using not allowed tag
{"not_allowed = 'boom'", 0},
// search for not existing tx result
{"account.number >= 2 AND account.number <= 5", 0},
// search using not existing tag
{"account.date >= TIME 2013-05-03T14:45:00Z", 0},
// search using CONTAINS
{"account.owner CONTAINS 'an'", 1},
// search using CONTAINS
{"account.owner CONTAINS 'Vlad'", 0},
}
for _, tc := range testCases {
t.Run(tc.q, func(t *testing.T) {
results, err := indexer.Search(query.MustParse(tc.q))
assert.NoError(t, err)
assert.Len(t, results, tc.resultsLength)
if tc.resultsLength > 0 {
assert.Equal(t, []*types.TxResult{txResult}, results)
}
})
}
}
func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
allowedTags := []string{"account.number"}
indexer := NewTxIndex(db.NewMemDB(), IndexTags(allowedTags))
txResult := txResultWithTags([]*abci.KVPair{
{Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 1},
{Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 2},
})
err := indexer.Index(txResult)
require.NoError(t, err)
results, err := indexer.Search(query.MustParse("account.number >= 1"))
assert.NoError(t, err)
assert.Len(t, results, 1)
assert.Equal(t, []*types.TxResult{txResult}, results)
}
func TestIndexAllTags(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB(), IndexAllTags())
txResult := txResultWithTags([]*abci.KVPair{
abci.KVPairString("account.owner", "Ivan"),
abci.KVPairInt("account.number", 1),
})
err := indexer.Index(txResult)
require.NoError(t, err)
results, err := indexer.Search(query.MustParse("account.number >= 1"))
assert.NoError(t, err)
assert.Len(t, results, 1)
assert.Equal(t, []*types.TxResult{txResult}, results)
results, err = indexer.Search(query.MustParse("account.owner = 'Ivan'"))
assert.NoError(t, err)
assert.Len(t, results, 1)
assert.Equal(t, []*types.TxResult{txResult}, results)
}
func txResultWithTags(tags []*abci.KVPair) *types.TxResult {
tx := types.Tx("HELLO WORLD")
return &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: tags}}
} }
func benchmarkTxIndex(txsCount int, b *testing.B) { func benchmarkTxIndex(txsCount int, b *testing.B) {
tx := types.Tx("HELLO WORLD") tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}}
dir, err := ioutil.TempDir("", "tx_index_db") dir, err := ioutil.TempDir("", "tx_index_db")
if err != nil { if err != nil {
@ -43,11 +159,11 @@ func benchmarkTxIndex(txsCount int, b *testing.B) {
defer os.RemoveAll(dir) // nolint: errcheck defer os.RemoveAll(dir) // nolint: errcheck
store := db.NewDB("tx_index", "leveldb", dir) store := db.NewDB("tx_index", "leveldb", dir)
indexer := &TxIndex{store: store} indexer := NewTxIndex(store)
batch := txindex.NewBatch(txsCount) batch := txindex.NewBatch(txsCount)
for i := 0; i < txsCount; i++ { for i := 0; i < txsCount; i++ {
if err := batch.Add(*txResult); err != nil { if err := batch.Add(txResult); err != nil {
b.Fatal(err) b.Fatal(err)
} }
txResult.Index += 1 txResult.Index += 1

View File

@ -5,8 +5,11 @@ import (
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmlibs/pubsub/query"
) )
var _ txindex.TxIndexer = (*TxIndex)(nil)
// TxIndex acts as a /dev/null. // TxIndex acts as a /dev/null.
type TxIndex struct{} type TxIndex struct{}
@ -19,3 +22,12 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { func (txi *TxIndex) AddBatch(batch *txindex.Batch) error {
return nil return nil
} }
// Index is a noop and always returns nil.
func (txi *TxIndex) Index(result *types.TxResult) error {
return nil
}
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
return []*types.TxResult{}, nil
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
abci "github.com/tendermint/abci/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
tmpubsub "github.com/tendermint/tmlibs/pubsub" tmpubsub "github.com/tendermint/tmlibs/pubsub"
@ -67,63 +68,101 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
//--- block, tx, and vote events //--- block, tx, and vote events
func (b *EventBus) PublishEventNewBlock(block EventDataNewBlock) error { func (b *EventBus) PublishEventNewBlock(event EventDataNewBlock) error {
return b.Publish(EventNewBlock, TMEventData{block}) return b.Publish(EventNewBlock, TMEventData{event})
} }
func (b *EventBus) PublishEventNewBlockHeader(header EventDataNewBlockHeader) error { func (b *EventBus) PublishEventNewBlockHeader(event EventDataNewBlockHeader) error {
return b.Publish(EventNewBlockHeader, TMEventData{header}) return b.Publish(EventNewBlockHeader, TMEventData{event})
} }
func (b *EventBus) PublishEventVote(vote EventDataVote) error { func (b *EventBus) PublishEventVote(event EventDataVote) error {
return b.Publish(EventVote, TMEventData{vote}) return b.Publish(EventVote, TMEventData{event})
} }
func (b *EventBus) PublishEventTx(tx EventDataTx) error { // PublishEventTx publishes tx event with tags from Result. Note it will add
// predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names
// will be overwritten.
func (b *EventBus) PublishEventTx(event EventDataTx) error {
// no explicit deadline for publishing events // no explicit deadline for publishing events
ctx := context.Background() ctx := context.Background()
b.pubsub.PublishWithTags(ctx, TMEventData{tx}, map[string]interface{}{EventTypeKey: EventTx, TxHashKey: fmt.Sprintf("%X", tx.Tx.Hash())})
tags := make(map[string]interface{})
// validate and fill tags from tx result
for _, tag := range event.Result.Tags {
// basic validation
if tag.Key == "" {
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", event.Tx)
continue
}
switch tag.ValueType {
case abci.KVPair_STRING:
tags[tag.Key] = tag.ValueString
case abci.KVPair_INT:
tags[tag.Key] = tag.ValueInt
}
}
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventTx
logIfTagExists(TxHashKey, tags, b.Logger)
tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash())
logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = event.Height
b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags)
return nil return nil
} }
func (b *EventBus) PublishEventProposalHeartbeat(ph EventDataProposalHeartbeat) error { func (b *EventBus) PublishEventProposalHeartbeat(event EventDataProposalHeartbeat) error {
return b.Publish(EventProposalHeartbeat, TMEventData{ph}) return b.Publish(EventProposalHeartbeat, TMEventData{event})
} }
//--- EventDataRoundState events //--- EventDataRoundState events
func (b *EventBus) PublishEventNewRoundStep(rs EventDataRoundState) error { func (b *EventBus) PublishEventNewRoundStep(event EventDataRoundState) error {
return b.Publish(EventNewRoundStep, TMEventData{rs}) return b.Publish(EventNewRoundStep, TMEventData{event})
} }
func (b *EventBus) PublishEventTimeoutPropose(rs EventDataRoundState) error { func (b *EventBus) PublishEventTimeoutPropose(event EventDataRoundState) error {
return b.Publish(EventTimeoutPropose, TMEventData{rs}) return b.Publish(EventTimeoutPropose, TMEventData{event})
} }
func (b *EventBus) PublishEventTimeoutWait(rs EventDataRoundState) error { func (b *EventBus) PublishEventTimeoutWait(event EventDataRoundState) error {
return b.Publish(EventTimeoutWait, TMEventData{rs}) return b.Publish(EventTimeoutWait, TMEventData{event})
} }
func (b *EventBus) PublishEventNewRound(rs EventDataRoundState) error { func (b *EventBus) PublishEventNewRound(event EventDataRoundState) error {
return b.Publish(EventNewRound, TMEventData{rs}) return b.Publish(EventNewRound, TMEventData{event})
} }
func (b *EventBus) PublishEventCompleteProposal(rs EventDataRoundState) error { func (b *EventBus) PublishEventCompleteProposal(event EventDataRoundState) error {
return b.Publish(EventCompleteProposal, TMEventData{rs}) return b.Publish(EventCompleteProposal, TMEventData{event})
} }
func (b *EventBus) PublishEventPolka(rs EventDataRoundState) error { func (b *EventBus) PublishEventPolka(event EventDataRoundState) error {
return b.Publish(EventPolka, TMEventData{rs}) return b.Publish(EventPolka, TMEventData{event})
} }
func (b *EventBus) PublishEventUnlock(rs EventDataRoundState) error { func (b *EventBus) PublishEventUnlock(event EventDataRoundState) error {
return b.Publish(EventUnlock, TMEventData{rs}) return b.Publish(EventUnlock, TMEventData{event})
} }
func (b *EventBus) PublishEventRelock(rs EventDataRoundState) error { func (b *EventBus) PublishEventRelock(event EventDataRoundState) error {
return b.Publish(EventRelock, TMEventData{rs}) return b.Publish(EventRelock, TMEventData{event})
} }
func (b *EventBus) PublishEventLock(rs EventDataRoundState) error { func (b *EventBus) PublishEventLock(event EventDataRoundState) error {
return b.Publish(EventLock, TMEventData{rs}) return b.Publish(EventLock, TMEventData{event})
}
func logIfTagExists(tag string, tags map[string]interface{}, logger log.Logger) {
if value, ok := tags[tag]; ok {
logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value)
}
} }

View File

@ -3,7 +3,6 @@ package types
import ( import (
"fmt" "fmt"
abci "github.com/tendermint/abci/types"
"github.com/tendermint/go-wire/data" "github.com/tendermint/go-wire/data"
tmpubsub "github.com/tendermint/tmlibs/pubsub" tmpubsub "github.com/tendermint/tmlibs/pubsub"
tmquery "github.com/tendermint/tmlibs/pubsub/query" tmquery "github.com/tendermint/tmlibs/pubsub/query"
@ -110,12 +109,7 @@ type EventDataNewBlockHeader struct {
// All txs fire EventDataTx // All txs fire EventDataTx
type EventDataTx struct { type EventDataTx struct {
Height int `json:"height"` TxResult
Tx Tx `json:"tx"`
Data data.Bytes `json:"data"`
Log string `json:"log"`
Code abci.CodeType `json:"code"`
Error string `json:"error"` // this is redundant information for now
} }
type EventDataProposalHeartbeat struct { type EventDataProposalHeartbeat struct {
@ -142,10 +136,13 @@ type EventDataVote struct {
const ( const (
// EventTypeKey is a reserved key, used to specify event type in tags. // EventTypeKey is a reserved key, used to specify event type in tags.
EventTypeKey = "tm.events.type" EventTypeKey = "tm.event"
// TxHashKey is a reserved key, used to specify transaction's hash. // TxHashKey is a reserved key, used to specify transaction's hash.
// see EventBus#PublishEventTx // see EventBus#PublishEventTx
TxHashKey = "tx.hash" TxHashKey = "tx.hash"
// TxHeightKey is a reserved key, used to specify transaction block's height.
// see EventBus#PublishEventTx
TxHeightKey = "tx.height"
) )
var ( var (
@ -167,9 +164,10 @@ var (
EventQueryTimeoutWait = queryForEvent(EventTimeoutWait) EventQueryTimeoutWait = queryForEvent(EventTimeoutWait)
EventQueryVote = queryForEvent(EventVote) EventQueryVote = queryForEvent(EventVote)
EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat) EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat)
EventQueryTx = queryForEvent(EventTx)
) )
func EventQueryTx(tx Tx) tmpubsub.Query { func EventQueryTxFor(tx Tx) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash()))
} }