From 40279fe668d89ab0209ece15c58c96150ed2509b Mon Sep 17 00:00:00 2001 From: Paul Noel <35237584+panoel@users.noreply.github.com> Date: Thu, 20 Oct 2022 11:43:43 -0500 Subject: [PATCH] node/cmd - refactor (#1762) * node/cmd - refactor * node/cmd - add logging * node/cmd - refactor * node/cmd - add logging * node/cmd: address inspection comments --- node/cmd/guardiand/node.go | 373 ++++++++++++++++++++----------------- 1 file changed, 204 insertions(+), 169 deletions(-) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 1a52ccdd3..fb23be1f1 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -397,52 +397,6 @@ func runNode(cmd *cobra.Command, args []string) { // Override the default go-log config, which uses a magic environment variable. ipfslog.SetAllLoggers(lvl) - // Register components for readiness checks. - readiness.RegisterComponent(common.ReadinessEthSyncing) - if *solanaRPC != "" { - readiness.RegisterComponent(common.ReadinessSolanaSyncing) - } - if *pythnetRPC != "" { - readiness.RegisterComponent(common.ReadinessPythNetSyncing) - } - if *terraWS != "" { - readiness.RegisterComponent(common.ReadinessTerraSyncing) - } - if *terra2WS != "" { - readiness.RegisterComponent(common.ReadinessTerra2Syncing) - } - if *algorandIndexerRPC != "" { - readiness.RegisterComponent(common.ReadinessAlgorandSyncing) - } - if *nearRPC != "" { - readiness.RegisterComponent(common.ReadinessNearSyncing) - } - if *xplaWS != "" { - readiness.RegisterComponent(common.ReadinessXplaSyncing) - } - if *wormchainWS != "" { - readiness.RegisterComponent(common.ReadinessWormchainSyncing) - } - readiness.RegisterComponent(common.ReadinessBSCSyncing) - readiness.RegisterComponent(common.ReadinessPolygonSyncing) - readiness.RegisterComponent(common.ReadinessAvalancheSyncing) - readiness.RegisterComponent(common.ReadinessOasisSyncing) - readiness.RegisterComponent(common.ReadinessAuroraSyncing) - readiness.RegisterComponent(common.ReadinessFantomSyncing) - readiness.RegisterComponent(common.ReadinessKaruraSyncing) - readiness.RegisterComponent(common.ReadinessAcalaSyncing) - readiness.RegisterComponent(common.ReadinessKlaytnSyncing) - readiness.RegisterComponent(common.ReadinessCeloSyncing) - readiness.RegisterComponent(common.ReadinessMoonbeamSyncing) - readiness.RegisterComponent(common.ReadinessAptosSyncing) - - if *testnetMode { - readiness.RegisterComponent(common.ReadinessEthRopstenSyncing) - readiness.RegisterComponent(common.ReadinessNeonSyncing) - readiness.RegisterComponent(common.ReadinessInjectiveSyncing) - readiness.RegisterComponent(common.ReadinessArbitrumSyncing) - } - if *statusAddr != "" { // Use a custom routing instead of using http.DefaultServeMux directly to avoid accidentally exposing packages // that register themselves with it by default (like pprof). @@ -592,6 +546,13 @@ func runNode(cmd *cobra.Command, args []string) { } else if *xplaLCD != "" || *xplaContract != "" { logger.Fatal("If --xplaWS is not specified, then --xplaLCD and --xplaContract must not be specified") } + if *wormchainWS != "" { + if *wormchainLCD == "" { + logger.Fatal("If --wormchainWS is specified, then --wormchainLCD must be specified") + } + } else if *wormchainLCD != "" { + logger.Fatal("If --wormchainWS is not specified, then --wormchainLCD must not be specified") + } if *aptosRPC != "" { if *aptosAccount == "" { @@ -851,40 +812,6 @@ func runNode(cmd *cobra.Command, args []string) { // Per-chain observation requests chainObsvReqC := make(map[vaa.ChainID]chan *gossipv1.ObservationRequest) - // Observation request channel for each chain supporting observation requests. - chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDTerra] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDTerra2] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDWormchain] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - if *nearRPC != "" { - chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - } - if *aptosRPC != "" { - chainObsvReqC[vaa.ChainIDAptos] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - } - chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDPythNet] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDXpla] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - if *testnetMode { - chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDEthereumRopsten] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDInjective] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - } - go handleReobservationRequests(rootCtx, clock.New(), logger, obsvReqC, chainObsvReqC) - var notifier *discord.DiscordNotifier if *discordToken != "" { notifier, err = discord.NewDiscordNotifier(*discordToken, *discordChannel, logger) @@ -989,120 +916,175 @@ func runNode(cmd *cobra.Command, args []string) { return err } - if err := supervisor.Run(ctx, "ethwatch", - evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil { - return err - } + // For each chain that wants a watcher, we: + // - create and register a component for readiness checks. + // - create an observation request channel. + // - create the watcher. + // + // NOTE: The "none" is a special indicator to disable a watcher until it is desirable to turn it back on. - if err := supervisor.Run(ctx, "bscwatch", - evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil { - return err - } - - polygonMinConfirmations := uint64(512) - if *testnetMode { - polygonMinConfirmations = 64 - } - - if err := supervisor.Run(ctx, "polygonwatch", - evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil { - // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is - // - // Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect - // developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon - // as specific public guidance exists for Polygon developers. - return err - } - if err := supervisor.Run(ctx, "avalanchewatch", - evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "oasiswatch", - evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "aurorawatch", - evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "fantomwatch", - evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "karurawatch", - evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "acalawatch", - evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "klaytnwatch", - evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "celowatch", - evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "moonbeamwatch", - evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { - return err - } - - if *testnetMode { - if err := supervisor.Run(ctx, "ethropstenwatch", - evm.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "neonwatch", - evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode).Run); err != nil { - return err - } - if err := supervisor.Run(ctx, "arbitrumwatch", - evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode).Run); err != nil { + if shouldStart(ethRPC) { + logger.Info("Starting Ethereum watcher") + readiness.RegisterComponent(common.ReadinessEthSyncing) + chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "ethwatch", + evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil { return err } } - if *terraWS != "" { + if shouldStart(bscRPC) { + logger.Info("Starting BSC watcher") + readiness.RegisterComponent(common.ReadinessBSCSyncing) + chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "bscwatch", + evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil { + return err + } + } + + if shouldStart(polygonRPC) { + polygonMinConfirmations := uint64(512) + if *testnetMode { + polygonMinConfirmations = 64 + } + logger.Info("Starting Polygon watcher") + readiness.RegisterComponent(common.ReadinessPolygonSyncing) + chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "polygonwatch", + evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil { + // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is + // + // Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect + // developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon + // as specific public guidance exists for Polygon developers. + return err + } + } + if shouldStart(avalancheRPC) { + logger.Info("Starting Avalanche watcher") + readiness.RegisterComponent(common.ReadinessAvalancheSyncing) + chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "avalanchewatch", + evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(oasisRPC) { + logger.Info("Starting Oasis watcher") + chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "oasiswatch", + evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(auroraRPC) { + logger.Info("Starting Aurora watcher") + readiness.RegisterComponent(common.ReadinessAuroraSyncing) + chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "aurorawatch", + evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(fantomRPC) { + logger.Info("Starting Fantom watcher") + readiness.RegisterComponent(common.ReadinessFantomSyncing) + chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "fantomwatch", + evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(karuraRPC) { + logger.Info("Starting Karura watcher") + readiness.RegisterComponent(common.ReadinessKaruraSyncing) + chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "karurawatch", + evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(acalaRPC) { + logger.Info("Starting Acala watcher") + readiness.RegisterComponent(common.ReadinessAcalaSyncing) + chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "acalawatch", + evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(klaytnRPC) { + logger.Info("Starting Klaytn watcher") + readiness.RegisterComponent(common.ReadinessKlaytnSyncing) + chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "klaytnwatch", + evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(celoRPC) { + logger.Info("Starting Celo watcher") + readiness.RegisterComponent(common.ReadinessCeloSyncing) + chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "celowatch", + evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(moonbeamRPC) { + logger.Info("Starting Moonbeam watcher") + readiness.RegisterComponent(common.ReadinessMoonbeamSyncing) + chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "moonbeamwatch", + evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { + return err + } + } + + if shouldStart(terraWS) { logger.Info("Starting Terra watcher") + readiness.RegisterComponent(common.ReadinessTerraSyncing) + chainObsvReqC[vaa.ChainIDTerra] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "terrawatch", cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, lockC, chainObsvReqC[vaa.ChainIDTerra], common.ReadinessTerraSyncing, vaa.ChainIDTerra).Run); err != nil { return err } } - if *terra2WS != "" { + if shouldStart(terra2WS) { logger.Info("Starting Terra 2 watcher") + readiness.RegisterComponent(common.ReadinessTerra2Syncing) + chainObsvReqC[vaa.ChainIDTerra2] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "terra2watch", cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, lockC, chainObsvReqC[vaa.ChainIDTerra2], common.ReadinessTerra2Syncing, vaa.ChainIDTerra2).Run); err != nil { return err } } - if *testnetMode { - logger.Info("Starting Injective watcher") - if err := supervisor.Run(ctx, "injectivewatch", - cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, lockC, chainObsvReqC[vaa.ChainIDInjective], common.ReadinessInjectiveSyncing, vaa.ChainIDInjective).Run); err != nil { - return err - } - } - if *xplaWS != "" { + if shouldStart(xplaWS) { logger.Info("Starting XPLA watcher") + readiness.RegisterComponent(common.ReadinessXplaSyncing) + chainObsvReqC[vaa.ChainIDXpla] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "xplawatch", cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, lockC, chainObsvReqC[vaa.ChainIDXpla], common.ReadinessXplaSyncing, vaa.ChainIDXpla).Run); err != nil { return err } } - if *algorandIndexerRPC != "" { + if shouldStart(algorandIndexerRPC) { + logger.Info("Starting Algorand watcher") + readiness.RegisterComponent(common.ReadinessAlgorandSyncing) + chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "algorandwatch", algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, lockC, setC, chainObsvReqC[vaa.ChainIDAlgorand]).Run); err != nil { return err } } - if *nearRPC != "" { + if shouldStart(nearRPC) { + logger.Info("Starting Near watcher") + readiness.RegisterComponent(common.ReadinessNearSyncing) + chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "nearwatch", near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run); err != nil { return err @@ -1110,44 +1092,93 @@ func runNode(cmd *cobra.Command, args []string) { } // Start Wormchain watcher only if configured - if *wormchainWS != "" && *wormchainLCD != "" { + if shouldStart(wormchainWS) { logger.Info("Starting Wormchain watcher") + readiness.RegisterComponent(common.ReadinessWormchainSyncing) + chainObsvReqC[vaa.ChainIDWormchain] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "wormchainwatch", wormchain.NewWatcher(*wormchainWS, *wormchainLCD, lockC, setC, chainObsvReqC[vaa.ChainIDWormchain]).Run); err != nil { return err } } - if *aptosRPC != "" { + if shouldStart(aptosRPC) { + logger.Info("Starting Aptos watcher") + readiness.RegisterComponent(common.ReadinessAptosSyncing) + chainObsvReqC[vaa.ChainIDAptos] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aptoswatch", aptos.NewWatcher(*aptosRPC, *aptosAccount, *aptosHandle, lockC, chainObsvReqC[vaa.ChainIDAptos]).Run); err != nil { return err } } - if *solanaRPC != "" { + if shouldStart(solanaRPC) { + logger.Info("Starting Solana watcher") + readiness.RegisterComponent(common.ReadinessSolanaSyncing) + chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "solwatch-confirmed", solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run); err != nil { return err } - if err := supervisor.Run(ctx, "solwatch-finalized", solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run); err != nil { return err } } - if *pythnetRPC != "" { + if shouldStart(pythnetRPC) { + logger.Info("Starting Pythnet watcher") + readiness.RegisterComponent(common.ReadinessPythNetSyncing) + chainObsvReqC[vaa.ChainIDPythNet] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "pythwatch-confirmed", solana.NewSolanaWatcher(*pythnetRPC, pythnetAddress, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run); err != nil { return err } - if err := supervisor.Run(ctx, "pythwatch-finalized", solana.NewSolanaWatcher(*pythnetRPC, pythnetAddress, lockC, chainObsvReqC[vaa.ChainIDPythNet], rpc.CommitmentFinalized, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run); err != nil { return err } } + if *testnetMode { + if shouldStart(ethRopstenRPC) { + logger.Info("Starting Eth Ropsten watcher") + readiness.RegisterComponent(common.ReadinessEthRopstenSyncing) + chainObsvReqC[vaa.ChainIDEthereumRopsten] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "ethropstenwatch", + evm.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(neonRPC) { + logger.Info("Starting Neon watcher") + readiness.RegisterComponent(common.ReadinessNeonSyncing) + chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "neonwatch", + evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(arbitrumRPC) { + logger.Info("Starting Arbitrum watcher") + readiness.RegisterComponent(common.ReadinessArbitrumSyncing) + chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "arbitrumwatch", + evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode).Run); err != nil { + return err + } + } + if shouldStart(injectiveWS) { + logger.Info("Starting Injective watcher") + readiness.RegisterComponent(common.ReadinessInjectiveSyncing) + chainObsvReqC[vaa.ChainIDInjective] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "injectivewatch", + cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, lockC, chainObsvReqC[vaa.ChainIDInjective], common.ReadinessInjectiveSyncing, vaa.ChainIDInjective).Run); err != nil { + return err + } + } + } + go handleReobservationRequests(rootCtx, clock.New(), logger, obsvReqC, chainObsvReqC) + if gov != nil { err := gov.Run(ctx) if err != nil { @@ -1238,3 +1269,7 @@ func decryptTelemetryServiceAccount() ([]byte, error) { return creds, err } + +func shouldStart(rpc *string) bool { + return *rpc != "" && *rpc != "none" +}