Properly check for orphaned txs

Change-Id: I2c3d1f638f6e6ab22c4dfcbbe0a0f5f6fd62f730
This commit is contained in:
Hendrik Hofstadt 2022-01-06 12:01:33 +01:00 committed by Evan Gray
parent 2998031b16
commit 409b5ca5bf
1 changed files with 33 additions and 3 deletions

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus/promauto"
"math/big"
"sync"
@ -37,6 +38,11 @@ var (
Name: "wormhole_eth_messages_observed_total",
Help: "Total number of Eth messages observed (pre-confirmation)",
}, []string{"eth_network"})
ethMessagesOrphaned = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_eth_messages_orphaned_total",
Help: "Total number of Eth messages dropped (orphaned)",
}, []string{"eth_network"})
ethMessagesConfirmed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_eth_messages_confirmed_total",
@ -89,9 +95,10 @@ type (
}
pendingKey struct {
TxHash eth_common.Hash
TxHash eth_common.Hash
BlockHash eth_common.Hash
EmitterAddress vaa.Address
Sequence uint64
Sequence uint64
}
pendingMessage struct {
@ -225,6 +232,7 @@ func (e *Watcher) Run(ctx context.Context) error {
key := pendingKey{
TxHash: message.TxHash,
BlockHash: ev.Raw.BlockHash,
EmitterAddress: message.EmitterAddress,
Sequence: message.Sequence,
}
@ -265,7 +273,7 @@ func (e *Watcher) Run(ctx context.Context) error {
currentEthHeight.WithLabelValues(e.networkName).Set(float64(ev.Number.Int64()))
readiness.SetReady(e.readiness)
p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
Height: ev.Number.Int64(),
Height: ev.Number.Int64(),
ContractAddress: e.contract.Hex(),
})
@ -284,6 +292,28 @@ func (e *Watcher) Run(ctx context.Context) error {
// Transaction is now ready
if pLock.height+uint64(pLock.message.ConsistencyLevel) <= ev.Number.Uint64() {
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
tx, err := c.TransactionReceipt(timeout, pLock.message.TxHash)
cancel()
if err != nil && err != rpc.ErrNoResult {
logger.Warn("transaction could not be fetched", zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
continue
}
if tx == nil {
logger.Info("tx was orphaned", zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pending, key)
ethMessagesOrphaned.WithLabelValues(e.networkName).Inc()
continue
}
if tx.BlockHash != key.BlockHash {
logger.Info("tx got dropped and mined in a later block; the message should have been reobserved", zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pending, key)
ethMessagesOrphaned.WithLabelValues(e.networkName).Inc()
continue
}
logger.Debug("observation confirmed", zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pending, key)