From 5e3a23df6d9d8e886970e3e17d65fad523cde30b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 14 May 2018 11:10:59 +0400 Subject: [PATCH] simplify indexer service main loop --- node/node.go | 1 + state/txindex/indexer_service.go | 30 +++++++++++------------------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/node/node.go b/node/node.go index fdc46669..1bd382eb 100644 --- a/node/node.go +++ b/node/node.go @@ -343,6 +343,7 @@ func NewNode(config *cfg.Config, } indexerService := txindex.NewIndexerService(txIndexer, eventBus) + indexerService.SetLogger(logger.With("module", "txindex")) // run the profile server profileHost := config.ProfListenAddress diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index edcb362e..93e6269e 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -41,32 +41,24 @@ func (is *IndexerService) OnStart() error { } go func() { - var numTxs, got int64 - var batch *Batch for { - select { - case e, ok := <-blockHeadersCh: + e, ok := <-blockHeadersCh + if !ok { + return + } + header := e.(types.EventDataNewBlockHeader).Header + batch := NewBatch(header.NumTxs) + for i := int64(0); i < header.NumTxs; i++ { + e, ok := <-txsCh if !ok { + is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i) return } - numTxs = e.(types.EventDataNewBlockHeader).Header.NumTxs - batch = NewBatch(numTxs) - case e, ok := <-txsCh: - if !ok { - return - } - if batch == nil { - panic("Expected pubsub to send block header first, but got tx event") - } txResult := e.(types.EventDataTx).TxResult batch.Add(&txResult) - got++ - if numTxs == got { - is.idr.AddBatch(batch) - batch = nil - got = 0 - } } + is.idr.AddBatch(batch) + is.Logger.Info("Indexed block", "height", header.Height) } }() return nil