From b827301ce6ed51b4a6a4059e74bec7a2771d49c8 Mon Sep 17 00:00:00 2001 From: tbjump Date: Wed, 24 May 2023 01:10:14 +0000 Subject: [PATCH] node/near: prevent transactionProcessingQueue from growing too large --- node/pkg/watchers/near/metrics.go | 5 +++-- node/pkg/watchers/near/watcher.go | 25 ++++++++++++++++++++----- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/node/pkg/watchers/near/metrics.go b/node/pkg/watchers/near/metrics.go index 5cc3194a2..5e4d14226 100644 --- a/node/pkg/watchers/near/metrics.go +++ b/node/pkg/watchers/near/metrics.go @@ -74,10 +74,11 @@ func (e *Watcher) runMetrics(ctx context.Context) error { case <-metricsIntervalTimer.C: // compute and publish periodic metrics l1 := e.transactionProcessingQueueCounter.Load() - l2 := len(e.chunkProcessingQueue) + l2 := len(e.transactionProcessingQueue) + l3 := len(e.chunkProcessingQueue) txqueueLen.Set(float64(l1)) chunkqueueLen.Set(float64(l2)) - logger.Debug("metrics", zap.Int64("txqueueLen", l1), zap.Int("chunkqueueLen", l2)) + logger.Debug("metrics", zap.Int64("txqueueWaiting", l1), zap.Int("txqueueReady", l2), zap.Int("chunkqueueLen", l3)) case event := <-e.eventChan: switch event { diff --git a/node/pkg/watchers/near/watcher.go b/node/pkg/watchers/near/watcher.go index 866442c9e..342adcc5e 100644 --- a/node/pkg/watchers/near/watcher.go +++ b/node/pkg/watchers/near/watcher.go @@ -104,7 +104,7 @@ func NewWatcher( msgC: msgC, obsvReqC: obsvReqC, readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDNear), - transactionProcessingQueue: make(chan *transactionProcessingJob), + transactionProcessingQueue: make(chan *transactionProcessingJob, queueSize), chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize), eventChanTxProcessedDuration: make(chan time.Duration, 10), eventChan: make(chan eventType, 10), @@ -174,7 +174,10 @@ func (e *Watcher) runChunkFetcher(ctx context.Context) error { continue } for _, job := range newJobs { - e.schedule(ctx, job, job.delay) + err := e.schedule(ctx, job, job.delay) + if err != nil { + logger.Info("error scheduling transaction processing job", zap.Error(err)) + } } } } @@ -201,7 +204,10 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error { // Guardians currently run nodes for all shards and the API seems to be returning the correct results independent of the set senderAccountId but this could change in the future. // Fixing this would require adding the transaction sender account ID to the observation request. job := newTransactionProcessingJob(txHash, e.wormholeAccount) - e.schedule(ctx, job, time.Nanosecond) + err := e.schedule(ctx, job, time.Nanosecond) + if err != nil { + logger.Info("error scheduling transaction processing job", zap.Error(err)) + } } } } @@ -228,7 +234,10 @@ func (e *Watcher) runTxProcessor(ctx context.Context) error { ) job.retryCounter++ job.delay *= 2 - e.schedule(ctx, job, job.delay) + err := e.schedule(ctx, job, job.delay) + if err != nil { + logger.Info("error scheduling transaction processing job", zap.Error(err)) + } } else { // Warn and do not retry logger.Warn( @@ -291,7 +300,12 @@ func (e *Watcher) Run(ctx context.Context) error { // schedule pushes a job to workers after delay. It is context aware and will not execute the job if the context // is cancelled before delay has passed and the job is picked up by a worker. -func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) { +func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) error { + if int(e.transactionProcessingQueueCounter.Load())+len(e.transactionProcessingQueue) > queueSize { + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) + return fmt.Errorf("NEAR transactionProcessingQueue exceeds max queue size. Skipping transaction.") + } + common.RunWithScissors(ctx, e.errC, "scheduledThread", func(ctx context.Context) error { timer := time.NewTimer(delay) @@ -313,4 +327,5 @@ func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, d } return nil }) + return nil }