From 6a6c258015fac27464c6a4d7e68102f4aad65100 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Wed, 26 Oct 2022 19:20:13 +0000 Subject: [PATCH] Node: arbitrum watcher redesign --- node/cmd/guardiand/node.go | 35 +++++----- node/pkg/watchers/evm/connectors/common.go | 5 +- node/pkg/watchers/evm/finalizers/arbitrum.go | 65 ++++++------------- .../watchers/evm/interfaces/l1Finalizer.go | 5 ++ node/pkg/watchers/evm/watcher.go | 19 +++++- 5 files changed, 65 insertions(+), 64 deletions(-) create mode 100644 node/pkg/watchers/evm/interfaces/l1Finalizer.go diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 5777bb0a5..fcd772dd6 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -924,12 +924,14 @@ func runNode(cmd *cobra.Command, args []string) { // // NOTE: The "none" is a special indicator to disable a watcher until it is desirable to turn it back on. + var ethWatcher *evm.Watcher if shouldStart(ethRPC) { logger.Info("Starting Ethereum watcher") readiness.RegisterComponent(common.ReadinessEthSyncing) chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], false, nil) if err := supervisor.Run(ctx, "ethwatch", - evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil { + ethWatcher.Run); err != nil { return err } } @@ -939,7 +941,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessBSCSyncing) chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "bscwatch", - evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -953,7 +955,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessPolygonSyncing) chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "polygonwatch", - evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode, nil).Run); err != nil { // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is // // Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect @@ -967,7 +969,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAvalancheSyncing) chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "avalanchewatch", - evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -975,7 +977,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Oasis watcher") chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "oasiswatch", - evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -984,7 +986,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAuroraSyncing) chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aurorawatch", - evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -993,7 +995,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessFantomSyncing) chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "fantomwatch", - evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -1002,7 +1004,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKaruraSyncing) chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "karurawatch", - evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -1011,7 +1013,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAcalaSyncing) chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "acalawatch", - evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -1020,7 +1022,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKlaytnSyncing) chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "klaytnwatch", - evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -1029,7 +1031,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessCeloSyncing) chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "celowatch", - evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -1038,7 +1040,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessMoonbeamSyncing) chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "moonbeamwatch", - evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -1146,7 +1148,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessEthRopstenSyncing) chainObsvReqC[vaa.ChainIDEthereumRopsten] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "ethropstenwatch", - evm.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode, nil).Run); err != nil { return err } } @@ -1155,16 +1157,19 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessNeonSyncing) chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "neonwatch", - evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode, nil).Run); err != nil { return err } } if shouldStart(arbitrumRPC) { + if ethWatcher == nil { + log.Fatalf("if arbitrum is enabled then ethereum must also be enabled.") + } logger.Info("Starting Arbitrum watcher") readiness.RegisterComponent(common.ReadinessArbitrumSyncing) chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "arbitrumwatch", - evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], false, ethWatcher).Run); err != nil { return err } } diff --git a/node/pkg/watchers/evm/connectors/common.go b/node/pkg/watchers/evm/connectors/common.go index 3d79e6779..18eea2937 100644 --- a/node/pkg/watchers/evm/connectors/common.go +++ b/node/pkg/watchers/evm/connectors/common.go @@ -15,8 +15,9 @@ import ( ) type NewBlock struct { - Number *big.Int - Hash common.Hash + Number *big.Int + Hash common.Hash + L1BlockNumber *big.Int // This is only populated on some chains (Arbitrum) } // Connector exposes Wormhole-specific interactions with an EVM-based network diff --git a/node/pkg/watchers/evm/finalizers/arbitrum.go b/node/pkg/watchers/evm/finalizers/arbitrum.go index 1a80e636d..830048c0d 100644 --- a/node/pkg/watchers/evm/finalizers/arbitrum.go +++ b/node/pkg/watchers/evm/finalizers/arbitrum.go @@ -3,69 +3,44 @@ package finalizers import ( "context" "fmt" - "strings" "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" + "github.com/certusone/wormhole/node/pkg/watchers/evm/interfaces" - arbitrumAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/arbitrumabi" - ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind" - ethCommon "github.com/ethereum/go-ethereum/common" ethClient "github.com/ethereum/go-ethereum/ethclient" "go.uber.org/zap" ) // ArbitrumFinalizer implements the finality check for Arbitrum. -// Arbitrum blocks should not be considered finalized until they show up on Ethereum. -// To determine when a block is final, we have to query the NodeInterface precompiled contract on Arbitrum. - -// To build the ABI for the NodeInterface precomile, do the following: -// - Download this file to ethereum/contracts/NodeInterface.sol and build the contracts. -// https://developer.offchainlabs.com/assets/files/NodeInterface-1413a19adf5bfcf97f5a5d3207d1b452.sol -// - Edit ethereum/build/contracts/NodeInterface.json and delete all but the bracketed part of “abi:”. -// - cd thirdparty/abigen and do the following to build the abigen tool: -// go build -mod=readonly -o abigen github.com/ethereum/go-ethereum/cmd/abigen -// - cd to the wormhole directory and do the following: -// - mkdir node/pkg/watchers/evm/connectors/arbitrumabi -// - third_party/abigen/abigen --abi ethereum/build/contracts/NodeInterface.json --pkg abi_arbitrum --out node/pkg/watchers/evm/connectors/arbitrumabi/abi.go +// Arbitrum blocks should not be considered finalized until they are finalized on Ethereum. type ArbitrumFinalizer struct { - logger *zap.Logger - connector connectors.Connector - caller *arbitrumAbi.AbiArbitrumCaller + logger *zap.Logger + connector connectors.Connector + l1Finalizer interfaces.L1Finalizer } -const ( - arbitrumNodeInterfacePrecompileAddr = "0x00000000000000000000000000000000000000C8" -) - -func NewArbitrumFinalizer(logger *zap.Logger, connector connectors.Connector, client *ethClient.Client) *ArbitrumFinalizer { - caller, err := arbitrumAbi.NewAbiArbitrumCaller(ethCommon.HexToAddress(arbitrumNodeInterfacePrecompileAddr), client) - if err != nil { - panic(fmt.Errorf("failed to create Arbitrum finalizer: %w", err)) - } - +func NewArbitrumFinalizer(logger *zap.Logger, connector connectors.Connector, client *ethClient.Client, l1Finalizer interfaces.L1Finalizer) *ArbitrumFinalizer { return &ArbitrumFinalizer{ - logger: logger, - connector: connector, - caller: caller, + logger: logger, + connector: connector, + l1Finalizer: l1Finalizer, } } -// IsBlockFinalized queries the NodeInfrastructure precompiled contract to see if the L2 (Arbitrum) block has appeared -// in an L1 (Ethereum) block. We don't really care what L2 block it appeared in, just that it has. +// IsBlockFinalized compares the number of the L1 block containing the Arbitrum block with the latest finalized block on Ethereum. func (a *ArbitrumFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) { - _, err := a.caller.FindBatchContainingBlock(ðBind.CallOpts{Context: ctx}, block.Number.Uint64()) - if err != nil { - // If it hasn't been published yet, the method returns an error, so we check for that and treat it as - // not finalized, rather than as an error. Here's what that looks like: - // "requested block 430842 is after latest on-chain block 430820 published in batch 4686" - if strings.ContainsAny(err.Error(), "is after latest on-chain block") { - return false, nil - } - - return false, err + if block.L1BlockNumber == nil { + return false, fmt.Errorf("l1 block number is nil") } - return true, nil + latestL1Block := a.l1Finalizer.GetLatestFinalizedBlockNumber() + if latestL1Block == 0 { + // This happens on start up. + return false, nil + } + + isFinalized := block.L1BlockNumber.Uint64() <= latestL1Block + return isFinalized, nil } diff --git a/node/pkg/watchers/evm/interfaces/l1Finalizer.go b/node/pkg/watchers/evm/interfaces/l1Finalizer.go new file mode 100644 index 000000000..411c9b57a --- /dev/null +++ b/node/pkg/watchers/evm/interfaces/l1Finalizer.go @@ -0,0 +1,5 @@ +package interfaces + +type L1Finalizer interface { + GetLatestFinalizedBlockNumber() uint64 +} diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index c4add38f2..e6371f95f 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -10,6 +10,7 @@ import ( "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi" "github.com/certusone/wormhole/node/pkg/watchers/evm/finalizers" + "github.com/certusone/wormhole/node/pkg/watchers/evm/interfaces" "github.com/certusone/wormhole/node/pkg/p2p" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" @@ -105,6 +106,9 @@ type ( // Interface to the chain specific ethereum library. ethConn connectors.Connector unsafeDevMode bool + + latestFinalizedBlockNumber uint64 + l1Finalizer interfaces.L1Finalizer } pendingKey struct { @@ -130,7 +134,9 @@ func NewEthWatcher( setEvents chan *common.GuardianSet, minConfirmations uint64, obsvReqC chan *gossipv1.ObservationRequest, - unsafeDevMode bool) *Watcher { + unsafeDevMode bool, + l1Finalizer interfaces.L1Finalizer, +) *Watcher { return &Watcher{ url: url, @@ -144,6 +150,7 @@ func NewEthWatcher( obsvReqC: obsvReqC, pending: map[pendingKey]*pendingMessage{}, unsafeDevMode: unsafeDevMode, + l1Finalizer: l1Finalizer, } } @@ -228,13 +235,16 @@ func (w *Watcher) Run(ctx context.Context) error { return fmt.Errorf("creating poll connector failed: %w", err) } } else if w.chainID == vaa.ChainIDArbitrum && !w.unsafeDevMode { + if w.l1Finalizer == nil { + return fmt.Errorf("unable to create arbitrum watcher because the l1 finalizer is not set") + } baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("dialing eth client failed: %w", err) } - finalizer := finalizers.NewArbitrumFinalizer(logger, baseConnector, baseConnector.Client()) + finalizer := finalizers.NewArbitrumFinalizer(logger, baseConnector, baseConnector.Client(), w.l1Finalizer) pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() @@ -518,6 +528,7 @@ func (w *Watcher) Run(ctx context.Context) error { blockNumberU := ev.Number.Uint64() atomic.StoreUint64(¤tBlockNumber, blockNumberU) + atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU) for key, pLock := range w.pending { expectedConfirmations := uint64(pLock.message.ConsistencyLevel) @@ -744,3 +755,7 @@ func (w *Watcher) getAcalaMode(ctx context.Context) (useFinalizedBlocks bool, er return } + +func (w *Watcher) GetLatestFinalizedBlockNumber() uint64 { + return atomic.LoadUint64(&w.latestFinalizedBlockNumber) +}