From 9b8bed4dbf168027d36ae8d1206478ad6b39e153 Mon Sep 17 00:00:00 2001 From: Paul Noel Date: Wed, 29 Mar 2023 22:56:50 +0000 Subject: [PATCH] node/pkg/watchers: add RunWithScissors --- node/pkg/watchers/algorand/watcher.go | 157 +++++++++++++++++--------- 1 file changed, 101 insertions(+), 56 deletions(-) diff --git a/node/pkg/watchers/algorand/watcher.go b/node/pkg/watchers/algorand/watcher.go index b88d31e74..c664d99e5 100644 --- a/node/pkg/watchers/algorand/watcher.go +++ b/node/pkg/watchers/algorand/watcher.go @@ -154,9 +154,7 @@ func (e *Watcher) Run(ctx context.Context) error { logger.Info("Algorand watcher connecting to indexer ", zap.String("url", e.indexerRPC)) logger.Info("Algorand watcher connecting to RPC node ", zap.String("url", e.algodRPC)) - timer := time.NewTicker(time.Second * 1) - defer timer.Stop() - + // The indexerClient is used to get the transaction by hash for a re-observation indexerClient, err := indexer.MakeClient(e.indexerRPC, e.indexerToken) if err != nil { logger.Error("indexer make client", zap.Error(err)) @@ -182,79 +180,126 @@ func (e *Watcher) Run(ctx context.Context) error { logger.Info(fmt.Sprintf("first block %d", e.next_round)) - for { - select { - case <-ctx.Done(): - return nil - case r := <-e.obsvReqC: - if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand { - panic("invalid chain ID") + // Create the timer for the getting the block height + timer := time.NewTicker(time.Second * 1) + defer timer.Stop() + + // Create an error channel + errC := make(chan error) + defer close(errC) + + // Signal that basic initialization is complete + readiness.SetReady(e.readinessSync) + + // Signal to the supervisor that this runnable has finished initialization + supervisor.Signal(ctx, supervisor.SignalHealthy) + + // Create the go routine to handle events from core contract + common.RunWithScissors(ctx, errC, "core_events_and_block_height", func(ctx context.Context) error { + logger.Info("Entering core_events_and_block_height...") + + for { + select { + case err := <-errC: + logger.Error("core_events_and_block_height died", zap.Error(err)) + return fmt.Errorf("core_events_and_block_height died: %w", err) + case <-ctx.Done(): + logger.Error("core_events_and_block_height context done") + return ctx.Err() + + case <-timer.C: + status, err := algodClient.Status().Do(context.Background()) + if err != nil { + logger.Error("algodClient.Status", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) + continue + } + + if e.next_round <= status.LastRound { + for { + block, err := algodClient.Block(e.next_round).Do(context.Background()) + if err != nil { + logger.Error("algodClient.Block %d: %s", zap.Uint64("next_round", e.next_round), zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) + break + } + + if block.Round == 0 { + break + } + + for _, element := range block.Payset { + lookAtTxn(e, element, block, logger) + } + e.next_round = e.next_round + 1 + + if e.next_round > status.LastRound { + break + } + } + } + + currentAlgorandHeight.Set(float64(status.LastRound)) + p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{ + Height: int64(status.LastRound), + ContractAddress: fmt.Sprintf("%d", e.appid), + }) + + readiness.SetReady(e.readinessSync) } + } + }) - logger.Info("Received obsv request", - zap.String("tx_hash", hex.EncodeToString(r.TxHash)), - zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash))) + // Create the go routine to listen for re-observation requests + common.RunWithScissors(ctx, errC, "fetch_obvs_req", func(ctx context.Context) error { + logger.Info("Entering fetch_obvs_req...") - result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background()) - if err != nil { - logger.Error("SearchForTransactions", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - break - } - for _, t := range result.Transactions { - r := t.ConfirmedRound + for { + select { + case err := <-errC: + logger.Error("fetch_obvs_req died", zap.Error(err)) + return fmt.Errorf("fetch_obvs_req died: %w", err) + case <-ctx.Done(): + logger.Error("fetch_obvs_req context done") + return ctx.Err() - block, err := algodClient.Block(r).Do(context.Background()) + case r := <-e.obsvReqC: + if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand { + panic("invalid chain ID") + } + + logger.Info("Received obsv request", + zap.String("tx_hash", hex.EncodeToString(r.TxHash)), + zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash))) + + result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background()) if err != nil { logger.Error("SearchForTransactions", zap.Error(err)) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) break } + for _, t := range result.Transactions { + r := t.ConfirmedRound - for _, element := range block.Payset { - lookAtTxn(e, element, block, logger) - } - } - - case <-timer.C: - status, err := algodClient.Status().Do(context.Background()) - if err != nil { - logger.Error(fmt.Sprintf("algodClient.Status: %s", err.Error())) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - continue - } - - if e.next_round <= status.LastRound { - for { - block, err := algodClient.Block(e.next_round).Do(context.Background()) + block, err := algodClient.Block(r).Do(context.Background()) if err != nil { - logger.Error(fmt.Sprintf("algodClient.Block %d: %s", e.next_round, err.Error())) + logger.Error("SearchForTransactions", zap.Error(err)) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) break } - if block.Round == 0 { - break - } - for _, element := range block.Payset { lookAtTxn(e, element, block, logger) } - e.next_round = e.next_round + 1 - - if e.next_round > status.LastRound { - break - } } } - - currentAlgorandHeight.Set(float64(status.LastRound)) - p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{ - Height: int64(status.LastRound), - ContractAddress: fmt.Sprintf("%d", e.appid), - }) - - readiness.SetReady(e.readinessSync) } + }) + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errC: + return err } }