From 50774265ea30634b3c9e2f69ed1635a425e73983 Mon Sep 17 00:00:00 2001 From: ftocal <46001274+ftocal@users.noreply.github.com> Date: Thu, 13 Jul 2023 12:06:46 -0300 Subject: [PATCH] fix contract-watcher processing (#531) Co-authored-by: walker-16 --- contract-watcher/cmd/service/run.go | 2 +- contract-watcher/watcher/aptos_watcher.go | 12 +++++++----- contract-watcher/watcher/evm_standard_watcher.go | 13 ++++++++----- contract-watcher/watcher/evm_watcher.go | 10 ++++++---- contract-watcher/watcher/solana_watcher.go | 12 +++++++----- contract-watcher/watcher/terra_watcher.go | 10 ++++++---- deploy/parser/parser-backfiller.yaml | 4 +++- 7 files changed, 38 insertions(+), 25 deletions(-) diff --git a/contract-watcher/cmd/service/run.go b/contract-watcher/cmd/service/run.go index 31b9f8c0..7ac68180 100644 --- a/contract-watcher/cmd/service/run.go +++ b/contract-watcher/cmd/service/run.go @@ -211,7 +211,7 @@ func newWatchersForMainnet() *watchersConfig { evm: 1000, solana: 3, terra: 10, - aptos: 3, + aptos: 20, oasis: 3, moonbeam: 5, celo: 3, diff --git a/contract-watcher/watcher/aptos_watcher.go b/contract-watcher/watcher/aptos_watcher.go index 30d7861c..de0f26bf 100644 --- a/contract-watcher/watcher/aptos_watcher.go +++ b/contract-watcher/watcher/aptos_watcher.go @@ -88,7 +88,7 @@ func (w *AptosWatcher) Start(ctx context.Context) error { w.logger.Error("cannot get latest block", zap.Error(err)) } maxBlocks := uint64(w.sizeBlocks) - w.logger.Info("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock)) + w.logger.Debug("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock)) w.metrics.SetLastBlock(w.chainID, lastBlock) if currentBlock < lastBlock { totalBlocks := (lastBlock-currentBlock)/maxBlocks + 1 @@ -98,13 +98,13 @@ func (w *AptosWatcher) Start(ctx context.Context) error { if toBlock > lastBlock { toBlock = lastBlock } - w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) + w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) w.processBlock(ctx, fromBlock, toBlock, true) - w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) + w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) } // process all the blocks between current and last block. } else { - w.logger.Info("waiting for new blocks") + w.logger.Debug("waiting for new blocks") select { case <-ctx.Done(): w.wg.Done() @@ -112,7 +112,9 @@ func (w *AptosWatcher) Start(ctx context.Context) error { case <-time.After(time.Duration(w.waitSeconds) * time.Second): } } - currentBlock = lastBlock + if lastBlock > currentBlock { + currentBlock = lastBlock + } } } } diff --git a/contract-watcher/watcher/evm_standard_watcher.go b/contract-watcher/watcher/evm_standard_watcher.go index d5171f82..a1bb17bd 100644 --- a/contract-watcher/watcher/evm_standard_watcher.go +++ b/contract-watcher/watcher/evm_standard_watcher.go @@ -81,19 +81,20 @@ func (w *EvmStandarWatcher) Start(ctx context.Context) error { if err != nil { w.logger.Error("cannot get latest block", zap.Error(err)) } - w.logger.Info("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock)) + w.logger.Debug("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock)) w.metrics.SetLastBlock(w.chainID, lastBlock) + if currentBlock < lastBlock { totalBlocks := getTotalBlocks(lastBlock, currentBlock, w.maxBlocks) for i := uint64(0); i < totalBlocks; i++ { fromBlock, toBlock := getPage(currentBlock, i, w.maxBlocks, lastBlock) - w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) + w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) w.processBlock(ctx, fromBlock, toBlock, true) - w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) + w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) } // process all the blocks between current and last block. } else { - w.logger.Info("waiting for new blocks") + w.logger.Debug("waiting for new blocks") select { case <-ctx.Done(): w.wg.Done() @@ -101,7 +102,9 @@ func (w *EvmStandarWatcher) Start(ctx context.Context) error { case <-time.After(time.Duration(w.waitSeconds) * time.Second): } } - currentBlock = lastBlock + if lastBlock > currentBlock { + currentBlock = lastBlock + } } } diff --git a/contract-watcher/watcher/evm_watcher.go b/contract-watcher/watcher/evm_watcher.go index 6c6013ed..40a1671a 100644 --- a/contract-watcher/watcher/evm_watcher.go +++ b/contract-watcher/watcher/evm_watcher.go @@ -91,13 +91,13 @@ func (w *EVMWatcher) Start(ctx context.Context) error { if toBlock > lastBlock { toBlock = lastBlock } - w.logger.Info("processing blocks", zap.Int64("from", fromBlock), zap.Int64("to", toBlock)) + w.logger.Debug("processing blocks", zap.Int64("from", fromBlock), zap.Int64("to", toBlock)) w.processBlock(ctx, fromBlock, toBlock, true) - w.logger.Info("blocks processed", zap.Int64("from", fromBlock), zap.Int64("to", toBlock)) + w.logger.Debug("blocks processed", zap.Int64("from", fromBlock), zap.Int64("to", toBlock)) } // process all the blocks between current and last block. } else { - w.logger.Info("waiting for new blocks") + w.logger.Debug("waiting for new blocks") select { case <-ctx.Done(): w.wg.Done() @@ -105,7 +105,9 @@ func (w *EVMWatcher) Start(ctx context.Context) error { case <-time.After(time.Duration(w.waitSeconds) * time.Second): } } - currentBlock = lastBlock + if lastBlock > currentBlock { + currentBlock = lastBlock + } } } diff --git a/contract-watcher/watcher/solana_watcher.go b/contract-watcher/watcher/solana_watcher.go index c1e7417b..24a8b32a 100644 --- a/contract-watcher/watcher/solana_watcher.go +++ b/contract-watcher/watcher/solana_watcher.go @@ -142,7 +142,7 @@ func (w *SolanaWatcher) Start(ctx context.Context) error { maxBlocks := uint64(w.sizeBlocks) w.metrics.SetLastBlock(w.chainID, lastBlock) if currentBlock < lastBlock { - w.logger.Info("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock)) + w.logger.Debug("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock)) totalBlocks := (lastBlock-currentBlock)/maxBlocks + 1 for i := 0; i < int(totalBlocks); i++ { fromBlock := currentBlock + uint64(i)*maxBlocks @@ -150,13 +150,13 @@ func (w *SolanaWatcher) Start(ctx context.Context) error { if toBlock > lastBlock { toBlock = lastBlock } - w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) + w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) w.processBlock(ctx, fromBlock, toBlock, true) - w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) + w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) } // process all the blocks between current and last block. } else { - w.logger.Info("waiting for new blocks") + w.logger.Debug("waiting for new blocks") select { case <-ctx.Done(): w.wg.Done() @@ -164,7 +164,9 @@ func (w *SolanaWatcher) Start(ctx context.Context) error { case <-time.After(time.Duration(w.waitSeconds) * time.Second): } } - currentBlock = lastBlock + if lastBlock > currentBlock { + currentBlock = lastBlock + } } } } diff --git a/contract-watcher/watcher/terra_watcher.go b/contract-watcher/watcher/terra_watcher.go index 6bb62f44..18958a76 100644 --- a/contract-watcher/watcher/terra_watcher.go +++ b/contract-watcher/watcher/terra_watcher.go @@ -99,7 +99,7 @@ func (w *TerraWatcher) Start(ctx context.Context) error { w.metrics.SetLastBlock(w.chainID, uint64(lastBlock)) // check if there are new blocks to process. if currentBlock < lastBlock { - w.logger.Info("processing blocks", zap.Int64("from", currentBlock), zap.Int64("to", lastBlock)) + w.logger.Debug("processing blocks", zap.Int64("from", currentBlock), zap.Int64("to", lastBlock)) for block := currentBlock; block <= lastBlock; block++ { w.processBlock(ctx, block) // update block watcher @@ -119,7 +119,9 @@ func (w *TerraWatcher) Start(ctx context.Context) error { case <-time.After(time.Duration(w.waitSeconds) * time.Second): } } - currentBlock = lastBlock + if lastBlock > currentBlock { + currentBlock = lastBlock + } } } } @@ -128,7 +130,7 @@ func (w *TerraWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock u totalBlocks := getTotalBlocks(toBlock, fromBlock, pageSize) for i := uint64(0); i < totalBlocks; i++ { fromBlock, toBlock := getPage(fromBlock, i, pageSize, toBlock) - w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) + w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) for block := fromBlock; block <= toBlock; block++ { w.processBlock(ctx, int64(block)) if persistBlock { @@ -141,7 +143,7 @@ func (w *TerraWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock u w.repository.UpdateWatcherBlock(ctx, w.chainID, watcherBlock) } } - w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) + w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock)) } } diff --git a/deploy/parser/parser-backfiller.yaml b/deploy/parser/parser-backfiller.yaml index fa7a4737..c7ef148c 100644 --- a/deploy/parser/parser-backfiller.yaml +++ b/deploy/parser/parser-backfiller.yaml @@ -39,4 +39,6 @@ spec: - --vaa-payload-parser-timeout - "{{ .VAA_PAYLOAD_PARSER_TIMEOUT }}" - --page-size - - "50" \ No newline at end of file + - "50" + - --start-time + - "2018-01-01T00:00:00Z" \ No newline at end of file