diff --git a/node/cmd/guardiand/adminserver_test.go b/node/cmd/guardiand/adminserver_test.go index c83c17a7f..2f3bfdd44 100644 --- a/node/cmd/guardiand/adminserver_test.go +++ b/node/cmd/guardiand/adminserver_test.go @@ -43,7 +43,7 @@ func (m mockEVMConnector) ContractAddress() common.Address { panic("unimplemented") } -func (m mockEVMConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethabi.AbiLogMessagePublished) (event.Subscription, error) { +func (m mockEVMConnector) WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethabi.AbiLogMessagePublished) (event.Subscription, error) { panic("unimplemented") } @@ -59,7 +59,7 @@ func (m mockEVMConnector) ParseLogMessagePublished(log types.Log) (*ethabi.AbiLo panic("unimplemented") } -func (m mockEVMConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *connectors.NewBlock) (ethereum.Subscription, error) { +func (m mockEVMConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *connectors.NewBlock) (ethereum.Subscription, error) { panic("unimplemented") } diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index ff1f2bd3d..3197c81f9 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -943,7 +943,7 @@ func runNode(cmd *cobra.Command, args []string) { chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode) if err := supervisor.Run(ctx, "ethwatch", - ethWatcher.Run); err != nil { + common.WrapWithScissors(ethWatcher.Run, "ethwatch")); err != nil { return err } } @@ -954,7 +954,7 @@ func runNode(cmd *cobra.Command, args []string) { chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode) bscWatcher.SetWaitForConfirmations(true) - if err := supervisor.Run(ctx, "bscwatch", bscWatcher.Run); err != nil { + if err := supervisor.Run(ctx, "bscwatch", common.WrapWithScissors(bscWatcher.Run, "bscwatch")); err != nil { return err } } @@ -973,7 +973,7 @@ func runNode(cmd *cobra.Command, args []string) { if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil { return err } - if err := supervisor.Run(ctx, "polygonwatch", polygonWatcher.Run); err != nil { + if err := supervisor.Run(ctx, "polygonwatch", common.WrapWithScissors(polygonWatcher.Run, "polygonwatch")); err != nil { return err } } @@ -982,7 +982,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAvalancheSyncing) chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "avalanchewatch", - evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run, "avalanchewatch")); err != nil { return err } } @@ -990,7 +990,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Oasis watcher") chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "oasiswatch", - evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run, "oasiswatch")); err != nil { return err } } @@ -999,7 +999,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAuroraSyncing) chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aurorawatch", - evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run, "aurorawatch")); err != nil { return err } } @@ -1008,7 +1008,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessFantomSyncing) chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "fantomwatch", - evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run, "fantomwatch")); err != nil { return err } } @@ -1017,7 +1017,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKaruraSyncing) chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "karurawatch", - evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run, "karurawatch")); err != nil { return err } } @@ -1026,7 +1026,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAcalaSyncing) chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "acalawatch", - evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run, "acalawatch")); err != nil { return err } } @@ -1035,7 +1035,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKlaytnSyncing) chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "klaytnwatch", - evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run, "klaytnwatch")); err != nil { return err } } @@ -1044,7 +1044,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessCeloSyncing) chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "celowatch", - evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run, "celowatch")); err != nil { return err } } @@ -1053,7 +1053,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessMoonbeamSyncing) chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "moonbeamwatch", - evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { + common.WrapWithScissors(evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run, "moonbeamwatch")); err != nil { return err } } @@ -1066,7 +1066,7 @@ func runNode(cmd *cobra.Command, args []string) { chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode) arbitrumWatcher.SetL1Finalizer(ethWatcher) - if err := supervisor.Run(ctx, "arbitrumwatch", arbitrumWatcher.Run); err != nil { + if err := supervisor.Run(ctx, "arbitrumwatch", common.WrapWithScissors(arbitrumWatcher.Run, "arbitrumwatch")); err != nil { return err } } @@ -1087,7 +1087,7 @@ func runNode(cmd *cobra.Command, args []string) { if err := optimismWatcher.SetRootChainParams(*optimismCtcRpc, *optimismCtcContractAddress); err != nil { return err } - if err := supervisor.Run(ctx, "optimismwatch", optimismWatcher.Run); err != nil { + if err := supervisor.Run(ctx, "optimismwatch", common.WrapWithScissors(optimismWatcher.Run, "optimismwatch")); err != nil { return err } } @@ -1220,7 +1220,7 @@ func runNode(cmd *cobra.Command, args []string) { chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode) neonWatcher.SetL1Finalizer(solanaFinalizedWatcher) - if err := supervisor.Run(ctx, "neonwatch", neonWatcher.Run); err != nil { + if err := supervisor.Run(ctx, "neonwatch", common.WrapWithScissors(neonWatcher.Run, "neonwatch")); err != nil { return err } } diff --git a/node/pkg/watchers/evm/connectors/celo.go b/node/pkg/watchers/evm/connectors/celo.go index 84de81d43..9e46881c1 100644 --- a/node/pkg/watchers/evm/connectors/celo.go +++ b/node/pkg/watchers/evm/connectors/celo.go @@ -18,6 +18,7 @@ import ( ethTypes "github.com/ethereum/go-ethereum/core/types" ethEvent "github.com/ethereum/go-ethereum/event" + "github.com/certusone/wormhole/node/pkg/common" "go.uber.org/zap" ) @@ -91,7 +92,7 @@ func (c *CeloConnector) GetGuardianSet(ctx context.Context, index uint32) (ethAb }, err } -func (c *CeloConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { +func (c *CeloConnector) WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() messageC := make(chan *celoAbi.AbiLogMessagePublished, 2) @@ -101,17 +102,17 @@ func (c *CeloConnector) WatchLogMessagePublished(ctx context.Context, sink chan< } // The purpose of this is to map events from the Celo log message channel to the Eth log message channel. - go func() { + common.RunWithScissors(ctx, errC, "celo_connector_watch_log", func(ctx context.Context) error { for { select { // This will return when the subscription is unsubscribed as the error channel gets closed case <-messageSub.Err(): - return + return nil case celoEvent := <-messageC: sink <- convertCeloEventToEth(celoEvent) } } - }() + }) return messageSub, err } @@ -143,7 +144,7 @@ func (c *CeloConnector) ParseLogMessagePublished(ethLog ethTypes.Log) (*ethAbi.A return convertCeloEventToEth(celoEvent), err } -func (c *CeloConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { +func (c *CeloConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { headSink := make(chan *celoTypes.Header, 2) headerSubscription, err := c.client.SubscribeNewHead(ctx, headSink) if err != nil { @@ -151,11 +152,11 @@ func (c *CeloConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *New } // The purpose of this is to map events from the Celo event channel to the new block event channel. - go func() { + common.RunWithScissors(ctx, errC, "celo_connector_subscribe_for_block", func(ctx context.Context) error { for { select { case <-ctx.Done(): - return + return nil case ev := <-headSink: if ev == nil { c.logger.Error("new header event is nil") @@ -171,7 +172,7 @@ func (c *CeloConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *New } } } - }() + }) return headerSubscription, err } diff --git a/node/pkg/watchers/evm/connectors/common.go b/node/pkg/watchers/evm/connectors/common.go index 7b3c81396..73d744a4b 100644 --- a/node/pkg/watchers/evm/connectors/common.go +++ b/node/pkg/watchers/evm/connectors/common.go @@ -27,11 +27,11 @@ type Connector interface { ContractAddress() common.Address GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) GetGuardianSet(ctx context.Context, index uint32) (ethabi.StructsGuardianSet, error) - WatchLogMessagePublished(ctx context.Context, sink chan<- *ethabi.AbiLogMessagePublished) (event.Subscription, error) + WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethabi.AbiLogMessagePublished) (event.Subscription, error) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) TimeOfBlockByHash(ctx context.Context, hash common.Hash) (uint64, error) ParseLogMessagePublished(log types.Log) (*ethabi.AbiLogMessagePublished, error) - SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) + SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error } diff --git a/node/pkg/watchers/evm/connectors/ethereum.go b/node/pkg/watchers/evm/connectors/ethereum.go index f802cb2cd..8ef28a4a4 100644 --- a/node/pkg/watchers/evm/connectors/ethereum.go +++ b/node/pkg/watchers/evm/connectors/ethereum.go @@ -15,6 +15,7 @@ import ( ethClient "github.com/ethereum/go-ethereum/ethclient" ethEvent "github.com/ethereum/go-ethereum/event" + "github.com/certusone/wormhole/node/pkg/common" "go.uber.org/zap" ) @@ -74,7 +75,7 @@ func (e *EthereumConnector) GetGuardianSet(ctx context.Context, index uint32) (e return e.caller.GetGuardianSet(ðBind.CallOpts{Context: ctx}, index) } -func (e *EthereumConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { +func (e *EthereumConnector) WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() return e.filterer.WatchLogMessagePublished(ðBind.WatchOpts{Context: timeout}, sink, nil) @@ -97,7 +98,7 @@ func (e *EthereumConnector) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi. return e.filterer.ParseLogMessagePublished(log) } -func (e *EthereumConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { +func (e *EthereumConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { headSink := make(chan *ethTypes.Header, 2) headerSubscription, err := e.client.SubscribeNewHead(ctx, headSink) if err != nil { @@ -105,11 +106,11 @@ func (e *EthereumConnector) SubscribeForBlocks(ctx context.Context, sink chan<- } // The purpose of this is to map events from the geth event channel to the new block event channel. - go func() { + common.RunWithScissors(ctx, errC, "eth_connector_subscribe_for_block", func(ctx context.Context) error { for { select { case <-ctx.Done(): - return + return nil case ev := <-headSink: if ev == nil { e.logger.Error("new header event is nil") @@ -125,7 +126,7 @@ func (e *EthereumConnector) SubscribeForBlocks(ctx context.Context, sink chan<- } } } - }() + }) return headerSubscription, err } diff --git a/node/pkg/watchers/evm/connectors/logpoller.go b/node/pkg/watchers/evm/connectors/logpoller.go index 9b7c6ea7c..e3cbf0464 100644 --- a/node/pkg/watchers/evm/connectors/logpoller.go +++ b/node/pkg/watchers/evm/connectors/logpoller.go @@ -14,6 +14,7 @@ import ( ethClient "github.com/ethereum/go-ethereum/ethclient" ethEvent "github.com/ethereum/go-ethereum/event" + "github.com/certusone/wormhole/node/pkg/common" "go.uber.org/zap" ) @@ -43,7 +44,9 @@ func (l *LogPollConnector) run(ctx context.Context) error { logger := supervisor.Logger(ctx).With(zap.String("eth_network", l.Connector.NetworkName())) blockChan := make(chan *NewBlock) - sub, err := l.SubscribeForBlocks(ctx, blockChan) + errC := make(chan error) + + sub, err := l.SubscribeForBlocks(ctx, errC, blockChan) if err != nil { return err } @@ -56,6 +59,8 @@ func (l *LogPollConnector) run(ctx context.Context) error { return ctx.Err() case err := <-sub.Err(): return err + case err := <-errC: + return err case block := <-blockChan: if err := l.processBlock(ctx, logger, block); err != nil { l.errFeed.Send(err.Error()) @@ -64,7 +69,7 @@ func (l *LogPollConnector) run(ctx context.Context) error { } } -func (l *LogPollConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { +func (l *LogPollConnector) WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { sub := NewPollSubscription() messageSub := l.messageFeed.Subscribe(sink) @@ -73,23 +78,23 @@ func (l *LogPollConnector) WatchLogMessagePublished(ctx context.Context, sink ch innerErrSink := make(chan string, 10) innerErrSub := l.errFeed.Subscribe(innerErrSink) - go func() { + common.RunWithScissors(ctx, errC, "log_poll_watch_log", func(ctx context.Context) error { for { select { case <-ctx.Done(): messageSub.Unsubscribe() innerErrSub.Unsubscribe() - return + return nil case <-sub.quit: messageSub.Unsubscribe() innerErrSub.Unsubscribe() sub.unsubDone <- struct{}{} - return + return nil case v := <-innerErrSink: sub.err <- fmt.Errorf(v) } } - }() + }) return sub, nil } diff --git a/node/pkg/watchers/evm/connectors/poller.go b/node/pkg/watchers/evm/connectors/poller.go index 4f6ae4c5e..e71323433 100644 --- a/node/pkg/watchers/evm/connectors/poller.go +++ b/node/pkg/watchers/evm/connectors/poller.go @@ -6,12 +6,14 @@ import ( "math/big" "time" + "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/supervisor" ethEvent "github.com/ethereum/go-ethereum/event" ethereum "github.com/ethereum/go-ethereum" ethCommon "github.com/ethereum/go-ethereum/common" ethHexUtils "github.com/ethereum/go-ethereum/common/hexutil" + "go.uber.org/zap" ) @@ -164,7 +166,7 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, return } -func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { +func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { sub := NewPollSubscription() blockSub := b.blockFeed.Subscribe(sink) @@ -173,23 +175,23 @@ func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<- innerErrSink := make(chan string, 10) innerErrSub := b.errFeed.Subscribe(innerErrSink) - go func() { + common.RunWithScissors(ctx, errC, "block_poll_subscribe_for_blocks", func(ctx context.Context) error { for { select { case <-ctx.Done(): blockSub.Unsubscribe() innerErrSub.Unsubscribe() - return + return nil case <-sub.quit: blockSub.Unsubscribe() innerErrSub.Unsubscribe() sub.unsubDone <- struct{}{} - return + return nil case v := <-innerErrSink: sub.err <- fmt.Errorf(v) } } - }() + }) return sub, nil } diff --git a/node/pkg/watchers/evm/connectors/poller_test.go b/node/pkg/watchers/evm/connectors/poller_test.go index 5e24433af..0e550ea89 100644 --- a/node/pkg/watchers/evm/connectors/poller_test.go +++ b/node/pkg/watchers/evm/connectors/poller_test.go @@ -64,7 +64,7 @@ func (e *mockConnectorForPoller) GetGuardianSet(ctx context.Context, index uint3 return ethAbi.StructsGuardianSet{}, fmt.Errorf("not implemented") } -func (e *mockConnectorForPoller) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { +func (e *mockConnectorForPoller) WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { var s ethEvent.Subscription return s, fmt.Errorf("not implemented") } @@ -81,7 +81,7 @@ func (e *mockConnectorForPoller) ParseLogMessagePublished(log ethTypes.Log) (*et return nil, fmt.Errorf("not implemented") } -func (e *mockConnectorForPoller) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { +func (e *mockConnectorForPoller) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { var s ethEvent.Subscription return s, fmt.Errorf("not implemented") } @@ -181,7 +181,9 @@ func TestBlockPoller(t *testing.T) { // Subscribe for events to be processed by our go routine. headSink := make(chan *NewBlock, 2) - headerSubscription, suberr := poller.SubscribeForBlocks(ctx, headSink) + errC := make(chan error) + + headerSubscription, suberr := poller.SubscribeForBlocks(ctx, errC, headSink) require.NoError(t, suberr) go func() { @@ -189,6 +191,10 @@ func TestBlockPoller(t *testing.T) { select { case <-ctx.Done(): return + case thisErr := <-errC: + mutex.Lock() + err = thisErr + mutex.Unlock() case thisErr := <-headerSubscription.Err(): mutex.Lock() err = thisErr diff --git a/node/pkg/watchers/evm/connectors/polygon.go b/node/pkg/watchers/evm/connectors/polygon.go index 40971b2b5..6a4ae9cb2 100644 --- a/node/pkg/watchers/evm/connectors/polygon.go +++ b/node/pkg/watchers/evm/connectors/polygon.go @@ -24,6 +24,7 @@ import ( "math/big" "time" + "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/supervisor" rootAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/polygonabi" @@ -89,7 +90,7 @@ func NewPolygonConnector( return connector, nil } -func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { +func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) { sub := NewPollSubscription() timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() @@ -114,11 +115,11 @@ func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, sink chan<- * return nil, fmt.Errorf("failed to post initial block: %w", err) } - go func() { + common.RunWithScissors(ctx, errC, "polygon_subscribe_for_block", func(ctx context.Context) error { for { select { case <-ctx.Done(): - return + return nil case err := <-messageSub.Err(): sub.err <- err case checkpoint := <-messageC: @@ -127,7 +128,7 @@ func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, sink chan<- * } } } - }() + }) return sub, nil } diff --git a/node/pkg/watchers/evm/finalizers/moonbeam_test.go b/node/pkg/watchers/evm/finalizers/moonbeam_test.go index 91814a675..2f68b3f50 100644 --- a/node/pkg/watchers/evm/finalizers/moonbeam_test.go +++ b/node/pkg/watchers/evm/finalizers/moonbeam_test.go @@ -52,7 +52,7 @@ func (e *moonbeamMockConnector) GetGuardianSet(ctx context.Context, index uint32 panic("not implemented by moonbeamMockConnector") } -func (e *moonbeamMockConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { +func (e *moonbeamMockConnector) WatchLogMessagePublished(ctx context.Context, errC chan error, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { panic("not implemented by moonbeamMockConnector") } @@ -68,7 +68,7 @@ func (e *moonbeamMockConnector) ParseLogMessagePublished(log ethTypes.Log) (*eth panic("not implemented by moonbeamMockConnector") } -func (e *moonbeamMockConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *connectors.NewBlock) (ethereum.Subscription, error) { +func (e *moonbeamMockConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *connectors.NewBlock) (ethereum.Subscription, error) { panic("not implemented by moonbeamMockConnector") } diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 1402f4241..9873af26b 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -325,10 +325,12 @@ func (w *Watcher) Run(ctx context.Context) error { } } + errC := make(chan error) + // Subscribe to new message publications. We don't use a timeout here because the LogPollConnector // will keep running. Other connectors will use a timeout internally if appropriate. messageC := make(chan *ethabi.AbiLogMessagePublished, 2) - messageSub, err := w.ethConn.WatchLogMessagePublished(ctx, messageC) + messageSub, err := w.ethConn.WatchLogMessagePublished(ctx, errC, messageC) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "subscribe_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) @@ -341,35 +343,33 @@ func (w *Watcher) Run(ctx context.Context) error { return fmt.Errorf("failed to request guardian set: %v", err) } - errC := make(chan error) - // Poll for guardian set. - go func() { + common.RunWithScissors(ctx, errC, "evm_fetch_guardian_set", func(ctx context.Context) error { t := time.NewTicker(15 * time.Second) defer t.Stop() for { select { case <-ctx.Done(): - return + return nil case <-t.C: if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil { errC <- fmt.Errorf("failed to request guardian set: %v", err) - return + return nil } } } - }() + }) // Track the current block numbers so we can compare it to the block number of // the message publication for observation requests. var currentBlockNumber uint64 var currentSafeBlockNumber uint64 - go func() { + common.RunWithScissors(ctx, errC, "evm_fetch_objs_req", func(ctx context.Context) error { for { select { case <-ctx.Done(): - return + return nil case r := <-w.obsvReqC: // This can't happen unless there is a programming error - the caller // is expected to send us only requests for our chainID. @@ -492,18 +492,18 @@ func (w *Watcher) Run(ctx context.Context) error { } } } - }() + }) - go func() { + common.RunWithScissors(ctx, errC, "evm_fetch_messages", func(ctx context.Context) error { for { select { case <-ctx.Done(): - return + return nil case err := <-messageSub.Err(): ethConnectionErrors.WithLabelValues(w.networkName, "subscription_error").Inc() errC <- fmt.Errorf("error while processing message publication subscription: %w", err) p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) - return + return nil case ev := <-messageC: // Request timestamp for block msm := time.Now() @@ -517,7 +517,7 @@ func (w *Watcher) Run(ctx context.Context) error { p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) - return + return nil } message := &common.MessagePublication{ @@ -572,27 +572,27 @@ func (w *Watcher) Run(ctx context.Context) error { w.pendingMu.Unlock() } } - }() + }) // Watch headers headSink := make(chan *connectors.NewBlock, 2) - headerSubscription, err := w.ethConn.SubscribeForBlocks(ctx, headSink) + headerSubscription, err := w.ethConn.SubscribeForBlocks(ctx, errC, headSink) if err != nil { ethConnectionErrors.WithLabelValues(w.networkName, "header_subscribe_error").Inc() p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("failed to subscribe to header events: %w", err) } - go func() { + common.RunWithScissors(ctx, errC, "evm_fetch_headers", func(ctx context.Context) error { for { select { case <-ctx.Done(): - return + return nil case err := <-headerSubscription.Err(): ethConnectionErrors.WithLabelValues(w.networkName, "header_subscription_error").Inc() errC <- fmt.Errorf("error while processing header subscription: %w", err) p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) - return + return nil case ev := <-headSink: // These two pointers should have been checked before the event was placed on the channel, but just being safe. if ev == nil { @@ -767,7 +767,7 @@ func (w *Watcher) Run(ctx context.Context) error { zap.String("eth_network", w.networkName)) } } - }() + }) // Now that the init is complete, peg readiness. That will also happen when we process a new head, but chains // that wait for finality may take a while to receive the first block and we don't want to hold up the init.