From ee1868205d062dd47d56f351e54747e20ea12d70 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Mon, 14 Nov 2022 19:38:04 -0600 Subject: [PATCH] Node: Rework min/max confs for EVM watchers (#1849) * Node: Rework max/min confs for EVM * Increase maxWaitConfirmations --- node/cmd/guardiand/node.go | 45 +++++++++++------------ node/pkg/watchers/evm/watcher.go | 63 +++++++++++++++++++++----------- 2 files changed, 62 insertions(+), 46 deletions(-) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 6010e3bcf..2c2e80b67 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -917,7 +917,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Ethereum watcher") readiness.RegisterComponent(common.ReadinessEthSyncing) chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode) + ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode) if err := supervisor.Run(ctx, "ethwatch", ethWatcher.Run); err != nil { return err @@ -928,27 +928,24 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting BSC watcher") readiness.RegisterComponent(common.ReadinessBSCSyncing) chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - if err := supervisor.Run(ctx, "bscwatch", - evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil { + bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode) + bscWatcher.SetWaitForConfirmations(true) + if err := supervisor.Run(ctx, "bscwatch", bscWatcher.Run); err != nil { return err } } if shouldStart(polygonRPC) { - var polygonMinConfirmations uint64 = 512 - if *polygonRootChainRpc != "" { - // If we are using checkpointing, we don't need to wait for additional confirmations. - polygonMinConfirmations = 1 - } else if *testnetMode { - // Testnet users don't want to have to wait too long. - polygonMinConfirmations = 64 - } else if !*unsafeDevMode { + // Checkpointing is required in mainnet, so we don't need to wait for confirmations. + waitForConfirmations := *unsafeDevMode || *testnetMode + if !waitForConfirmations && *polygonRootChainRpc == "" { log.Fatal("Polygon checkpointing is required in mainnet") } logger.Info("Starting Polygon watcher") readiness.RegisterComponent(common.ReadinessPolygonSyncing) chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode) + polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode) + polygonWatcher.SetWaitForConfirmations(waitForConfirmations) if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil { return err } @@ -961,7 +958,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAvalancheSyncing) chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "avalanchewatch", - evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { return err } } @@ -969,7 +966,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Oasis watcher") chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "oasiswatch", - evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { return err } } @@ -978,7 +975,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAuroraSyncing) chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "aurorawatch", - evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { return err } } @@ -987,7 +984,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessFantomSyncing) chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "fantomwatch", - evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { return err } } @@ -996,7 +993,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKaruraSyncing) chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "karurawatch", - evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { return err } } @@ -1005,7 +1002,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAcalaSyncing) chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "acalawatch", - evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { return err } } @@ -1014,7 +1011,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessKlaytnSyncing) chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "klaytnwatch", - evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { return err } } @@ -1023,7 +1020,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessCeloSyncing) chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "celowatch", - evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { return err } } @@ -1032,7 +1029,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessMoonbeamSyncing) chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "moonbeamwatch", - evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { return err } } @@ -1043,7 +1040,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Arbitrum watcher") readiness.RegisterComponent(common.ReadinessArbitrumSyncing) chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode) + arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode) arbitrumWatcher.SetL1Finalizer(ethWatcher) if err := supervisor.Run(ctx, "arbitrumwatch", arbitrumWatcher.Run); err != nil { return err @@ -1056,7 +1053,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Optimism watcher") readiness.RegisterComponent(common.ReadinessOptimismSyncing) chainObsvReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode) + optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, lockC, nil, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode) optimismWatcher.SetL1Finalizer(ethWatcher) if err := supervisor.Run(ctx, "optimismwatch", optimismWatcher.Run); err != nil { return err @@ -1169,7 +1166,7 @@ func runNode(cmd *cobra.Command, args []string) { logger.Info("Starting Neon watcher") readiness.RegisterComponent(common.ReadinessNeonSyncing) chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode) + neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode) neonWatcher.SetL1Finalizer(solanaFinalizedWatcher) if err := supervisor.Run(ctx, "neonwatch", neonWatcher.Run); err != nil { return err diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 7ad1558d9..4dfb1bd6c 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -100,8 +100,15 @@ type ( // 0 is a valid guardian set, so we need a nil value here currentGuardianSet *uint32 - // Minimum number of confirmations to accept, regardless of what the contract specifies. - minConfirmations uint64 + // waitForConfirmations indicates if we should wait for the number of confirmations specified by the consistencyLevel in the message. + // On many of the chains, we already wait for finalized blocks so there is no point in waiting any additional blocks after finality. + // Therefore this parameter defaults to false. This feature can / should be enabled on chains where we don't wait for finality. + waitForConfirmations bool + + // maxWaitConfirmations is the maximum number of confirmations to wait before declaring a transaction abandoned. If we are honoring + // the consistency level (waitForConfirmations is set to true), then we wait maxWaitConfirmations plus the consistency level. This + // parameter defaults to 60, which should be plenty long enough for most chains. If not, this parameter can be set. + maxWaitConfirmations uint64 // Interface to the chain specific ethereum library. ethConn connectors.Connector @@ -136,23 +143,23 @@ func NewEthWatcher( chainID vaa.ChainID, messageEvents chan *common.MessagePublication, setEvents chan *common.GuardianSet, - minConfirmations uint64, obsvReqC chan *gossipv1.ObservationRequest, unsafeDevMode bool, ) *Watcher { return &Watcher{ - url: url, - contract: contract, - networkName: networkName, - readiness: readiness, - minConfirmations: minConfirmations, - chainID: chainID, - msgChan: messageEvents, - setChan: setEvents, - obsvReqC: obsvReqC, - pending: map[pendingKey]*pendingMessage{}, - unsafeDevMode: unsafeDevMode, + url: url, + contract: contract, + networkName: networkName, + readiness: readiness, + waitForConfirmations: false, + maxWaitConfirmations: 60, + chainID: chainID, + msgChan: messageEvents, + setChan: setEvents, + obsvReqC: obsvReqC, + pending: map[pendingKey]*pendingMessage{}, + unsafeDevMode: unsafeDevMode, } } @@ -400,9 +407,9 @@ func (w *Watcher) Run(ctx context.Context) error { continue } - expectedConfirmations := uint64(msg.ConsistencyLevel) - if expectedConfirmations < w.minConfirmations { - expectedConfirmations = w.minConfirmations + var expectedConfirmations uint64 + if w.waitForConfirmations { + expectedConfirmations = uint64(msg.ConsistencyLevel) } // SECURITY: In the recovery flow, we already know which transaction to @@ -571,13 +578,13 @@ func (w *Watcher) Run(ctx context.Context) error { atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU) for key, pLock := range w.pending { - expectedConfirmations := uint64(pLock.message.ConsistencyLevel) - if expectedConfirmations < w.minConfirmations { - expectedConfirmations = w.minConfirmations + var expectedConfirmations uint64 + if w.waitForConfirmations { + expectedConfirmations = uint64(pLock.message.ConsistencyLevel) } // Transaction was dropped and never picked up again - if pLock.height+4*uint64(expectedConfirmations) <= blockNumberU { + if pLock.height+expectedConfirmations+w.maxWaitConfirmations <= blockNumberU { logger.Info("observation timed out", zap.Stringer("tx", pLock.message.TxHash), zap.Stringer("blockhash", key.BlockHash), @@ -586,6 +593,8 @@ func (w *Watcher) Run(ctx context.Context) error { zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), zap.String("eth_network", w.networkName), + zap.Uint64("expectedConfirmations", expectedConfirmations), + zap.Uint64("maxWaitConfirmations", w.maxWaitConfirmations), ) ethMessagesOrphaned.WithLabelValues(w.networkName, "timeout").Inc() delete(w.pending, key) @@ -593,7 +602,7 @@ func (w *Watcher) Run(ctx context.Context) error { } // Transaction is now ready - if pLock.height+uint64(expectedConfirmations) <= blockNumberU { + if pLock.height+expectedConfirmations <= blockNumberU { timeout, cancel := context.WithTimeout(ctx, 5*time.Second) tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash) cancel() @@ -822,3 +831,13 @@ func (w *Watcher) SetRootChainParams(rootChainRpc string, rootChainContract stri func (w *Watcher) usePolygonCheckpointing() bool { return w.rootChainRpc != "" && w.rootChainContract != "" } + +// SetWaitForConfirmations is used to override whether we should wait for the number of confirmations specified by the consistencyLevel in the message. +func (w *Watcher) SetWaitForConfirmations(waitForConfirmations bool) { + w.waitForConfirmations = waitForConfirmations +} + +// SetMaxWaitConfirmations is used to override the maximum number of confirmations to wait before declaring a transaction abandoned. +func (w *Watcher) SetMaxWaitConfirmations(maxWaitConfirmations uint64) { + w.maxWaitConfirmations = maxWaitConfirmations +}