dev.v2: Modernize algorand watcher and improve logging (#1766)

This commit is contained in:
jumpsiegel 2022-10-24 09:08:05 -05:00 committed by GitHub
parent b58c75de8f
commit 6f38e42119
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 123 deletions

View File

@ -1077,7 +1077,7 @@ func runNode(cmd *cobra.Command, args []string) {
readiness.RegisterComponent(common.ReadinessAlgorandSyncing) readiness.RegisterComponent(common.ReadinessAlgorandSyncing)
chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "algorandwatch", if err := supervisor.Run(ctx, "algorandwatch",
algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, lockC, setC, chainObsvReqC[vaa.ChainIDAlgorand]).Run); err != nil { algorand.NewWatcher(*algorandIndexerRPC, *algorandIndexerToken, *algorandAlgodRPC, *algorandAlgodToken, *algorandAppID, lockC, chainObsvReqC[vaa.ChainIDAlgorand]).Run); err != nil {
return err return err
} }
} }

View File

@ -5,7 +5,6 @@ import (
"encoding/base32" "encoding/base32"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt" "fmt"
"time" "time"
@ -35,11 +34,9 @@ type (
appid uint64 appid uint64
msgChan chan *common.MessagePublication msgChan chan *common.MessagePublication
setChan chan *common.GuardianSet
obsvReqC chan *gossipv1.ObservationRequest obsvReqC chan *gossipv1.ObservationRequest
next_round uint64 next_round uint64
debug bool
} }
) )
@ -64,7 +61,6 @@ func NewWatcher(
algodToken string, algodToken string,
appid uint64, appid uint64,
lockEvents chan *common.MessagePublication, lockEvents chan *common.MessagePublication,
setEvents chan *common.GuardianSet,
obsvReqC chan *gossipv1.ObservationRequest, obsvReqC chan *gossipv1.ObservationRequest,
) *Watcher { ) *Watcher {
return &Watcher{ return &Watcher{
@ -74,10 +70,8 @@ func NewWatcher(
algodToken: algodToken, algodToken: algodToken,
appid: appid, appid: appid,
msgChan: lockEvents, msgChan: lockEvents,
setChan: setEvents,
obsvReqC: obsvReqC, obsvReqC: obsvReqC,
next_round: 0, next_round: 0,
debug: true,
} }
} }
@ -99,23 +93,12 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.
continue continue
} }
if e.debug {
JSON, err := json.Marshal(it)
if err != nil {
logger.Error("Cannot JSON marshal transaction", zap.Error(err))
} else {
logger.Info(string(JSON))
}
}
emitter := at.Sender emitter := at.Sender
var a vaa.Address var a vaa.Address
copy(a[:], emitter[:]) // 32 bytes = 8edf5b0e108c3a1a0a4b704cc89591f2ad8d50df24e991567e640ed720a94be2 copy(a[:], emitter[:]) // 32 bytes = 8edf5b0e108c3a1a0a4b704cc89591f2ad8d50df24e991567e640ed720a94be2
if e.debug { logger.Info("emitter: " + hex.EncodeToString(emitter[:]))
logger.Error("emitter: " + hex.EncodeToString(emitter[:]))
}
t.Txn.GenesisID = b.GenesisID t.Txn.GenesisID = b.GenesisID
t.Txn.GenesisHash = b.GenesisHash t.Txn.GenesisHash = b.GenesisHash
@ -127,9 +110,7 @@ func lookAtTxn(e *Watcher, t types.SignedTxnInBlock, b types.Block, logger *zap.
continue continue
} }
if e.debug { logger.Info("id: " + hex.EncodeToString(id) + " " + Id)
logger.Info("id: " + hex.EncodeToString(id) + " " + Id)
}
var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b
@ -167,135 +148,109 @@ func (e *Watcher) Run(ctx context.Context) error {
}) })
logger := supervisor.Logger(ctx) logger := supervisor.Logger(ctx)
errC := make(chan error)
logger.Info("Algorand watcher connecting to indexer ", zap.String("url", e.indexerRPC)) logger.Info("Algorand watcher connecting to indexer ", zap.String("url", e.indexerRPC))
logger.Info("Algorand watcher connecting to RPC node ", zap.String("url", e.algodRPC)) logger.Info("Algorand watcher connecting to RPC node ", zap.String("url", e.algodRPC))
go func() { timer := time.NewTicker(time.Second * 1)
timer := time.NewTicker(time.Second * 1) defer timer.Stop()
defer timer.Stop()
indexerClient, err := indexer.MakeClient(e.indexerRPC, e.indexerToken) indexerClient, err := indexer.MakeClient(e.indexerRPC, e.indexerToken)
if err != nil { if err != nil {
logger.Error("indexer make client", zap.Error(err)) logger.Error("indexer make client", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
errC <- err return err
return }
}
algodClient, err := algod.MakeClient(e.algodRPC, e.algodToken) algodClient, err := algod.MakeClient(e.algodRPC, e.algodToken)
if err != nil { if err != nil {
logger.Error("algod client", zap.Error(err)) logger.Error("algod client", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
errC <- err return err
return }
}
if e.next_round == 0 { status, err := algodClient.StatusAfterBlock(0).Do(context.Background())
status, err := algodClient.StatusAfterBlock(0).Do(context.Background()) if err != nil {
if err != nil { logger.Error("StatusAfterBlock", zap.Error(err))
logger.Error("StatusAfterBlock", zap.Error(err)) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) return err
errC <- err }
return
e.next_round = status.NextVersionRound
for {
select {
case <-ctx.Done():
return nil
case r := <-e.obsvReqC:
if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand {
panic("invalid chain ID")
} }
e.next_round = status.NextVersionRound logger.Info("Received obsv request",
} zap.String("tx_hash", hex.EncodeToString(r.TxHash)),
zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)))
for { result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background())
select { if err != nil {
case <-ctx.Done(): logger.Error("SearchForTransactions", zap.Error(err))
return p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
case r := <-e.obsvReqC: break
if vaa.ChainID(r.ChainId) != vaa.ChainIDAlgorand { }
panic("invalid chain ID") for _, t := range result.Transactions {
} r := t.ConfirmedRound
logger.Info("Received obsv request", block, err := algodClient.Block(r).Do(context.Background())
zap.String("tx_hash", hex.EncodeToString(r.TxHash)),
zap.String("base32_tx_hash", base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)))
result, err := indexerClient.SearchForTransactions().TXID(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(r.TxHash)).Do(context.Background())
if err != nil { if err != nil {
logger.Error("SearchForTransactions", zap.Error(err)) logger.Error("SearchForTransactions", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
errC <- err break
return
} }
for i := 0; i < len(result.Transactions); i++ {
var t = result.Transactions[i]
r := t.ConfirmedRound
block, err := algodClient.Block(r).Do(context.Background()) for _, element := range block.Payset {
lookAtTxn(e, element, block, logger)
}
}
case <-timer.C:
status, err := algodClient.Status().Do(context.Background())
if err != nil {
logger.Error(fmt.Sprintf("algodClient.Status: %s", err.Error()))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
continue
}
if e.next_round < status.NextVersionRound {
for {
logger.Info(fmt.Sprintf("inspecting block %d", e.next_round))
block, err := algodClient.Block(e.next_round).Do(context.Background())
if err != nil { if err != nil {
logger.Error("SearchForTransactions", zap.Error(err)) logger.Error(fmt.Sprintf("algodClient.Block %d: %s", e.next_round, err.Error()))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
errC <- err break
return }
if block.Round == 0 {
break
} }
for _, element := range block.Payset { for _, element := range block.Payset {
lookAtTxn(e, element, block, logger) lookAtTxn(e, element, block, logger)
} }
} e.next_round = e.next_round + 1
case <-timer.C: readiness.SetReady(common.ReadinessAlgorandSyncing)
status, err := algodClient.Status().Do(context.Background()) currentAlgorandHeight.Set(float64(e.next_round))
if err != nil { p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{
logger.Error(fmt.Sprintf("algodClient.Status: %s", err.Error())) Height: int64(e.next_round),
ContractAddress: fmt.Sprintf("%d", e.appid),
})
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1) if e.next_round == status.NextVersionRound {
errC <- err break
return
}
if e.next_round > status.NextVersionRound {
e.next_round = status.NextVersionRound
logger.Info(fmt.Sprintf("Algorand rollback to %d", e.next_round))
return
}
if e.next_round < status.NextVersionRound {
for {
logger.Info(fmt.Sprintf("inspecting block %d", e.next_round))
block, err := algodClient.Block(e.next_round).Do(context.Background())
if err != nil {
logger.Error(fmt.Sprintf("algodClient.Block %d: %s", e.next_round, err.Error()))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAlgorand, 1)
errC <- err
return
}
if block.Round == 0 {
break
}
for _, element := range block.Payset {
lookAtTxn(e, element, block, logger)
}
e.next_round = e.next_round + 1
if e.next_round == status.NextVersionRound {
break
}
} }
} }
readiness.SetReady(common.ReadinessAlgorandSyncing)
currentAlgorandHeight.Set(float64(e.next_round))
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAlgorand, &gossipv1.Heartbeat_Network{
Height: int64(e.next_round),
ContractAddress: fmt.Sprintf("%d", e.appid),
})
} }
} }
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
} }
} }