From 95fd892bec6849a3a76d949482acee2a10dfa774 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Mon, 28 Nov 2022 07:55:35 -0600 Subject: [PATCH] Node: Support for EVM watcher safe blocks (#1727) * Node: Support for EVM watcher safe blocks * Fix merge error * Add check of publishSafeBlocks if not useFinalized --- node/pkg/watchers/evm/connectors/common.go | 1 + node/pkg/watchers/evm/connectors/poller.go | 71 +++++++++++----- node/pkg/watchers/evm/connectors/polygon.go | 2 +- node/pkg/watchers/evm/watcher.go | 94 +++++++++++++++++---- sdk/vaa/structs.go | 1 + 5 files changed, 132 insertions(+), 37 deletions(-) diff --git a/node/pkg/watchers/evm/connectors/common.go b/node/pkg/watchers/evm/connectors/common.go index 18eea2937..7b3c81396 100644 --- a/node/pkg/watchers/evm/connectors/common.go +++ b/node/pkg/watchers/evm/connectors/common.go @@ -18,6 +18,7 @@ type NewBlock struct { Number *big.Int Hash common.Hash L1BlockNumber *big.Int // This is only populated on some chains (Arbitrum) + Safe bool } // Connector exposes Wormhole-specific interactions with an EVM-based network diff --git a/node/pkg/watchers/evm/connectors/poller.go b/node/pkg/watchers/evm/connectors/poller.go index 933c1e2a8..9130fb317 100644 --- a/node/pkg/watchers/evm/connectors/poller.go +++ b/node/pkg/watchers/evm/connectors/poller.go @@ -23,19 +23,24 @@ type PollFinalizer interface { // finalizer which will be used to only return finalized blocks on subscriptions. type BlockPollConnector struct { Connector - Delay time.Duration - useFinalized bool - finalizer PollFinalizer - blockFeed ethEvent.Feed - errFeed ethEvent.Feed + Delay time.Duration + useFinalized bool + publishSafeBlocks bool + finalizer PollFinalizer + blockFeed ethEvent.Feed + errFeed ethEvent.Feed } -func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, useFinalized bool) (*BlockPollConnector, error) { +func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, useFinalized bool, publishSafeBlocks bool) (*BlockPollConnector, error) { + if publishSafeBlocks && !useFinalized { + return nil, fmt.Errorf("publishSafeBlocks may only be enabled if useFinalized is enabled") + } connector := &BlockPollConnector{ - Connector: baseConnector, - Delay: delay, - useFinalized: useFinalized, - finalizer: finalizer, + Connector: baseConnector, + Delay: delay, + useFinalized: useFinalized, + publishSafeBlocks: publishSafeBlocks, + finalizer: finalizer, } err := supervisor.Run(ctx, "blockPoller", connector.run) if err != nil { @@ -47,11 +52,19 @@ func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finaliz func (b *BlockPollConnector) run(ctx context.Context) error { logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName())) - lastBlock, err := b.getBlock(ctx, logger, nil) + lastBlock, err := b.getBlock(ctx, logger, nil, false) if err != nil { return err } + var lastSafeBlock *NewBlock + if b.publishSafeBlocks { + lastSafeBlock, err = b.getBlock(ctx, logger, nil, true) + if err != nil { + return err + } + } + timer := time.NewTimer(time.Millisecond) // Start immediately. supervisor.Signal(ctx, supervisor.SignalHealthy) @@ -62,17 +75,30 @@ func (b *BlockPollConnector) run(ctx context.Context) error { return ctx.Err() case <-timer.C: for count := 0; count < 3; count++ { - lastBlock, err = b.pollBlocks(ctx, logger, lastBlock) + lastBlock, err = b.pollBlocks(ctx, logger, lastBlock, false) if err == nil { break } - logger.Error("polling encountered an error", zap.Error(err)) + logger.Error("polling of block encountered an error", zap.Error(err)) // Wait an interval before trying again. We stay in this loop so that we // try up to three times before causing the watcher to restart. time.Sleep(b.Delay) } + if err == nil && b.publishSafeBlocks { + for count := 0; count < 3; count++ { + lastSafeBlock, err = b.pollBlocks(ctx, logger, lastSafeBlock, true) + if err == nil { + break + } + logger.Error("polling of safe block encountered an error", zap.Error(err)) + + // Same wait as above. + time.Sleep(b.Delay) + } + } + if err != nil { b.errFeed.Send(fmt.Sprint("polling encountered an error: ", err)) } @@ -81,7 +107,7 @@ func (b *BlockPollConnector) run(ctx context.Context) error { } } -func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock) (lastPublishedBlock *NewBlock, retErr error) { +func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock, safe bool) (lastPublishedBlock *NewBlock, retErr error) { // Some of the testnet providers (like the one we are using for Arbitrum) limit how many transactions we can do. When that happens, the call hangs. // Use a timeout so that the call will fail and the runable will get restarted. This should not happen in mainnet, but if it does, we will need to // investigate why the runable is dying and fix the underlying problem. @@ -94,7 +120,7 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, // Fetch the latest block on the chain // We could do this on every iteration such that if a new block is created while this function is being executed, // it would automatically fetch new blocks but in order to reduce API load this will be done on the next iteration. - latestBlock, err := b.getBlock(timeout, logger, nil) + latestBlock, err := b.getBlock(timeout, logger, nil, safe) if err != nil { logger.Error("failed to look up latest block", zap.Uint64("lastSeenBlock", lastBlock.Number.Uint64()), zap.Error(err)) @@ -108,7 +134,7 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, // Try to fetch the next block between lastBlock and latestBlock nextBlockNumber := new(big.Int).Add(lastPublishedBlock.Number, big.NewInt(1)) - block, err := b.getBlock(timeout, logger, nextBlockNumber) + block, err := b.getBlock(timeout, logger, nextBlockNumber, safe) if err != nil { logger.Error("failed to fetch next block", zap.Uint64("block", nextBlockNumber.Uint64()), zap.Error(err)) @@ -164,17 +190,21 @@ func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<- return sub, nil } -func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int) (*NewBlock, error) { - return getBlock(ctx, logger, b.Connector, number, b.useFinalized) +func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int, safe bool) (*NewBlock, error) { + return getBlock(ctx, logger, b.Connector, number, b.useFinalized, safe) } // getBlock is a free function that can be called from other connectors to get a single block. -func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *big.Int, useFinalized bool) (*NewBlock, error) { +func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *big.Int, useFinalized bool, safe bool) (*NewBlock, error) { var numStr string if number != nil { numStr = ethHexUtils.EncodeBig(number) } else if useFinalized { - numStr = "finalized" + if safe { + numStr = "safe" + } else { + numStr = "finalized" + } } else { numStr = "latest" } @@ -213,5 +243,6 @@ func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *b Number: &n, Hash: m.Hash, L1BlockNumber: l1bn, + Safe: safe, }, nil } diff --git a/node/pkg/watchers/evm/connectors/polygon.go b/node/pkg/watchers/evm/connectors/polygon.go index afaa80393..4c29a646e 100644 --- a/node/pkg/watchers/evm/connectors/polygon.go +++ b/node/pkg/watchers/evm/connectors/polygon.go @@ -147,7 +147,7 @@ func (c *PolygonConnector) postBlock(ctx context.Context, blockNum *big.Int, sin return fmt.Errorf("blockNum is nil") } - block, err := getBlock(ctx, c.logger, c.Connector, blockNum, false) + block, err := getBlock(ctx, c.logger, c.Connector, blockNum, false, false) if err != nil { return fmt.Errorf("failed to get block %s: %w", blockNum.String(), err) } diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 4dfb1bd6c..1017173bb 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -186,6 +186,8 @@ func (w *Watcher) Run(ctx context.Context) error { timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() + safeBlocksSupported := false + var err error if w.chainID == vaa.ChainIDCelo && !w.unsafeDevMode { // When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum. @@ -197,14 +199,20 @@ func (w *Watcher) Run(ctx context.Context) error { return fmt.Errorf("dialing eth client failed: %w", err) } } else if useFinalizedBlocks { - logger.Info("using finalized blocks") + if w.chainID == vaa.ChainIDEthereum && !w.unsafeDevMode { + safeBlocksSupported = true + logger.Info("using finalized blocks, will publish safe blocks") + } else { + logger.Info("using finalized blocks") + } + baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("dialing eth client failed: %w", err) } - w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, true) + w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, true, safeBlocksSupported) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -218,7 +226,7 @@ func (w *Watcher) Run(ctx context.Context) error { return fmt.Errorf("dialing eth client failed: %w", err) } finalizer := finalizers.NewMoonbeamFinalizer(logger, baseConnector) - w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false) + w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false, false) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -235,7 +243,7 @@ func (w *Watcher) Run(ctx context.Context) error { return fmt.Errorf("dialing eth client failed: %w", err) } finalizer := finalizers.NewNeonFinalizer(logger, baseConnector, baseConnector.Client(), w.l1Finalizer) - pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false) + pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false, false) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -258,7 +266,7 @@ func (w *Watcher) Run(ctx context.Context) error { return fmt.Errorf("dialing eth client failed: %w", err) } finalizer := finalizers.NewArbitrumFinalizer(logger, baseConnector, baseConnector.Client(), w.l1Finalizer) - pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false) + pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false, false) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -281,7 +289,7 @@ func (w *Watcher) Run(ctx context.Context) error { return fmt.Errorf("dialing eth client failed: %w", err) } finalizer := finalizers.NewOptimismFinalizer(timeout, logger, baseConnector, w.l1Finalizer) - w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false) + w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false, false) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -348,9 +356,10 @@ func (w *Watcher) Run(ctx context.Context) error { } }() - // Track the current block number so we can compare it to the block number of + // Track the current block numbers so we can compare it to the block number of // the message publication for observation requests. var currentBlockNumber uint64 + var currentSafeBlockNumber uint64 go func() { for { @@ -377,11 +386,7 @@ func (w *Watcher) Run(ctx context.Context) error { // always sends the head before it sends the logs (implicit synchronization // by relying on the same websocket connection). blockNumberU := atomic.LoadUint64(¤tBlockNumber) - if blockNumberU == 0 { - logger.Error("no block number available, ignoring observation request", - zap.String("eth_network", w.networkName)) - continue - } + safeBlockNumberU := atomic.LoadUint64(¤tSafeBlockNumber) timeout, cancel := context.WithTimeout(ctx, 5*time.Second) blockNumber, msgs, err := MessageEventsForTransaction(timeout, w.ethConn, w.contract, w.chainID, tx) @@ -407,6 +412,43 @@ func (w *Watcher) Run(ctx context.Context) error { continue } + if msg.ConsistencyLevel == vaa.ConsistencyLevelSafe && safeBlocksSupported { + if safeBlockNumberU == 0 { + logger.Error("no safe block number available, ignoring observation request", + zap.String("eth_network", w.networkName)) + continue + } + + if blockNumber <= safeBlockNumberU { + logger.Info("re-observed message publication transaction", + zap.Stringer("tx", msg.TxHash), + zap.Stringer("emitter_address", msg.EmitterAddress), + zap.Uint64("sequence", msg.Sequence), + zap.Uint64("current_safe_block", safeBlockNumberU), + zap.Uint64("observed_block", blockNumber), + zap.String("eth_network", w.networkName), + ) + w.msgChan <- msg + } else { + logger.Info("ignoring re-observed message publication transaction", + zap.Stringer("tx", msg.TxHash), + zap.Stringer("emitter_address", msg.EmitterAddress), + zap.Uint64("sequence", msg.Sequence), + zap.Uint64("current_safe_block", safeBlockNumberU), + zap.Uint64("observed_block", blockNumber), + zap.String("eth_network", w.networkName), + ) + } + + continue + } + + if blockNumberU == 0 { + logger.Error("no block number available, ignoring observation request", + zap.String("eth_network", w.networkName)) + continue + } + var expectedConfirmations uint64 if w.waitForConfirmations { expectedConfirmations = uint64(msg.ConsistencyLevel) @@ -554,7 +596,7 @@ func (w *Watcher) Run(ctx context.Context) error { continue } if ev.Number == nil { - logger.Error("new header block number is nil", zap.String("eth_network", w.networkName)) + logger.Error("new header block number is nil", zap.String("eth_network", w.networkName), zap.Bool("is_safe_block", ev.Safe)) continue } @@ -563,6 +605,7 @@ func (w *Watcher) Run(ctx context.Context) error { logger.Info("processing new header", zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), + zap.Bool("is_safe_block", ev.Safe), zap.String("eth_network", w.networkName)) currentEthHeight.WithLabelValues(w.networkName).Set(float64(ev.Number.Int64())) readiness.SetReady(w.readiness) @@ -574,12 +617,24 @@ func (w *Watcher) Run(ctx context.Context) error { w.pendingMu.Lock() blockNumberU := ev.Number.Uint64() - atomic.StoreUint64(¤tBlockNumber, blockNumberU) - atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU) + if ev.Safe { + atomic.StoreUint64(¤tSafeBlockNumber, blockNumberU) + } else { + atomic.StoreUint64(¤tBlockNumber, blockNumberU) + atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU) + } for key, pLock := range w.pending { + // If this block is safe, only process messages wanting safe. + // If it's not safe, only process messages wanting finalized. + if safeBlocksSupported { + if ev.Safe != (pLock.message.ConsistencyLevel == vaa.ConsistencyLevelSafe) { + continue + } + } + var expectedConfirmations uint64 - if w.waitForConfirmations { + if w.waitForConfirmations && !ev.Safe { expectedConfirmations = uint64(pLock.message.ConsistencyLevel) } @@ -591,6 +646,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), + zap.Bool("is_safe_block", ev.Safe), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), zap.Uint64("expectedConfirmations", expectedConfirmations), @@ -622,6 +678,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), + zap.Bool("is_safe_block", ev.Safe), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), zap.Error(err)) @@ -640,6 +697,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), + zap.Bool("is_safe_block", ev.Safe), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), zap.Error(err)) @@ -656,6 +714,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), + zap.Bool("is_safe_block", ev.Safe), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), zap.Error(err)) @@ -672,6 +731,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), + zap.Bool("is_safe_block", ev.Safe), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName)) delete(w.pending, key) @@ -685,6 +745,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Stringer("emitter_address", key.EmitterAddress), zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), + zap.Bool("is_safe_block", ev.Safe), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName)) delete(w.pending, key) @@ -696,6 +757,7 @@ func (w *Watcher) Run(ctx context.Context) error { w.pendingMu.Unlock() logger.Info("processed new header", zap.Stringer("current_block", ev.Number), + zap.Bool("is_safe_block", ev.Safe), zap.Stringer("current_blockhash", currentHash), zap.Duration("took", time.Since(start)), zap.String("eth_network", w.networkName)) diff --git a/sdk/vaa/structs.go b/sdk/vaa/structs.go index 53190e710..666d56e6e 100644 --- a/sdk/vaa/structs.go +++ b/sdk/vaa/structs.go @@ -116,6 +116,7 @@ type ( const ( ConsistencyLevelPublishImmediately = uint8(200) + ConsistencyLevelSafe = uint8(201) ) func (a Address) MarshalJSON() ([]byte, error) {