diff --git a/node/pkg/ethereum/watcher.go b/node/pkg/ethereum/watcher.go index 99f298cd4..594de36ea 100644 --- a/node/pkg/ethereum/watcher.go +++ b/node/pkg/ethereum/watcher.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/certusone/wormhole/node/pkg/p2p" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + "github.com/ethereum/go-ethereum/rpc" "github.com/prometheus/client_golang/prometheus/promauto" "math/big" "sync" @@ -37,6 +38,11 @@ var ( Name: "wormhole_eth_messages_observed_total", Help: "Total number of Eth messages observed (pre-confirmation)", }, []string{"eth_network"}) + ethMessagesOrphaned = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormhole_eth_messages_orphaned_total", + Help: "Total number of Eth messages dropped (orphaned)", + }, []string{"eth_network"}) ethMessagesConfirmed = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wormhole_eth_messages_confirmed_total", @@ -89,9 +95,10 @@ type ( } pendingKey struct { - TxHash eth_common.Hash + TxHash eth_common.Hash + BlockHash eth_common.Hash EmitterAddress vaa.Address - Sequence uint64 + Sequence uint64 } pendingMessage struct { @@ -225,6 +232,7 @@ func (e *Watcher) Run(ctx context.Context) error { key := pendingKey{ TxHash: message.TxHash, + BlockHash: ev.Raw.BlockHash, EmitterAddress: message.EmitterAddress, Sequence: message.Sequence, } @@ -265,7 +273,7 @@ func (e *Watcher) Run(ctx context.Context) error { currentEthHeight.WithLabelValues(e.networkName).Set(float64(ev.Number.Int64())) readiness.SetReady(e.readiness) p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{ - Height: ev.Number.Int64(), + Height: ev.Number.Int64(), ContractAddress: e.contract.Hex(), }) @@ -284,6 +292,28 @@ func (e *Watcher) Run(ctx context.Context) error { // Transaction is now ready if pLock.height+uint64(pLock.message.ConsistencyLevel) <= ev.Number.Uint64() { + timeout, cancel = context.WithTimeout(ctx, 15*time.Second) + tx, err := c.TransactionReceipt(timeout, pLock.message.TxHash) + cancel() + if err != nil && err != rpc.ErrNoResult { + logger.Warn("transaction could not be fetched", zap.Stringer("tx", pLock.message.TxHash), + zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) + continue + } + if tx == nil { + logger.Info("tx was orphaned", zap.Stringer("tx", pLock.message.TxHash), + zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) + delete(e.pending, key) + ethMessagesOrphaned.WithLabelValues(e.networkName).Inc() + continue + } + if tx.BlockHash != key.BlockHash { + logger.Info("tx got dropped and mined in a later block; the message should have been reobserved", zap.Stringer("tx", pLock.message.TxHash), + zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) + delete(e.pending, key) + ethMessagesOrphaned.WithLabelValues(e.networkName).Inc() + continue + } logger.Debug("observation confirmed", zap.Stringer("tx", pLock.message.TxHash), zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) delete(e.pending, key)