Node: Generate readiness labels programmatically (#2540)

* Node: Generate readiness labels programmatically

Change-Id: Ica03a7cc314b92a0521afad053ffa20f03ff6529

* Move obsolete  labels into test code

Change-Id: I682ec4ca2ea36be1f53b3f6d39f2da4191e2805f

* Code review rework

Change-Id: Idc8f17265a0b6ef357ac98707d20cc2486001520

* Redesign based on feedback

* Near tests failing

* Fix panic in solana
This commit is contained in:
bruce-riley 2023-03-27 12:50:21 -05:00 committed by GitHub
parent 708d02f128
commit a52acb52a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 273 additions and 130 deletions

View File

@ -1097,9 +1097,9 @@ func runNode(cmd *cobra.Command, args []string) {
var ethWatcher *evm.Watcher
if shouldStart(ethRPC) {
logger.Info("Starting Ethereum watcher")
readiness.RegisterComponent(common.ReadinessEthSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDEthereum)
chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, chainMsgC[vaa.ChainIDEthereum], setWriteC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode)
ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", vaa.ChainIDEthereum, chainMsgC[vaa.ChainIDEthereum], setWriteC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode)
if err := supervisor.Run(ctx, "ethwatch",
common.WrapWithScissors(ethWatcher.Run, "ethwatch")); err != nil {
return err
@ -1108,9 +1108,9 @@ func runNode(cmd *cobra.Command, args []string) {
if shouldStart(bscRPC) {
logger.Info("Starting BSC watcher")
readiness.RegisterComponent(common.ReadinessBSCSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDBSC)
chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, chainMsgC[vaa.ChainIDBSC], nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode)
bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", vaa.ChainIDBSC, chainMsgC[vaa.ChainIDBSC], nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode)
bscWatcher.SetWaitForConfirmations(true)
if err := supervisor.Run(ctx, "bscwatch", common.WrapWithScissors(bscWatcher.Run, "bscwatch")); err != nil {
return err
@ -1124,9 +1124,9 @@ func runNode(cmd *cobra.Command, args []string) {
log.Fatal("Polygon checkpointing is required in mainnet")
}
logger.Info("Starting Polygon watcher")
readiness.RegisterComponent(common.ReadinessPolygonSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDPolygon)
chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, chainMsgC[vaa.ChainIDPolygon], nil, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode)
polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", vaa.ChainIDPolygon, chainMsgC[vaa.ChainIDPolygon], nil, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode)
polygonWatcher.SetWaitForConfirmations(waitForConfirmations)
if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil {
return err
@ -1137,81 +1137,82 @@ func runNode(cmd *cobra.Command, args []string) {
}
if shouldStart(avalancheRPC) {
logger.Info("Starting Avalanche watcher")
readiness.RegisterComponent(common.ReadinessAvalancheSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDAvalanche)
chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "avalanchewatch",
common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, chainMsgC[vaa.ChainIDAvalanche], nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", vaa.ChainIDAvalanche, chainMsgC[vaa.ChainIDAvalanche], nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil {
return err
}
}
if shouldStart(oasisRPC) {
logger.Info("Starting Oasis watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDOasis)
chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "oasiswatch",
common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, chainMsgC[vaa.ChainIDOasis], nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", vaa.ChainIDOasis, chainMsgC[vaa.ChainIDOasis], nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil {
return err
}
}
if shouldStart(auroraRPC) {
logger.Info("Starting Aurora watcher")
readiness.RegisterComponent(common.ReadinessAuroraSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDAurora)
chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "aurorawatch",
common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, chainMsgC[vaa.ChainIDAurora], nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", vaa.ChainIDAurora, chainMsgC[vaa.ChainIDAurora], nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil {
return err
}
}
if shouldStart(fantomRPC) {
logger.Info("Starting Fantom watcher")
readiness.RegisterComponent(common.ReadinessFantomSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDFantom)
chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "fantomwatch",
common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, chainMsgC[vaa.ChainIDFantom], nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", vaa.ChainIDFantom, chainMsgC[vaa.ChainIDFantom], nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil {
return err
}
}
if shouldStart(karuraRPC) {
logger.Info("Starting Karura watcher")
readiness.RegisterComponent(common.ReadinessKaruraSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDKarura)
chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "karurawatch",
common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, chainMsgC[vaa.ChainIDKarura], nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", vaa.ChainIDKarura, chainMsgC[vaa.ChainIDKarura], nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil {
return err
}
}
if shouldStart(acalaRPC) {
logger.Info("Starting Acala watcher")
readiness.RegisterComponent(common.ReadinessAcalaSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDAcala)
chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "acalawatch",
common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, chainMsgC[vaa.ChainIDAcala], nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", vaa.ChainIDAcala, chainMsgC[vaa.ChainIDAcala], nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil {
return err
}
}
if shouldStart(klaytnRPC) {
logger.Info("Starting Klaytn watcher")
readiness.RegisterComponent(common.ReadinessKlaytnSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDKlaytn)
chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "klaytnwatch",
common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, chainMsgC[vaa.ChainIDKlaytn], nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", vaa.ChainIDKlaytn, chainMsgC[vaa.ChainIDKlaytn], nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil {
return err
}
}
if shouldStart(celoRPC) {
logger.Info("Starting Celo watcher")
readiness.RegisterComponent(common.ReadinessCeloSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDCelo)
chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "celowatch",
common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, chainMsgC[vaa.ChainIDCelo], nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", vaa.ChainIDCelo, chainMsgC[vaa.ChainIDCelo], nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil {
return err
}
}
if shouldStart(moonbeamRPC) {
logger.Info("Starting Moonbeam watcher")
readiness.RegisterComponent(common.ReadinessMoonbeamSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDMoonbeam)
chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "moonbeamwatch",
common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, chainMsgC[vaa.ChainIDMoonbeam], nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil {
common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", vaa.ChainIDMoonbeam, chainMsgC[vaa.ChainIDMoonbeam], nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil {
return err
}
}
@ -1220,9 +1221,9 @@ func runNode(cmd *cobra.Command, args []string) {
log.Fatalf("if arbitrum is enabled then ethereum must also be enabled.")
}
logger.Info("Starting Arbitrum watcher")
readiness.RegisterComponent(common.ReadinessArbitrumSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDArbitrum)
chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, chainMsgC[vaa.ChainIDArbitrum], nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode)
arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", vaa.ChainIDArbitrum, chainMsgC[vaa.ChainIDArbitrum], nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode)
arbitrumWatcher.SetL1Finalizer(ethWatcher)
if err := supervisor.Run(ctx, "arbitrumwatch", common.WrapWithScissors(arbitrumWatcher.Run, "arbitrumwatch")); err != nil {
return err
@ -1230,9 +1231,9 @@ func runNode(cmd *cobra.Command, args []string) {
}
if shouldStart(optimismRPC) {
logger.Info("Starting Optimism watcher")
readiness.RegisterComponent(common.ReadinessOptimismSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDOptimism)
chainObsvReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, chainMsgC[vaa.ChainIDOptimism], nil, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode)
optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", vaa.ChainIDOptimism, chainMsgC[vaa.ChainIDOptimism], nil, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode)
// If rootChainParams are set, pass them in for pre-Bedrock mode
if *optimismCtcRpc != "" || *optimismCtcContractAddress != "" {
@ -1251,37 +1252,37 @@ func runNode(cmd *cobra.Command, args []string) {
if shouldStart(terraWS) {
logger.Info("Starting Terra watcher")
readiness.RegisterComponent(common.ReadinessTerraSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDTerra)
chainObsvReqC[vaa.ChainIDTerra] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "terrawatch",
common.WrapWithScissors(cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, chainMsgC[vaa.ChainIDTerra], chainObsvReqC[vaa.ChainIDTerra], common.ReadinessTerraSyncing, vaa.ChainIDTerra).Run, "terrawatch")); err != nil {
common.WrapWithScissors(cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, chainMsgC[vaa.ChainIDTerra], chainObsvReqC[vaa.ChainIDTerra], vaa.ChainIDTerra).Run, "terrawatch")); err != nil {
return err
}
}
if shouldStart(terra2WS) {
logger.Info("Starting Terra 2 watcher")
readiness.RegisterComponent(common.ReadinessTerra2Syncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDTerra2)
chainObsvReqC[vaa.ChainIDTerra2] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "terra2watch",
common.WrapWithScissors(cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, chainMsgC[vaa.ChainIDTerra2], chainObsvReqC[vaa.ChainIDTerra2], common.ReadinessTerra2Syncing, vaa.ChainIDTerra2).Run, "terra2watch")); err != nil {
common.WrapWithScissors(cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, chainMsgC[vaa.ChainIDTerra2], chainObsvReqC[vaa.ChainIDTerra2], vaa.ChainIDTerra2).Run, "terra2watch")); err != nil {
return err
}
}
if shouldStart(xplaWS) {
logger.Info("Starting XPLA watcher")
readiness.RegisterComponent(common.ReadinessXplaSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDXpla)
chainObsvReqC[vaa.ChainIDXpla] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "xplawatch",
common.WrapWithScissors(cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, chainMsgC[vaa.ChainIDXpla], chainObsvReqC[vaa.ChainIDXpla], common.ReadinessXplaSyncing, vaa.ChainIDXpla).Run, "xplawatch")); err != nil {
common.WrapWithScissors(cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, chainMsgC[vaa.ChainIDXpla], chainObsvReqC[vaa.ChainIDXpla], vaa.ChainIDXpla).Run, "xplawatch")); err != nil {
return err
}
}
if shouldStart(algorandIndexerRPC) {
logger.Info("Starting Algorand watcher")
readiness.RegisterComponent(common.ReadinessAlgorandSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDAlgorand)
chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "algorandwatch",
common.WrapWithScissors(algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, chainMsgC[vaa.ChainIDAlgorand], chainObsvReqC[vaa.ChainIDAlgorand]).Run, "algorandwatch")); err != nil {
@ -1290,7 +1291,7 @@ func runNode(cmd *cobra.Command, args []string) {
}
if shouldStart(nearRPC) {
logger.Info("Starting Near watcher")
readiness.RegisterComponent(common.ReadinessNearSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDNear)
chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "nearwatch",
common.WrapWithScissors(near.NewWatcher(*nearRPC, *nearContract, chainMsgC[vaa.ChainIDNear], chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run, "nearwatch")); err != nil {
@ -1301,7 +1302,7 @@ func runNode(cmd *cobra.Command, args []string) {
// Start Wormchain watcher only if configured
if shouldStart(wormchainWS) {
logger.Info("Starting Wormchain watcher")
readiness.RegisterComponent(common.ReadinessWormchainSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDWormchain)
chainObsvReqC[vaa.ChainIDWormchain] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "wormchainwatch",
wormchain.NewWatcher(*wormchainWS, *wormchainLCD, chainMsgC[vaa.ChainIDWormchain], chainObsvReqC[vaa.ChainIDWormchain]).Run); err != nil {
@ -1310,7 +1311,7 @@ func runNode(cmd *cobra.Command, args []string) {
}
if shouldStart(aptosRPC) {
logger.Info("Starting Aptos watcher")
readiness.RegisterComponent(common.ReadinessAptosSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDAptos)
chainObsvReqC[vaa.ChainIDAptos] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "aptoswatch",
aptos.NewWatcher(*aptosRPC, *aptosAccount, *aptosHandle, chainMsgC[vaa.ChainIDAptos], chainObsvReqC[vaa.ChainIDAptos]).Run); err != nil {
@ -1320,7 +1321,7 @@ func runNode(cmd *cobra.Command, args []string) {
if shouldStart(suiRPC) {
logger.Info("Starting Sui watcher")
readiness.RegisterComponent(common.ReadinessSuiSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDSui)
chainObsvReqC[vaa.ChainIDSui] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "suiwatch",
sui.NewWatcher(*suiRPC, *suiWS, *suiAccount, *suiPackage, *unsafeDevMode, chainMsgC[vaa.ChainIDSui], chainObsvReqC[vaa.ChainIDSui]).Run); err != nil {
@ -1331,13 +1332,13 @@ func runNode(cmd *cobra.Command, args []string) {
var solanaFinalizedWatcher *solana.SolanaWatcher
if shouldStart(solanaRPC) {
logger.Info("Starting Solana watcher")
readiness.RegisterComponent(common.ReadinessSolanaSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDSolana)
chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "solwatch-confirmed",
common.WrapWithScissors(solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run, "solwatch-confirmed")); err != nil {
common.WrapWithScissors(solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], nil, rpc.CommitmentConfirmed, vaa.ChainIDSolana).Run, "solwatch-confirmed")); err != nil {
return err
}
solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana)
solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, vaa.ChainIDSolana)
if err := supervisor.Run(ctx, "solwatch-finalized", common.WrapWithScissors(solanaFinalizedWatcher.Run, "solwatch-finalized")); err != nil {
return err
}
@ -1345,20 +1346,20 @@ func runNode(cmd *cobra.Command, args []string) {
if shouldStart(pythnetRPC) {
logger.Info("Starting Pythnet watcher")
readiness.RegisterComponent(common.ReadinessPythNetSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDPythNet)
chainObsvReqC[vaa.ChainIDPythNet] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "pythwatch-confirmed",
common.WrapWithScissors(solana.NewSolanaWatcher(*pythnetRPC, pythnetWS, pythnetAddress, *pythnetContract, chainMsgC[vaa.ChainIDPythNet], nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run, "pythwatch-confirmed")); err != nil {
common.WrapWithScissors(solana.NewSolanaWatcher(*pythnetRPC, pythnetWS, pythnetAddress, *pythnetContract, chainMsgC[vaa.ChainIDPythNet], nil, rpc.CommitmentConfirmed, vaa.ChainIDPythNet).Run, "pythwatch-confirmed")); err != nil {
return err
}
}
if shouldStart(injectiveWS) {
logger.Info("Starting Injective watcher")
readiness.RegisterComponent(common.ReadinessInjectiveSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDInjective)
chainObsvReqC[vaa.ChainIDInjective] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "injectivewatch",
common.WrapWithScissors(cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, chainMsgC[vaa.ChainIDInjective], chainObsvReqC[vaa.ChainIDInjective], common.ReadinessInjectiveSyncing, vaa.ChainIDInjective).Run, "injectivewatch")); err != nil {
common.WrapWithScissors(cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, chainMsgC[vaa.ChainIDInjective], chainObsvReqC[vaa.ChainIDInjective], vaa.ChainIDInjective).Run, "injectivewatch")); err != nil {
return err
}
}
@ -1369,9 +1370,9 @@ func runNode(cmd *cobra.Command, args []string) {
log.Fatalf("if neon is enabled then solana must also be enabled.")
}
logger.Info("Starting Neon watcher")
readiness.RegisterComponent(common.ReadinessNeonSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDNeon)
chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, chainMsgC[vaa.ChainIDNeon], nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode)
neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", vaa.ChainIDNeon, chainMsgC[vaa.ChainIDNeon], nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode)
neonWatcher.SetL1Finalizer(solanaFinalizedWatcher)
if err := supervisor.Run(ctx, "neonwatch", common.WrapWithScissors(neonWatcher.Run, "neonwatch")); err != nil {
return err
@ -1379,9 +1380,9 @@ func runNode(cmd *cobra.Command, args []string) {
}
if shouldStart(baseRPC) {
logger.Info("Starting Base watcher")
readiness.RegisterComponent(common.ReadinessBaseSyncing)
common.MustRegisterReadinessSyncing(vaa.ChainIDBase)
chainObsvReqC[vaa.ChainIDBase] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
baseWatcher := evm.NewEthWatcher(*baseRPC, baseContractAddr, "base", common.ReadinessBaseSyncing, vaa.ChainIDBase, chainMsgC[vaa.ChainIDBase], nil, chainObsvReqC[vaa.ChainIDBase], *unsafeDevMode)
baseWatcher := evm.NewEthWatcher(*baseRPC, baseContractAddr, "base", vaa.ChainIDBase, chainMsgC[vaa.ChainIDBase], nil, chainObsvReqC[vaa.ChainIDBase], *unsafeDevMode)
if err := supervisor.Run(ctx, "basewatch", common.WrapWithScissors(baseWatcher.Run, "basewatch")); err != nil {
return err
}

View File

@ -1,33 +1,40 @@
package common
import "github.com/certusone/wormhole/node/pkg/readiness"
import (
"fmt"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
const (
ReadinessEthSyncing readiness.Component = "ethSyncing"
ReadinessSolanaSyncing readiness.Component = "solanaSyncing"
ReadinessTerraSyncing readiness.Component = "terraSyncing"
ReadinessAlgorandSyncing readiness.Component = "algorandSyncing"
ReadinessNearSyncing readiness.Component = "nearSyncing"
ReadinessAptosSyncing readiness.Component = "aptosSyncing"
ReadinessSuiSyncing readiness.Component = "suiSyncing"
ReadinessBSCSyncing readiness.Component = "bscSyncing"
ReadinessPolygonSyncing readiness.Component = "polygonSyncing"
ReadinessAvalancheSyncing readiness.Component = "avalancheSyncing"
ReadinessOasisSyncing readiness.Component = "oasisSyncing"
ReadinessAuroraSyncing readiness.Component = "auroraSyncing"
ReadinessFantomSyncing readiness.Component = "fantomSyncing"
ReadinessKaruraSyncing readiness.Component = "karuraSyncing"
ReadinessAcalaSyncing readiness.Component = "acalaSyncing"
ReadinessKlaytnSyncing readiness.Component = "klaytnSyncing"
ReadinessCeloSyncing readiness.Component = "celoSyncing"
ReadinessMoonbeamSyncing readiness.Component = "moonbeamSyncing"
ReadinessNeonSyncing readiness.Component = "neonSyncing"
ReadinessTerra2Syncing readiness.Component = "terra2Syncing"
ReadinessInjectiveSyncing readiness.Component = "injectiveSyncing"
ReadinessXplaSyncing readiness.Component = "xplaSyncing"
ReadinessPythNetSyncing readiness.Component = "pythnetSyncing"
ReadinessArbitrumSyncing readiness.Component = "arbitrumSyncing"
ReadinessOptimismSyncing readiness.Component = "optimismSyncing"
ReadinessBaseSyncing readiness.Component = "baseSyncing"
ReadinessWormchainSyncing readiness.Component = "wormchainSyncing"
ReadinessEthSyncing readiness.Component = "ethSyncing"
)
// MustRegisterReadinessSyncing registers the specified chain for readiness syncing. It panics if the chain ID is invalid so it should only be used during initialization.
// This function will
func MustRegisterReadinessSyncing(chainID vaa.ChainID) {
readiness.RegisterComponent(MustConvertChainIdToReadinessSyncing(chainID))
}
// MustConvertChainIdToReadinessSyncing maps a chain ID to a readiness syncing value. It panics if the chain ID is invalid so it should only be used during initialization.
func MustConvertChainIdToReadinessSyncing(chainID vaa.ChainID) readiness.Component {
readinessSync, err := ConvertChainIdToReadinessSyncing(chainID)
if err != nil {
panic(err)
}
return readinessSync
}
// ConvertChainIdToReadinessSyncing maps a chain ID to a readiness syncing value. It returns an error if the chain ID is invalid.
func ConvertChainIdToReadinessSyncing(chainID vaa.ChainID) (readiness.Component, error) {
if chainID == vaa.ChainIDEthereum {
// The readiness for Ethereum is "ethSyncing", not "ethereumSyncing". Changing it would most likely break monitoring. . .
return ReadinessEthSyncing, nil
}
str := chainID.String()
if _, err := vaa.ChainIDFromString(str); err != nil {
return readiness.Component(""), fmt.Errorf("invalid chainID: %d", uint16(chainID))
}
return readiness.Component(str + "Syncing"), nil
}

View File

@ -0,0 +1,118 @@
package common
import (
"testing"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"github.com/stretchr/testify/assert"
)
const (
// Ethereum is defined in readiness.go.
ReadinessSolanaSyncing readiness.Component = "solanaSyncing"
ReadinessTerraSyncing readiness.Component = "terraSyncing"
ReadinessAlgorandSyncing readiness.Component = "algorandSyncing"
ReadinessNearSyncing readiness.Component = "nearSyncing"
ReadinessAptosSyncing readiness.Component = "aptosSyncing"
ReadinessSuiSyncing readiness.Component = "suiSyncing"
ReadinessBSCSyncing readiness.Component = "bscSyncing"
ReadinessPolygonSyncing readiness.Component = "polygonSyncing"
ReadinessAvalancheSyncing readiness.Component = "avalancheSyncing"
ReadinessOasisSyncing readiness.Component = "oasisSyncing"
ReadinessAuroraSyncing readiness.Component = "auroraSyncing"
ReadinessFantomSyncing readiness.Component = "fantomSyncing"
ReadinessKaruraSyncing readiness.Component = "karuraSyncing"
ReadinessAcalaSyncing readiness.Component = "acalaSyncing"
ReadinessKlaytnSyncing readiness.Component = "klaytnSyncing"
ReadinessCeloSyncing readiness.Component = "celoSyncing"
ReadinessMoonbeamSyncing readiness.Component = "moonbeamSyncing"
ReadinessNeonSyncing readiness.Component = "neonSyncing"
ReadinessTerra2Syncing readiness.Component = "terra2Syncing"
ReadinessInjectiveSyncing readiness.Component = "injectiveSyncing"
ReadinessXplaSyncing readiness.Component = "xplaSyncing"
ReadinessPythNetSyncing readiness.Component = "pythnetSyncing"
ReadinessArbitrumSyncing readiness.Component = "arbitrumSyncing"
ReadinessOptimismSyncing readiness.Component = "optimismSyncing"
ReadinessBaseSyncing readiness.Component = "baseSyncing"
ReadinessWormchainSyncing readiness.Component = "wormchainSyncing"
)
// This test is just to make sure that nothing got broken when we switched from manually specifying the readiness syncing labels.
// Once this functionality is merged, this test can probably be deleted (so that we don't need to keep adding new chains going forward).
func TestConvertChainIdToReadinessSyncing(t *testing.T) {
type test struct {
input vaa.ChainID
output readiness.Component
}
// Positive Test Cases
p_tests := []test{
{input: vaa.ChainIDSolana, output: ReadinessSolanaSyncing},
{input: vaa.ChainIDEthereum, output: ReadinessEthSyncing},
{input: vaa.ChainIDTerra, output: ReadinessTerraSyncing},
{input: vaa.ChainIDBSC, output: ReadinessBSCSyncing},
{input: vaa.ChainIDPolygon, output: ReadinessPolygonSyncing},
{input: vaa.ChainIDAvalanche, output: ReadinessAvalancheSyncing},
{input: vaa.ChainIDOasis, output: ReadinessOasisSyncing},
{input: vaa.ChainIDAlgorand, output: ReadinessAlgorandSyncing},
{input: vaa.ChainIDAptos, output: ReadinessAptosSyncing},
{input: vaa.ChainIDSui, output: ReadinessSuiSyncing},
{input: vaa.ChainIDNear, output: ReadinessNearSyncing},
{input: vaa.ChainIDAurora, output: ReadinessAuroraSyncing},
{input: vaa.ChainIDFantom, output: ReadinessFantomSyncing},
{input: vaa.ChainIDKarura, output: ReadinessKaruraSyncing},
{input: vaa.ChainIDAcala, output: ReadinessAcalaSyncing},
{input: vaa.ChainIDKlaytn, output: ReadinessKlaytnSyncing},
{input: vaa.ChainIDCelo, output: ReadinessCeloSyncing},
{input: vaa.ChainIDMoonbeam, output: ReadinessMoonbeamSyncing},
{input: vaa.ChainIDNeon, output: ReadinessNeonSyncing},
{input: vaa.ChainIDTerra2, output: ReadinessTerra2Syncing},
{input: vaa.ChainIDInjective, output: ReadinessInjectiveSyncing},
{input: vaa.ChainIDArbitrum, output: ReadinessArbitrumSyncing},
{input: vaa.ChainIDPythNet, output: ReadinessPythNetSyncing},
{input: vaa.ChainIDOptimism, output: ReadinessOptimismSyncing},
{input: vaa.ChainIDXpla, output: ReadinessXplaSyncing},
// BTC readiness not defined yet {input: vaa.ChainIDBtc, output: ReadinessBtcSyncing},
{input: vaa.ChainIDBase, output: ReadinessBaseSyncing},
}
// Negative Test Cases
n_tests := []test{
{input: vaa.ChainIDUnset, output: ""},
}
for _, tc := range p_tests {
t.Run(tc.input.String(), func(t *testing.T) {
chainId, err := ConvertChainIdToReadinessSyncing(tc.input)
assert.Equal(t, tc.output, chainId)
assert.NoError(t, err)
})
}
for _, tc := range n_tests {
t.Run(tc.input.String(), func(t *testing.T) {
chainId, err := ConvertChainIdToReadinessSyncing(tc.input)
assert.Equal(t, tc.output, chainId)
assert.Error(t, err)
})
}
}
func TestMustRegisterReadinessSyncing(t *testing.T) {
// The first time should work.
assert.NotPanics(t, func() {
MustRegisterReadinessSyncing(vaa.ChainIDEthereum)
})
// A second time should panic.
assert.Panics(t, func() {
MustRegisterReadinessSyncing(vaa.ChainIDEthereum)
})
// An invalid chainID should panic.
assert.Panics(t, func() {
MustRegisterReadinessSyncing(vaa.ChainIDUnset)
})
}

View File

@ -21,20 +21,20 @@ type Component string
// RegisterComponent registers the given component name such that it is required to be ready for the global check to succeed.
func RegisterComponent(component Component) {
mu.Lock()
defer mu.Unlock()
if _, ok := registry[string(component)]; ok {
panic("component already registered")
}
registry[string(component)] = false
mu.Unlock()
}
// SetReady sets the given global component state.
func SetReady(component Component) {
mu.Lock()
defer mu.Unlock()
if !registry[string(component)] {
registry[string(component)] = true
}
mu.Unlock()
}
// Handler returns a net/http handler for the readiness check. It returns 200 OK if all components are ready,
@ -54,6 +54,7 @@ func Handler(w http.ResponseWriter, r *http.Request) {
}
mu.Lock()
defer mu.Unlock()
for k, v := range registry {
_, err = fmt.Fprintf(resp, "%s\t%v\n", k, v)
if err != nil {
@ -64,7 +65,6 @@ func Handler(w http.ResponseWriter, r *http.Request) {
ready = false
}
}
mu.Unlock()
if !ready {
w.WriteHeader(http.StatusPreconditionFailed)

View File

@ -33,8 +33,9 @@ type (
algodToken string
appid uint64
msgC chan<- *common.MessagePublication
obsvReqC <-chan *gossipv1.ObservationRequest
msgC chan<- *common.MessagePublication
obsvReqC <-chan *gossipv1.ObservationRequest
readinessSync readiness.Component
next_round uint64
}
@ -64,14 +65,15 @@ func NewWatcher(
obsvReqC <-chan *gossipv1.ObservationRequest,
) *Watcher {
return &Watcher{
indexerRPC: indexerRPC,
indexerToken: indexerToken,
algodRPC: algodRPC,
algodToken: algodToken,
appid: appid,
msgC: msgC,
obsvReqC: obsvReqC,
next_round: 0,
indexerRPC: indexerRPC,
indexerToken: indexerToken,
algodRPC: algodRPC,
algodToken: algodToken,
appid: appid,
msgC: msgC,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDAlgorand),
next_round: 0,
}
}
@ -252,7 +254,7 @@ func (e *Watcher) Run(ctx context.Context) error {
ContractAddress: fmt.Sprintf("%d", e.appid),
})
readiness.SetReady(common.ReadinessAlgorandSyncing)
readiness.SetReady(e.readinessSync)
}
}
}

View File

@ -29,8 +29,9 @@ type (
aptosAccount string
aptosHandle string
msgC chan<- *common.MessagePublication
obsvReqC <-chan *gossipv1.ObservationRequest
msgC chan<- *common.MessagePublication
obsvReqC <-chan *gossipv1.ObservationRequest
readinessSync readiness.Component
}
)
@ -56,11 +57,12 @@ func NewWatcher(
obsvReqC <-chan *gossipv1.ObservationRequest,
) *Watcher {
return &Watcher{
aptosRPC: aptosRPC,
aptosAccount: aptosAccount,
aptosHandle: aptosHandle,
msgC: msgC,
obsvReqC: obsvReqC,
aptosRPC: aptosRPC,
aptosAccount: aptosAccount,
aptosHandle: aptosHandle,
msgC: msgC,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDAptos),
}
}
@ -223,7 +225,7 @@ func (e *Watcher) Run(ctx context.Context) error {
ContractAddress: e.aptosAccount,
})
readiness.SetReady(common.ReadinessAptosSyncing)
readiness.SetReady(e.readinessSync)
}
}
}

