diff --git a/node/pkg/watchers/near/metrics.go b/node/pkg/watchers/near/metrics.go index 8803a33f2..74b1d7c46 100644 --- a/node/pkg/watchers/near/metrics.go +++ b/node/pkg/watchers/near/metrics.go @@ -84,11 +84,11 @@ func (e *Watcher) runMetrics(ctx context.Context) error { return ctx.Err() case <-metricsIntervalTimer.C: // compute and publish periodic metrics - l1 := e.transactionProcessingQueue.Len() + l1 := e.transactionProcessingQueueCounter.Load() l2 := len(e.chunkProcessingQueue) txQuequeLen.Set(float64(l1)) chunkQuequeLen.Set(float64(l2)) - logger.Info("metrics", zap.Int("txQuequeLen", l1), zap.Int("chunkQuequeLen", l2)) + logger.Info("metrics", zap.Int64("txQuequeLen", l1), zap.Int("chunkQuequeLen", l2)) case height := <-e.eventChanBlockProcessedHeight: if highestBlockHeightProcessed < height { diff --git a/node/pkg/watchers/near/poll.go b/node/pkg/watchers/near/poll.go index c026b8840..d83d2dc42 100644 --- a/node/pkg/watchers/near/poll.go +++ b/node/pkg/watchers/near/poll.go @@ -9,10 +9,10 @@ import ( ) // fetchAndParseChunk goes through all transactions in a chunk and returns a list of transactionProcessingJob -func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, chunkHeader nearapi.ChunkHeader) ([]transactionProcessingJob, error) { +func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, chunkHeader nearapi.ChunkHeader) ([]*transactionProcessingJob, error) { logger.Debug("near.fetchAndParseChunk", zap.String("chunk_hash", chunkHeader.Hash)) - result := []transactionProcessingJob{} + var result []*transactionProcessingJob chunk, err := e.nearAPI.GetChunk(ctx, chunkHeader) if err != nil { diff --git a/node/pkg/watchers/near/timerqueue/timerqueue.go b/node/pkg/watchers/near/timerqueue/timerqueue.go deleted file mode 100644 index 6db2571dc..000000000 --- a/node/pkg/watchers/near/timerqueue/timerqueue.go +++ /dev/null @@ -1,164 +0,0 @@ -// Package timerqueue implements a priority queue for objects scheduled at a -// particular time. -package timerqueue - -import ( - "container/heap" - "errors" - "sync" - "time" -) - -type Timer interface{} - -// Timerqueue is a time-sorted collection of Timer objects. -type Timerqueue struct { - heap timerHeap - table map[Timer]*timerData - mu sync.Mutex -} - -type timerData struct { - timer Timer - time time.Time - index int -} - -// New creates a new timer priority queue. -func New() *Timerqueue { - return &Timerqueue{ - table: make(map[Timer]*timerData), - } -} - -// Len returns the current number of timer objects in the queue. -func (q *Timerqueue) Len() int { - q.mu.Lock() - defer q.mu.Unlock() - return len(q.heap) -} - -// Schedule schedules a timer for exectuion at time tm. If the -// timer was already scheduled, it is rescheduled. -func (q *Timerqueue) Schedule(t Timer, tm time.Time) { - q.mu.Lock() - defer q.mu.Unlock() - if data, ok := q.table[t]; !ok { - data = &timerData{t, tm, 0} - heap.Push(&q.heap, data) - q.table[t] = data - } else { - data.time = tm - heap.Fix(&q.heap, data.index) - } -} - -// Unschedule unschedules a timer's execution. -func (q *Timerqueue) Unschedule(t Timer) { - q.mu.Lock() - defer q.mu.Unlock() - if data, ok := q.table[t]; ok { - heap.Remove(&q.heap, data.index) - delete(q.table, t) - } -} - -// GetTime returns the time at which the timer is scheduled. -// If the timer isn't currently scheduled, an error is returned. -func (q *Timerqueue) GetTime(t Timer) (tm time.Time, err error) { - q.mu.Lock() - defer q.mu.Unlock() - if data, ok := q.table[t]; ok { - return data.time, nil - } - return time.Time{}, errors.New("timerqueue: timer not scheduled") -} - -// IsScheduled returns true if the timer is currently scheduled. -func (q *Timerqueue) IsScheduled(t Timer) bool { - q.mu.Lock() - defer q.mu.Unlock() - _, ok := q.table[t] - return ok -} - -// Clear unschedules all currently scheduled timers. -func (q *Timerqueue) Clear() { - q.mu.Lock() - defer q.mu.Unlock() - q.heap, q.table = nil, make(map[Timer]*timerData) -} - -// PopFirst removes and returns the next timer to be scheduled and -// the time at which it is scheduled to run. -func (q *Timerqueue) PopFirst() (t Timer, tm time.Time) { - q.mu.Lock() - defer q.mu.Unlock() - if len(q.heap) > 0 { - data := heap.Pop(&q.heap).(*timerData) - delete(q.table, data.timer) - return data.timer, data.time - } - return nil, time.Time{} -} - -// PopFirstIfReady removes and returns the next timer *if* it is ready -// the time at which it is scheduled to run. -func (q *Timerqueue) PopFirstIfReady() (Timer, time.Time, error) { - q.mu.Lock() - defer q.mu.Unlock() - if len(q.heap) > 0 { - if q.heap[0].time.Before(time.Now()) { - // first job is ready. Pop it. - data := heap.Pop(&q.heap).(*timerData) - delete(q.table, data.timer) - return data.timer, data.time, nil - } - } - return nil, time.Time{}, errors.New("no job ready") -} - -// PeekFirst returns the next timer to be scheduled and the time -// at which it is scheduled to run. It does not modify the contents -// of the timer queue. -func (q *Timerqueue) PeekFirst() (t Timer, tm time.Time) { - q.mu.Lock() - defer q.mu.Unlock() - if len(q.heap) > 0 { - return q.heap[0].timer, q.heap[0].time - } - return nil, time.Time{} -} - -/* - * timerHeap - */ - -type timerHeap []*timerData - -func (h timerHeap) Len() int { - return len(h) -} - -func (h timerHeap) Less(i, j int) bool { - return h[i].time.Before(h[j].time) -} - -func (h timerHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] - h[i].index, h[j].index = i, j -} - -func (h *timerHeap) Push(x interface{}) { - data := x.(*timerData) - *h = append(*h, data) - data.index = len(*h) - 1 -} - -func (h *timerHeap) Pop() interface{} { - n := len(*h) - data := (*h)[n-1] - *h = (*h)[:n-1] - data.index = -1 - return data -} diff --git a/node/pkg/watchers/near/watcher.go b/node/pkg/watchers/near/watcher.go index 8e10217ec..61db0f1a8 100644 --- a/node/pkg/watchers/near/watcher.go +++ b/node/pkg/watchers/near/watcher.go @@ -3,6 +3,7 @@ package near import ( "context" "fmt" + "sync/atomic" "time" "github.com/certusone/wormhole/node/pkg/common" @@ -11,7 +12,6 @@ import ( "github.com/certusone/wormhole/node/pkg/readiness" "github.com/certusone/wormhole/node/pkg/supervisor" "github.com/certusone/wormhole/node/pkg/watchers/near/nearapi" - "github.com/certusone/wormhole/node/pkg/watchers/near/timerqueue" "github.com/mr-tron/base58" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" @@ -72,8 +72,9 @@ type ( obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel // internal queques - transactionProcessingQueue timerqueue.Timerqueue - chunkProcessingQueue chan nearapi.ChunkHeader + transactionProcessingQueueCounter atomic.Int64 + transactionProcessingQueue chan *transactionProcessingJob + chunkProcessingQueue chan nearapi.ChunkHeader // events channels eventChanBlockProcessedHeight chan uint64 // whenever a block is processed, post the height here @@ -100,7 +101,7 @@ func NewWatcher( nearRPC: nearRPC, msgC: msgC, obsvReqC: obsvReqC, - transactionProcessingQueue: *timerqueue.New(), + transactionProcessingQueue: make(chan *transactionProcessingJob), chunkProcessingQueue: make(chan nearapi.ChunkHeader, quequeSize), eventChanBlockProcessedHeight: make(chan uint64, 10), eventChanTxProcessedDuration: make(chan time.Duration, 10), @@ -108,8 +109,8 @@ func NewWatcher( } } -func newTransactionProcessingJob(txHash string, senderAccountId string) transactionProcessingJob { - return transactionProcessingJob{ +func newTransactionProcessingJob(txHash string, senderAccountId string) *transactionProcessingJob { + return &transactionProcessingJob{ txHash, senderAccountId, time.Now(), @@ -166,17 +167,8 @@ func (e *Watcher) runChunkFetcher(ctx context.Context) error { p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) continue } - for i := 0; i < len(newJobs); i++ { - if e.transactionProcessingQueue.Len() > quequeSize { - logger.Warn( - "NEAR transactionProcessingQueue exceeds max queue size. Skipping transaction.", - zap.String("log_msg_type", "tx_proc_queue_full"), - zap.String("chunk_id", chunkHeader.Hash), - ) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) - break - } - e.transactionProcessingQueue.Schedule(newJobs[i], time.Now().Add(newJobs[i].delay)) + for _, job := range newJobs { + e.schedule(ctx, job, job.delay) } } } @@ -205,7 +197,7 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error { // Guardians currently run nodes for all shards and the API seems to be returning the correct results independent of the set senderAccountId but this could change in the future. // Fixing this would require adding the transaction sender account ID to the observation request. job := newTransactionProcessingJob(txHash, e.wormholeAccount) - e.transactionProcessingQueue.Schedule(job, time.Now().Add(-time.Nanosecond)) + e.schedule(ctx, job, time.Nanosecond) } } } @@ -213,66 +205,51 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error { func (e *Watcher) runTxProcessor(ctx context.Context) error { logger := supervisor.Logger(ctx) supervisor.Signal(ctx, supervisor.SignalHealthy) - - timer := time.NewTimer(time.Millisecond) - for { select { case <-ctx.Done(): return ctx.Err() - case <-timer.C: - for { - j, _, err := e.transactionProcessingQueue.PopFirstIfReady() - if err != nil { - timer.Reset(time.Millisecond) - break + case job := <-e.transactionProcessingQueue: + err := e.processTx(logger, ctx, job) + if err != nil { + // transaction processing unsuccessful. Retry if retry_counter not exceeded. + if job.retryCounter < txProcRetry { + // Log and retry with exponential backoff + logger.Info( + "near.processTx", + zap.String("log_msg_type", "tx_processing_retry"), + zap.String("tx_hash", job.txHash), + zap.String("error", err.Error()), + ) + job.retryCounter++ + job.delay *= 2 + e.schedule(ctx, job, job.delay) + } else { + // Warn and do not retry + logger.Warn( + "near.processTx", + zap.String("log_msg_type", "tx_processing_retries_exceeded"), + zap.String("tx_hash", job.txHash), + zap.String("error", err.Error()), + ) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) } - - job := j.(transactionProcessingJob) - - err = e.processTx(logger, ctx, &job) - if err != nil { - // transaction processing unsuccessful. Retry if retry_counter not exceeded. - - if job.retryCounter < txProcRetry { - // Log and retry with exponential backoff - logger.Info( - "near.processTx", - zap.String("log_msg_type", "tx_processing_retry"), - zap.String("tx_hash", job.txHash), - zap.String("error", err.Error()), - ) - job.retryCounter++ - job.delay *= 2 - e.transactionProcessingQueue.Schedule(job, time.Now().Add(job.delay)) - } else { - // Warn and do not retry - logger.Warn( - "near.processTx", - zap.String("log_msg_type", "tx_processing_retries_exceeded"), - zap.String("tx_hash", job.txHash), - zap.String("error", err.Error()), - ) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) - } - } - - if job.hasWormholeMsg { - // report how long it took to process this transaction - e.eventChanTxProcessedDuration <- time.Since(job.creationTime) - } - - // tell everyone about successful processing - e.eventChanBlockProcessedHeight <- job.wormholeMsgBlockHeight } + if job.hasWormholeMsg { + // report how long it took to process this transaction + e.eventChanTxProcessedDuration <- time.Since(job.creationTime) + } + + // tell everyone about successful processing + e.eventChanBlockProcessedHeight <- job.wormholeMsgBlockHeight } + } } func (e *Watcher) Run(ctx context.Context) error { - logger := supervisor.Logger(ctx) e.nearAPI = nearapi.NewNearApiImpl(nearapi.NewHttpNearRpc(e.nearRPC)) @@ -320,3 +297,25 @@ func (e *Watcher) Run(ctx context.Context) error { <-ctx.Done() return ctx.Err() } + +func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) { + go func() { + timer := time.NewTimer(delay) + defer timer.Stop() + + e.transactionProcessingQueueCounter.Add(1) + defer e.transactionProcessingQueueCounter.Add(-1) + + select { + case <-ctx.Done(): + return + case <-timer.C: + // Don't block on processing if the context is cancelled + select { + case <-ctx.Done(): + return + case e.transactionProcessingQueue <- job: + } + } + }() +}