diff --git a/bridge/pkg/ethereum/watcher.go b/bridge/pkg/ethereum/watcher.go index cd73a17aa..a9de69aa2 100644 --- a/bridge/pkg/ethereum/watcher.go +++ b/bridge/pkg/ethereum/watcher.go @@ -79,17 +79,17 @@ type ( emitGuardianSet bool // Channel to send new messages to. - lockChan chan *common.MessagePublication + msgChan chan *common.MessagePublication // Channel to send guardian set changes to. setChan chan *common.GuardianSet - pendingLocks map[eth_common.Hash]*pendingLock - pendingLocksGuard sync.Mutex + pending map[eth_common.Hash]*pendingMessage + pendingMu sync.Mutex } - pendingLock struct { - lock *common.MessagePublication - height uint64 + pendingMessage struct { + message *common.MessagePublication + height uint64 } ) @@ -99,7 +99,7 @@ func NewEthBridgeWatcher( networkName string, chainID vaa.ChainID, emitGuardianSet bool, - lockEvents chan *common.MessagePublication, + messageEvents chan *common.MessagePublication, setEvents chan *common.GuardianSet) *EthBridgeWatcher { return &EthBridgeWatcher{ url: url, @@ -107,9 +107,9 @@ func NewEthBridgeWatcher( networkName: networkName, emitGuardianSet: emitGuardianSet, chainID: chainID, - lockChan: lockEvents, + msgChan: messageEvents, setChan: setEvents, - pendingLocks: map[eth_common.Hash]*pendingLock{}} + pending: map[eth_common.Hash]*pendingMessage{}} } func (e *EthBridgeWatcher) Run(ctx context.Context) error { @@ -205,7 +205,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { return } - lock := &common.MessagePublication{ + messsage := &common.MessagePublication{ TxHash: ev.Raw.TxHash, Timestamp: time.Unix(int64(b.Time()), 0), Nonce: ev.Nonce, @@ -221,12 +221,12 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { ethMessagesObserved.WithLabelValues(e.networkName).Inc() - e.pendingLocksGuard.Lock() - e.pendingLocks[ev.Raw.TxHash] = &pendingLock{ - lock: lock, - height: ev.Raw.BlockNumber, + e.pendingMu.Lock() + e.pending[ev.Raw.TxHash] = &pendingMessage{ + message: messsage, + height: ev.Raw.BlockNumber, } - e.pendingLocksGuard.Unlock() + e.pendingMu.Unlock() case ev := <-guardianSetC: logger.Info("guardian set has changed, fetching new value", zap.Uint32("new_index", ev.Index), zap.String("eth_network", e.networkName)) @@ -285,30 +285,30 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { BridgeAddress: e.bridge.Hex(), }) - e.pendingLocksGuard.Lock() + e.pendingMu.Lock() blockNumberU := ev.Number.Uint64() - for hash, pLock := range e.pendingLocks { + for hash, pLock := range e.pending { // Transaction was dropped and never picked up again - if pLock.height+4*uint64(pLock.lock.ConsistencyLevel) <= blockNumberU { - logger.Debug("observation timed out", zap.Stringer("tx", pLock.lock.TxHash), + if pLock.height+4*uint64(pLock.message.ConsistencyLevel) <= blockNumberU { + logger.Debug("observation timed out", zap.Stringer("tx", pLock.message.TxHash), zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) - delete(e.pendingLocks, hash) + delete(e.pending, hash) continue } // Transaction is now ready - if pLock.height+uint64(pLock.lock.ConsistencyLevel) <= ev.Number.Uint64() { - logger.Debug("observation confirmed", zap.Stringer("tx", pLock.lock.TxHash), + if pLock.height+uint64(pLock.message.ConsistencyLevel) <= ev.Number.Uint64() { + logger.Debug("observation confirmed", zap.Stringer("tx", pLock.message.TxHash), zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) - delete(e.pendingLocks, hash) - e.lockChan <- pLock.lock + delete(e.pending, hash) + e.msgChan <- pLock.message ethMessagesConfirmed.WithLabelValues(e.networkName).Inc() } } - e.pendingLocksGuard.Unlock() + e.pendingMu.Unlock() logger.Info("processed new header", zap.Stringer("block", ev.Number), zap.Duration("took", time.Since(start)), zap.String("eth_network", e.networkName)) }