diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index fb23be1f1..eb0db3233 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -1077,7 +1077,7 @@ func runNode(cmd *cobra.Command, args []string) { readiness.RegisterComponent(common.ReadinessAlgorandSyncing) chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) if err := supervisor.Run(ctx, "algorandwatch", - algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, lockC, setC, chainObsvReqC[vaa.ChainIDAlgorand]).Run); err != nil { + algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, lockC, chainObsvReqC[vaa.ChainIDAlgorand]).Run); err != nil { return err } } diff --git a/node/pkg/watchers/algorand/watcher.go b/node/pkg/watchers/algorand/watcher.go index 6411915f3..456adce6f 100644 --- a/node/pkg/watchers/algorand/watcher.go +++ b/node/pkg/watchers/algorand/watcher.go @@ -5,7 +5,6 @@ import ( "encoding/base32" "encoding/binary" "encoding/hex" - "encoding/json" "fmt" "time" @@ -35,11 +34,9 @@ type ( appid uint64 msgChan chan *common.MessagePublication - setChan chan *common.GuardianSet obsvReqC chan *gossipv1.ObservationRequest next_round uint64 - debug bool } ) @@ -64,7 +61,6 @@ func NewWatcher( algodToken string, appid uint64, lockEvents chan *common.MessagePublication, - setEvents chan *common.GuardianSet, obsvReqC chan *gossipv1.ObservationRequest, ) *Watcher { return &Watcher{ @@ -74,10 +70,8 @@ func NewWatcher( algodToken: algodToken, appid: appid, msgChan: lockEvents, - setChan: setEvents, obsvReqC: obsvReqC, next_round: 0, - debug: true, } } @@ -99,23 +93,12 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap. continue } - if e.debug { - JSON, err := json.Marshal(it) - if err != nil { - logger.Error("Cannot JSON marshal transaction", zap.Error(err)) - } else { - logger.Info(string(JSON)) - } - } - emitter := at.Sender var a vaa.Address copy(a[:], emitter[:]) // 32 bytes = 8edf5b0e108c3a1a0a4b704cc89591f2ad8d50df24e991567e640ed720a94be2 - if e.debug { - logger.Error("emitter: " + hex.EncodeToString(emitter[:])) - } + logger.Info("emitter: " + hex.EncodeToString(emitter[:])) t.Txn.GenesisID = b.GenesisID t.Txn.GenesisHash = b.GenesisHash @@ -127,9 +110,7 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap. continue } - if e.debug { - logger.Info("id: " + hex.EncodeToString(id) + " " + Id) - } + logger.Info("id: " + hex.EncodeToString(id) + " " + Id) var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b @@ -167,135 +148,109 @@ func (e *Watcher) Run(ctx context.Context) error { }) logger := supervisor.Logger(ctx) - errC := make(chan error) logger.Info("Algorand watcher connecting to indexer ", zap.String("url", e.indexerRPC)) logger.Info("Algorand watcher connecting to RPC node ", zap.String("url", e.algodRPC)) - go func() { - timer := time.NewTicker(time.Second * 1) - defer timer.Stop() + timer := time.NewTicker(time.Second * 1) + defer timer.Stop() - indexerClient, err := indexer.MakeClient(e.indexerRPC, e.indexerToken) - if err != nil { - logger.Error("indexer make client", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - errC <- err - return - } + indexerClient, err := indexer.MakeClient(e.indexerRPC, e.indexerToken) + if err != nil { + logger.Error("indexer make client", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) + return err + } - algodClient, err := algod.MakeClient(e.algodRPC, e.algodToken) - if err != nil { - logger.Error("algod client", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - errC <- err - return - } + algodClient, err := algod.MakeClient(e.algodRPC, e.algodToken) + if err != nil { + logger.Error("algod client", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) + return err + } - if e.next_round == 0 { - status, err := algodClient.StatusAfterBlock(0).Do(context.Background()) - if err != nil { - logger.Error("StatusAfterBlock", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - errC <- err - return + status, err := algodClient.StatusAfterBlock(0).Do(context.Background()) + if err != nil { + logger.Error("StatusAfterBlock", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) + return err + } + + e.next_round = status.NextVersionRound + + for { + select { + case <-ctx.Done(): + return nil + case r := <-e.obsvReqC: + if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand { + panic("invalid chain ID") } - e.next_round = status.NextVersionRound - } + logger.Info("Received obsv request", + zap.String("tx_hash", hex.EncodeToString(r.TxHash)), + zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash))) - for { - select { - case <-ctx.Done(): - return - case r := <-e.obsvReqC: - if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand { - panic("invalid chain ID") - } + result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background()) + if err != nil { + logger.Error("SearchForTransactions", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) + break + } + for _, t := range result.Transactions { + r := t.ConfirmedRound - logger.Info("Received obsv request", - zap.String("tx_hash", hex.EncodeToString(r.TxHash)), - zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash))) - - result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background()) + block, err := algodClient.Block(r).Do(context.Background()) if err != nil { logger.Error("SearchForTransactions", zap.Error(err)) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - errC <- err - return + break } - for i := 0; i < len(result.Transactions); i++ { - var t = result.Transactions[i] - r := t.ConfirmedRound - block, err := algodClient.Block(r).Do(context.Background()) + for _, element := range block.Payset { + lookAtTxn(e, element, block, logger) + } + } + + case <-timer.C: + status, err := algodClient.Status().Do(context.Background()) + if err != nil { + logger.Error(fmt.Sprintf("algodClient.Status: %s", err.Error())) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) + continue + } + + if e.next_round < status.NextVersionRound { + for { + logger.Info(fmt.Sprintf("inspecting block %d", e.next_round)) + block, err := algodClient.Block(e.next_round).Do(context.Background()) if err != nil { - logger.Error("SearchForTransactions", zap.Error(err)) + logger.Error(fmt.Sprintf("algodClient.Block %d: %s", e.next_round, err.Error())) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - errC <- err - return + break + } + + if block.Round == 0 { + break } for _, element := range block.Payset { lookAtTxn(e, element, block, logger) } - } + e.next_round = e.next_round + 1 - case <-timer.C: - status, err := algodClient.Status().Do(context.Background()) - if err != nil { - logger.Error(fmt.Sprintf("algodClient.Status: %s", err.Error())) + readiness.SetReady(common.ReadinessAlgorandSyncing) + currentAlgorandHeight.Set(float64(e.next_round)) + p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{ + Height: int64(e.next_round), + ContractAddress: fmt.Sprintf("%d", e.appid), + }) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - errC <- err - return - } - - if e.next_round > status.NextVersionRound { - e.next_round = status.NextVersionRound - logger.Info(fmt.Sprintf("Algorand rollback to %d", e.next_round)) - return - } - - if e.next_round < status.NextVersionRound { - for { - logger.Info(fmt.Sprintf("inspecting block %d", e.next_round)) - block, err := algodClient.Block(e.next_round).Do(context.Background()) - if err != nil { - logger.Error(fmt.Sprintf("algodClient.Block %d: %s", e.next_round, err.Error())) - - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) - errC <- err - return - } - - if block.Round == 0 { - break - } - - for _, element := range block.Payset { - lookAtTxn(e, element, block, logger) - } - e.next_round = e.next_round + 1 - if e.next_round == status.NextVersionRound { - break - } + if e.next_round == status.NextVersionRound { + break } } - readiness.SetReady(common.ReadinessAlgorandSyncing) - currentAlgorandHeight.Set(float64(e.next_round)) - p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{ - Height: int64(e.next_round), - ContractAddress: fmt.Sprintf("%d", e.appid), - }) } } - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-errC: - return err } }