Node: arbitrum watcher redesign

This commit is contained in:
Bruce Riley 2022-10-26 19:20:13 +00:00 committed by Evan Gray
parent ea50bee9d2
commit 6a6c258015
5 changed files with 65 additions and 64 deletions

View File

@ -924,12 +924,14 @@ func runNode(cmd *cobra.Command, args []string) {
//
// NOTE: The "none" is a special indicator to disable a watcher until it is desirable to turn it back on.
var ethWatcher *evm.Watcher
if shouldStart(ethRPC) {
logger.Info("Starting Ethereum watcher")
readiness.RegisterComponent(common.ReadinessEthSyncing)
chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], false, nil)
if err := supervisor.Run(ctx, "ethwatch",
evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil {
ethWatcher.Run); err != nil {
return err
}
}
@ -939,7 +941,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessBSCSyncing)
chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "bscwatch",
evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -953,7 +955,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessPolygonSyncing)
chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "polygonwatch",
evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode, nil).Run); err != nil {
// Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is
//
// Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect
@ -967,7 +969,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessAvalancheSyncing)
chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "avalanchewatch",
evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -975,7 +977,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Oasis watcher")
chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "oasiswatch",
evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -984,7 +986,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessAuroraSyncing)
chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "aurorawatch",
evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -993,7 +995,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessFantomSyncing)
chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "fantomwatch",
evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -1002,7 +1004,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessKaruraSyncing)
chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "karurawatch",
evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -1011,7 +1013,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessAcalaSyncing)
chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "acalawatch",
evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -1020,7 +1022,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessKlaytnSyncing)
chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "klaytnwatch",
evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -1029,7 +1031,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessCeloSyncing)
chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "celowatch",
evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -1038,7 +1040,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessMoonbeamSyncing)
chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "moonbeamwatch",
evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -1146,7 +1148,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessEthRopstenSyncing)
chainObsvReqC[vaa.ChainIDEthereumRopsten] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "ethropstenwatch",
evm.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
@ -1155,16 +1157,19 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessNeonSyncing)
chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "neonwatch",
evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode, nil).Run); err != nil {
return err
}
}
if shouldStart(arbitrumRPC) {
if ethWatcher == nil {
log.Fatalf("if arbitrum is enabled then ethereum must also be enabled.")
}
logger.Info("Starting Arbitrum watcher")
readiness.RegisterComponent(common.ReadinessArbitrumSyncing)
chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "arbitrumwatch",
evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], false, ethWatcher).Run); err != nil {
return err
}
}

View File

@ -15,8 +15,9 @@ import (
)
type NewBlock struct {
Number *big.Int
Hash common.Hash
Number *big.Int
Hash common.Hash
L1BlockNumber *big.Int // This is only populated on some chains (Arbitrum)
}
// Connector exposes Wormhole-specific interactions with an EVM-based network

View File

