extract indexing goroutine to a separate indexer service
This commit is contained in:
parent
91f2184003
commit
686e0eea9f
21
node/node.go
21
node/node.go
|
@ -111,6 +111,7 @@ type Node struct {
|
|||
proxyApp proxy.AppConns // connection to the application
|
||||
rpcListeners []net.Listener // rpc servers
|
||||
txIndexer txindex.TxIndexer
|
||||
indexerService *txindex.IndexerService
|
||||
}
|
||||
|
||||
// NewNode returns a new, ready to go, Tendermint Node.
|
||||
|
@ -292,16 +293,7 @@ func NewNode(config *cfg.Config,
|
|||
txIndexer = &null.TxIndex{}
|
||||
}
|
||||
|
||||
// subscribe for all transactions and index them by tags
|
||||
ch := make(chan interface{})
|
||||
eventBus.Subscribe(context.Background(), "tx_index", types.EventQueryTx, ch)
|
||||
go func() {
|
||||
for event := range ch {
|
||||
// XXX: may be not perfomant to write one event at a time
|
||||
txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult
|
||||
txIndexer.Index(&txResult)
|
||||
}
|
||||
}()
|
||||
indexerService := txindex.NewIndexerService(txIndexer, eventBus)
|
||||
|
||||
// run the profile server
|
||||
profileHost := config.ProfListenAddress
|
||||
|
@ -328,6 +320,7 @@ func NewNode(config *cfg.Config,
|
|||
consensusReactor: consensusReactor,
|
||||
proxyApp: proxyApp,
|
||||
txIndexer: txIndexer,
|
||||
indexerService: indexerService,
|
||||
eventBus: eventBus,
|
||||
}
|
||||
node.BaseService = *cmn.NewBaseService(logger, "Node", node)
|
||||
|
@ -373,6 +366,12 @@ func (n *Node) OnStart() error {
|
|||
}
|
||||
}
|
||||
|
||||
// start tx indexer
|
||||
_, err = n.indexerService.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -392,6 +391,8 @@ func (n *Node) OnStop() {
|
|||
}
|
||||
|
||||
n.eventBus.Stop()
|
||||
|
||||
n.indexerService.Stop()
|
||||
}
|
||||
|
||||
// RunForever waits for an interrupt signal and stops the node.
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package txindex
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tendermint/tendermint/types"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
)
|
||||
|
||||
const (
|
||||
subscriber = "IndexerService"
|
||||
)
|
||||
|
||||
type IndexerService struct {
|
||||
cmn.BaseService
|
||||
|
||||
idr TxIndexer
|
||||
eventBus *types.EventBus
|
||||
}
|
||||
|
||||
func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService {
|
||||
is := &IndexerService{idr: idr, eventBus: eventBus}
|
||||
is.BaseService = *cmn.NewBaseService(nil, "IndexerService", is)
|
||||
return is
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service by subscribing for all transactions
|
||||
// and indexing them by tags.
|
||||
func (is *IndexerService) OnStart() error {
|
||||
ch := make(chan interface{})
|
||||
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, ch); err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for event := range ch {
|
||||
// TODO: may be not perfomant to write one event at a time
|
||||
txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult
|
||||
is.idr.Index(&txResult)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (is *IndexerService) OnStop() {
|
||||
if is.eventBus.IsRunning() {
|
||||
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue