watcher: simplify near watcher

Change-Id: If1f646de29c04ab58f5b5ae90b8cbb1f2803fcc0
This commit is contained in:
Hendrik Hofstadt 2022-11-29 12:22:56 +01:00 committed by Evan Gray
parent c5925f1467
commit 97f302e7d8
4 changed files with 68 additions and 233 deletions

View File

@ -84,11 +84,11 @@ func (e *Watcher) runMetrics(ctx context.Context) error {
return ctx.Err()
case <-metricsIntervalTimer.C:
// compute and publish periodic metrics
l1 := e.transactionProcessingQueue.Len()
l1 := e.transactionProcessingQueueCounter.Load()
l2 := len(e.chunkProcessingQueue)
txQuequeLen.Set(float64(l1))
chunkQuequeLen.Set(float64(l2))
logger.Info("metrics", zap.Int("txQuequeLen", l1), zap.Int("chunkQuequeLen", l2))
logger.Info("metrics", zap.Int64("txQuequeLen", l1), zap.Int("chunkQuequeLen", l2))
case height := <-e.eventChanBlockProcessedHeight:
if highestBlockHeightProcessed < height {

View File

@ -9,10 +9,10 @@ import (
)
// fetchAndParseChunk goes through all transactions in a chunk and returns a list of transactionProcessingJob
func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, chunkHeader nearapi.ChunkHeader) ([]transactionProcessingJob, error) {
func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, chunkHeader nearapi.ChunkHeader) ([]*transactionProcessingJob, error) {
logger.Debug("near.fetchAndParseChunk", zap.String("chunk_hash", chunkHeader.Hash))
result := []transactionProcessingJob{}
var result []*transactionProcessingJob
chunk, err := e.nearAPI.GetChunk(ctx, chunkHeader)
if err != nil {

View File

@ -1,164 +0,0 @@
// Package timerqueue implements a priority queue for objects scheduled at a
// particular time.
package timerqueue
import (
"container/heap"
"errors"
"sync"
"time"
)
type Timer interface{}
// Timerqueue is a time-sorted collection of Timer objects.
type Timerqueue struct {
heap timerHeap
table map[Timer]*timerData
mu sync.Mutex
}
type timerData struct {
timer Timer
time time.Time
index int
}
// New creates a new timer priority queue.
func New() *Timerqueue {
return &Timerqueue{
table: make(map[Timer]*timerData),
}
}
// Len returns the current number of timer objects in the queue.
func (q *Timerqueue) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.heap)
}
// Schedule schedules a timer for exectuion at time tm. If the
// timer was already scheduled, it is rescheduled.
func (q *Timerqueue) Schedule(t Timer, tm time.Time) {
q.mu.Lock()
defer q.mu.Unlock()
if data, ok := q.table[t]; !ok {
data = &timerData{t, tm, 0}
heap.Push(&q.heap, data)
q.table[t] = data
} else {
data.time = tm
heap.Fix(&q.heap, data.index)
}
}
// Unschedule unschedules a timer's execution.
func (q *Timerqueue) Unschedule(t Timer) {
q.mu.Lock()
defer q.mu.Unlock()
if data, ok := q.table[t]; ok {
heap.Remove(&q.heap, data.index)
delete(q.table, t)
}
}
// GetTime returns the time at which the timer is scheduled.
// If the timer isn't currently scheduled, an error is returned.
func (q *Timerqueue) GetTime(t Timer) (tm time.Time, err error) {
q.mu.Lock()
defer q.mu.Unlock()
if data, ok := q.table[t]; ok {
return data.time, nil
}
return time.Time{}, errors.New("timerqueue: timer not scheduled")
}
// IsScheduled returns true if the timer is currently scheduled.
func (q *Timerqueue) IsScheduled(t Timer) bool {
q.mu.Lock()
defer q.mu.Unlock()
_, ok := q.table[t]
return ok
}
// Clear unschedules all currently scheduled timers.
func (q *Timerqueue) Clear() {
q.mu.Lock()
defer q.mu.Unlock()
q.heap, q.table = nil, make(map[Timer]*timerData)
}
// PopFirst removes and returns the next timer to be scheduled and
// the time at which it is scheduled to run.
func (q *Timerqueue) PopFirst() (t Timer, tm time.Time) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.heap) > 0 {
data := heap.Pop(&q.heap).(*timerData)
delete(q.table, data.timer)
return data.timer, data.time
}
return nil, time.Time{}
}
// PopFirstIfReady removes and returns the next timer *if* it is ready
// the time at which it is scheduled to run.
func (q *Timerqueue) PopFirstIfReady() (Timer, time.Time, error) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.heap) > 0 {
if q.heap[0].time.Before(time.Now()) {
// first job is ready. Pop it.
data := heap.Pop(&q.heap).(*timerData)
delete(q.table, data.timer)
return data.timer, data.time, nil
}
}
return nil, time.Time{}, errors.New("no job ready")
}
// PeekFirst returns the next timer to be scheduled and the time
// at which it is scheduled to run. It does not modify the contents
// of the timer queue.
func (q *Timerqueue) PeekFirst() (t Timer, tm time.Time) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.heap) > 0 {
return q.heap[0].timer, q.heap[0].time
}
return nil, time.Time{}
}
/*
* timerHeap
*/
type timerHeap []*timerData
func (h timerHeap) Len() int {
return len(h)
}
func (h timerHeap) Less(i, j int) bool {
return h[i].time.Before(h[j].time)
}
func (h timerHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index, h[j].index = i, j
}
func (h *timerHeap) Push(x interface{}) {
data := x.(*timerData)
*h = append(*h, data)
data.index = len(*h) - 1
}
func (h *timerHeap) Pop() interface{} {
n := len(*h)
data := (*h)[n-1]
*h = (*h)[:n-1]
data.index = -1
return data
}

View File

@ -3,6 +3,7 @@ package near
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/certusone/wormhole/node/pkg/common"
@ -11,7 +12,6 @@ import (
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers/near/nearapi"
"github.com/certusone/wormhole/node/pkg/watchers/near/timerqueue"
"github.com/mr-tron/base58"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
@ -72,8 +72,9 @@ type (
obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel
// internal queques
transactionProcessingQueue timerqueue.Timerqueue
chunkProcessingQueue chan nearapi.ChunkHeader
transactionProcessingQueueCounter atomic.Int64
transactionProcessingQueue chan *transactionProcessingJob
chunkProcessingQueue chan nearapi.ChunkHeader
// events channels
eventChanBlockProcessedHeight chan uint64 // whenever a block is processed, post the height here
@ -100,7 +101,7 @@ func NewWatcher(
nearRPC: nearRPC,
msgC: msgC,
obsvReqC: obsvReqC,
transactionProcessingQueue: *timerqueue.New(),
transactionProcessingQueue: make(chan *transactionProcessingJob),
chunkProcessingQueue: make(chan nearapi.ChunkHeader, quequeSize),
eventChanBlockProcessedHeight: make(chan uint64, 10),
eventChanTxProcessedDuration: make(chan time.Duration, 10),
@ -108,8 +109,8 @@ func NewWatcher(
}
}
func newTransactionProcessingJob(txHash string, senderAccountId string) transactionProcessingJob {
return transactionProcessingJob{
func newTransactionProcessingJob(txHash string, senderAccountId string) *transactionProcessingJob {
return &transactionProcessingJob{
txHash,
senderAccountId,
time.Now(),
@ -166,17 +167,8 @@ func (e *Watcher) runChunkFetcher(ctx context.Context) error {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
continue
}
for i := 0; i < len(newJobs); i++ {
if e.transactionProcessingQueue.Len() > quequeSize {
logger.Warn(
"NEAR transactionProcessingQueue exceeds max queue size. Skipping transaction.",
zap.String("log_msg_type", "tx_proc_queue_full"),
zap.String("chunk_id", chunkHeader.Hash),
)
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
break
}
e.transactionProcessingQueue.Schedule(newJobs[i], time.Now().Add(newJobs[i].delay))
for _, job := range newJobs {
e.schedule(ctx, job, job.delay)
}
}
}
@ -205,7 +197,7 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error {
// Guardians currently run nodes for all shards and the API seems to be returning the correct results independent of the set senderAccountId but this could change in the future.
// Fixing this would require adding the transaction sender account ID to the observation request.
job := newTransactionProcessingJob(txHash, e.wormholeAccount)
e.transactionProcessingQueue.Schedule(job, time.Now().Add(-time.Nanosecond))
e.schedule(ctx, job, time.Nanosecond)
}
}
}
@ -213,66 +205,51 @@ func (e *Watcher) runObsvReqProcessor(ctx context.Context) error {
func (e *Watcher) runTxProcessor(ctx context.Context) error {
logger := supervisor.Logger(ctx)
supervisor.Signal(ctx, supervisor.SignalHealthy)
timer := time.NewTimer(time.Millisecond)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
for {
j, _, err := e.transactionProcessingQueue.PopFirstIfReady()
if err != nil {
timer.Reset(time.Millisecond)
break
case job := <-e.transactionProcessingQueue:
err := e.processTx(logger, ctx, job)
if err != nil {
// transaction processing unsuccessful. Retry if retry_counter not exceeded.
if job.retryCounter < txProcRetry {
// Log and retry with exponential backoff
logger.Info(
"near.processTx",
zap.String("log_msg_type", "tx_processing_retry"),
zap.String("tx_hash", job.txHash),
zap.String("error", err.Error()),
)
job.retryCounter++
job.delay *= 2
e.schedule(ctx, job, job.delay)
} else {
// Warn and do not retry
logger.Warn(
"near.processTx",
zap.String("log_msg_type", "tx_processing_retries_exceeded"),
zap.String("tx_hash", job.txHash),
zap.String("error", err.Error()),
)
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
}
job := j.(transactionProcessingJob)
err = e.processTx(logger, ctx, &job)
if err != nil {
// transaction processing unsuccessful. Retry if retry_counter not exceeded.
if job.retryCounter < txProcRetry {
// Log and retry with exponential backoff
logger.Info(
"near.processTx",
zap.String("log_msg_type", "tx_processing_retry"),
zap.String("tx_hash", job.txHash),
zap.String("error", err.Error()),
)
job.retryCounter++
job.delay *= 2
e.transactionProcessingQueue.Schedule(job, time.Now().Add(job.delay))
} else {
// Warn and do not retry
logger.Warn(
"near.processTx",
zap.String("log_msg_type", "tx_processing_retries_exceeded"),
zap.String("tx_hash", job.txHash),
zap.String("error", err.Error()),
)
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
}
}
if job.hasWormholeMsg {
// report how long it took to process this transaction
e.eventChanTxProcessedDuration <- time.Since(job.creationTime)
}
// tell everyone about successful processing
e.eventChanBlockProcessedHeight <- job.wormholeMsgBlockHeight
}
if job.hasWormholeMsg {
// report how long it took to process this transaction
e.eventChanTxProcessedDuration <- time.Since(job.creationTime)
}
// tell everyone about successful processing
e.eventChanBlockProcessedHeight <- job.wormholeMsgBlockHeight
}
}
}
func (e *Watcher) Run(ctx context.Context) error {
logger := supervisor.Logger(ctx)
e.nearAPI = nearapi.NewNearApiImpl(nearapi.NewHttpNearRpc(e.nearRPC))
@ -320,3 +297,25 @@ func (e *Watcher) Run(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}
func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) {
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()
e.transactionProcessingQueueCounter.Add(1)
defer e.transactionProcessingQueueCounter.Add(-1)
select {
case <-ctx.Done():
return
case <-timer.C:
// Don't block on processing if the context is cancelled
select {
case <-ctx.Done():
return
case e.transactionProcessingQueue <- job:
}
}
}()
}