diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 37c9310b2..2c2559346 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -1097,9 +1097,9 @@ func runNode(cmd *cobra.Command, args []string) { var ethWatcher *evm.Watcher if shouldStart(ethRPC) { logger.Info("Starting Ethereum watcher") - readiness.RegisterComponent(common.ReadinessEthSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDEthereum) chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, chainMsgC[vaa.ChainIDEthereum], setWriteC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode) + ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", vaa.ChainIDEthereum, chainMsgC[vaa.ChainIDEthereum], setWriteC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode) if err := supervisor.Run(ctx, "ethwatch", common.WrapWithScissors(ethWatcher.Run, "ethwatch")); err != nil { return err @@ -1108,9 +1108,9 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(bscRPC) { logger.Info("Starting BSC watcher") - readiness.RegisterComponent(common.ReadinessBSCSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDBSC) chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, chainMsgC[vaa.ChainIDBSC], nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode) + bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", vaa.ChainIDBSC, chainMsgC[vaa.ChainIDBSC], nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode) bscWatcher.SetWaitForConfirmations(true) if err := supervisor.Run(ctx, "bscwatch", common.WrapWithScissors(bscWatcher.Run, "bscwatch")); err != nil { return err @@ -1124,9 +1124,9 @@ func runNode(cmd *cobra.Command, args []string) { log.Fatal("Polygon checkpointing is required in mainnet") } logger.Info("Starting Polygon watcher") - readiness.RegisterComponent(common.ReadinessPolygonSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDPolygon) chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, chainMsgC[vaa.ChainIDPolygon], nil, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode) + polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", vaa.ChainIDPolygon, chainMsgC[vaa.ChainIDPolygon], nil, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode) polygonWatcher.SetWaitForConfirmations(waitForConfirmations) if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil { return err @@ -1137,81 +1137,82 @@ func runNode(cmd *cobra.Command, args []string) { } if shouldStart(avalancheRPC) { logger.Info("Starting Avalanche watcher") - readiness.RegisterComponent(common.ReadinessAvalancheSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDAvalanche) chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "avalanchewatch", - common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, chainMsgC[vaa.ChainIDAvalanche], nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", vaa.ChainIDAvalanche, chainMsgC[vaa.ChainIDAvalanche], nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil { return err } } if shouldStart(oasisRPC) { logger.Info("Starting Oasis watcher") + common.MustRegisterReadinessSyncing(vaa.ChainIDOasis) chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "oasiswatch", - common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, chainMsgC[vaa.ChainIDOasis], nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", vaa.ChainIDOasis, chainMsgC[vaa.ChainIDOasis], nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil { return err } } if shouldStart(auroraRPC) { logger.Info("Starting Aurora watcher") - readiness.RegisterComponent(common.ReadinessAuroraSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDAurora) chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aurorawatch", - common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, chainMsgC[vaa.ChainIDAurora], nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", vaa.ChainIDAurora, chainMsgC[vaa.ChainIDAurora], nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil { return err } } if shouldStart(fantomRPC) { logger.Info("Starting Fantom watcher") - readiness.RegisterComponent(common.ReadinessFantomSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDFantom) chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "fantomwatch", - common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, chainMsgC[vaa.ChainIDFantom], nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", vaa.ChainIDFantom, chainMsgC[vaa.ChainIDFantom], nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil { return err } } if shouldStart(karuraRPC) { logger.Info("Starting Karura watcher") - readiness.RegisterComponent(common.ReadinessKaruraSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDKarura) chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "karurawatch", - common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, chainMsgC[vaa.ChainIDKarura], nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", vaa.ChainIDKarura, chainMsgC[vaa.ChainIDKarura], nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil { return err } } if shouldStart(acalaRPC) { logger.Info("Starting Acala watcher") - readiness.RegisterComponent(common.ReadinessAcalaSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDAcala) chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "acalawatch", - common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, chainMsgC[vaa.ChainIDAcala], nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", vaa.ChainIDAcala, chainMsgC[vaa.ChainIDAcala], nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil { return err } } if shouldStart(klaytnRPC) { logger.Info("Starting Klaytn watcher") - readiness.RegisterComponent(common.ReadinessKlaytnSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDKlaytn) chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "klaytnwatch", - common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, chainMsgC[vaa.ChainIDKlaytn], nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", vaa.ChainIDKlaytn, chainMsgC[vaa.ChainIDKlaytn], nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil { return err } } if shouldStart(celoRPC) { logger.Info("Starting Celo watcher") - readiness.RegisterComponent(common.ReadinessCeloSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDCelo) chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "celowatch", - common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, chainMsgC[vaa.ChainIDCelo], nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", vaa.ChainIDCelo, chainMsgC[vaa.ChainIDCelo], nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil { return err } } if shouldStart(moonbeamRPC) { logger.Info("Starting Moonbeam watcher") - readiness.RegisterComponent(common.ReadinessMoonbeamSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDMoonbeam) chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "moonbeamwatch", - common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, chainMsgC[vaa.ChainIDMoonbeam], nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", vaa.ChainIDMoonbeam, chainMsgC[vaa.ChainIDMoonbeam], nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil { return err } } @@ -1220,9 +1221,9 @@ func runNode(cmd *cobra.Command, args []string) { log.Fatalf("if arbitrum is enabled then ethereum must also be enabled.") } logger.Info("Starting Arbitrum watcher") - readiness.RegisterComponent(common.ReadinessArbitrumSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDArbitrum) chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, chainMsgC[vaa.ChainIDArbitrum], nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode) + arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", vaa.ChainIDArbitrum, chainMsgC[vaa.ChainIDArbitrum], nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode) arbitrumWatcher.SetL1Finalizer(ethWatcher) if err := supervisor.Run(ctx, "arbitrumwatch", common.WrapWithScissors(arbitrumWatcher.Run, "arbitrumwatch")); err != nil { return err @@ -1230,9 +1231,9 @@ func runNode(cmd *cobra.Command, args []string) { } if shouldStart(optimismRPC) { logger.Info("Starting Optimism watcher") - readiness.RegisterComponent(common.ReadinessOptimismSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDOptimism) chainObsvReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, chainMsgC[vaa.ChainIDOptimism], nil, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode) + optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", vaa.ChainIDOptimism, chainMsgC[vaa.ChainIDOptimism], nil, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode) // If rootChainParams are set, pass them in for pre-Bedrock mode if *optimismCtcRpc != "" || *optimismCtcContractAddress != "" { @@ -1251,37 +1252,37 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(terraWS) { logger.Info("Starting Terra watcher") - readiness.RegisterComponent(common.ReadinessTerraSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDTerra) chainObsvReqC[vaa.ChainIDTerra] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "terrawatch", - common.WrapWithScissors(cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, chainMsgC[vaa.ChainIDTerra], chainObsvReqC[vaa.ChainIDTerra], common.ReadinessTerraSyncing, vaa.ChainIDTerra).Run, "terrawatch")); err != nil { + common.WrapWithScissors(cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, chainMsgC[vaa.ChainIDTerra], chainObsvReqC[vaa.ChainIDTerra], vaa.ChainIDTerra).Run, "terrawatch")); err != nil { return err } } if shouldStart(terra2WS) { logger.Info("Starting Terra 2 watcher") - readiness.RegisterComponent(common.ReadinessTerra2Syncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDTerra2) chainObsvReqC[vaa.ChainIDTerra2] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "terra2watch", - common.WrapWithScissors(cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, chainMsgC[vaa.ChainIDTerra2], chainObsvReqC[vaa.ChainIDTerra2], common.ReadinessTerra2Syncing, vaa.ChainIDTerra2).Run, "terra2watch")); err != nil { + common.WrapWithScissors(cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, chainMsgC[vaa.ChainIDTerra2], chainObsvReqC[vaa.ChainIDTerra2], vaa.ChainIDTerra2).Run, "terra2watch")); err != nil { return err } } if shouldStart(xplaWS) { logger.Info("Starting XPLA watcher") - readiness.RegisterComponent(common.ReadinessXplaSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDXpla) chainObsvReqC[vaa.ChainIDXpla] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "xplawatch", - common.WrapWithScissors(cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, chainMsgC[vaa.ChainIDXpla], chainObsvReqC[vaa.ChainIDXpla], common.ReadinessXplaSyncing, vaa.ChainIDXpla).Run, "xplawatch")); err != nil { + common.WrapWithScissors(cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, chainMsgC[vaa.ChainIDXpla], chainObsvReqC[vaa.ChainIDXpla], vaa.ChainIDXpla).Run, "xplawatch")); err != nil { return err } } if shouldStart(algorandIndexerRPC) { logger.Info("Starting Algorand watcher") - readiness.RegisterComponent(common.ReadinessAlgorandSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDAlgorand) chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "algorandwatch", common.WrapWithScissors(algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, chainMsgC[vaa.ChainIDAlgorand], chainObsvReqC[vaa.ChainIDAlgorand]).Run, "algorandwatch")); err != nil { @@ -1290,7 +1291,7 @@ func runNode(cmd *cobra.Command, args []string) { } if shouldStart(nearRPC) { logger.Info("Starting Near watcher") - readiness.RegisterComponent(common.ReadinessNearSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDNear) chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "nearwatch", common.WrapWithScissors(near.NewWatcher(*nearRPC, *nearContract, chainMsgC[vaa.ChainIDNear], chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run, "nearwatch")); err != nil { @@ -1301,7 +1302,7 @@ func runNode(cmd *cobra.Command, args []string) { // Start Wormchain watcher only if configured if shouldStart(wormchainWS) { logger.Info("Starting Wormchain watcher") - readiness.RegisterComponent(common.ReadinessWormchainSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDWormchain) chainObsvReqC[vaa.ChainIDWormchain] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "wormchainwatch", wormchain.NewWatcher(*wormchainWS, *wormchainLCD, chainMsgC[vaa.ChainIDWormchain], chainObsvReqC[vaa.ChainIDWormchain]).Run); err != nil { @@ -1310,7 +1311,7 @@ func runNode(cmd *cobra.Command, args []string) { } if shouldStart(aptosRPC) { logger.Info("Starting Aptos watcher") - readiness.RegisterComponent(common.ReadinessAptosSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDAptos) chainObsvReqC[vaa.ChainIDAptos] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aptoswatch", aptos.NewWatcher(*aptosRPC, *aptosAccount, *aptosHandle, chainMsgC[vaa.ChainIDAptos], chainObsvReqC[vaa.ChainIDAptos]).Run); err != nil { @@ -1320,7 +1321,7 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(suiRPC) { logger.Info("Starting Sui watcher") - readiness.RegisterComponent(common.ReadinessSuiSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDSui) chainObsvReqC[vaa.ChainIDSui] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "suiwatch", sui.NewWatcher(*suiRPC, *suiWS, *suiAccount, *suiPackage, *unsafeDevMode, chainMsgC[vaa.ChainIDSui], chainObsvReqC[vaa.ChainIDSui]).Run); err != nil { @@ -1331,13 +1332,13 @@ func runNode(cmd *cobra.Command, args []string) { var solanaFinalizedWatcher *solana.SolanaWatcher if shouldStart(solanaRPC) { logger.Info("Starting Solana watcher") - readiness.RegisterComponent(common.ReadinessSolanaSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDSolana) chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "solwatch-confirmed", - common.WrapWithScissors(solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run, "solwatch-confirmed")); err != nil { + common.WrapWithScissors(solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], nil, rpc.CommitmentConfirmed, vaa.ChainIDSolana).Run, "solwatch-confirmed")); err != nil { return err } - solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana) + solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, vaa.ChainIDSolana) if err := supervisor.Run(ctx, "solwatch-finalized", common.WrapWithScissors(solanaFinalizedWatcher.Run, "solwatch-finalized")); err != nil { return err } @@ -1345,20 +1346,20 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(pythnetRPC) { logger.Info("Starting Pythnet watcher") - readiness.RegisterComponent(common.ReadinessPythNetSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDPythNet) chainObsvReqC[vaa.ChainIDPythNet] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "pythwatch-confirmed", - common.WrapWithScissors(solana.NewSolanaWatcher(*pythnetRPC, pythnetWS, pythnetAddress, *pythnetContract, chainMsgC[vaa.ChainIDPythNet], nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run, "pythwatch-confirmed")); err != nil { + common.WrapWithScissors(solana.NewSolanaWatcher(*pythnetRPC, pythnetWS, pythnetAddress, *pythnetContract, chainMsgC[vaa.ChainIDPythNet], nil, rpc.CommitmentConfirmed, vaa.ChainIDPythNet).Run, "pythwatch-confirmed")); err != nil { return err } } if shouldStart(injectiveWS) { logger.Info("Starting Injective watcher") - readiness.RegisterComponent(common.ReadinessInjectiveSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDInjective) chainObsvReqC[vaa.ChainIDInjective] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "injectivewatch", - common.WrapWithScissors(cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, chainMsgC[vaa.ChainIDInjective], chainObsvReqC[vaa.ChainIDInjective], common.ReadinessInjectiveSyncing, vaa.ChainIDInjective).Run, "injectivewatch")); err != nil { + common.WrapWithScissors(cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, chainMsgC[vaa.ChainIDInjective], chainObsvReqC[vaa.ChainIDInjective], vaa.ChainIDInjective).Run, "injectivewatch")); err != nil { return err } } @@ -1369,9 +1370,9 @@ func runNode(cmd *cobra.Command, args []string) { log.Fatalf("if neon is enabled then solana must also be enabled.") } logger.Info("Starting Neon watcher") - readiness.RegisterComponent(common.ReadinessNeonSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDNeon) chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, chainMsgC[vaa.ChainIDNeon], nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode) + neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", vaa.ChainIDNeon, chainMsgC[vaa.ChainIDNeon], nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode) neonWatcher.SetL1Finalizer(solanaFinalizedWatcher) if err := supervisor.Run(ctx, "neonwatch", common.WrapWithScissors(neonWatcher.Run, "neonwatch")); err != nil { return err @@ -1379,9 +1380,9 @@ func runNode(cmd *cobra.Command, args []string) { } if shouldStart(baseRPC) { logger.Info("Starting Base watcher") - readiness.RegisterComponent(common.ReadinessBaseSyncing) + common.MustRegisterReadinessSyncing(vaa.ChainIDBase) chainObsvReqC[vaa.ChainIDBase] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - baseWatcher := evm.NewEthWatcher(*baseRPC, baseContractAddr, "base", common.ReadinessBaseSyncing, vaa.ChainIDBase, chainMsgC[vaa.ChainIDBase], nil, chainObsvReqC[vaa.ChainIDBase], *unsafeDevMode) + baseWatcher := evm.NewEthWatcher(*baseRPC, baseContractAddr, "base", vaa.ChainIDBase, chainMsgC[vaa.ChainIDBase], nil, chainObsvReqC[vaa.ChainIDBase], *unsafeDevMode) if err := supervisor.Run(ctx, "basewatch", common.WrapWithScissors(baseWatcher.Run, "basewatch")); err != nil { return err } diff --git a/node/pkg/common/readiness.go b/node/pkg/common/readiness.go index 229aa69cc..01d886e7f 100644 --- a/node/pkg/common/readiness.go +++ b/node/pkg/common/readiness.go @@ -1,33 +1,40 @@ package common -import "github.com/certusone/wormhole/node/pkg/readiness" +import ( + "fmt" + + "github.com/certusone/wormhole/node/pkg/readiness" + "github.com/wormhole-foundation/wormhole/sdk/vaa" +) const ( - ReadinessEthSyncing readiness.Component = "ethSyncing" - ReadinessSolanaSyncing readiness.Component = "solanaSyncing" - ReadinessTerraSyncing readiness.Component = "terraSyncing" - ReadinessAlgorandSyncing readiness.Component = "algorandSyncing" - ReadinessNearSyncing readiness.Component = "nearSyncing" - ReadinessAptosSyncing readiness.Component = "aptosSyncing" - ReadinessSuiSyncing readiness.Component = "suiSyncing" - ReadinessBSCSyncing readiness.Component = "bscSyncing" - ReadinessPolygonSyncing readiness.Component = "polygonSyncing" - ReadinessAvalancheSyncing readiness.Component = "avalancheSyncing" - ReadinessOasisSyncing readiness.Component = "oasisSyncing" - ReadinessAuroraSyncing readiness.Component = "auroraSyncing" - ReadinessFantomSyncing readiness.Component = "fantomSyncing" - ReadinessKaruraSyncing readiness.Component = "karuraSyncing" - ReadinessAcalaSyncing readiness.Component = "acalaSyncing" - ReadinessKlaytnSyncing readiness.Component = "klaytnSyncing" - ReadinessCeloSyncing readiness.Component = "celoSyncing" - ReadinessMoonbeamSyncing readiness.Component = "moonbeamSyncing" - ReadinessNeonSyncing readiness.Component = "neonSyncing" - ReadinessTerra2Syncing readiness.Component = "terra2Syncing" - ReadinessInjectiveSyncing readiness.Component = "injectiveSyncing" - ReadinessXplaSyncing readiness.Component = "xplaSyncing" - ReadinessPythNetSyncing readiness.Component = "pythnetSyncing" - ReadinessArbitrumSyncing readiness.Component = "arbitrumSyncing" - ReadinessOptimismSyncing readiness.Component = "optimismSyncing" - ReadinessBaseSyncing readiness.Component = "baseSyncing" - ReadinessWormchainSyncing readiness.Component = "wormchainSyncing" + ReadinessEthSyncing readiness.Component = "ethSyncing" ) + +// MustRegisterReadinessSyncing registers the specified chain for readiness syncing. It panics if the chain ID is invalid so it should only be used during initialization. +// This function will +func MustRegisterReadinessSyncing(chainID vaa.ChainID) { + readiness.RegisterComponent(MustConvertChainIdToReadinessSyncing(chainID)) +} + +// MustConvertChainIdToReadinessSyncing maps a chain ID to a readiness syncing value. It panics if the chain ID is invalid so it should only be used during initialization. +func MustConvertChainIdToReadinessSyncing(chainID vaa.ChainID) readiness.Component { + readinessSync, err := ConvertChainIdToReadinessSyncing(chainID) + if err != nil { + panic(err) + } + return readinessSync +} + +// ConvertChainIdToReadinessSyncing maps a chain ID to a readiness syncing value. It returns an error if the chain ID is invalid. +func ConvertChainIdToReadinessSyncing(chainID vaa.ChainID) (readiness.Component, error) { + if chainID == vaa.ChainIDEthereum { + // The readiness for Ethereum is "ethSyncing", not "ethereumSyncing". Changing it would most likely break monitoring. . . + return ReadinessEthSyncing, nil + } + str := chainID.String() + if _, err := vaa.ChainIDFromString(str); err != nil { + return readiness.Component(""), fmt.Errorf("invalid chainID: %d", uint16(chainID)) + } + return readiness.Component(str + "Syncing"), nil +} diff --git a/node/pkg/common/readiness_test.go b/node/pkg/common/readiness_test.go new file mode 100644 index 000000000..af841c5df --- /dev/null +++ b/node/pkg/common/readiness_test.go @@ -0,0 +1,118 @@ +package common + +import ( + "testing" + + "github.com/certusone/wormhole/node/pkg/readiness" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + + "github.com/stretchr/testify/assert" +) + +const ( + // Ethereum is defined in readiness.go. + ReadinessSolanaSyncing readiness.Component = "solanaSyncing" + ReadinessTerraSyncing readiness.Component = "terraSyncing" + ReadinessAlgorandSyncing readiness.Component = "algorandSyncing" + ReadinessNearSyncing readiness.Component = "nearSyncing" + ReadinessAptosSyncing readiness.Component = "aptosSyncing" + ReadinessSuiSyncing readiness.Component = "suiSyncing" + ReadinessBSCSyncing readiness.Component = "bscSyncing" + ReadinessPolygonSyncing readiness.Component = "polygonSyncing" + ReadinessAvalancheSyncing readiness.Component = "avalancheSyncing" + ReadinessOasisSyncing readiness.Component = "oasisSyncing" + ReadinessAuroraSyncing readiness.Component = "auroraSyncing" + ReadinessFantomSyncing readiness.Component = "fantomSyncing" + ReadinessKaruraSyncing readiness.Component = "karuraSyncing" + ReadinessAcalaSyncing readiness.Component = "acalaSyncing" + ReadinessKlaytnSyncing readiness.Component = "klaytnSyncing" + ReadinessCeloSyncing readiness.Component = "celoSyncing" + ReadinessMoonbeamSyncing readiness.Component = "moonbeamSyncing" + ReadinessNeonSyncing readiness.Component = "neonSyncing" + ReadinessTerra2Syncing readiness.Component = "terra2Syncing" + ReadinessInjectiveSyncing readiness.Component = "injectiveSyncing" + ReadinessXplaSyncing readiness.Component = "xplaSyncing" + ReadinessPythNetSyncing readiness.Component = "pythnetSyncing" + ReadinessArbitrumSyncing readiness.Component = "arbitrumSyncing" + ReadinessOptimismSyncing readiness.Component = "optimismSyncing" + ReadinessBaseSyncing readiness.Component = "baseSyncing" + ReadinessWormchainSyncing readiness.Component = "wormchainSyncing" +) + +// This test is just to make sure that nothing got broken when we switched from manually specifying the readiness syncing labels. +// Once this functionality is merged, this test can probably be deleted (so that we don't need to keep adding new chains going forward). +func TestConvertChainIdToReadinessSyncing(t *testing.T) { + type test struct { + input vaa.ChainID + output readiness.Component + } + + // Positive Test Cases + p_tests := []test{ + {input: vaa.ChainIDSolana, output: ReadinessSolanaSyncing}, + {input: vaa.ChainIDEthereum, output: ReadinessEthSyncing}, + {input: vaa.ChainIDTerra, output: ReadinessTerraSyncing}, + {input: vaa.ChainIDBSC, output: ReadinessBSCSyncing}, + {input: vaa.ChainIDPolygon, output: ReadinessPolygonSyncing}, + {input: vaa.ChainIDAvalanche, output: ReadinessAvalancheSyncing}, + {input: vaa.ChainIDOasis, output: ReadinessOasisSyncing}, + {input: vaa.ChainIDAlgorand, output: ReadinessAlgorandSyncing}, + {input: vaa.ChainIDAptos, output: ReadinessAptosSyncing}, + {input: vaa.ChainIDSui, output: ReadinessSuiSyncing}, + {input: vaa.ChainIDNear, output: ReadinessNearSyncing}, + {input: vaa.ChainIDAurora, output: ReadinessAuroraSyncing}, + {input: vaa.ChainIDFantom, output: ReadinessFantomSyncing}, + {input: vaa.ChainIDKarura, output: ReadinessKaruraSyncing}, + {input: vaa.ChainIDAcala, output: ReadinessAcalaSyncing}, + {input: vaa.ChainIDKlaytn, output: ReadinessKlaytnSyncing}, + {input: vaa.ChainIDCelo, output: ReadinessCeloSyncing}, + {input: vaa.ChainIDMoonbeam, output: ReadinessMoonbeamSyncing}, + {input: vaa.ChainIDNeon, output: ReadinessNeonSyncing}, + {input: vaa.ChainIDTerra2, output: ReadinessTerra2Syncing}, + {input: vaa.ChainIDInjective, output: ReadinessInjectiveSyncing}, + {input: vaa.ChainIDArbitrum, output: ReadinessArbitrumSyncing}, + {input: vaa.ChainIDPythNet, output: ReadinessPythNetSyncing}, + {input: vaa.ChainIDOptimism, output: ReadinessOptimismSyncing}, + {input: vaa.ChainIDXpla, output: ReadinessXplaSyncing}, + // BTC readiness not defined yet {input: vaa.ChainIDBtc, output: ReadinessBtcSyncing}, + {input: vaa.ChainIDBase, output: ReadinessBaseSyncing}, + } + + // Negative Test Cases + n_tests := []test{ + {input: vaa.ChainIDUnset, output: ""}, + } + + for _, tc := range p_tests { + t.Run(tc.input.String(), func(t *testing.T) { + chainId, err := ConvertChainIdToReadinessSyncing(tc.input) + assert.Equal(t, tc.output, chainId) + assert.NoError(t, err) + }) + } + + for _, tc := range n_tests { + t.Run(tc.input.String(), func(t *testing.T) { + chainId, err := ConvertChainIdToReadinessSyncing(tc.input) + assert.Equal(t, tc.output, chainId) + assert.Error(t, err) + }) + } +} + +func TestMustRegisterReadinessSyncing(t *testing.T) { + // The first time should work. + assert.NotPanics(t, func() { + MustRegisterReadinessSyncing(vaa.ChainIDEthereum) + }) + + // A second time should panic. + assert.Panics(t, func() { + MustRegisterReadinessSyncing(vaa.ChainIDEthereum) + }) + + // An invalid chainID should panic. + assert.Panics(t, func() { + MustRegisterReadinessSyncing(vaa.ChainIDUnset) + }) +} diff --git a/node/pkg/readiness/health.go b/node/pkg/readiness/health.go index 21800ab9b..40a84283b 100644 --- a/node/pkg/readiness/health.go +++ b/node/pkg/readiness/health.go @@ -21,20 +21,20 @@ type Component string // RegisterComponent registers the given component name such that it is required to be ready for the global check to succeed. func RegisterComponent(component Component) { mu.Lock() + defer mu.Unlock() if _, ok := registry[string(component)]; ok { panic("component already registered") } registry[string(component)] = false - mu.Unlock() } // SetReady sets the given global component state. func SetReady(component Component) { mu.Lock() + defer mu.Unlock() if !registry[string(component)] { registry[string(component)] = true } - mu.Unlock() } // Handler returns a net/http handler for the readiness check. It returns 200 OK if all components are ready, @@ -54,6 +54,7 @@ func Handler(w http.ResponseWriter, r *http.Request) { } mu.Lock() + defer mu.Unlock() for k, v := range registry { _, err = fmt.Fprintf(resp, "%s\t%v\n", k, v) if err != nil { @@ -64,7 +65,6 @@ func Handler(w http.ResponseWriter, r *http.Request) { ready = false } } - mu.Unlock() if !ready { w.WriteHeader(http.StatusPreconditionFailed) diff --git a/node/pkg/watchers/algorand/watcher.go b/node/pkg/watchers/algorand/watcher.go index 31fdcfe00..b88d31e74 100644 --- a/node/pkg/watchers/algorand/watcher.go +++ b/node/pkg/watchers/algorand/watcher.go @@ -33,8 +33,9 @@ type ( algodToken string appid uint64 - msgC chan<- *common.MessagePublication - obsvReqC <-chan *gossipv1.ObservationRequest + msgC chan<- *common.MessagePublication + obsvReqC <-chan *gossipv1.ObservationRequest + readinessSync readiness.Component next_round uint64 } @@ -64,14 +65,15 @@ func NewWatcher( obsvReqC <-chan *gossipv1.ObservationRequest, ) *Watcher { return &Watcher{ - indexerRPC: indexerRPC, - indexerToken: indexerToken, - algodRPC: algodRPC, - algodToken: algodToken, - appid: appid, - msgC: msgC, - obsvReqC: obsvReqC, - next_round: 0, + indexerRPC: indexerRPC, + indexerToken: indexerToken, + algodRPC: algodRPC, + algodToken: algodToken, + appid: appid, + msgC: msgC, + obsvReqC: obsvReqC, + readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDAlgorand), + next_round: 0, } } @@ -252,7 +254,7 @@ func (e *Watcher) Run(ctx context.Context) error { ContractAddress: fmt.Sprintf("%d", e.appid), }) - readiness.SetReady(common.ReadinessAlgorandSyncing) + readiness.SetReady(e.readinessSync) } } } diff --git a/node/pkg/watchers/aptos/watcher.go b/node/pkg/watchers/aptos/watcher.go index f07f3867f..689b619c7 100644 --- a/node/pkg/watchers/aptos/watcher.go +++ b/node/pkg/watchers/aptos/watcher.go @@ -29,8 +29,9 @@ type ( aptosAccount string aptosHandle string - msgC chan<- *common.MessagePublication - obsvReqC <-chan *gossipv1.ObservationRequest + msgC chan<- *common.MessagePublication + obsvReqC <-chan *gossipv1.ObservationRequest + readinessSync readiness.Component } ) @@ -56,11 +57,12 @@ func NewWatcher( obsvReqC <-chan *gossipv1.ObservationRequest, ) *Watcher { return &Watcher{ - aptosRPC: aptosRPC, - aptosAccount: aptosAccount, - aptosHandle: aptosHandle, - msgC: msgC, - obsvReqC: obsvReqC, + aptosRPC: aptosRPC, + aptosAccount: aptosAccount, + aptosHandle: aptosHandle, + msgC: msgC, + obsvReqC: obsvReqC, + readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDAptos), } } @@ -223,7 +225,7 @@ func (e *Watcher) Run(ctx context.Context) error { ContractAddress: e.aptosAccount, }) - readiness.SetReady(common.ReadinessAptosSyncing) + readiness.SetReady(e.readinessSync) } } } diff --git a/node/pkg/watchers/cosmwasm/watcher.go b/node/pkg/watchers/cosmwasm/watcher.go index b64023d75..fa8862b02 100644 --- a/node/pkg/watchers/cosmwasm/watcher.go +++ b/node/pkg/watchers/cosmwasm/watcher.go @@ -43,7 +43,7 @@ type ( obsvReqC <-chan *gossipv1.ObservationRequest // Readiness component - readiness readiness.Component + readinessSync readiness.Component // VAA ChainID of the network we're connecting to. chainID vaa.ChainID // Key for contract address in the wasm logs @@ -97,7 +97,6 @@ func NewWatcher( contract string, msgC chan<- *common.MessagePublication, obsvReqC <-chan *gossipv1.ObservationRequest, - readiness readiness.Component, chainID vaa.ChainID) *Watcher { // CosmWasm 1.0.0 @@ -123,7 +122,7 @@ func NewWatcher( contract: contract, msgC: msgC, obsvReqC: obsvReqC, - readiness: readiness, + readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID), chainID: chainID, contractAddressFilterKey: contractAddressFilterKey, contractAddressLogKey: contractAddressLogKey, @@ -180,7 +179,7 @@ func (e *Watcher) Run(ctx context.Context) error { } logger.Info("subscribed to new transaction events", zap.String("network", networkName)) - readiness.SetReady(e.readiness) + readiness.SetReady(e.readinessSync) common.RunWithScissors(ctx, errC, "cosmwasm_block_height", func(ctx context.Context) error { t := time.NewTicker(5 * time.Second) @@ -220,6 +219,8 @@ func (e *Watcher) Run(ctx context.Context) error { Height: latestBlock.Int(), ContractAddress: e.contract, }) + + readiness.SetReady(e.readinessSync) } } }) diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 8c4a7a78d..a659725ae 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -72,7 +72,7 @@ type ( // Human-readable name of the Eth network, for logging and monitoring. networkName string // Readiness component - readiness readiness.Component + readinessSync readiness.Component // VAA ChainID of the network we're connecting to. chainID vaa.ChainID @@ -139,7 +139,6 @@ func NewEthWatcher( url string, contract eth_common.Address, networkName string, - readiness readiness.Component, chainID vaa.ChainID, msgC chan<- *common.MessagePublication, setC chan<- *common.GuardianSet, @@ -151,7 +150,7 @@ func NewEthWatcher( url: url, contract: contract, networkName: networkName, - readiness: readiness, + readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID), waitForConfirmations: false, maxWaitConfirmations: 60, chainID: chainID, @@ -642,7 +641,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Bool("is_safe_block", ev.Safe), zap.String("eth_network", w.networkName)) currentEthHeight.WithLabelValues(w.networkName).Set(float64(ev.Number.Int64())) - readiness.SetReady(w.readiness) + readiness.SetReady(w.readinessSync) p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{ Height: ev.Number.Int64(), ContractAddress: w.contract.Hex(), @@ -801,7 +800,7 @@ func (w *Watcher) Run(ctx context.Context) error { // Now that the init is complete, peg readiness. That will also happen when we process a new head, but chains // that wait for finality may take a while to receive the first block and we don't want to hold up the init. - readiness.SetReady(w.readiness) + readiness.SetReady(w.readinessSync) select { case <-ctx.Done(): diff --git a/node/pkg/watchers/near/watcher.go b/node/pkg/watchers/near/watcher.go index 8a267a788..866442c9e 100644 --- a/node/pkg/watchers/near/watcher.go +++ b/node/pkg/watchers/near/watcher.go @@ -67,8 +67,9 @@ type ( nearRPC string // external channels - msgC chan<- *common.MessagePublication // validated (SECURITY: and only validated!) observations go into this channel - obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel + msgC chan<- *common.MessagePublication // validated (SECURITY: and only validated!) observations go into this channel + obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel + readinessSync readiness.Component // internal queues transactionProcessingQueueCounter atomic.Int64 @@ -102,6 +103,7 @@ func NewWatcher( nearRPC: nearRPC, msgC: msgC, obsvReqC: obsvReqC, + readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDNear), transactionProcessingQueue: make(chan *transactionProcessingJob), chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize), eventChanTxProcessedDuration: make(chan time.Duration, 10), @@ -149,7 +151,7 @@ func (e *Watcher) runBlockPoll(ctx context.Context) error { Height: int64(highestFinalBlockHeightObserved), ContractAddress: e.wormholeAccount, }) - readiness.SetReady(common.ReadinessNearSyncing) + readiness.SetReady(e.readinessSync) timer.Reset(blockPollInterval) } diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index 45b9882ac..543c2ef3a 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -42,7 +42,7 @@ type ( pumpData chan []byte rpcClient *rpc.Client // Readiness component - readiness readiness.Component + readinessSync readiness.Component // VAA ChainID of the network we're connecting to. chainID vaa.ChainID // Human readable name of network @@ -181,20 +181,19 @@ func NewSolanaWatcher( msgC chan<- *common.MessagePublication, obsvReqC <-chan *gossipv1.ObservationRequest, commitment rpc.CommitmentType, - readiness readiness.Component, chainID vaa.ChainID) *SolanaWatcher { return &SolanaWatcher{ - rpcUrl: rpcUrl, - wsUrl: wsUrl, - contract: contractAddress, - rawContract: rawContract, - msgC: msgC, - obsvReqC: obsvReqC, - commitment: commitment, - rpcClient: rpc.New(rpcUrl), - readiness: readiness, - chainID: chainID, - networkName: vaa.ChainID(chainID).String(), + rpcUrl: rpcUrl, + wsUrl: wsUrl, + contract: contractAddress, + rawContract: rawContract, + msgC: msgC, + obsvReqC: obsvReqC, + commitment: commitment, + rpcClient: rpc.New(rpcUrl), + readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID), + chainID: chainID, + networkName: vaa.ChainID(chainID).String(), } } @@ -334,7 +333,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { lastSlot = slot - 1 } currentSolanaHeight.WithLabelValues(s.networkName, string(s.commitment)).Set(float64(slot)) - readiness.SetReady(s.readiness) + readiness.SetReady(s.readinessSync) p2p.DefaultRegistry.SetNetworkStats(s.chainID, &gossipv1.Heartbeat_Network{ Height: int64(slot), ContractAddress: contractAddr, diff --git a/node/pkg/watchers/sui/watcher.go b/node/pkg/watchers/sui/watcher.go index 580e0c52f..2a4c2e88b 100644 --- a/node/pkg/watchers/sui/watcher.go +++ b/node/pkg/watchers/sui/watcher.go @@ -42,8 +42,9 @@ type ( unsafeDevMode bool - msgChan chan *common.MessagePublication - obsvReqC chan *gossipv1.ObservationRequest + msgChan chan *common.MessagePublication + obsvReqC chan *gossipv1.ObservationRequest + readinessSync readiness.Component subId int64 subscribed bool @@ -136,6 +137,7 @@ func NewWatcher( unsafeDevMode: unsafeDevMode, msgChan: messageEvents, obsvReqC: obsvReqC, + readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDSui), subId: 0, subscribed: false, } @@ -410,7 +412,7 @@ func (e *Watcher) Run(ctx context.Context) error { }) if e.subscribed { - readiness.SetReady(common.ReadinessSuiSyncing) + readiness.SetReady(e.readinessSync) } } } diff --git a/node/pkg/watchers/wormchain/watcher.go b/node/pkg/watchers/wormchain/watcher.go index 7dc0249d6..7d6e545e8 100644 --- a/node/pkg/watchers/wormchain/watcher.go +++ b/node/pkg/watchers/wormchain/watcher.go @@ -39,6 +39,8 @@ type ( // Incoming re-observation requests from the network. Pre-filtered to only // include requests for our chainID. obsvReqC <-chan *gossipv1.ObservationRequest + + readinessSync readiness.Component } ) @@ -77,7 +79,13 @@ func NewWatcher( urlLCD string, msgC chan<- *common.MessagePublication, obsvReqC <-chan *gossipv1.ObservationRequest) *Watcher { - return &Watcher{urlWS: urlWS, urlLCD: urlLCD, msgC: msgC, obsvReqC: obsvReqC} + return &Watcher{ + urlWS: urlWS, + urlLCD: urlLCD, + msgC: msgC, + obsvReqC: obsvReqC, + readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDWormchain), + } } func (e *Watcher) Run(ctx context.Context) error { @@ -121,7 +129,7 @@ func (e *Watcher) Run(ctx context.Context) error { } logger.Info("subscribed to new transaction events") - readiness.SetReady(common.ReadinessWormchainSyncing) + readiness.SetReady(e.readinessSync) go func() { t := time.NewTicker(5 * time.Second) @@ -154,6 +162,8 @@ func (e *Watcher) Run(ctx context.Context) error { p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDWormchain, &gossipv1.Heartbeat_Network{ Height: latestBlock.Int(), }) + + readiness.SetReady(e.readinessSync) } }()