diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 4592d4417..6010e3bcf 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -917,7 +917,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Ethereum watcher") readiness.RegisterComponent(common.ReadinessEthSyncing) chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode, nil) + ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode) if err := supervisor.Run(ctx, "ethwatch", ethWatcher.Run); err != nil { return err @@ -929,7 +929,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessBSCSyncing) chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "bscwatch", - evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil { return err } } @@ -948,7 +948,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Polygon watcher") readiness.RegisterComponent(common.ReadinessPolygonSyncing) chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode, nil) + polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode) if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil { return err } @@ -961,7 +961,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAvalancheSyncing) chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "avalanchewatch", - evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { return err } } @@ -969,7 +969,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Oasis watcher") chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "oasiswatch", - evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { return err } } @@ -978,7 +978,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAuroraSyncing) chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aurorawatch", - evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { return err } } @@ -987,7 +987,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessFantomSyncing) chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "fantomwatch", - evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { return err } } @@ -996,7 +996,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKaruraSyncing) chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "karurawatch", - evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { return err } } @@ -1005,7 +1005,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAcalaSyncing) chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "acalawatch", - evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { return err } } @@ -1014,7 +1014,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKlaytnSyncing) chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "klaytnwatch", - evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { return err } } @@ -1023,7 +1023,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessCeloSyncing) chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "celowatch", - evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { return err } } @@ -1032,7 +1032,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessMoonbeamSyncing) chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "moonbeamwatch", - evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode, nil).Run); err != nil { + evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { return err } } @@ -1043,8 +1043,9 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Arbitrum watcher") readiness.RegisterComponent(common.ReadinessArbitrumSyncing) chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - if err := supervisor.Run(ctx, "arbitrumwatch", - evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode, ethWatcher).Run); err != nil { + arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode) + arbitrumWatcher.SetL1Finalizer(ethWatcher) + if err := supervisor.Run(ctx, "arbitrumwatch", arbitrumWatcher.Run); err != nil { return err } } @@ -1055,8 +1056,9 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Optimism watcher") readiness.RegisterComponent(common.ReadinessOptimismSyncing) chainObsvReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - if err := supervisor.Run(ctx, "optimismwatch", - evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode, ethWatcher).Run); err != nil { + optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode) + optimismWatcher.SetL1Finalizer(ethWatcher) + if err := supervisor.Run(ctx, "optimismwatch", optimismWatcher.Run); err != nil { return err } } @@ -1130,6 +1132,7 @@ func runNode(cmd *cobra.Command, args []string) { } } + var solanaFinalizedWatcher *solana.SolanaWatcher if shouldStart(solanaRPC) { logger.Info("Starting Solana watcher") readiness.RegisterComponent(common.ReadinessSolanaSyncing) @@ -1138,8 +1141,8 @@ func runNode(cmd *cobra.Command, args []string) { solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, nil, rpc.CommitmentConfirmed, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run); err != nil { return err } - if err := supervisor.Run(ctx, "solwatch-finalized", - solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana).Run); err != nil { + solanaFinalizedWatcher = solana.NewSolanaWatcher(*solanaRPC, solAddress, lockC, chainObsvReqC[vaa.ChainIDSolana], rpc.CommitmentFinalized, common.ReadinessSolanaSyncing, vaa.ChainIDSolana) + if err := supervisor.Run(ctx, "solwatch-finalized", solanaFinalizedWatcher.Run); err != nil { return err } } @@ -1160,11 +1163,15 @@ func runNode(cmd *cobra.Command, args []string) { if *testnetMode { if shouldStart(neonRPC) { + if solanaFinalizedWatcher == nil { + log.Fatalf("if neon is enabled then solana must also be enabled.") + } logger.Info("Starting Neon watcher") readiness.RegisterComponent(common.ReadinessNeonSyncing) chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - if err := supervisor.Run(ctx, "neonwatch", - evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode, nil).Run); err != nil { + neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode) + neonWatcher.SetL1Finalizer(solanaFinalizedWatcher) + if err := supervisor.Run(ctx, "neonwatch", neonWatcher.Run); err != nil { return err } } diff --git a/node/pkg/watchers/evm/finalizers/arbitrum.go b/node/pkg/watchers/evm/finalizers/arbitrum.go index 830048c0d..9cfa37cb0 100644 --- a/node/pkg/watchers/evm/finalizers/arbitrum.go +++ b/node/pkg/watchers/evm/finalizers/arbitrum.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" - "github.com/certusone/wormhole/node/pkg/watchers/evm/interfaces" + "github.com/certusone/wormhole/node/pkg/watchers/interfaces" ethClient "github.com/ethereum/go-ethereum/ethclient" diff --git a/node/pkg/watchers/evm/finalizers/neon.go b/node/pkg/watchers/evm/finalizers/neon.go new file mode 100644 index 000000000..3ffa54343 --- /dev/null +++ b/node/pkg/watchers/evm/finalizers/neon.go @@ -0,0 +1,41 @@ +package finalizers + +import ( + "context" + + "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" + "github.com/certusone/wormhole/node/pkg/watchers/interfaces" + + ethClient "github.com/ethereum/go-ethereum/ethclient" + + "go.uber.org/zap" +) + +// NeonFinalizer implements the finality check for Neon. The Neon block number is actually the Solana slot number. +// Blocks on Neon should not be considered finalized until that slot is finalized on Solana. Confirmed this with the +// Neon team on 11/12/2022. Also confirmed that they do not have a websocket interface so we need to poll for log events. +type NeonFinalizer struct { + logger *zap.Logger + connector connectors.Connector + l1Finalizer interfaces.L1Finalizer +} + +func NewNeonFinalizer(logger *zap.Logger, connector connectors.Connector, client *ethClient.Client, l1Finalizer interfaces.L1Finalizer) *NeonFinalizer { + return &NeonFinalizer{ + logger: logger, + connector: connector, + l1Finalizer: l1Finalizer, + } +} + +// IsBlockFinalized compares the number of the Neon block with the latest finalized block on Solana. +func (f *NeonFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) { + latestL1Block := f.l1Finalizer.GetLatestFinalizedBlockNumber() + if latestL1Block == 0 { + // This happens on start up. + return false, nil + } + + isFinalized := block.Number.Uint64() <= latestL1Block + return isFinalized, nil +} diff --git a/node/pkg/watchers/evm/finalizers/optimism.go b/node/pkg/watchers/evm/finalizers/optimism.go index f5c286700..6c2d6fcfd 100644 --- a/node/pkg/watchers/evm/finalizers/optimism.go +++ b/node/pkg/watchers/evm/finalizers/optimism.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" - "github.com/certusone/wormhole/node/pkg/watchers/evm/interfaces" + "github.com/certusone/wormhole/node/pkg/watchers/interfaces" "go.uber.org/zap" ) diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 706720eef..7ad1558d9 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -10,7 +10,7 @@ import ( "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi" "github.com/certusone/wormhole/node/pkg/watchers/evm/finalizers" - "github.com/certusone/wormhole/node/pkg/watchers/evm/interfaces" + "github.com/certusone/wormhole/node/pkg/watchers/interfaces" "github.com/certusone/wormhole/node/pkg/p2p" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" @@ -139,7 +139,6 @@ func NewEthWatcher( minConfirmations uint64, obsvReqC chan *gossipv1.ObservationRequest, unsafeDevMode bool, - l1Finalizer interfaces.L1Finalizer, ) *Watcher { return &Watcher{ @@ -154,7 +153,6 @@ func NewEthWatcher( obsvReqC: obsvReqC, pending: map[pendingKey]*pendingMessage{}, unsafeDevMode: unsafeDevMode, - l1Finalizer: l1Finalizer, } } @@ -219,14 +217,18 @@ func (w *Watcher) Run(ctx context.Context) error { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("creating block poll connector failed: %w", err) } - } else if w.chainID == vaa.ChainIDNeon { + } else if w.chainID == vaa.ChainIDNeon && !w.unsafeDevMode { + if w.l1Finalizer == nil { + return fmt.Errorf("unable to create neon watcher because the l1 finalizer is not set") + } baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("dialing eth client failed: %w", err) } - pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, false) + finalizer := finalizers.NewNeonFinalizer(logger, baseConnector, baseConnector.Client(), w.l1Finalizer) + pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -794,6 +796,13 @@ func (w *Watcher) getAcalaMode(ctx context.Context) (useFinalizedBlocks bool, er return } +// SetL1Finalizer is used to set the layer one finalizer. +func (w *Watcher) SetL1Finalizer(l1Finalizer interfaces.L1Finalizer) { + w.l1Finalizer = l1Finalizer +} + +// GetLatestFinalizedBlockNumber() implements the L1Finalizer interface and allows other watchers to +// get the latest finalized block number from this watcher. func (w *Watcher) GetLatestFinalizedBlockNumber() uint64 { return atomic.LoadUint64(&w.latestFinalizedBlockNumber) } diff --git a/node/pkg/watchers/evm/interfaces/l1Finalizer.go b/node/pkg/watchers/interfaces/l1Finalizer.go similarity index 100% rename from node/pkg/watchers/evm/interfaces/l1Finalizer.go rename to node/pkg/watchers/interfaces/l1Finalizer.go diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index a07a93e4f..cfcd0ceaa 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/certusone/wormhole/node/pkg/common" @@ -38,6 +39,10 @@ type SolanaWatcher struct { networkName string // The last slot processed by the watcher. lastSlot uint64 + + // latestFinalizedBlockNumber is the latest block processed by this watcher. + latestBlockNumber uint64 + latestBlockNumberMu sync.Mutex } var ( @@ -301,6 +306,8 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot zap.Duration("took", time.Since(start)), zap.String("commitment", string(s.commitment))) + s.updateLatestBlock(slot) + OUTER: for txNum, txRpc := range out.Transactions { if txRpc.Meta.Err != nil { @@ -594,6 +601,23 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a s.messageEvent <- observation } +// updateLatestBlock() updates the latest block number if the slot passed in is greater than the previous value. +// This check is necessary because blocks can be posted out of order, due to multi threading in this watcher. +func (s *SolanaWatcher) updateLatestBlock(slot uint64) { + s.latestBlockNumberMu.Lock() + defer s.latestBlockNumberMu.Unlock() + if slot > s.latestBlockNumber { + s.latestBlockNumber = slot + } +} + +// GetLatestFinalizedBlockNumber() returns the latest published block. +func (s *SolanaWatcher) GetLatestFinalizedBlockNumber() uint64 { + s.latestBlockNumberMu.Lock() + defer s.latestBlockNumberMu.Unlock() + return s.latestBlockNumber +} + type ( MessagePublicationAccount struct { VaaVersion uint8