From 5323453b815b47c18f512a72e7e672d562c80757 Mon Sep 17 00:00:00 2001 From: tbjump <103955289+tbjump@users.noreply.github.com> Date: Fri, 20 Jan 2023 13:15:13 -0800 Subject: [PATCH] node: add channel read/write type constraints (#1931) --- node/cmd/guardiand/adminserver.go | 19 ++- node/cmd/guardiand/node.go | 152 ++++++++++++++--------- node/pkg/governor/governor_monitoring.go | 6 +- node/pkg/p2p/p2p.go | 14 +-- node/pkg/processor/broadcast.go | 4 +- node/pkg/processor/cleanup.go | 2 +- node/pkg/processor/processor.go | 31 +++-- node/pkg/watchers/algorand/watcher.go | 12 +- node/pkg/watchers/aptos/watcher.go | 12 +- node/pkg/watchers/cosmwasm/watcher.go | 14 +-- node/pkg/watchers/evm/watcher.go | 32 ++--- node/pkg/watchers/solana/client.go | 48 +++---- node/pkg/watchers/wormchain/watcher.go | 16 ++- sdk/vaa/structs.go | 32 +++++ 14 files changed, 231 insertions(+), 163 deletions(-) diff --git a/node/cmd/guardiand/adminserver.go b/node/cmd/guardiand/adminserver.go index 13b1392dc..acf2daa37 100644 --- a/node/cmd/guardiand/adminserver.go +++ b/node/cmd/guardiand/adminserver.go @@ -41,9 +41,9 @@ type nodePrivilegedService struct { nodev1.UnimplementedNodePrivilegedServiceServer db *db.Database injectC chan<- *vaa.VAA - obsvReqSendC chan *gossipv1.ObservationRequest + obsvReqSendC chan<- *gossipv1.ObservationRequest logger *zap.Logger - signedInC chan *gossipv1.SignedVAAWithQuorum + signedInC chan<- *gossipv1.SignedVAAWithQuorum governor *governor.ChainGovernor evmConnector connectors.Connector gsCache sync.Map @@ -397,8 +397,19 @@ func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *no }, nil } -func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, signedInC chan *gossipv1.SignedVAAWithQuorum, obsvReqSendC chan *gossipv1.ObservationRequest, - db *db.Database, gst *common.GuardianSetState, gov *governor.ChainGovernor, gk *ecdsa.PrivateKey, ethRpc *string, ethContract *string) (supervisor.Runnable, error) { +func adminServiceRunnable( + logger *zap.Logger, + socketPath string, + injectC chan<- *vaa.VAA, + signedInC chan<- *gossipv1.SignedVAAWithQuorum, + obsvReqSendC chan<- *gossipv1.ObservationRequest, + db *db.Database, + gst *common.GuardianSetState, + gov *governor.ChainGovernor, + gk *ecdsa.PrivateKey, + ethRpc *string, + ethContract *string, +) (supervisor.Runnable, error) { // Delete existing UNIX socket, if present. fi, err := os.Stat(socketPath) if err == nil { diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index de72e350d..7b682be9b 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -826,29 +826,30 @@ func runNode(cmd *cobra.Command, args []string) { rootCtxCancel() }() - // Ethereum lock event channel - lockC := make(chan *common.MessagePublication) - - // Ethereum incoming guardian set updates - setC := make(chan *common.GuardianSet) - - // Outbound gossip message queue - sendC := make(chan []byte) + // Setup various channels... + // Outbound gossip message queue (needs to be read/write because p2p needs read/write) + gossipSendC := make(chan []byte) // Inbound observations obsvC := make(chan *gossipv1.SignedObservation, 50) + // Finalized guardian observations aggregated across all chains + msgReadC, msgWriteC := makeChannelPair[*common.MessagePublication](0) + + // Ethereum incoming guardian set updates + setReadC, setWriteC := makeChannelPair[*common.GuardianSet](0) + // Inbound signed VAAs - signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50) + signedInReadC, signedInWriteC := makeChannelPair[*gossipv1.SignedVAAWithQuorum](50) // Inbound observation requests from the p2p service (for all chains) - obsvReqC := make(chan *gossipv1.ObservationRequest, common.ObsvReqChannelSize) + obsvReqReadC, obsvReqWriteC := makeChannelPair[*gossipv1.ObservationRequest](common.ObsvReqChannelSize) // Outbound observation requests - obsvReqSendC := make(chan *gossipv1.ObservationRequest, common.ObsvReqChannelSize) + obsvReqSendReadC, obsvReqSendWriteC := makeChannelPair[*gossipv1.ObservationRequest](common.ObsvReqChannelSize) // Injected VAAs (manually generated rather than created via observation) - injectC := make(chan *vaa.VAA) + injectReadC, injectWriteC := makeChannelPair[*vaa.VAA](0) // Guardian set state managed by processor gst := common.NewGuardianSetState(nil) @@ -856,6 +857,35 @@ func runNode(cmd *cobra.Command, args []string) { // Per-chain observation requests chainObsvReqC := make(map[vaa.ChainID]chan *gossipv1.ObservationRequest) + // Per-chain msgC + chainMsgC := make(map[vaa.ChainID]chan *common.MessagePublication) + // aggregate per-chain msgC into msgC. + // SECURITY defense-in-depth: This way we enforce that a watcher must set the msg.EmitterChain to its chainId, which makes the code easier to audit + for _, chainId := range vaa.GetAllNetworkIDs() { + chainMsgC[chainId] = make(chan *common.MessagePublication) + go func(c <-chan *common.MessagePublication, chainId vaa.ChainID) { + for { + select { + case <-rootCtx.Done(): + return + case msg := <-c: + if msg.EmitterChain == chainId { + msgWriteC <- msg + } else { + // SECURITY: This should never happen. If it does, a watcher has been compromised. + logger.Fatal("SECURITY CRITICAL: Received observation from a chain that was not marked as originating from that chain", + zap.Stringer("tx", msg.TxHash), + zap.Stringer("emitter_address", msg.EmitterAddress), + zap.Uint64("sequence", msg.Sequence), + zap.Stringer("msgChainId", msg.EmitterChain), + zap.Stringer("watcherChainId", chainId), + ) + } + } + } + }(chainMsgC[chainId], chainId) + } + var notifier *discord.DiscordNotifier if *discordToken != "" { notifier, err = discord.NewDiscordNotifier(*discordToken, *discordChannel, logger) @@ -961,12 +991,7 @@ func runNode(cmd *cobra.Command, args []string) { // will be passed to it for processing. It will forward all token bridge transfers to the accountant contract. // If accountantCheckEnabled is set to true, token bridge transfers will not be signed and published until they // are approved by the accountant smart contract. - - // TODO: Use this once PR #1931 is merged. - //acctReadC, acctWriteC := makeChannelPair[*common.MessagePublication](0) - acctChan := make(chan *common.MessagePublication) - var acctReadC <-chan *common.MessagePublication = acctChan - var acctWriteC chan<- *common.MessagePublication = acctChan + acctReadC, acctWriteC := makeChannelPair[*common.MessagePublication](0) var acct *accountant.Accountant if *accountantContract != "" { @@ -1021,16 +1046,10 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("chain governor is disabled") } - // local admin service socket - adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, signedInC, obsvReqSendC, db, gst, gov, gk, ethRPC, ethContract) - if err != nil { - logger.Fatal("failed to create admin service socket", zap.Error(err)) - } - // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { if err := supervisor.Run(ctx, "p2p", p2p.Run( - obsvC, obsvReqC, obsvReqSendC, sendC, signedInC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel, acct, gov, nil, nil)); err != nil { + (chan<- *gossipv1.SignedObservation)(obsvC), obsvReqWriteC, obsvReqSendReadC, gossipSendC, signedInWriteC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel, acct, gov, nil, nil)); err != nil { return err } @@ -1046,7 +1065,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Ethereum watcher") readiness.RegisterComponent(common.ReadinessEthSyncing) chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode) + ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, chainMsgC[vaa.ChainIDEthereum], setWriteC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode) if err := supervisor.Run(ctx, "ethwatch", common.WrapWithScissors(ethWatcher.Run, "ethwatch")); err != nil { return err @@ -1057,7 +1076,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting BSC watcher") readiness.RegisterComponent(common.ReadinessBSCSyncing) chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode) + bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, chainMsgC[vaa.ChainIDBSC], nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode) bscWatcher.SetWaitForConfirmations(true) if err := supervisor.Run(ctx, "bscwatch", common.WrapWithScissors(bscWatcher.Run, "bscwatch")); err != nil { return err @@ -1073,7 +1092,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Polygon watcher") readiness.RegisterComponent(common.ReadinessPolygonSyncing) chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode) + polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, chainMsgC[vaa.ChainIDPolygon], nil, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode) polygonWatcher.SetWaitForConfirmations(waitForConfirmations) if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil { return err @@ -1087,7 +1106,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAvalancheSyncing) chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "avalanchewatch", - common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, chainMsgC[vaa.ChainIDAvalanche], nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil { return err } } @@ -1095,7 +1114,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Oasis watcher") chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "oasiswatch", - common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, chainMsgC[vaa.ChainIDOasis], nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil { return err } } @@ -1104,7 +1123,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAuroraSyncing) chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aurorawatch", - common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, chainMsgC[vaa.ChainIDAurora], nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil { return err } } @@ -1113,7 +1132,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessFantomSyncing) chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "fantomwatch", - common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, chainMsgC[vaa.ChainIDFantom], nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil { return err } } @@ -1122,7 +1141,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKaruraSyncing) chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "karurawatch", - common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, chainMsgC[vaa.ChainIDKarura], nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil { return err } } @@ -1131,7 +1150,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAcalaSyncing) chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "acalawatch", - common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, chainMsgC[vaa.ChainIDAcala], nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil { return err } } @@ -1140,7 +1159,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKlaytnSyncing) chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "klaytnwatch", - common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, chainMsgC[vaa.ChainIDKlaytn], nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil { return err } } @@ -1149,7 +1168,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessCeloSyncing) chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "celowatch", - common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, chainMsgC[vaa.ChainIDCelo], nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil { return err } } @@ -1158,7 +1177,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessMoonbeamSyncing) chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "moonbeamwatch", - common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, chainMsgC[vaa.ChainIDMoonbeam], nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil { return err } } @@ -1169,7 +1188,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Arbitrum watcher") readiness.RegisterComponent(common.ReadinessArbitrumSyncing) chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode) + arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, chainMsgC[vaa.ChainIDArbitrum], nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode) arbitrumWatcher.SetL1Finalizer(ethWatcher) if err := supervisor.Run(ctx, "arbitrumwatch", common.WrapWithScissors(arbitrumWatcher.Run, "arbitrumwatch")); err != nil { return err @@ -1187,7 +1206,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Optimism watcher") readiness.RegisterComponent(common.ReadinessOptimismSyncing) chainObsvReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, lockC, nil, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode) + optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, chainMsgC[vaa.ChainIDOptimism], nil, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode) optimismWatcher.SetL1Finalizer(ethWatcher) if err := optimismWatcher.SetRootChainParams(*optimismCtcRpc, *optimismCtcContractAddress); err != nil { return err @@ -1202,7 +1221,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessTerraSyncing) chainObsvReqC[vaa.ChainIDTerra] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "terrawatch", - common.WrapWithScissors(cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, lockC, chainObsvReqC[vaa.ChainIDTerra], common.ReadinessTerraSyncing, vaa.ChainIDTerra).Run, "terrawatch")); err != nil { + common.WrapWithScissors(cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, chainMsgC[vaa.ChainIDTerra], chainObsvReqC[vaa.ChainIDTerra], common.ReadinessTerraSyncing, vaa.ChainIDTerra).Run, "terrawatch")); err != nil { return err } } @@ -1212,7 +1231,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessTerra2Syncing) chainObsvReqC[vaa.ChainIDTerra2] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "terra2watch", - common.WrapWithScissors(cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, lockC, chainObsvReqC[vaa.ChainIDTerra2], common.ReadinessTerra2Syncing, vaa.ChainIDTerra2).Run, "terra2watch")); err != nil { + common.WrapWithScissors(cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, chainMsgC[vaa.ChainIDTerra2], chainObsvReqC[vaa.ChainIDTerra2], common.ReadinessTerra2Syncing, vaa.ChainIDTerra2).Run, "terra2watch")); err != nil { return err } } @@ -1222,7 +1241,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessXplaSyncing) chainObsvReqC[vaa.ChainIDXpla] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "xplawatch", - common.WrapWithScissors(cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, lockC, chainObsvReqC[vaa.ChainIDXpla], common.ReadinessXplaSyncing, vaa.ChainIDXpla).Run, "xplawatch")); err != nil { + common.WrapWithScissors(cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, chainMsgC[vaa.ChainIDXpla], chainObsvReqC[vaa.ChainIDXpla], common.ReadinessXplaSyncing, vaa.ChainIDXpla).Run, "xplawatch")); err != nil { return err } } @@ -1232,7 +1251,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAlgorandSyncing) chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "algorandwatch", - common.WrapWithScissors(algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, lockC, chainObsvReqC[vaa.ChainIDAlgorand]).Run, "algorandwatch")); err != nil { + common.WrapWithScissors(algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, chainMsgC[vaa.ChainIDAlgorand], chainObsvReqC[vaa.ChainIDAlgorand]).Run, "algorandwatch")); err != nil { return err } } @@ -1241,7 +1260,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessNearSyncing) chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "nearwatch", - common.WrapWithScissors(near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run, "nearwatch")); err != nil { + common.WrapWithScissors(near.NewWatcher(*nearRPC, *nearContract, chainMsgC[vaa.ChainIDNear], chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run, "nearwatch")); err != nil { return err } } @@ -1252,7 +1271,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessWormchainSyncing) chainObsvReqC[vaa.ChainIDWormchain] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "wormchainwatch", - wormchain.NewWatcher(*wormchainWS, *wormchainLCD, lockC, setC, chainObsvReqC[vaa.ChainIDWormchain]).Run); err != nil { + wormchain.NewWatcher(*wormchainWS, *wormchainLCD, chainMsgC[vaa.ChainIDWormchain], chainObsvReqC[vaa.ChainIDWormchain]).Run); err != nil { return err } } @@ -1261,7 +1280,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAptosSyncing) chainObsvReqC[vaa.ChainIDAptos] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aptoswatch", - aptos.NewWatcher(*aptosRPC, *aptosAccount, *aptosHandle, lockC, chainObsvReqC[vaa.ChainIDAptos]).Run); err != nil { + aptos.NewWatcher(*aptosRPC, *aptosAccount, *aptosHandle, chainMsgC[vaa.ChainIDAptos], chainObsvReqC[vaa.ChainIDAptos]).Run); err != nil { return err } } @@ -1271,7 +1290,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessSuiSyncing) chainObsvReqC[vaa.ChainIDSui] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "suiwatch", - sui.NewWatcher(*suiRPC, *suiWS, *suiAccount, *suiPackage, *unsafeDevMode, lockC, chainObsvReqC[vaa.ChainIDSui]).Run); err != nil { + sui.NewWatcher(*suiRPC, *suiWS, *suiAccount, *suiPackage, *unsafeDevMode, chainMsgC[vaa.ChainIDSui], chainObsvReqC[vaa.ChainIDSui]).Run); err != nil { return err } } @@ -1282,10 +1301,10 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessSolanaSyncing) chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "solwatch-confirmed", - common.WrapWithScissors(solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run, "solwatch-confirmed")); err != nil { + common.WrapWithScissors(solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run, "solwatch-confirmed")); err != nil { return err } - solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana) + solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, nil, solAddress, *solanaContract, chainMsgC[vaa.ChainIDSolana], chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana) if err := supervisor.Run(ctx, "solwatch-finalized", common.WrapWithScissors(solanaFinalizedWatcher.Run, "solwatch-finalized")); err != nil { return err } @@ -1296,7 +1315,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessPythNetSyncing) chainObsvReqC[vaa.ChainIDPythNet] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "pythwatch-confirmed", - common.WrapWithScissors(solana.NewSolanaWatcher(*pythnetRPC, pythnetWS, pythnetAddress, *pythnetContract, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run, "pythwatch-confirmed")); err != nil { + common.WrapWithScissors(solana.NewSolanaWatcher(*pythnetRPC, pythnetWS, pythnetAddress, *pythnetContract, chainMsgC[vaa.ChainIDPythNet], nil, rpc.CommitmentConfirmed, common.ReadinessPythNetSyncing, vaa.ChainIDPythNet).Run, "pythwatch-confirmed")); err != nil { return err } } @@ -1306,7 +1325,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessInjectiveSyncing) chainObsvReqC[vaa.ChainIDInjective] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "injectivewatch", - common.WrapWithScissors(cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, lockC, chainObsvReqC[vaa.ChainIDInjective], common.ReadinessInjectiveSyncing, vaa.ChainIDInjective).Run, "injectivewatch")); err != nil { + common.WrapWithScissors(cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, chainMsgC[vaa.ChainIDInjective], chainObsvReqC[vaa.ChainIDInjective], common.ReadinessInjectiveSyncing, vaa.ChainIDInjective).Run, "injectivewatch")); err != nil { return err } } @@ -1319,14 +1338,14 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Neon watcher") readiness.RegisterComponent(common.ReadinessNeonSyncing) chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode) + neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, chainMsgC[vaa.ChainIDNeon], nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode) neonWatcher.SetL1Finalizer(solanaFinalizedWatcher) if err := supervisor.Run(ctx, "neonwatch", common.WrapWithScissors(neonWatcher.Run, "neonwatch")); err != nil { return err } } } - go handleReobservationRequests(rootCtx, clock.New(), logger, obsvReqC, chainObsvReqC) + go handleReobservationRequests(rootCtx, clock.New(), logger, obsvReqReadC, chainObsvReqC) if acct != nil { if err := acct.Start(ctx); err != nil { @@ -1341,15 +1360,15 @@ func runNode(cmd *cobra.Command, args []string) { } } - p := processor.NewProcessor(ctx, + if err := supervisor.Run(ctx, "processor", processor.NewProcessor(ctx, db, - lockC, - setC, - sendC, + msgReadC, + setReadC, + gossipSendC, obsvC, - obsvReqSendC, - injectC, - signedInC, + obsvReqSendWriteC, + injectReadC, + signedInReadC, gk, gst, *unsafeDevMode, @@ -1361,11 +1380,15 @@ func runNode(cmd *cobra.Command, args []string) { gov, acct, acctReadC, - ) - if err := supervisor.Run(ctx, "processor", p.Run); err != nil { + ).Run); err != nil { return err } + adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectWriteC, signedInWriteC, obsvReqSendWriteC, db, gst, gov, gk, ethRPC, ethContract) + if err != nil { + logger.Fatal("failed to create admin service socket", zap.Error(err)) + } + if err := supervisor.Run(ctx, "admin", adminService); err != nil { return err } @@ -1463,3 +1486,8 @@ func unsafeDevModeEvmContractAddress(contractAddr string) string { return devnet.GanacheWormholeContractAddress.Hex() } + +func makeChannelPair[T any](cap int) (<-chan T, chan<- T) { + out := make(chan T, cap) + return out, out +} diff --git a/node/pkg/governor/governor_monitoring.go b/node/pkg/governor/governor_monitoring.go index acf774abb..cfa90290d 100644 --- a/node/pkg/governor/governor_monitoring.go +++ b/node/pkg/governor/governor_monitoring.go @@ -396,7 +396,7 @@ var ( }) ) -func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { +func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan<- []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { gov.mutex.Lock() defer gov.mutex.Unlock() @@ -464,7 +464,7 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan []by var governorMessagePrefixConfig = []byte("governor_config_000000000000000000|") var governorMessagePrefixStatus = []byte("governor_status_000000000000000000|") -func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { +func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { chains := make([]*gossipv1.ChainGovernorConfig_Chain, 0) for _, ce := range gov.chains { chains = append(chains, &gossipv1.ChainGovernorConfig_Chain{ @@ -521,7 +521,7 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan []byt sendC <- b } -func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { +func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) { chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0) numEnqueued := 0 for _, ce := range gov.chains { diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index bd7b3e689..9e316cdd5 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -69,11 +69,11 @@ func signedObservationRequestDigest(b []byte) common.Hash { } func Run( - obsvC chan *gossipv1.SignedObservation, - obsvReqC chan *gossipv1.ObservationRequest, - obsvReqSendC chan *gossipv1.ObservationRequest, - sendC chan []byte, - signedInC chan *gossipv1.SignedVAAWithQuorum, + obsvC chan<- *gossipv1.SignedObservation, + obsvReqC chan<- *gossipv1.ObservationRequest, + obsvReqSendC <-chan *gossipv1.ObservationRequest, + gossipSendC chan []byte, + signedInC chan<- *gossipv1.SignedVAAWithQuorum, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *node_common.GuardianSetState, @@ -252,7 +252,7 @@ func Run( collectNodeMetrics(ourAddr, h.ID(), heartbeat) if gov != nil { - gov.CollectMetrics(heartbeat, sendC, gk, ourAddr) + gov.CollectMetrics(heartbeat, gossipSendC, gk, ourAddr) } b, err := proto.Marshal(heartbeat) @@ -297,7 +297,7 @@ func Run( select { case <-ctx.Done(): return - case msg := <-sendC: + case msg := <-gossipSendC: err := th.Publish(ctx, msg) p2pMessagesSent.Inc() if err != nil { diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index 058ee18b8..c71387c0e 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -44,7 +44,7 @@ func (p *Processor) broadcastSignature( panic(err) } - p.sendC <- msg + p.gossipSendC <- msg // Store our VAA in case we're going to submit it to Solana hash := hex.EncodeToString(digest.Bytes()) @@ -84,5 +84,5 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { panic(err) } - p.sendC <- msg + p.gossipSendC <- msg } diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index e9de1ae53..3b0a211ec 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -202,7 +202,7 @@ func (p *Processor) handleCleanup(ctx context.Context) { if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { p.logger.Warn("failed to broadcast re-observation request", zap.Error(err)) } - p.sendC <- s.ourMsg + p.gossipSendC <- s.ourMsg s.retryCount += 1 s.lastRetry = time.Now() aggregationStateRetries.Inc() diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index b2e00a4f1..01b192861 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -81,13 +81,12 @@ type PythNetVaaEntry struct { } type Processor struct { - // lockC is a channel of observed emitted messages - lockC chan *common.MessagePublication + // msgC is a channel of observed emitted messages + msgC <-chan *common.MessagePublication // setC is a channel of guardian set updates - setC chan *common.GuardianSet - - // sendC is a channel of outbound messages to broadcast on p2p - sendC chan []byte + setC <-chan *common.GuardianSet + // gossipSendC is a channel of outbound messages to broadcast on p2p + gossipSendC chan<- []byte // obsvC is a channel of inbound decoded observations from p2p obsvC chan *gossipv1.SignedObservation @@ -95,10 +94,10 @@ type Processor struct { obsvReqSendC chan<- *gossipv1.ObservationRequest // signedInC is a channel of inbound signed VAA observations from p2p - signedInC chan *gossipv1.SignedVAAWithQuorum + signedInC <-chan *gossipv1.SignedVAAWithQuorum // injectC is a channel of VAAs injected locally. - injectC chan *vaa.VAA + injectC <-chan *vaa.VAA // gk is the node's guardian private key gk *ecdsa.PrivateKey @@ -141,13 +140,13 @@ type Processor struct { func NewProcessor( ctx context.Context, db *db.Database, - lockC chan *common.MessagePublication, - setC chan *common.GuardianSet, - sendC chan []byte, + msgC <-chan *common.MessagePublication, + setC <-chan *common.GuardianSet, + gossipSendC chan<- []byte, obsvC chan *gossipv1.SignedObservation, obsvReqSendC chan<- *gossipv1.ObservationRequest, - injectC chan *vaa.VAA, - signedInC chan *gossipv1.SignedVAAWithQuorum, + injectC <-chan *vaa.VAA, + signedInC <-chan *gossipv1.SignedVAAWithQuorum, gk *ecdsa.PrivateKey, gst *common.GuardianSetState, devnetMode bool, @@ -162,9 +161,9 @@ func NewProcessor( ) *Processor { return &Processor{ - lockC: lockC, + msgC: msgC, setC: setC, - sendC: sendC, + gossipSendC: gossipSendC, obsvC: obsvC, obsvReqSendC: obsvReqSendC, signedInC: signedInC, @@ -210,7 +209,7 @@ func (p *Processor) Run(ctx context.Context) error { zap.Strings("set", p.gs.KeysAsHexStrings()), zap.Uint32("index", p.gs.Index)) p.gst.Set(p.gs) - case k := <-p.lockC: + case k := <-p.msgC: if p.governor != nil { if !p.governor.ProcessMsg(k) { continue diff --git a/node/pkg/watchers/algorand/watcher.go b/node/pkg/watchers/algorand/watcher.go index ff4d11f92..31fdcfe00 100644 --- a/node/pkg/watchers/algorand/watcher.go +++ b/node/pkg/watchers/algorand/watcher.go @@ -33,8 +33,8 @@ type ( algodToken string appid uint64 - msgChan chan *common.MessagePublication - obsvReqC chan *gossipv1.ObservationRequest + msgC chan<- *common.MessagePublication + obsvReqC <-chan *gossipv1.ObservationRequest next_round uint64 } @@ -60,8 +60,8 @@ func NewWatcher( algodRPC string, algodToken string, appid uint64, - lockEvents chan *common.MessagePublication, - obsvReqC chan *gossipv1.ObservationRequest, + msgC chan<- *common.MessagePublication, + obsvReqC <-chan *gossipv1.ObservationRequest, ) *Watcher { return &Watcher{ indexerRPC: indexerRPC, @@ -69,7 +69,7 @@ func NewWatcher( algodRPC: algodRPC, algodToken: algodToken, appid: appid, - msgChan: lockEvents, + msgC: msgC, obsvReqC: obsvReqC, next_round: 0, } @@ -137,7 +137,7 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap. zap.Uint8("consistency_level", observation.ConsistencyLevel), ) - e.msgChan <- observation + e.msgC <- observation } } diff --git a/node/pkg/watchers/aptos/watcher.go b/node/pkg/watchers/aptos/watcher.go index 779c5a1ff..79c7e17e3 100644 --- a/node/pkg/watchers/aptos/watcher.go +++ b/node/pkg/watchers/aptos/watcher.go @@ -29,8 +29,8 @@ type ( aptosAccount string aptosHandle string - msgChan chan *common.MessagePublication - obsvReqC chan *gossipv1.ObservationRequest + msgC chan<- *common.MessagePublication + obsvReqC <-chan *gossipv1.ObservationRequest } ) @@ -52,14 +52,14 @@ func NewWatcher( aptosRPC string, aptosAccount string, aptosHandle string, - messageEvents chan *common.MessagePublication, - obsvReqC chan *gossipv1.ObservationRequest, + msgC chan<- *common.MessagePublication, + obsvReqC <-chan *gossipv1.ObservationRequest, ) *Watcher { return &Watcher{ aptosRPC: aptosRPC, aptosAccount: aptosAccount, aptosHandle: aptosHandle, - msgChan: messageEvents, + msgC: msgC, obsvReqC: obsvReqC, } } @@ -317,5 +317,5 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq u zap.Uint8("consistencyLevel", observation.ConsistencyLevel), ) - e.msgChan <- observation + e.msgC <- observation } diff --git a/node/pkg/watchers/cosmwasm/watcher.go b/node/pkg/watchers/cosmwasm/watcher.go index 18faae25d..49fd1f6a8 100644 --- a/node/pkg/watchers/cosmwasm/watcher.go +++ b/node/pkg/watchers/cosmwasm/watcher.go @@ -36,11 +36,11 @@ type ( urlLCD string contract string - msgChan chan *common.MessagePublication + msgC chan<- *common.MessagePublication // Incoming re-observation requests from the network. Pre-filtered to only // include requests for our chainID. - obsvReqC chan *gossipv1.ObservationRequest + obsvReqC <-chan *gossipv1.ObservationRequest // Readiness component readiness readiness.Component @@ -95,8 +95,8 @@ func NewWatcher( urlWS string, urlLCD string, contract string, - lockEvents chan *common.MessagePublication, - obsvReqC chan *gossipv1.ObservationRequest, + msgC chan<- *common.MessagePublication, + obsvReqC <-chan *gossipv1.ObservationRequest, readiness readiness.Component, chainID vaa.ChainID) *Watcher { @@ -121,7 +121,7 @@ func NewWatcher( urlWS: urlWS, urlLCD: urlLCD, contract: contract, - msgChan: lockEvents, + msgC: msgC, obsvReqC: obsvReqC, readiness: readiness, chainID: chainID, @@ -273,7 +273,7 @@ func (e *Watcher) Run(ctx context.Context) error { msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, e.contractAddressLogKey) for _, msg := range msgs { - e.msgChan <- msg + e.msgC <- msg messagesConfirmed.WithLabelValues(networkName).Inc() } } @@ -313,7 +313,7 @@ func (e *Watcher) Run(ctx context.Context) error { msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, e.contractAddressLogKey) for _, msg := range msgs { - e.msgChan <- msg + e.msgC <- msg messagesConfirmed.WithLabelValues(networkName).Inc() } diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 9873af26b..77d8179cc 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -77,10 +77,10 @@ type ( chainID vaa.ChainID // Channel to send new messages to. - msgChan chan *common.MessagePublication + msgC chan<- *common.MessagePublication // Channel to send guardian set changes to. - // setChan can be set to nil if no guardian set changes are needed. + // setC can be set to nil if no guardian set changes are needed. // // We currently only fetch the guardian set from one primary chain, which should // have this flag set to true, and false on all others. @@ -88,11 +88,11 @@ type ( // The current primary chain is Ethereum (a mostly arbitrary decision because it // has the best API - we might want to switch the primary chain to Solana once // the governance mechanism lives there), - setChan chan *common.GuardianSet + setC chan<- *common.GuardianSet // Incoming re-observation requests from the network. Pre-filtered to only // include requests for our chainID. - obsvReqC chan *gossipv1.ObservationRequest + obsvReqC <-chan *gossipv1.ObservationRequest pending map[pendingKey]*pendingMessage pendingMu sync.Mutex @@ -141,9 +141,9 @@ func NewEthWatcher( networkName string, readiness readiness.Component, chainID vaa.ChainID, - messageEvents chan *common.MessagePublication, - setEvents chan *common.GuardianSet, - obsvReqC chan *gossipv1.ObservationRequest, + msgC chan<- *common.MessagePublication, + setC chan<- *common.GuardianSet, + obsvReqC <-chan *gossipv1.ObservationRequest, unsafeDevMode bool, ) *Watcher { @@ -155,8 +155,8 @@ func NewEthWatcher( waitForConfirmations: false, maxWaitConfirmations: 60, chainID: chainID, - msgChan: messageEvents, - setChan: setEvents, + msgC: msgC, + setC: setC, obsvReqC: obsvReqC, pending: map[pendingKey]*pendingMessage{}, unsafeDevMode: unsafeDevMode, @@ -412,7 +412,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Uint64("observed_block", blockNumber), zap.String("eth_network", w.networkName), ) - w.msgChan <- msg + w.msgC <- msg continue } @@ -432,7 +432,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Uint64("observed_block", blockNumber), zap.String("eth_network", w.networkName), ) - w.msgChan <- msg + w.msgC <- msg } else { logger.Info("ignoring re-observed message publication transaction", zap.Stringer("tx", msg.TxHash), @@ -477,7 +477,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Uint64("observed_block", blockNumber), zap.String("eth_network", w.networkName), ) - w.msgChan <- msg + w.msgC <- msg } else { logger.Info("ignoring re-observed message publication transaction", zap.Stringer("tx", msg.TxHash), @@ -543,7 +543,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), zap.String("eth_network", w.networkName)) - w.msgChan <- message + w.msgC <- message ethMessagesConfirmed.WithLabelValues(w.networkName).Inc() continue } @@ -753,7 +753,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName)) delete(w.pending, key) - w.msgChan <- pLock.message + w.msgC <- pLock.message ethMessagesConfirmed.WithLabelValues(w.networkName).Inc() } } @@ -809,8 +809,8 @@ func (w *Watcher) fetchAndUpdateGuardianSet( w.currentGuardianSet = &idx - if w.setChan != nil { - w.setChan <- &common.GuardianSet{ + if w.setC != nil { + w.setC <- &common.GuardianSet{ Keys: gs.Keys, Index: idx, } diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index bbf71341c..32305cf81 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -32,16 +32,16 @@ import ( type ( SolanaWatcher struct { - contract solana.PublicKey - rawContract string - rpcUrl string - wsUrl *string - commitment rpc.CommitmentType - messageEvent chan *common.MessagePublication - obsvReqC chan *gossipv1.ObservationRequest - errC chan error - pumpData chan []byte - rpcClient *rpc.Client + contract solana.PublicKey + rawContract string + rpcUrl string + wsUrl *string + commitment rpc.CommitmentType + msgC chan<- *common.MessagePublication + obsvReqC <-chan *gossipv1.ObservationRequest + errC chan error + pumpData chan []byte + rpcClient *rpc.Client // Readiness component readiness readiness.Component // VAA ChainID of the network we're connecting to. @@ -179,23 +179,23 @@ func NewSolanaWatcher( wsUrl *string, contractAddress solana.PublicKey, rawContract string, - messageEvents chan *common.MessagePublication, - obsvReqC chan *gossipv1.ObservationRequest, + msgC chan<- *common.MessagePublication, + obsvReqC <-chan *gossipv1.ObservationRequest, commitment rpc.CommitmentType, readiness readiness.Component, chainID vaa.ChainID) *SolanaWatcher { return &SolanaWatcher{ - rpcUrl: rpcUrl, - wsUrl: wsUrl, - contract: contractAddress, - rawContract: rawContract, - messageEvent: messageEvents, - obsvReqC: obsvReqC, - commitment: commitment, - rpcClient: rpc.New(rpcUrl), - readiness: readiness, - chainID: chainID, - networkName: vaa.ChainID(chainID).String(), + rpcUrl: rpcUrl, + wsUrl: wsUrl, + contract: contractAddress, + rawContract: rawContract, + msgC: msgC, + obsvReqC: obsvReqC, + commitment: commitment, + rpcClient: rpc.New(rpcUrl), + readiness: readiness, + chainID: chainID, + networkName: vaa.ChainID(chainID).String(), } } @@ -830,7 +830,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a zap.Uint8("consistency_level", observation.ConsistencyLevel), ) - s.messageEvent <- observation + s.msgC <- observation } // updateLatestBlock() updates the latest block number if the slot passed in is greater than the previous value. diff --git a/node/pkg/watchers/wormchain/watcher.go b/node/pkg/watchers/wormchain/watcher.go index 5b992ebf3..697cf2ebb 100644 --- a/node/pkg/watchers/wormchain/watcher.go +++ b/node/pkg/watchers/wormchain/watcher.go @@ -34,12 +34,11 @@ type ( urlWS string urlLCD string - msgChan chan *common.MessagePublication - setChan chan *common.GuardianSet + msgC chan<- *common.MessagePublication // Incoming re-observation requests from the network. Pre-filtered to only // include requests for our chainID. - obsvReqC chan *gossipv1.ObservationRequest + obsvReqC <-chan *gossipv1.ObservationRequest } ) @@ -76,10 +75,9 @@ type clientRequest struct { func NewWatcher( urlWS string, urlLCD string, - lockEvents chan *common.MessagePublication, - setEvents chan *common.GuardianSet, - obsvReqC chan *gossipv1.ObservationRequest) *Watcher { - return &Watcher{urlWS: urlWS, urlLCD: urlLCD, msgChan: lockEvents, setChan: setEvents, obsvReqC: obsvReqC} + msgC chan<- *common.MessagePublication, + obsvReqC <-chan *gossipv1.ObservationRequest) *Watcher { + return &Watcher{urlWS: urlWS, urlLCD: urlLCD, msgC: msgC, obsvReqC: obsvReqC} } func (e *Watcher) Run(ctx context.Context) error { @@ -210,7 +208,7 @@ func (e *Watcher) Run(ctx context.Context) error { msgs := EventsToMessagePublications(txHash, events.Array(), logger) for _, msg := range msgs { - e.msgChan <- msg + e.msgC <- msg wormchainMessagesConfirmed.Inc() } } @@ -248,7 +246,7 @@ func (e *Watcher) Run(ctx context.Context) error { msgs := EventsToMessagePublications(txHash, events.Array(), logger) for _, msg := range msgs { - e.msgChan <- msg + e.msgC <- msg wormchainMessagesConfirmed.Inc() } } diff --git a/sdk/vaa/structs.go b/sdk/vaa/structs.go index 012e6c8d4..b755ca615 100644 --- a/sdk/vaa/structs.go +++ b/sdk/vaa/structs.go @@ -276,6 +276,38 @@ func ChainIDFromString(s string) (ChainID, error) { } } +func GetAllNetworkIDs() []ChainID { + return []ChainID{ + ChainIDSolana, + ChainIDEthereum, + ChainIDTerra, + ChainIDBSC, + ChainIDPolygon, + ChainIDAvalanche, + ChainIDOasis, + ChainIDAlgorand, + ChainIDAurora, + ChainIDFantom, + ChainIDKarura, + ChainIDAcala, + ChainIDKlaytn, + ChainIDCelo, + ChainIDNear, + ChainIDMoonbeam, + ChainIDNeon, + ChainIDTerra2, + ChainIDInjective, + ChainIDSui, + ChainIDAptos, + ChainIDArbitrum, + ChainIDOptimism, + ChainIDPythNet, + ChainIDXpla, + ChainIDBtc, + ChainIDWormchain, + } +} + const ( ChainIDUnset ChainID = 0 // ChainIDSolana is the ChainID of Solana