@ -3,69 +3,44 @@ package finalizers
import (
"context"
"fmt"
"strings"
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
"github.com/certusone/wormhole/node/pkg/watchers/evm/interfaces"
arbitrumAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/arbitrumabi"
ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
ethCommon "github.com/ethereum/go-ethereum/common"
ethClient "github.com/ethereum/go-ethereum/ethclient"
"go.uber.org/zap"
)
// ArbitrumFinalizer implements the finality check for Arbitrum.
// Arbitrum blocks should not be considered finalized until they show up on Ethereum.
// To determine when a block is final, we have to query the NodeInterface precompiled contract on Arbitrum.
// To build the ABI for the NodeInterface precomile, do the following:
// - Download this file to ethereum/contracts/NodeInterface.sol and build the contracts.
// https://developer.offchainlabs.com/assets/files/NodeInterface-1413a19adf5bfcf97f5a5d3207d1b452.sol
// - Edit ethereum/build/contracts/NodeInterface.json and delete all but the bracketed part of “abi:”.
// - cd thirdparty/abigen and do the following to build the abigen tool:
// go build -mod=readonly -o abigen github.com/ethereum/go-ethereum/cmd/abigen
// - cd to the wormhole directory and do the following:
// - mkdir node/pkg/watchers/evm/connectors/arbitrumabi
// - third_party/abigen/abigen --abi ethereum/build/contracts/NodeInterface.json --pkg abi_arbitrum --out node/pkg/watchers/evm/connectors/arbitrumabi/abi.go
// Arbitrum blocks should not be considered finalized until they are finalized on Ethereum.
type ArbitrumFinalizer struct {
logger *zap.Logger
connector connectors.Connector
caller *arbitrumAbi.AbiArbitrumCaller
logger *zap.Logger
connector connectors.Connector
l1Finalizer interfaces.L1Finalizer
}
const (
arbitrumNodeInterfacePrecompileAddr = "0x00000000000000000000000000000000000000C8"
)
func NewArbitrumFinalizer(logger *zap.Logger, connector connectors.Connector, client *ethClient.Client) *ArbitrumFinalizer {
caller, err := arbitrumAbi.NewAbiArbitrumCaller(ethCommon.HexToAddress(arbitrumNodeInterfacePrecompileAddr), client)
if err != nil {
panic(fmt.Errorf("failed to create Arbitrum finalizer: %w", err))
}
func NewArbitrumFinalizer(logger *zap.Logger, connector connectors.Connector, client *ethClient.Client, l1Finalizer interfaces.L1Finalizer) *ArbitrumFinalizer {
return &ArbitrumFinalizer{
logger: logger,
connector: connector,
caller: caller,
logger: logger,
connector: connector,
l1Finalizer: l1Finalizer,
}
}
// IsBlockFinalized queries the NodeInfrastructure precompiled contract to see if the L2 (Arbitrum) block has appeared
// in an L1 (Ethereum) block. We don't really care what L2 block it appeared in, just that it has.
// IsBlockFinalized compares the number of the L1 block containing the Arbitrum block with the latest finalized block on Ethereum.
func (a *ArbitrumFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) {
_, err := a.caller.FindBatchContainingBlock(&ethBind.CallOpts{Context: ctx}, block.Number.Uint64())
if err != nil {
// If it hasn't been published yet, the method returns an error, so we check for that and treat it as
// not finalized, rather than as an error. Here's what that looks like:
// "requested block 430842 is after latest on-chain block 430820 published in batch 4686"
if strings.ContainsAny(err.Error(), "is after latest on-chain block") {
return false, nil
}
return false, err
if block.L1BlockNumber == nil {
return false, fmt.Errorf("l1 block number is nil")
}
return true, nil
latestL1Block := a.l1Finalizer.GetLatestFinalizedBlockNumber()
if latestL1Block == 0 {
// This happens on start up.
return false, nil
}
isFinalized := block.L1BlockNumber.Uint64() <= latestL1Block
return isFinalized, nil
}

View File

@ -0,0 +1,5 @@
package interfaces
type L1Finalizer interface {
GetLatestFinalizedBlockNumber() uint64
}

View File

@ -10,6 +10,7 @@ import (
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
"github.com/certusone/wormhole/node/pkg/watchers/evm/finalizers"
"github.com/certusone/wormhole/node/pkg/watchers/evm/interfaces"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
@ -105,6 +106,9 @@ type (
// Interface to the chain specific ethereum library.
ethConn connectors.Connector
unsafeDevMode bool
latestFinalizedBlockNumber uint64
l1Finalizer interfaces.L1Finalizer
}
pendingKey struct {
@ -130,7 +134,9 @@ func NewEthWatcher(
setEvents chan *common.GuardianSet,
minConfirmations uint64,
obsvReqC chan *gossipv1.ObservationRequest,
unsafeDevMode bool) *Watcher {
unsafeDevMode bool,
l1Finalizer interfaces.L1Finalizer,
) *Watcher {
return &Watcher{
url: url,
@ -144,6 +150,7 @@ func NewEthWatcher(
obsvReqC: obsvReqC,
pending: map[pendingKey]*pendingMessage{},
unsafeDevMode: unsafeDevMode,
l1Finalizer: l1Finalizer,
}
}
@ -228,13 +235,16 @@ func (w *Watcher) Run(ctx context.Context) error {
return fmt.Errorf("creating poll connector failed: %w", err)
}
} else if w.chainID == vaa.ChainIDArbitrum && !w.unsafeDevMode {
if w.l1Finalizer == nil {
return fmt.Errorf("unable to create arbitrum watcher because the l1 finalizer is not set")
}
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
finalizer := finalizers.NewArbitrumFinalizer(logger, baseConnector, baseConnector.Client())
finalizer := finalizers.NewArbitrumFinalizer(logger, baseConnector, baseConnector.Client(), w.l1Finalizer)
pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
@ -518,6 +528,7 @@ func (w *Watcher) Run(ctx context.Context) error {
blockNumberU := ev.Number.Uint64()
atomic.StoreUint64(&currentBlockNumber, blockNumberU)
atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU)
for key, pLock := range w.pending {
expectedConfirmations := uint64(pLock.message.ConsistencyLevel)
@ -744,3 +755,7 @@ func (w *Watcher) getAcalaMode(ctx context.Context) (useFinalizedBlocks bool, er
return
}
func (w *Watcher) GetLatestFinalizedBlockNumber() uint64 {
return atomic.LoadUint64(&w.latestFinalizedBlockNumber)
}