node/pkg/ethereum: rename lockup to message

Change-Id: I914d348940ac3f0c359a4d5175cbe5861e9fe1c2
This commit is contained in:
Leo 2021-07-28 15:08:38 +02:00 committed by Leopold Schabel
parent 863e0e69ec
commit f717262282
1 changed files with 25 additions and 25 deletions

View File

@ -79,17 +79,17 @@ type (
emitGuardianSet bool emitGuardianSet bool
// Channel to send new messages to. // Channel to send new messages to.
lockChan chan *common.MessagePublication msgChan chan *common.MessagePublication
// Channel to send guardian set changes to. // Channel to send guardian set changes to.
setChan chan *common.GuardianSet setChan chan *common.GuardianSet
pendingLocks map[eth_common.Hash]*pendingLock pending map[eth_common.Hash]*pendingMessage
pendingLocksGuard sync.Mutex pendingMu sync.Mutex
} }
pendingLock struct { pendingMessage struct {
lock *common.MessagePublication message *common.MessagePublication
height uint64 height uint64
} }
) )
@ -99,7 +99,7 @@ func NewEthBridgeWatcher(
networkName string, networkName string,
chainID vaa.ChainID, chainID vaa.ChainID,
emitGuardianSet bool, emitGuardianSet bool,
lockEvents chan *common.MessagePublication, messageEvents chan *common.MessagePublication,
setEvents chan *common.GuardianSet) *EthBridgeWatcher { setEvents chan *common.GuardianSet) *EthBridgeWatcher {
return &EthBridgeWatcher{ return &EthBridgeWatcher{
url: url, url: url,
@ -107,9 +107,9 @@ func NewEthBridgeWatcher(
networkName: networkName, networkName: networkName,
emitGuardianSet: emitGuardianSet, emitGuardianSet: emitGuardianSet,
chainID: chainID, chainID: chainID,
lockChan: lockEvents, msgChan: messageEvents,
setChan: setEvents, setChan: setEvents,
pendingLocks: map[eth_common.Hash]*pendingLock{}} pending: map[eth_common.Hash]*pendingMessage{}}
} }
func (e *EthBridgeWatcher) Run(ctx context.Context) error { func (e *EthBridgeWatcher) Run(ctx context.Context) error {
@ -205,7 +205,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
return return
} }
lock := &common.MessagePublication{ messsage := &common.MessagePublication{
TxHash: ev.Raw.TxHash, TxHash: ev.Raw.TxHash,
Timestamp: time.Unix(int64(b.Time()), 0), Timestamp: time.Unix(int64(b.Time()), 0),
Nonce: ev.Nonce, Nonce: ev.Nonce,
@ -221,12 +221,12 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
ethMessagesObserved.WithLabelValues(e.networkName).Inc() ethMessagesObserved.WithLabelValues(e.networkName).Inc()
e.pendingLocksGuard.Lock() e.pendingMu.Lock()
e.pendingLocks[ev.Raw.TxHash] = &pendingLock{ e.pending[ev.Raw.TxHash] = &pendingMessage{
lock: lock, message: messsage,
height: ev.Raw.BlockNumber, height: ev.Raw.BlockNumber,
} }
e.pendingLocksGuard.Unlock() e.pendingMu.Unlock()
case ev := <-guardianSetC: case ev := <-guardianSetC:
logger.Info("guardian set has changed, fetching new value", logger.Info("guardian set has changed, fetching new value",
zap.Uint32("new_index", ev.Index), zap.String("eth_network", e.networkName)) zap.Uint32("new_index", ev.Index), zap.String("eth_network", e.networkName))
@ -285,30 +285,30 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
BridgeAddress: e.bridge.Hex(), BridgeAddress: e.bridge.Hex(),
}) })
e.pendingLocksGuard.Lock() e.pendingMu.Lock()
blockNumberU := ev.Number.Uint64() blockNumberU := ev.Number.Uint64()
for hash, pLock := range e.pendingLocks { for hash, pLock := range e.pending {
// Transaction was dropped and never picked up again // Transaction was dropped and never picked up again
if pLock.height+4*uint64(pLock.lock.ConsistencyLevel) <= blockNumberU { if pLock.height+4*uint64(pLock.message.ConsistencyLevel) <= blockNumberU {
logger.Debug("observation timed out", zap.Stringer("tx", pLock.lock.TxHash), logger.Debug("observation timed out", zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pendingLocks, hash) delete(e.pending, hash)
continue continue
} }
// Transaction is now ready // Transaction is now ready
if pLock.height+uint64(pLock.lock.ConsistencyLevel) <= ev.Number.Uint64() { if pLock.height+uint64(pLock.message.ConsistencyLevel) <= ev.Number.Uint64() {
logger.Debug("observation confirmed", zap.Stringer("tx", pLock.lock.TxHash), logger.Debug("observation confirmed", zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pendingLocks, hash) delete(e.pending, hash)
e.lockChan <- pLock.lock e.msgChan <- pLock.message
ethMessagesConfirmed.WithLabelValues(e.networkName).Inc() ethMessagesConfirmed.WithLabelValues(e.networkName).Inc()
} }
} }
e.pendingLocksGuard.Unlock() e.pendingMu.Unlock()
logger.Info("processed new header", zap.Stringer("block", ev.Number), logger.Info("processed new header", zap.Stringer("block", ev.Number),
zap.Duration("took", time.Since(start)), zap.String("eth_network", e.networkName)) zap.Duration("took", time.Since(start)), zap.String("eth_network", e.networkName))
} }