node: eth watcher multi-message fix

This commit is contained in:
Evan Gray 2021-12-23 19:23:06 +00:00 committed by Evan Gray
parent cc2c310ba6
commit e1257f123a
1 changed files with 20 additions and 8 deletions

View File

@ -81,13 +81,19 @@ type (
// the governance mechanism lives there), // the governance mechanism lives there),
setChan chan *common.GuardianSet setChan chan *common.GuardianSet
pending map[eth_common.Hash]*pendingMessage pending map[pendingKey]*pendingMessage
pendingMu sync.Mutex pendingMu sync.Mutex
// 0 is a valid guardian set, so we need a nil value here // 0 is a valid guardian set, so we need a nil value here
currentGuardianSet *uint32 currentGuardianSet *uint32
} }
pendingKey struct {
TxHash eth_common.Hash
EmitterAddress vaa.Address
Sequence uint64
}
pendingMessage struct { pendingMessage struct {
message *common.MessagePublication message *common.MessagePublication
height uint64 height uint64
@ -110,7 +116,7 @@ func NewEthWatcher(
chainID: chainID, chainID: chainID,
msgChan: messageEvents, msgChan: messageEvents,
setChan: setEvents, setChan: setEvents,
pending: map[eth_common.Hash]*pendingMessage{}} pending: map[pendingKey]*pendingMessage{}}
} }
func (e *Watcher) Run(ctx context.Context) error { func (e *Watcher) Run(ctx context.Context) error {
@ -201,7 +207,7 @@ func (e *Watcher) Run(ctx context.Context) error {
return return
} }
messsage := &common.MessagePublication{ message := &common.MessagePublication{
TxHash: ev.Raw.TxHash, TxHash: ev.Raw.TxHash,
Timestamp: time.Unix(int64(b.Time()), 0), Timestamp: time.Unix(int64(b.Time()), 0),
Nonce: ev.Nonce, Nonce: ev.Nonce,
@ -217,9 +223,15 @@ func (e *Watcher) Run(ctx context.Context) error {
ethMessagesObserved.WithLabelValues(e.networkName).Inc() ethMessagesObserved.WithLabelValues(e.networkName).Inc()
key := pendingKey{
TxHash: message.TxHash,
EmitterAddress: message.EmitterAddress,
Sequence: message.Sequence,
}
e.pendingMu.Lock() e.pendingMu.Lock()
e.pending[ev.Raw.TxHash] = &pendingMessage{ e.pending[key] = &pendingMessage{
message: messsage, message: message,
height: ev.Raw.BlockNumber, height: ev.Raw.BlockNumber,
} }
e.pendingMu.Unlock() e.pendingMu.Unlock()
@ -260,13 +272,13 @@ func (e *Watcher) Run(ctx context.Context) error {
e.pendingMu.Lock() e.pendingMu.Lock()
blockNumberU := ev.Number.Uint64() blockNumberU := ev.Number.Uint64()
for hash, pLock := range e.pending { for key, pLock := range e.pending {
// Transaction was dropped and never picked up again // Transaction was dropped and never picked up again
if pLock.height+4*uint64(pLock.message.ConsistencyLevel) <= blockNumberU { if pLock.height+4*uint64(pLock.message.ConsistencyLevel) <= blockNumberU {
logger.Debug("observation timed out", zap.Stringer("tx", pLock.message.TxHash), logger.Debug("observation timed out", zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pending, hash) delete(e.pending, key)
continue continue
} }
@ -274,7 +286,7 @@ func (e *Watcher) Run(ctx context.Context) error {
if pLock.height+uint64(pLock.message.ConsistencyLevel) <= ev.Number.Uint64() { if pLock.height+uint64(pLock.message.ConsistencyLevel) <= ev.Number.Uint64() {
logger.Debug("observation confirmed", zap.Stringer("tx", pLock.message.TxHash), logger.Debug("observation confirmed", zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pending, hash) delete(e.pending, key)
e.msgChan <- pLock.message e.msgChan <- pLock.message
ethMessagesConfirmed.WithLabelValues(e.networkName).Inc() ethMessagesConfirmed.WithLabelValues(e.networkName).Inc()
} }