node/pkg/watchers: add RunWithScissors

This commit is contained in:
Paul Noel 2023-03-29 22:56:50 +00:00 committed by Evan Gray
parent a846036b6e
commit 9b8bed4dbf
1 changed files with 101 additions and 56 deletions

View File

@ -154,9 +154,7 @@ func (e *Watcher) Run(ctx context.Context) error {
logger.Info("Algorand watcher connecting to indexer ", zap.String("url", e.indexerRPC))
logger.Info("Algorand watcher connecting to RPC node ", zap.String("url", e.algodRPC))
timer := time.NewTicker(time.Second * 1)
defer timer.Stop()
// The indexerClient is used to get the transaction by hash for a re-observation
indexerClient, err := indexer.MakeClient(e.indexerRPC, e.indexerToken)
if err != nil {
logger.Error("indexer make client", zap.Error(err))
@ -182,79 +180,126 @@ func (e *Watcher) Run(ctx context.Context) error {
logger.Info(fmt.Sprintf("first block %d", e.next_round))
for {
select {
case <-ctx.Done():
return nil
case r := <-e.obsvReqC:
if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand {
panic("invalid chain ID")
// Create the timer for the getting the block height
timer := time.NewTicker(time.Second * 1)
defer timer.Stop()
// Create an error channel
errC := make(chan error)
defer close(errC)
// Signal that basic initialization is complete
readiness.SetReady(e.readinessSync)
// Signal to the supervisor that this runnable has finished initialization
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Create the go routine to handle events from core contract
common.RunWithScissors(ctx, errC, "core_events_and_block_height", func(ctx context.Context) error {
logger.Info("Entering core_events_and_block_height...")
for {
select {
case err := <-errC:
logger.Error("core_events_and_block_height died", zap.Error(err))
return fmt.Errorf("core_events_and_block_height died: %w", err)
case <-ctx.Done():
logger.Error("core_events_and_block_height context done")
return ctx.Err()
case <-timer.C:
status, err := algodClient.Status().Do(context.Background())
if err != nil {
logger.Error("algodClient.Status", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
continue
}
if e.next_round <= status.LastRound {
for {
block, err := algodClient.Block(e.next_round).Do(context.Background())
if err != nil {
logger.Error("algodClient.Block %d: %s", zap.Uint64("next_round", e.next_round), zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
break
}
if block.Round == 0 {
break
}
for _, element := range block.Payset {
lookAtTxn(e, element, block, logger)
}
e.next_round = e.next_round + 1
if e.next_round > status.LastRound {
break
}
}
}
currentAlgorandHeight.Set(float64(status.LastRound))
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{
Height: int64(status.LastRound),
ContractAddress: fmt.Sprintf("%d", e.appid),
})
readiness.SetReady(e.readinessSync)
}
}
})
logger.Info("Received obsv request",
zap.String("tx_hash", hex.EncodeToString(r.TxHash)),
zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)))
// Create the go routine to listen for re-observation requests
common.RunWithScissors(ctx, errC, "fetch_obvs_req", func(ctx context.Context) error {
logger.Info("Entering fetch_obvs_req...")
result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background())
if err != nil {
logger.Error("SearchForTransactions", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
break
}
for _, t := range result.Transactions {
r := t.ConfirmedRound
for {
select {
case err := <-errC:
logger.Error("fetch_obvs_req died", zap.Error(err))
return fmt.Errorf("fetch_obvs_req died: %w", err)
case <-ctx.Done():
logger.Error("fetch_obvs_req context done")
return ctx.Err()
block, err := algodClient.Block(r).Do(context.Background())
case r := <-e.obsvReqC:
if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand {
panic("invalid chain ID")
}
logger.Info("Received obsv request",
zap.String("tx_hash", hex.EncodeToString(r.TxHash)),
zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)))
result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background())
if err != nil {
logger.Error("SearchForTransactions", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
break
}
for _, t := range result.Transactions {
r := t.ConfirmedRound
for _, element := range block.Payset {
lookAtTxn(e, element, block, logger)
}
}
case <-timer.C:
status, err := algodClient.Status().Do(context.Background())
if err != nil {
logger.Error(fmt.Sprintf("algodClient.Status: %s", err.Error()))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
continue
}
if e.next_round <= status.LastRound {
for {
block, err := algodClient.Block(e.next_round).Do(context.Background())
block, err := algodClient.Block(r).Do(context.Background())
if err != nil {
logger.Error(fmt.Sprintf("algodClient.Block %d: %s", e.next_round, err.Error()))
logger.Error("SearchForTransactions", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
break
}
if block.Round == 0 {
break
}
for _, element := range block.Payset {
lookAtTxn(e, element, block, logger)
}
e.next_round = e.next_round + 1
if e.next_round > status.LastRound {
break
}
}
}
currentAlgorandHeight.Set(float64(status.LastRound))
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{
Height: int64(status.LastRound),
ContractAddress: fmt.Sprintf("%d", e.appid),
})
readiness.SetReady(e.readinessSync)
}
})
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
}
}