From 9ccf5e593ec553147dc290fc9059b114557d3365 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Wed, 6 Dec 2023 16:54:03 -0600 Subject: [PATCH] Node/EVM: Retry block time read (#3577) --- node/pkg/watchers/evm/watcher.go | 203 +++++++++++++++++++++---------- 1 file changed, 142 insertions(+), 61 deletions(-) diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index c6214c8da..d102c5f2b 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/client_golang/prometheus" + ethereum "github.com/ethereum/go-ethereum" eth_common "github.com/ethereum/go-ethereum/common" eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil" "go.uber.org/zap" @@ -484,73 +485,19 @@ func (w *Watcher) Run(parentCtx context.Context) error { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return nil case ev := <-messageC: - // Request timestamp for block - msm := time.Now() - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, ev.Raw.BlockHash) - cancel() - queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds()) - + blockTime, err := w.getBlockTime(ctx, ev.Raw.BlockHash) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc() + if err == ethereum.NotFound { + go w.waitForBlockTime(ctx, logger, errC, ev) + continue + } p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) - errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", - ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) + errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) return nil } - message := &common.MessagePublication{ - TxHash: ev.Raw.TxHash, - Timestamp: time.Unix(int64(blockTime), 0), - Nonce: ev.Nonce, - Sequence: ev.Sequence, - EmitterChain: w.chainID, - EmitterAddress: PadAddress(ev.Sender), - Payload: ev.Payload, - ConsistencyLevel: ev.ConsistencyLevel, - } - - ethMessagesObserved.WithLabelValues(w.networkName).Inc() - - if message.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately { - logger.Info("found new message publication transaction, publishing it immediately", - zap.Stringer("tx", ev.Raw.TxHash), - zap.Uint64("block", ev.Raw.BlockNumber), - zap.Stringer("blockhash", ev.Raw.BlockHash), - zap.Uint64("blockTime", blockTime), - zap.Uint64("Sequence", ev.Sequence), - zap.Uint32("Nonce", ev.Nonce), - zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), - zap.String("eth_network", w.networkName)) - - w.msgC <- message - ethMessagesConfirmed.WithLabelValues(w.networkName).Inc() - continue - } - - logger.Info("found new message publication transaction", - zap.Stringer("tx", ev.Raw.TxHash), - zap.Uint64("block", ev.Raw.BlockNumber), - zap.Stringer("blockhash", ev.Raw.BlockHash), - zap.Uint64("blockTime", blockTime), - zap.Uint64("Sequence", ev.Sequence), - zap.Uint32("Nonce", ev.Nonce), - zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), - zap.String("eth_network", w.networkName)) - - key := pendingKey{ - TxHash: message.TxHash, - BlockHash: ev.Raw.BlockHash, - EmitterAddress: message.EmitterAddress, - Sequence: message.Sequence, - } - - w.pendingMu.Lock() - w.pending[key] = &pendingMessage{ - message: message, - height: ev.Raw.BlockNumber, - } - w.pendingMu.Unlock() + w.postMessage(logger, ev, blockTime) } } }) @@ -917,3 +864,137 @@ func (w *Watcher) updateNetworkStats(stats *gossipv1.Heartbeat_Network) { ContractAddress: w.contract.Hex(), }) } + +// getBlockTime reads the time of a block. +func (w *Watcher) getBlockTime(ctx context.Context, blockHash eth_common.Hash) (uint64, error) { + msm := time.Now() + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, blockHash) + cancel() + queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds()) + return blockTime, err +} + +// postMessage creates a message object from a log event and adds it to the pending list for processing. +func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublished, blockTime uint64) { + message := &common.MessagePublication{ + TxHash: ev.Raw.TxHash, + Timestamp: time.Unix(int64(blockTime), 0), + Nonce: ev.Nonce, + Sequence: ev.Sequence, + EmitterChain: w.chainID, + EmitterAddress: PadAddress(ev.Sender), + Payload: ev.Payload, + ConsistencyLevel: ev.ConsistencyLevel, + } + + ethMessagesObserved.WithLabelValues(w.networkName).Inc() + + if message.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately { + logger.Info("found new message publication transaction, publishing it immediately", + zap.Stringer("tx", ev.Raw.TxHash), + zap.Uint64("block", ev.Raw.BlockNumber), + zap.Stringer("blockhash", ev.Raw.BlockHash), + zap.Uint64("blockTime", blockTime), + zap.Uint64("Sequence", ev.Sequence), + zap.Uint32("Nonce", ev.Nonce), + zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), + zap.String("eth_network", w.networkName)) + + w.msgC <- message + ethMessagesConfirmed.WithLabelValues(w.networkName).Inc() + return + } + + logger.Info("found new message publication transaction", + zap.Stringer("tx", ev.Raw.TxHash), + zap.Uint64("block", ev.Raw.BlockNumber), + zap.Stringer("blockhash", ev.Raw.BlockHash), + zap.Uint64("blockTime", blockTime), + zap.Uint64("Sequence", ev.Sequence), + zap.Uint32("Nonce", ev.Nonce), + zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), + zap.String("eth_network", w.networkName)) + + key := pendingKey{ + TxHash: message.TxHash, + BlockHash: ev.Raw.BlockHash, + EmitterAddress: message.EmitterAddress, + Sequence: message.Sequence, + } + + w.pendingMu.Lock() + w.pending[key] = &pendingMessage{ + message: message, + height: ev.Raw.BlockNumber, + } + w.pendingMu.Unlock() +} + +// waitForBlockTime is a go routine that repeatedly attempts to read the block time for a single log event. It is used when the initial attempt to read +// the block time fails. If it is finally able to read the block time, it posts the event for processing. Otherwise, it will eventually give up. +func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC chan error, ev *ethabi.AbiLogMessagePublished) { + logger.Warn("found new message publication transaction but failed to look up block time, deferring processing", + zap.Stringer("tx", ev.Raw.TxHash), + zap.Uint64("block", ev.Raw.BlockNumber), + zap.Stringer("blockhash", ev.Raw.BlockHash), + zap.Uint64("Sequence", ev.Sequence), + zap.Uint32("Nonce", ev.Nonce), + zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), + zap.String("eth_network", w.networkName)) + + const RetryInterval = 5 * time.Second + const MaxRetries = 3 + start := time.Now() + t := time.NewTimer(RetryInterval) + defer t.Stop() + retries := 1 + for { + select { + case <-ctx.Done(): + return + case <-t.C: + blockTime, err := w.getBlockTime(ctx, ev.Raw.BlockHash) + if err == nil { + logger.Info("retry of block time query succeeded, posting transaction", + zap.Stringer("tx", ev.Raw.TxHash), + zap.Uint64("block", ev.Raw.BlockNumber), + zap.Stringer("blockhash", ev.Raw.BlockHash), + zap.Uint64("blockTime", blockTime), + zap.Uint64("Sequence", ev.Sequence), + zap.Uint32("Nonce", ev.Nonce), + zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), + zap.Stringer("startTime", start), + zap.Int("retries", retries), + zap.String("eth_network", w.networkName)) + + w.postMessage(logger, ev, blockTime) + return + } + + ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc() + if err != ethereum.NotFound { + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) + return + } + if retries >= MaxRetries { + logger.Error("repeatedly failed to look up block time, giving up", + zap.Stringer("tx", ev.Raw.TxHash), + zap.Uint64("block", ev.Raw.BlockNumber), + zap.Stringer("blockhash", ev.Raw.BlockHash), + zap.Uint64("Sequence", ev.Sequence), + zap.Uint32("Nonce", ev.Nonce), + zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), + zap.Stringer("startTime", start), + zap.Int("retries", retries), + zap.String("eth_network", w.networkName)) + + return + } + + retries++ + t.Reset(RetryInterval) + } + } +}