View File

@ -43,7 +43,7 @@ type (
obsvReqC <-chan *gossipv1.ObservationRequest
// Readiness component
readiness readiness.Component
readinessSync readiness.Component
// VAA ChainID of the network we're connecting to.
chainID vaa.ChainID
// Key for contract address in the wasm logs
@ -97,7 +97,6 @@ func NewWatcher(
contract string,
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
readiness readiness.Component,
chainID vaa.ChainID) *Watcher {
// CosmWasm 1.0.0
@ -123,7 +122,7 @@ func NewWatcher(
contract: contract,
msgC: msgC,
obsvReqC: obsvReqC,
readiness: readiness,
readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID),
chainID: chainID,
contractAddressFilterKey: contractAddressFilterKey,
contractAddressLogKey: contractAddressLogKey,
@ -180,7 +179,7 @@ func (e *Watcher) Run(ctx context.Context) error {
}
logger.Info("subscribed to new transaction events", zap.String("network", networkName))
readiness.SetReady(e.readiness)
readiness.SetReady(e.readinessSync)
common.RunWithScissors(ctx, errC, "cosmwasm_block_height", func(ctx context.Context) error {
t := time.NewTicker(5 * time.Second)
@ -220,6 +219,8 @@ func (e *Watcher) Run(ctx context.Context) error {
Height: latestBlock.Int(),
ContractAddress: e.contract,
})
readiness.SetReady(e.readinessSync)
}
}
})

