From 49b3b6ab610f6d293b0b4b64954faf50c87fd0da Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Tue, 25 Oct 2022 15:13:36 -0500 Subject: [PATCH] Node: make karura/acala watcher use finalized blocks (#1747) * Node: acala watcher use finalized blocks * node: acala not in safe mode assume finalized mode * Remove unused function * Changes signature of checkForSafeMode() * Beef up the safe mode check * Remove unnecessary function --- node/pkg/watchers/evm/connectors/poller.go | 35 ++++------- node/pkg/watchers/evm/watcher.go | 67 ++++++++++++++-------- 2 files changed, 55 insertions(+), 47 deletions(-) diff --git a/node/pkg/watchers/evm/connectors/poller.go b/node/pkg/watchers/evm/connectors/poller.go index 15683a9f7..734f3a9ac 100644 --- a/node/pkg/watchers/evm/connectors/poller.go +++ b/node/pkg/watchers/evm/connectors/poller.go @@ -23,22 +23,19 @@ type PollFinalizer interface { // finalizer which will be used to only return finalized blocks on subscriptions. type BlockPollConnector struct { Connector - Delay time.Duration - isEthPoS bool - hasEthSwitchedToPoS bool - finalizer PollFinalizer - - blockFeed ethEvent.Feed - errFeed ethEvent.Feed + Delay time.Duration + useFinalized bool + finalizer PollFinalizer + blockFeed ethEvent.Feed + errFeed ethEvent.Feed } -func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, isEthPoS bool) (*BlockPollConnector, error) { +func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, useFinalized bool) (*BlockPollConnector, error) { connector := &BlockPollConnector{ - Connector: baseConnector, - Delay: delay, - isEthPoS: isEthPoS, - hasEthSwitchedToPoS: false, - finalizer: finalizer, + Connector: baseConnector, + Delay: delay, + useFinalized: useFinalized, + finalizer: finalizer, } err := supervisor.Run(ctx, "blockPoller", connector.run) if err != nil { @@ -134,10 +131,6 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, return } -func (b *BlockPollConnector) SetEthSwitched() { - b.hasEthSwitchedToPoS = true -} - func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { sub := NewPollSubscription() blockSub := b.blockFeed.Subscribe(sink) @@ -171,7 +164,7 @@ func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, n var numStr string if number != nil { numStr = ethHexUtils.EncodeBig(number) - } else if b.hasEthSwitchedToPoS { + } else if b.useFinalized { numStr = "finalized" } else { numStr = "latest" @@ -196,12 +189,6 @@ func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, n ) return nil, fmt.Errorf("failed to unmarshal block: Number is nil") } - d := big.Int(*m.Difficulty) - if b.isEthPoS && !b.hasEthSwitchedToPoS && d.Cmp(big.NewInt(0)) == 0 { - logger.Info("switching from latest to finalized", zap.Duration("delay", b.Delay)) - b.SetEthSwitched() - return b.getBlock(ctx, logger, number) - } n := big.Int(*m.Number) return &NewBlock{ Number: &n, diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 10c024d9c..c4add38f2 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -19,6 +19,7 @@ import ( "github.com/prometheus/client_golang/prometheus" eth_common "github.com/ethereum/go-ethereum/common" + eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil" "go.uber.org/zap" "github.com/certusone/wormhole/node/pkg/common" @@ -102,9 +103,8 @@ type ( minConfirmations uint64 // Interface to the chain specific ethereum library. - ethConn connectors.Connector - shouldCheckSafeMode bool - unsafeDevMode bool + ethConn connectors.Connector + unsafeDevMode bool } pendingKey struct { @@ -133,28 +133,33 @@ func NewEthWatcher( unsafeDevMode bool) *Watcher { return &Watcher{ - url: url, - contract: contract, - networkName: networkName, - readiness: readiness, - minConfirmations: minConfirmations, - chainID: chainID, - msgChan: messageEvents, - setChan: setEvents, - obsvReqC: obsvReqC, - pending: map[pendingKey]*pendingMessage{}, - shouldCheckSafeMode: (chainID == vaa.ChainIDKarura || chainID == vaa.ChainIDAcala) && (!unsafeDevMode), - unsafeDevMode: unsafeDevMode, + url: url, + contract: contract, + networkName: networkName, + readiness: readiness, + minConfirmations: minConfirmations, + chainID: chainID, + msgChan: messageEvents, + setChan: setEvents, + obsvReqC: obsvReqC, + pending: map[pendingKey]*pendingMessage{}, + unsafeDevMode: unsafeDevMode, } } func (w *Watcher) Run(ctx context.Context) error { logger := supervisor.Logger(ctx) - if w.shouldCheckSafeMode { - if err := w.checkForSafeMode(ctx); err != nil { + useFinalizedBlocks := (w.chainID == vaa.ChainIDEthereum && (!w.unsafeDevMode)) + if (w.chainID == vaa.ChainIDKarura || w.chainID == vaa.ChainIDAcala) && (!w.unsafeDevMode) { + ufb, err := w.getAcalaMode(ctx) + if err != nil { return err } + + if ufb { + useFinalizedBlocks = true + } } // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing) @@ -175,7 +180,8 @@ func (w *Watcher) Run(ctx context.Context) error { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("dialing eth client failed: %w", err) } - } else if w.chainID == vaa.ChainIDEthereum && !w.unsafeDevMode { + } else if useFinalizedBlocks { + logger.Info("using finalized blocks") baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() @@ -702,24 +708,39 @@ func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector) return currentIndex, &gs, nil } -func (w *Watcher) checkForSafeMode(ctx context.Context) error { +func (w *Watcher) getAcalaMode(ctx context.Context) (useFinalizedBlocks bool, errRet error) { timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() c, err := rpc.DialContext(timeout, w.url) if err != nil { - return fmt.Errorf("failed to connect to url %s to check for safe mode: %w", w.url, err) + errRet = fmt.Errorf("failed to connect to url %s to check acala mode: %w", w.url, err) + return } + // First check to see if polling for finalized blocks is suported. + type Marshaller struct { + Number *eth_hexutil.Big + } + + var m Marshaller + err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "finalized", false) + if err == nil { + useFinalizedBlocks = true + return + } + + // If finalized blocks are not supported, then we had better be in safe mode! var safe bool err = c.CallContext(ctx, &safe, "net_isSafeMode") if err != nil { - return fmt.Errorf("check for safe mode for url %s failed: %w", w.url, err) + errRet = fmt.Errorf("check for safe mode for url %s failed: %w", w.url, err) + return } if !safe { - return fmt.Errorf("url %s is not using safe mode", w.url) + errRet = fmt.Errorf("url %s does not support finalized blocks and is not using safe mode", w.url) } - return nil + return }