Node/EVM: Retry block time read (#3577)

This commit is contained in:
bruce-riley 2023-12-06 16:54:03 -06:00 committed by GitHub
parent cab4419a4f
commit 9ccf5e593e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 142 additions and 61 deletions

View File

@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
ethereum "github.com/ethereum/go-ethereum"
eth_common "github.com/ethereum/go-ethereum/common"
eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil"
"go.uber.org/zap"
@ -484,73 +485,19 @@ func (w *Watcher) Run(parentCtx context.Context) error {
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return nil
case ev := <-messageC:
// Request timestamp for block
msm := time.Now()
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, ev.Raw.BlockHash)
cancel()
queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
blockTime, err := w.getBlockTime(ctx, ev.Raw.BlockHash)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
if err == ethereum.NotFound {
go w.waitForBlockTime(ctx, logger, errC, ev)
continue
}
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w",
ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
return nil
}
message := &common.MessagePublication{
TxHash: ev.Raw.TxHash,
Timestamp: time.Unix(int64(blockTime), 0),
Nonce: ev.Nonce,
Sequence: ev.Sequence,
EmitterChain: w.chainID,
EmitterAddress: PadAddress(ev.Sender),
Payload: ev.Payload,
ConsistencyLevel: ev.ConsistencyLevel,
}
ethMessagesObserved.WithLabelValues(w.networkName).Inc()
if message.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
logger.Info("found new message publication transaction, publishing it immediately",
zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber),
zap.Stringer("blockhash", ev.Raw.BlockHash),
zap.Uint64("blockTime", blockTime),
zap.Uint64("Sequence", ev.Sequence),
zap.Uint32("Nonce", ev.Nonce),
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
zap.String("eth_network", w.networkName))
w.msgC <- message
ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
continue
}
logger.Info("found new message publication transaction",
zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber),
zap.Stringer("blockhash", ev.Raw.BlockHash),
zap.Uint64("blockTime", blockTime),
zap.Uint64("Sequence", ev.Sequence),
zap.Uint32("Nonce", ev.Nonce),
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
zap.String("eth_network", w.networkName))
key := pendingKey{
TxHash: message.TxHash,
BlockHash: ev.Raw.BlockHash,
EmitterAddress: message.EmitterAddress,
Sequence: message.Sequence,
}
w.pendingMu.Lock()
w.pending[key] = &pendingMessage{
message: message,
height: ev.Raw.BlockNumber,
}
w.pendingMu.Unlock()
w.postMessage(logger, ev, blockTime)
}
}
})
@ -917,3 +864,137 @@ func (w *Watcher) updateNetworkStats(stats *gossipv1.Heartbeat_Network) {
ContractAddress: w.contract.Hex(),
})
}
// getBlockTime reads the time of a block.
func (w *Watcher) getBlockTime(ctx context.Context, blockHash eth_common.Hash) (uint64, error) {
msm := time.Now()
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, blockHash)
cancel()
queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
return blockTime, err
}
// postMessage creates a message object from a log event and adds it to the pending list for processing.
func (w *Watcher) postMessage(logger *zap.Logger, ev *ethabi.AbiLogMessagePublished, blockTime uint64) {
message := &common.MessagePublication{
TxHash: ev.Raw.TxHash,
Timestamp: time.Unix(int64(blockTime), 0),
Nonce: ev.Nonce,
Sequence: ev.Sequence,
EmitterChain: w.chainID,
EmitterAddress: PadAddress(ev.Sender),
Payload: ev.Payload,
ConsistencyLevel: ev.ConsistencyLevel,
}
ethMessagesObserved.WithLabelValues(w.networkName).Inc()
if message.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
logger.Info("found new message publication transaction, publishing it immediately",
zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber),
zap.Stringer("blockhash", ev.Raw.BlockHash),
zap.Uint64("blockTime", blockTime),
zap.Uint64("Sequence", ev.Sequence),
zap.Uint32("Nonce", ev.Nonce),
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
zap.String("eth_network", w.networkName))
w.msgC <- message
ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
return
}
logger.Info("found new message publication transaction",
zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber),
zap.Stringer("blockhash", ev.Raw.BlockHash),
zap.Uint64("blockTime", blockTime),
zap.Uint64("Sequence", ev.Sequence),
zap.Uint32("Nonce", ev.Nonce),
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
zap.String("eth_network", w.networkName))
key := pendingKey{
TxHash: message.TxHash,
BlockHash: ev.Raw.BlockHash,
EmitterAddress: message.EmitterAddress,
Sequence: message.Sequence,
}
w.pendingMu.Lock()
w.pending[key] = &pendingMessage{
message: message,
height: ev.Raw.BlockNumber,
}
w.pendingMu.Unlock()
}
// waitForBlockTime is a go routine that repeatedly attempts to read the block time for a single log event. It is used when the initial attempt to read
// the block time fails. If it is finally able to read the block time, it posts the event for processing. Otherwise, it will eventually give up.
func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC chan error, ev *ethabi.AbiLogMessagePublished) {
logger.Warn("found new message publication transaction but failed to look up block time, deferring processing",
zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber),
zap.Stringer("blockhash", ev.Raw.BlockHash),
zap.Uint64("Sequence", ev.Sequence),
zap.Uint32("Nonce", ev.Nonce),
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
zap.String("eth_network", w.networkName))
const RetryInterval = 5 * time.Second
const MaxRetries = 3
start := time.Now()
t := time.NewTimer(RetryInterval)
defer t.Stop()
retries := 1
for {
select {
case <-ctx.Done():
return
case <-t.C:
blockTime, err := w.getBlockTime(ctx, ev.Raw.BlockHash)
if err == nil {
logger.Info("retry of block time query succeeded, posting transaction",
zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber),
zap.Stringer("blockhash", ev.Raw.BlockHash),
zap.Uint64("blockTime", blockTime),
zap.Uint64("Sequence", ev.Sequence),
zap.Uint32("Nonce", ev.Nonce),
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
zap.Stringer("startTime", start),
zap.Int("retries", retries),
zap.String("eth_network", w.networkName))
w.postMessage(logger, ev, blockTime)
return
}
ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
if err != ethereum.NotFound {
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
return
}
if retries >= MaxRetries {
logger.Error("repeatedly failed to look up block time, giving up",
zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber),
zap.Stringer("blockhash", ev.Raw.BlockHash),
zap.Uint64("Sequence", ev.Sequence),
zap.Uint32("Nonce", ev.Nonce),
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
zap.Stringer("startTime", start),
zap.Int("retries", retries),
zap.String("eth_network", w.networkName))
return
}
retries++
t.Reset(RetryInterval)
}
}
}