node/near: prevent transactionProcessingQueue from growing too large

This commit is contained in:
tbjump 2023-05-24 01:10:14 +00:00 committed by tbjump
parent c20c3c667d
commit b827301ce6
2 changed files with 23 additions and 7 deletions

View File

@ -74,10 +74,11 @@ func (e *Watcher) runMetrics(ctx context.Context) error {
case <-metricsIntervalTimer.C:
// compute and publish periodic metrics
l1 := e.transactionProcessingQueueCounter.Load()
l2 := len(e.chunkProcessingQueue)
l2 := len(e.transactionProcessingQueue)
l3 := len(e.chunkProcessingQueue)
txqueueLen.Set(float64(l1))
chunkqueueLen.Set(float64(l2))
logger.Debug("metrics", zap.Int64("txqueueLen", l1), zap.Int("chunkqueueLen", l2))
logger.Debug("metrics", zap.Int64("txqueueWaiting", l1), zap.Int("txqueueReady", l2), zap.Int("chunkqueueLen", l3))
case event := <-e.eventChan:
switch event {

View File

@ -104,7 +104,7 @@ func NewWatcher(
msgC: msgC,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDNear),
transactionProcessingQueue: make(chan *transactionProcessingJob),
transactionProcessingQueue: make(chan *transactionProcessingJob, queueSize),
chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize),
eventChanTxProcessedDuration: make(chan time.Duration, 10),
eventChan: make(chan eventType, 10),
@ -174,7 +174,10 @@ func (e *Watcher) runChunkFetcher(ctx context.Context) error {
continue
}
for _, job := range newJobs {
e.schedule(ctx, job, job.delay)
err := e.schedule(ctx, job, job.delay)
if err != nil {
logger.Info("error scheduling transaction processing job", zap.Error(err))
}
}
}
}
@ -201,7 +204,10 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error {
// Guardians currently run nodes for all shards and the API seems to be returning the correct results independent of the set senderAccountId but this could change in the future.
// Fixing this would require adding the transaction sender account ID to the observation request.
job := newTransactionProcessingJob(txHash, e.wormholeAccount)
e.schedule(ctx, job, time.Nanosecond)
err := e.schedule(ctx, job, time.Nanosecond)
if err != nil {
logger.Info("error scheduling transaction processing job", zap.Error(err))
}
}
}
}
@ -228,7 +234,10 @@ func (e *Watcher) runTxProcessor(ctx context.Context) error {
)
job.retryCounter++
job.delay *= 2
e.schedule(ctx, job, job.delay)
err := e.schedule(ctx, job, job.delay)
if err != nil {
logger.Info("error scheduling transaction processing job", zap.Error(err))
}
} else {
// Warn and do not retry
logger.Warn(
@ -291,7 +300,12 @@ func (e *Watcher) Run(ctx context.Context) error {
// schedule pushes a job to workers after delay. It is context aware and will not execute the job if the context
// is cancelled before delay has passed and the job is picked up by a worker.
func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) {
func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) error {
if int(e.transactionProcessingQueueCounter.Load())+len(e.transactionProcessingQueue) > queueSize {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
return fmt.Errorf("NEAR transactionProcessingQueue exceeds max queue size. Skipping transaction.")
}
common.RunWithScissors(ctx, e.errC, "scheduledThread",
func(ctx context.Context) error {
timer := time.NewTimer(delay)
@ -313,4 +327,5 @@ func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, d
}
return nil
})
return nil
}