node: revert "node/pkg/watchers: add RunWithScissors" (#2620)

This reverts commit 9b8bed4dbf.
This commit is contained in:
Paul Noel 2023-04-11 21:55:43 +00:00 committed by GitHub
parent 58f55288f3
commit 2016da7713
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 56 additions and 101 deletions

View File

@ -154,7 +154,9 @@ func (e *Watcher) Run(ctx context.Context) error {
logger.Info("Algorand watcher connecting to indexer ", zap.String("url", e.indexerRPC)) logger.Info("Algorand watcher connecting to indexer ", zap.String("url", e.indexerRPC))
logger.Info("Algorand watcher connecting to RPC node ", zap.String("url", e.algodRPC)) logger.Info("Algorand watcher connecting to RPC node ", zap.String("url", e.algodRPC))
// The indexerClient is used to get the transaction by hash for a re-observation timer := time.NewTicker(time.Second * 1)
defer timer.Stop()
indexerClient, err := indexer.MakeClient(e.indexerRPC, e.indexerToken) indexerClient, err := indexer.MakeClient(e.indexerRPC, e.indexerToken)
if err != nil { if err != nil {
logger.Error("indexer make client", zap.Error(err)) logger.Error("indexer make client", zap.Error(err))
@ -180,126 +182,79 @@ func (e *Watcher) Run(ctx context.Context) error {
logger.Info(fmt.Sprintf("first block %d", e.next_round)) logger.Info(fmt.Sprintf("first block %d", e.next_round))
// Create the timer for the getting the block height for {
timer := time.NewTicker(time.Second * 1) select {
defer timer.Stop() case <-ctx.Done():
return nil
// Create an error channel case r := <-e.obsvReqC:
errC := make(chan error) if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand {
defer close(errC) panic("invalid chain ID")
// Signal that basic initialization is complete
readiness.SetReady(e.readinessSync)
// Signal to the supervisor that this runnable has finished initialization
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Create the go routine to handle events from core contract
common.RunWithScissors(ctx, errC, "core_events_and_block_height", func(ctx context.Context) error {
logger.Info("Entering core_events_and_block_height...")
for {
select {
case err := <-errC:
logger.Error("core_events_and_block_height died", zap.Error(err))
return fmt.Errorf("core_events_and_block_height died: %w", err)
case <-ctx.Done():
logger.Error("core_events_and_block_height context done")
return ctx.Err()
case <-timer.C:
status, err := algodClient.Status().Do(context.Background())
if err != nil {
logger.Error("algodClient.Status", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
continue
}
if e.next_round <= status.LastRound {
for {
block, err := algodClient.Block(e.next_round).Do(context.Background())
if err != nil {
logger.Error("algodClient.Block %d: %s", zap.Uint64("next_round", e.next_round), zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
break
}
if block.Round == 0 {
break
}
for _, element := range block.Payset {
lookAtTxn(e, element, block, logger)
}
e.next_round = e.next_round + 1
if e.next_round > status.LastRound {
break
}
}
}
currentAlgorandHeight.Set(float64(status.LastRound))
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{
Height: int64(status.LastRound),
ContractAddress: fmt.Sprintf("%d", e.appid),
})
readiness.SetReady(e.readinessSync)
} }
}
})
// Create the go routine to listen for re-observation requests logger.Info("Received obsv request",
common.RunWithScissors(ctx, errC, "fetch_obvs_req", func(ctx context.Context) error { zap.String("tx_hash", hex.EncodeToString(r.TxHash)),
logger.Info("Entering fetch_obvs_req...") zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)))
for { result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background())
select { if err != nil {
case err := <-errC: logger.Error("SearchForTransactions", zap.Error(err))
logger.Error("fetch_obvs_req died", zap.Error(err)) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
return fmt.Errorf("fetch_obvs_req died: %w", err) break
case <-ctx.Done(): }
logger.Error("fetch_obvs_req context done") for _, t := range result.Transactions {
return ctx.Err() r := t.ConfirmedRound
case r := <-e.obsvReqC: block, err := algodClient.Block(r).Do(context.Background())
if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand {
panic("invalid chain ID")
}
logger.Info("Received obsv request",
zap.String("tx_hash", hex.EncodeToString(r.TxHash)),
zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)))
result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background())
if err != nil { if err != nil {
logger.Error("SearchForTransactions", zap.Error(err)) logger.Error("SearchForTransactions", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
break break
} }
for _, t := range result.Transactions {
r := t.ConfirmedRound
block, err := algodClient.Block(r).Do(context.Background()) for _, element := range block.Payset {
lookAtTxn(e, element, block, logger)
}
}
case <-timer.C:
status, err := algodClient.Status().Do(context.Background())
if err != nil {
logger.Error(fmt.Sprintf("algodClient.Status: %s", err.Error()))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
continue
}
if e.next_round <= status.LastRound {
for {
block, err := algodClient.Block(e.next_round).Do(context.Background())
if err != nil { if err != nil {
logger.Error("SearchForTransactions", zap.Error(err)) logger.Error(fmt.Sprintf("algodClient.Block %d: %s", e.next_round, err.Error()))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
break break
} }
if block.Round == 0 {
break
}
for _, element := range block.Payset { for _, element := range block.Payset {
lookAtTxn(e, element, block, logger) lookAtTxn(e, element, block, logger)
} }
e.next_round = e.next_round + 1
if e.next_round > status.LastRound {
break
}
} }
} }
}
})
select { currentAlgorandHeight.Set(float64(status.LastRound))
case <-ctx.Done(): p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{
return ctx.Err() Height: int64(status.LastRound),
case err := <-errC: ContractAddress: fmt.Sprintf("%d", e.appid),
return err })
readiness.SetReady(e.readinessSync)
}
} }
} }