node/pkg/watcher: add timeouts

This commit is contained in:
Paul Noel 2023-02-06 20:48:36 +00:00 committed by Evan Gray
parent f2be500799
commit 1658dbffec
1 changed files with 15 additions and 6 deletions

View File

@ -117,15 +117,12 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
// Use a timeout so that the call will fail and the runable will get restarted. This should not happen in mainnet, but if it does, we will need to
// investigate why the runable is dying and fix the underlying problem.
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
lastPublishedBlock = lastBlock
// Fetch the latest block on the chain
// We could do this on every iteration such that if a new block is created while this function is being executed,
// it would automatically fetch new blocks but in order to reduce API load this will be done on the next iteration.
latestBlock, err := b.getBlock(timeout, logger, nil, safe)
latestBlock, err := b.getBlockWithTimeout(ctx, logger, nil, safe)
if err != nil {
logger.Error("failed to look up latest block",
zap.Uint64("lastSeenBlock", lastBlock.Number.Uint64()), zap.Error(err))
@ -139,7 +136,7 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
// Try to fetch the next block between lastBlock and latestBlock
nextBlockNumber := new(big.Int).Add(lastPublishedBlock.Number, big.NewInt(1))
block, err := b.getBlock(timeout, logger, nextBlockNumber, safe)
block, err := b.getBlockWithTimeout(ctx, logger, nextBlockNumber, safe)
if err != nil {
logger.Error("failed to fetch next block",
zap.Uint64("block", nextBlockNumber.Uint64()), zap.Error(err))
@ -147,7 +144,7 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
}
if b.finalizer != nil {
finalized, err := b.finalizer.IsBlockFinalized(timeout, block)
finalized, err := b.isBlockFinalizedWithTimeout(ctx, block)
if err != nil {
logger.Error("failed to check block finalization",
zap.Uint64("block", block.Number.Uint64()), zap.Error(err))
@ -166,6 +163,18 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
return
}
func (b *BlockPollConnector) getBlockWithTimeout(ctx context.Context, logger *zap.Logger, blockNumber *big.Int, safe bool) (*NewBlock, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return b.getBlock(timeout, logger, blockNumber, safe)
}
func (b *BlockPollConnector) isBlockFinalizedWithTimeout(ctx context.Context, block *NewBlock) (bool, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return b.finalizer.IsBlockFinalized(timeout, block)
}
func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) {
sub := NewPollSubscription()
blockSub := b.blockFeed.Subscribe(sink)