near/fix-stats: Fix network stats for near (#2077)

* near/fix-stats: Fix network stats for near
This commit is contained in:
jumpsiegel 2022-12-05 07:24:44 -06:00 committed by GitHub
parent e31f701b61
commit 759550715a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 47 deletions

View File

@ -4,10 +4,7 @@ import (
"context"
"time"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -32,12 +29,6 @@ func (e *Watcher) runMetrics(ctx context.Context) error {
wormholeMsgCounter := 0
var wormholeMsgTotalTime time.Duration = 0
currentNearHeight := promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_near_current_height",
Help: "Height of the highest block that has been processed. (Transactions from prior blocks may still be waiting).",
})
wormholeTxAvgDuration := promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_near_tx_avg_duration",
@ -74,8 +65,6 @@ func (e *Watcher) runMetrics(ctx context.Context) error {
Help: "NEAR RPC Error Counter",
})
var highestBlockHeightProcessed uint64 = 0
metricsIntervalTimer := time.NewTicker(metricsInterval) // this is just one ms for the first iteration.
for {
@ -90,19 +79,6 @@ func (e *Watcher) runMetrics(ctx context.Context) error {
chunkqueueLen.Set(float64(l2))
logger.Info("metrics", zap.Int64("txqueueLen", l1), zap.Int("chunkqueueLen", l2))
case height := <-e.eventChanBlockProcessedHeight:
if highestBlockHeightProcessed < height {
highestBlockHeightProcessed = height
currentNearHeight.Set(float64(height))
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDNear, &gossipv1.Heartbeat_Network{
Height: int64(height),
ContractAddress: e.wormholeAccount,
})
readiness.SetReady(common.ReadinessNearSyncing)
logger.Info("newHeight", zap.String("log_msg_type", "height"), zap.Uint64("height", height))
}
case event := <-e.eventChan:
switch event {
case EVENT_FINALIZED_CACHE_MISS:

View File

@ -245,9 +245,6 @@ func (e *Watcher) processWormholeLog(logger *zap.Logger, ctx context.Context, jo
// tell everyone about it
job.hasWormholeMsg = true
if pubEvent.BlockHeight > job.wormholeMsgBlockHeight {
job.wormholeMsgBlockHeight = pubEvent.BlockHeight
}
e.eventChan <- EVENT_NEAR_MESSAGE_CONFIRMED

View File

@ -58,8 +58,7 @@ type (
delay time.Duration
// set during processing
hasWormholeMsg bool // set during processing; whether this transaction emitted a Wormhole message
wormholeMsgBlockHeight uint64 // highest block height of a wormhole message in this transaction
hasWormholeMsg bool // set during processing; whether this transaction emitted a Wormhole message
}
Watcher struct {
@ -77,9 +76,8 @@ type (
chunkProcessingQueue chan nearapi.ChunkHeader
// events channels
eventChanBlockProcessedHeight chan uint64 // whenever a block is processed, post the height here
eventChanTxProcessedDuration chan time.Duration
eventChan chan eventType // whenever a messages is confirmed, post true in here
eventChanTxProcessedDuration chan time.Duration
eventChan chan eventType // whenever a messages is confirmed, post true in here
// sub-components
finalizer Finalizer
@ -96,16 +94,15 @@ func NewWatcher(
mainnet bool,
) *Watcher {
return &Watcher{
mainnet: mainnet,
wormholeAccount: wormholeContract,
nearRPC: nearRPC,
msgC: msgC,
obsvReqC: obsvReqC,
transactionProcessingQueue: make(chan *transactionProcessingJob),
chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize),
eventChanBlockProcessedHeight: make(chan uint64, 10),
eventChanTxProcessedDuration: make(chan time.Duration, 10),
eventChan: make(chan eventType, 10),
mainnet: mainnet,
wormholeAccount: wormholeContract,
nearRPC: nearRPC,
msgC: msgC,
obsvReqC: obsvReqC,
transactionProcessingQueue: make(chan *transactionProcessingJob),
chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize),
eventChanTxProcessedDuration: make(chan time.Duration, 10),
eventChan: make(chan eventType, 10),
}
}
@ -117,7 +114,6 @@ func newTransactionProcessingJob(txHash string, senderAccountId string) *transac
0,
initialTxProcDelay,
false,
0,
}
}
@ -147,6 +143,13 @@ func (e *Watcher) runBlockPoll(ctx context.Context) error {
if err != nil {
logger.Warn("NEAR poll error", zap.String("log_msg_type", "block_poll_error"), zap.String("error", err.Error()))
}
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDNear, &gossipv1.Heartbeat_Network{
Height: int64(highestFinalBlockHeightObserved),
ContractAddress: e.wormholeAccount,
})
readiness.SetReady(common.ReadinessNearSyncing)
timer.Reset(blockPollInterval)
}
}
@ -241,9 +244,6 @@ func (e *Watcher) runTxProcessor(ctx context.Context) error {
// report how long it took to process this transaction
e.eventChanTxProcessedDuration <- time.Since(job.creationTime)
}
// tell everyone about successful processing
e.eventChanBlockProcessedHeight <- job.wormholeMsgBlockHeight
}
}
@ -291,7 +291,6 @@ func (e *Watcher) Run(ctx context.Context) error {
}
}
readiness.SetReady(common.ReadinessNearSyncing)
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()