Node: Rework min/max confs for EVM watchers (#1849)

* Node: Rework max/min confs for EVM

* Increase maxWaitConfirmations
This commit is contained in:
bruce-riley 2022-11-14 19:38:04 -06:00 committed by GitHub
parent 2fab13e2cc
commit ee1868205d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 46 deletions

View File

@ -917,7 +917,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Ethereum watcher")
readiness.RegisterComponent(common.ReadinessEthSyncing)
chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode)
ethWatcher = evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode)
if err := supervisor.Run(ctx, "ethwatch",
ethWatcher.Run); err != nil {
return err
@ -928,27 +928,24 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting BSC watcher")
readiness.RegisterComponent(common.ReadinessBSCSyncing)
chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "bscwatch",
evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil {
bscWatcher := evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode)
bscWatcher.SetWaitForConfirmations(true)
if err := supervisor.Run(ctx, "bscwatch", bscWatcher.Run); err != nil {
return err
}
}
if shouldStart(polygonRPC) {
var polygonMinConfirmations uint64 = 512
if *polygonRootChainRpc != "" {
// If we are using checkpointing, we don't need to wait for additional confirmations.
polygonMinConfirmations = 1
} else if *testnetMode {
// Testnet users don't want to have to wait too long.
polygonMinConfirmations = 64
} else if !*unsafeDevMode {
// Checkpointing is required in mainnet, so we don't need to wait for confirmations.
waitForConfirmations := *unsafeDevMode || *testnetMode
if !waitForConfirmations && *polygonRootChainRpc == "" {
log.Fatal("Polygon checkpointing is required in mainnet")
}
logger.Info("Starting Polygon watcher")
readiness.RegisterComponent(common.ReadinessPolygonSyncing)
chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode)
polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode)
polygonWatcher.SetWaitForConfirmations(waitForConfirmations)
if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil {
return err
}
@ -961,7 +958,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessAvalancheSyncing)
chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "avalanchewatch",
evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -969,7 +966,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Oasis watcher")
chainObsvReqC[vaa.ChainIDOasis] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "oasiswatch",
evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -978,7 +975,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessAuroraSyncing)
chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "aurorawatch",
evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -987,7 +984,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessFantomSyncing)
chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "fantomwatch",
evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -996,7 +993,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessKaruraSyncing)
chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "karurawatch",
evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -1005,7 +1002,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessAcalaSyncing)
chainObsvReqC[vaa.ChainIDAcala] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "acalawatch",
evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -1014,7 +1011,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessKlaytnSyncing)
chainObsvReqC[vaa.ChainIDKlaytn] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "klaytnwatch",
evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -1023,7 +1020,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessCeloSyncing)
chainObsvReqC[vaa.ChainIDCelo] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "celowatch",
evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -1032,7 +1029,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessMoonbeamSyncing)
chainObsvReqC[vaa.ChainIDMoonbeam] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "moonbeamwatch",
evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil {
return err
}
}
@ -1043,7 +1040,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Arbitrum watcher")
readiness.RegisterComponent(common.ReadinessArbitrumSyncing)
chainObsvReqC[vaa.ChainIDArbitrum] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, 1, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode)
arbitrumWatcher := evm.NewEthWatcher(*arbitrumRPC, arbitrumContractAddr, "arbitrum", common.ReadinessArbitrumSyncing, vaa.ChainIDArbitrum, lockC, nil, chainObsvReqC[vaa.ChainIDArbitrum], *unsafeDevMode)
arbitrumWatcher.SetL1Finalizer(ethWatcher)
if err := supervisor.Run(ctx, "arbitrumwatch", arbitrumWatcher.Run); err != nil {
return err
@ -1056,7 +1053,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Optimism watcher")
readiness.RegisterComponent(common.ReadinessOptimismSyncing)
chainObsvReqC[vaa.ChainIDOptimism] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode)
optimismWatcher := evm.NewEthWatcher(*optimismRPC, optimismContractAddr, "optimism", common.ReadinessOptimismSyncing, vaa.ChainIDOptimism, lockC, nil, chainObsvReqC[vaa.ChainIDOptimism], *unsafeDevMode)
optimismWatcher.SetL1Finalizer(ethWatcher)
if err := supervisor.Run(ctx, "optimismwatch", optimismWatcher.Run); err != nil {
return err
@ -1169,7 +1166,7 @@ func runNode(cmd *cobra.Command, args []string) {
logger.Info("Starting Neon watcher")
readiness.RegisterComponent(common.ReadinessNeonSyncing)
chainObsvReqC[vaa.ChainIDNeon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode)
neonWatcher := evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode)
neonWatcher.SetL1Finalizer(solanaFinalizedWatcher)
if err := supervisor.Run(ctx, "neonwatch", neonWatcher.Run); err != nil {
return err

View File

@ -100,8 +100,15 @@ type (
// 0 is a valid guardian set, so we need a nil value here
currentGuardianSet *uint32
// Minimum number of confirmations to accept, regardless of what the contract specifies.
minConfirmations uint64
// waitForConfirmations indicates if we should wait for the number of confirmations specified by the consistencyLevel in the message.
// On many of the chains, we already wait for finalized blocks so there is no point in waiting any additional blocks after finality.
// Therefore this parameter defaults to false. This feature can / should be enabled on chains where we don't wait for finality.
waitForConfirmations bool
// maxWaitConfirmations is the maximum number of confirmations to wait before declaring a transaction abandoned. If we are honoring
// the consistency level (waitForConfirmations is set to true), then we wait maxWaitConfirmations plus the consistency level. This
// parameter defaults to 60, which should be plenty long enough for most chains. If not, this parameter can be set.
maxWaitConfirmations uint64
// Interface to the chain specific ethereum library.
ethConn connectors.Connector
@ -136,23 +143,23 @@ func NewEthWatcher(
chainID vaa.ChainID,
messageEvents chan *common.MessagePublication,
setEvents chan *common.GuardianSet,
minConfirmations uint64,
obsvReqC chan *gossipv1.ObservationRequest,
unsafeDevMode bool,
) *Watcher {
return &Watcher{
url: url,
contract: contract,
networkName: networkName,
readiness: readiness,
minConfirmations: minConfirmations,
chainID: chainID,
msgChan: messageEvents,
setChan: setEvents,
obsvReqC: obsvReqC,
pending: map[pendingKey]*pendingMessage{},
unsafeDevMode: unsafeDevMode,
url: url,
contract: contract,
networkName: networkName,
readiness: readiness,
waitForConfirmations: false,
maxWaitConfirmations: 60,
chainID: chainID,
msgChan: messageEvents,
setChan: setEvents,
obsvReqC: obsvReqC,
pending: map[pendingKey]*pendingMessage{},
unsafeDevMode: unsafeDevMode,
}
}
@ -400,9 +407,9 @@ func (w *Watcher) Run(ctx context.Context) error {
continue
}
expectedConfirmations := uint64(msg.ConsistencyLevel)
if expectedConfirmations < w.minConfirmations {
expectedConfirmations = w.minConfirmations
var expectedConfirmations uint64
if w.waitForConfirmations {
expectedConfirmations = uint64(msg.ConsistencyLevel)
}
// SECURITY: In the recovery flow, we already know which transaction to
@ -571,13 +578,13 @@ func (w *Watcher) Run(ctx context.Context) error {
atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU)
for key, pLock := range w.pending {
expectedConfirmations := uint64(pLock.message.ConsistencyLevel)
if expectedConfirmations < w.minConfirmations {
expectedConfirmations = w.minConfirmations
var expectedConfirmations uint64
if w.waitForConfirmations {
expectedConfirmations = uint64(pLock.message.ConsistencyLevel)
}
// Transaction was dropped and never picked up again
if pLock.height+4*uint64(expectedConfirmations) <= blockNumberU {
if pLock.height+expectedConfirmations+w.maxWaitConfirmations <= blockNumberU {
logger.Info("observation timed out",
zap.Stringer("tx", pLock.message.TxHash),
zap.Stringer("blockhash", key.BlockHash),
@ -586,6 +593,8 @@ func (w *Watcher) Run(ctx context.Context) error {
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", w.networkName),
zap.Uint64("expectedConfirmations", expectedConfirmations),
zap.Uint64("maxWaitConfirmations", w.maxWaitConfirmations),
)
ethMessagesOrphaned.WithLabelValues(w.networkName, "timeout").Inc()
delete(w.pending, key)
@ -593,7 +602,7 @@ func (w *Watcher) Run(ctx context.Context) error {
}
// Transaction is now ready
if pLock.height+uint64(expectedConfirmations) <= blockNumberU {
if pLock.height+expectedConfirmations <= blockNumberU {
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash)
cancel()
@ -822,3 +831,13 @@ func (w *Watcher) SetRootChainParams(rootChainRpc string, rootChainContract stri
func (w *Watcher) usePolygonCheckpointing() bool {
return w.rootChainRpc != "" && w.rootChainContract != ""
}
// SetWaitForConfirmations is used to override whether we should wait for the number of confirmations specified by the consistencyLevel in the message.
func (w *Watcher) SetWaitForConfirmations(waitForConfirmations bool) {
w.waitForConfirmations = waitForConfirmations
}
// SetMaxWaitConfirmations is used to override the maximum number of confirmations to wait before declaring a transaction abandoned.
func (w *Watcher) SetMaxWaitConfirmations(maxWaitConfirmations uint64) {
w.maxWaitConfirmations = maxWaitConfirmations
}