View File

@ -72,7 +72,7 @@ type (
// Human-readable name of the Eth network, for logging and monitoring.
networkName string
// Readiness component
readiness readiness.Component
readinessSync readiness.Component
// VAA ChainID of the network we're connecting to.
chainID vaa.ChainID
@ -139,7 +139,6 @@ func NewEthWatcher(
url string,
contract eth_common.Address,
networkName string,
readiness readiness.Component,
chainID vaa.ChainID,
msgC chan<- *common.MessagePublication,
setC chan<- *common.GuardianSet,
@ -151,7 +150,7 @@ func NewEthWatcher(
url: url,
contract: contract,
networkName: networkName,
readiness: readiness,
readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID),
waitForConfirmations: false,
maxWaitConfirmations: 60,
chainID: chainID,
@ -642,7 +641,7 @@ func (w *Watcher) Run(ctx context.Context) error {
zap.Bool("is_safe_block", ev.Safe),
zap.String("eth_network", w.networkName))
currentEthHeight.WithLabelValues(w.networkName).Set(float64(ev.Number.Int64()))
readiness.SetReady(w.readiness)
readiness.SetReady(w.readinessSync)
p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
Height: ev.Number.Int64(),
ContractAddress: w.contract.Hex(),
@ -801,7 +800,7 @@ func (w *Watcher) Run(ctx context.Context) error {
// Now that the init is complete, peg readiness. That will also happen when we process a new head, but chains
// that wait for finality may take a while to receive the first block and we don't want to hold up the init.
readiness.SetReady(w.readiness)
readiness.SetReady(w.readinessSync)
select {
case <-ctx.Done():

View File

@ -67,8 +67,9 @@ type (
nearRPC string
// external channels
msgC chan<- *common.MessagePublication // validated (SECURITY: and only validated!) observations go into this channel
obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel
msgC chan<- *common.MessagePublication // validated (SECURITY: and only validated!) observations go into this channel
obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel
readinessSync readiness.Component
// internal queues
transactionProcessingQueueCounter atomic.Int64
@ -102,6 +103,7 @@ func NewWatcher(
nearRPC: nearRPC,
msgC: msgC,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDNear),
transactionProcessingQueue: make(chan *transactionProcessingJob),
chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize),
eventChanTxProcessedDuration: make(chan time.Duration, 10),
@ -149,7 +151,7 @@ func (e *Watcher) runBlockPoll(ctx context.Context) error {
Height: int64(highestFinalBlockHeightObserved),
ContractAddress: e.wormholeAccount,
})
readiness.SetReady(common.ReadinessNearSyncing)
readiness.SetReady(e.readinessSync)
timer.Reset(blockPollInterval)
}

View File

@ -42,7 +42,7 @@ type (
pumpData chan []byte
rpcClient *rpc.Client
// Readiness component
readiness readiness.Component
readinessSync readiness.Component
// VAA ChainID of the network we're connecting to.
chainID vaa.ChainID
// Human readable name of network
@ -181,20 +181,19 @@ func NewSolanaWatcher(
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
commitment rpc.CommitmentType,
readiness readiness.Component,
chainID vaa.ChainID) *SolanaWatcher {
return &SolanaWatcher{
rpcUrl: rpcUrl,
wsUrl: wsUrl,
contract: contractAddress,
rawContract: rawContract,
msgC: msgC,
obsvReqC: obsvReqC,
commitment: commitment,
rpcClient: rpc.New(rpcUrl),
readiness: readiness,
chainID: chainID,
networkName: vaa.ChainID(chainID).String(),
rpcUrl: rpcUrl,
wsUrl: wsUrl,
contract: contractAddress,
rawContract: rawContract,
msgC: msgC,
obsvReqC: obsvReqC,
commitment: commitment,
rpcClient: rpc.New(rpcUrl),
readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID),
chainID: chainID,
networkName: vaa.ChainID(chainID).String(),
}
}
@ -334,7 +333,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
lastSlot = slot - 1
}
currentSolanaHeight.WithLabelValues(s.networkName, string(s.commitment)).Set(float64(slot))
readiness.SetReady(s.readiness)
readiness.SetReady(s.readinessSync)
p2p.DefaultRegistry.SetNetworkStats(s.chainID, &gossipv1.Heartbeat_Network{
Height: int64(slot),
ContractAddress: contractAddr,

View File

@ -42,8 +42,9 @@ type (
unsafeDevMode bool
msgChan chan *common.MessagePublication
obsvReqC chan *gossipv1.ObservationRequest
msgChan chan *common.MessagePublication
obsvReqC chan *gossipv1.ObservationRequest
readinessSync readiness.Component
subId int64
subscribed bool
@ -136,6 +137,7 @@ func NewWatcher(
unsafeDevMode: unsafeDevMode,
msgChan: messageEvents,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDSui),
subId: 0,
subscribed: false,
}
@ -410,7 +412,7 @@ func (e *Watcher) Run(ctx context.Context) error {
})
if e.subscribed {
readiness.SetReady(common.ReadinessSuiSyncing)
readiness.SetReady(e.readinessSync)
}
}
}

View File

@ -39,6 +39,8 @@ type (
// Incoming re-observation requests from the network. Pre-filtered to only
// include requests for our chainID.
obsvReqC <-chan *gossipv1.ObservationRequest
readinessSync readiness.Component
}
)
@ -77,7 +79,13 @@ func NewWatcher(
urlLCD string,
msgC chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest) *Watcher {
return &Watcher{urlWS: urlWS, urlLCD: urlLCD, msgC: msgC, obsvReqC: obsvReqC}
return &Watcher{
urlWS: urlWS,
urlLCD: urlLCD,
msgC: msgC,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDWormchain),
}
}
func (e *Watcher) Run(ctx context.Context) error {
@ -121,7 +129,7 @@ func (e *Watcher) Run(ctx context.Context) error {
}
logger.Info("subscribed to new transaction events")
readiness.SetReady(common.ReadinessWormchainSyncing)
readiness.SetReady(e.readinessSync)
go func() {
t := time.NewTicker(5 * time.Second)
@ -154,6 +162,8 @@ func (e *Watcher) Run(ctx context.Context) error {
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDWormchain, &gossipv1.Heartbeat_Network{
Height: latestBlock.Int(),
})
readiness.SetReady(e.readinessSync)
}
}()