From e1257f123aeb7a6bb3c142bd9871eedf3106b4b4 Mon Sep 17 00:00:00 2001 From: Evan Gray Date: Thu, 23 Dec 2021 19:23:06 +0000 Subject: [PATCH] node: eth watcher multi-message fix --- node/pkg/ethereum/watcher.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/node/pkg/ethereum/watcher.go b/node/pkg/ethereum/watcher.go index 588acca9c..99f298cd4 100644 --- a/node/pkg/ethereum/watcher.go +++ b/node/pkg/ethereum/watcher.go @@ -81,13 +81,19 @@ type ( // the governance mechanism lives there), setChan chan *common.GuardianSet - pending map[eth_common.Hash]*pendingMessage + pending map[pendingKey]*pendingMessage pendingMu sync.Mutex // 0 is a valid guardian set, so we need a nil value here currentGuardianSet *uint32 } + pendingKey struct { + TxHash eth_common.Hash + EmitterAddress vaa.Address + Sequence uint64 + } + pendingMessage struct { message *common.MessagePublication height uint64 @@ -110,7 +116,7 @@ func NewEthWatcher( chainID: chainID, msgChan: messageEvents, setChan: setEvents, - pending: map[eth_common.Hash]*pendingMessage{}} + pending: map[pendingKey]*pendingMessage{}} } func (e *Watcher) Run(ctx context.Context) error { @@ -201,7 +207,7 @@ func (e *Watcher) Run(ctx context.Context) error { return } - messsage := &common.MessagePublication{ + message := &common.MessagePublication{ TxHash: ev.Raw.TxHash, Timestamp: time.Unix(int64(b.Time()), 0), Nonce: ev.Nonce, @@ -217,9 +223,15 @@ func (e *Watcher) Run(ctx context.Context) error { ethMessagesObserved.WithLabelValues(e.networkName).Inc() + key := pendingKey{ + TxHash: message.TxHash, + EmitterAddress: message.EmitterAddress, + Sequence: message.Sequence, + } + e.pendingMu.Lock() - e.pending[ev.Raw.TxHash] = &pendingMessage{ - message: messsage, + e.pending[key] = &pendingMessage{ + message: message, height: ev.Raw.BlockNumber, } e.pendingMu.Unlock() @@ -260,13 +272,13 @@ func (e *Watcher) Run(ctx context.Context) error { e.pendingMu.Lock() blockNumberU := ev.Number.Uint64() - for hash, pLock := range e.pending { + for key, pLock := range e.pending { // Transaction was dropped and never picked up again if pLock.height+4*uint64(pLock.message.ConsistencyLevel) <= blockNumberU { logger.Debug("observation timed out", zap.Stringer("tx", pLock.message.TxHash), zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) - delete(e.pending, hash) + delete(e.pending, key) continue } @@ -274,7 +286,7 @@ func (e *Watcher) Run(ctx context.Context) error { if pLock.height+uint64(pLock.message.ConsistencyLevel) <= ev.Number.Uint64() { logger.Debug("observation confirmed", zap.Stringer("tx", pLock.message.TxHash), zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) - delete(e.pending, hash) + delete(e.pending, key) e.msgChan <- pLock.message ethMessagesConfirmed.WithLabelValues(e.networkName).Inc() }