From 56645faaad103065751c418ac3e1927adb352011 Mon Sep 17 00:00:00 2001 From: Hendrik Hofstadt Date: Tue, 29 Nov 2022 12:26:46 +0100 Subject: [PATCH] watcher: fix near typos / docs Change-Id: I7e271945e8d636b08e384f09359ad8aebf708103 --- node/pkg/watchers/near/README.md | 6 +++--- node/pkg/watchers/near/finalizer.go | 2 +- node/pkg/watchers/near/metrics.go | 18 +++++++++--------- node/pkg/watchers/near/poll.go | 4 ++-- node/pkg/watchers/near/tx_processing.go | 2 +- node/pkg/watchers/near/watcher.go | 12 +++++++----- 6 files changed, 23 insertions(+), 21 deletions(-) diff --git a/node/pkg/watchers/near/README.md b/node/pkg/watchers/near/README.md index 5ed809a9c..ec214fd6d 100644 --- a/node/pkg/watchers/near/README.md +++ b/node/pkg/watchers/near/README.md @@ -7,16 +7,16 @@ Responsibility: Observe finalized `publishMessage` event emissions from the Worm There are multiple supervised runners: * *BlockPoll*: Polls the NEAR RPC API for finalized blocks and generates transactionProcessingJobs for every single transaction * *ChunkFetcher*: Fetches chunks for each block in parallel -* *TxProcessor*: Processes `transactionProcessingJob`, going through all receipt outcomes, searching for Wormhole messages, and checking that they have been finalized. If there are Wormhole messages in any receipts from this transaction but those receipts are not in finalized blocks, the `transactionProcessingJob` will be put in the back of the queque. +* *TxProcessor*: Processes `transactionProcessingJob`, going through all receipt outcomes, searching for Wormhole messages, and checking that they have been finalized. If there are Wormhole messages in any receipts from this transaction but those receipts are not in finalized blocks, the `transactionProcessingJob` will be put in the back of the queue. * *ObsvReqProcessor*: Process observation requests. An observation request is a way of kindly asking the Guardian to go back in time and look at a particular transaction and try to identify wormhole events in it. Observation requests are received from other Guardians or injected through the admin API. Eventhough they are signed, they should not be trusted. * chunkProcessingQueue gets new chunk hashes. These are processed once we think that the transactions in it are likely to be completed. * transactionProcessingQueue is a priority queue and gets all the transactions from the chunks. - These are constantly processed and put back to the end of the queque if processing fails. + These are constantly processed and put back to the end of the queue if processing fails. If processing a transaction fails, most likely because not all receipts have been finalized, it is retried with an exponential backoff and eventually it is dropped. -* multiple workers process both types of jobs from these two queques +* multiple workers process both types of jobs from these two queues Determining finality of blocks: * There is a lru cache, finalizedBlocksCache, that keeps track of blocks hashes of blocks that are known to be finalized diff --git a/node/pkg/watchers/near/finalizer.go b/node/pkg/watchers/near/finalizer.go index 4bb61d2a2..c44f91dd3 100644 --- a/node/pkg/watchers/near/finalizer.go +++ b/node/pkg/watchers/near/finalizer.go @@ -20,7 +20,7 @@ type Finalizer struct { } func newFinalizer(eventChan chan eventType, nearAPI nearapi.NearApi, mainnet bool) Finalizer { - finalizedBlocksCache, _ := lru.New(workerCountTxProcessing * quequeSize) + finalizedBlocksCache, _ := lru.New(workerCountTxProcessing * queueSize) return Finalizer{ finalizedBlocksCache, diff --git a/node/pkg/watchers/near/metrics.go b/node/pkg/watchers/near/metrics.go index 74b1d7c46..19073570f 100644 --- a/node/pkg/watchers/near/metrics.go +++ b/node/pkg/watchers/near/metrics.go @@ -44,16 +44,16 @@ func (e *Watcher) runMetrics(ctx context.Context) error { Help: "Average duration it takes for a wormhole message to be processed in milliseconds", }) - txQuequeLen := promauto.With(reg).NewGauge( + txqueueLen := promauto.With(reg).NewGauge( prometheus.GaugeOpts{ - Name: "wormhole_near_tx_queque", - Help: "Current Near transaction processing queque length", + Name: "wormhole_near_tx_queue", + Help: "Current Near transaction processing queue length", }) - chunkQuequeLen := promauto.With(reg).NewGauge( + chunkqueueLen := promauto.With(reg).NewGauge( prometheus.GaugeOpts{ - Name: "wormhole_near_chunk_queque", - Help: "Current Near chunk processing queque length", + Name: "wormhole_near_chunk_queue", + Help: "Current Near chunk processing queue length", }) nearMessagesConfirmed := promauto.With(reg).NewCounter( @@ -86,9 +86,9 @@ func (e *Watcher) runMetrics(ctx context.Context) error { // compute and publish periodic metrics l1 := e.transactionProcessingQueueCounter.Load() l2 := len(e.chunkProcessingQueue) - txQuequeLen.Set(float64(l1)) - chunkQuequeLen.Set(float64(l2)) - logger.Info("metrics", zap.Int64("txQuequeLen", l1), zap.Int("chunkQuequeLen", l2)) + txqueueLen.Set(float64(l1)) + chunkqueueLen.Set(float64(l2)) + logger.Info("metrics", zap.Int64("txqueueLen", l1), zap.Int("chunkqueueLen", l2)) case height := <-e.eventChanBlockProcessedHeight: if highestBlockHeightProcessed < height { diff --git a/node/pkg/watchers/near/poll.go b/node/pkg/watchers/near/poll.go index d83d2dc42..3993566ac 100644 --- a/node/pkg/watchers/near/poll.go +++ b/node/pkg/watchers/near/poll.go @@ -29,7 +29,7 @@ func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, ch // recursivelyReadFinalizedBlocks walks back the blockchain from the startBlock (inclusive) // until it reaches a block of height stopHeight or less (exclusive). Chunks from all these blocks are put -// into e.chunkProcessingQueque with the chunks from the oldest block first +// into e.chunkProcessingqueue with the chunks from the oldest block first // if there is an error while walking back the chain, no chunks will be returned func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context.Context, startBlock nearapi.Block, stopHeight uint64, chunkSink chan<- nearapi.ChunkHeader, recursionDepth uint) error { @@ -75,7 +75,7 @@ func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context return nil } -// readFinalChunksSince polls the NEAR blockchain for new blocks with height > startHeight, parses out the chunks and places +// ReadFinalChunksSince polls the NEAR blockchain for new blocks with height > startHeight, parses out the chunks and places // them into `chunkSink` in the order they were recorded on the blockchain // returns the height of the latest final block func (e *Watcher) ReadFinalChunksSince(logger *zap.Logger, ctx context.Context, startHeight uint64, chunkSink chan<- nearapi.ChunkHeader) (newestFinalHeight uint64, err error) { diff --git a/node/pkg/watchers/near/tx_processing.go b/node/pkg/watchers/near/tx_processing.go index edff104a4..36787a085 100644 --- a/node/pkg/watchers/near/tx_processing.go +++ b/node/pkg/watchers/near/tx_processing.go @@ -96,7 +96,7 @@ func (e *Watcher) processOutcome(logger *zap.Logger, ctx context.Context, job *t // SECURITY CRITICAL: Check that block has been finalized. outcomeBlockHeader, isFinalized := e.finalizer.isFinalized(logger, ctx, outcomeBlockHash.String()) if !isFinalized { - // If it has not, we return an error such that this transaction can be put back into the queque. + // If it has not, we return an error such that this transaction can be put back into the queue. return errors.New("block not finalized yet") } diff --git a/node/pkg/watchers/near/watcher.go b/node/pkg/watchers/near/watcher.go index 61db0f1a8..fd4e1b463 100644 --- a/node/pkg/watchers/near/watcher.go +++ b/node/pkg/watchers/near/watcher.go @@ -21,7 +21,7 @@ var ( // how long to initially wait between observing a transaction and attempting to process the transaction. // To successfully process the transaction, all receipts need to be finalized, which typically only occurs two blocks later or so. - // transaction processing will be retried with exponential backoff, i.e. transaction may stay in the queque for ca. initialTxProcDelay^(txProcRetry+2) time. + // transaction processing will be retried with exponential backoff, i.e. transaction may stay in the queue for ca. initialTxProcDelay^(txProcRetry+2) time. initialTxProcDelay = time.Second * 3 blockPollInterval = time.Millisecond * 200 @@ -33,9 +33,9 @@ var ( // such that they can all be fetched in parallel. // We're currently seeing ~10 chunks/block, so setting this to 20 conservatively. workerChunkFetching int = 20 - quequeSize int = 10_000 // size of the queques for chunk processing as well as transaction processing + queueSize int = 10_000 // size of the queues for chunk processing as well as transaction processing - // if watcher falls behind this many blocks, start over. This should be set proportional to `quequeSize` + // if watcher falls behind this many blocks, start over. This should be set proportional to `queueSize` // such that all transactions from `maxFallBehindBlocks` can easily fit into the queue maxFallBehindBlocks uint = 200 @@ -71,7 +71,7 @@ type ( msgC chan<- *common.MessagePublication // validated (SECURITY: and only validated!) observations go into this channel obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel - // internal queques + // internal queues transactionProcessingQueueCounter atomic.Int64 transactionProcessingQueue chan *transactionProcessingJob chunkProcessingQueue chan nearapi.ChunkHeader @@ -102,7 +102,7 @@ func NewWatcher( msgC: msgC, obsvReqC: obsvReqC, transactionProcessingQueue: make(chan *transactionProcessingJob), - chunkProcessingQueue: make(chan nearapi.ChunkHeader, quequeSize), + chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize), eventChanBlockProcessedHeight: make(chan uint64, 10), eventChanTxProcessedDuration: make(chan time.Duration, 10), eventChan: make(chan eventType, 10), @@ -298,6 +298,8 @@ func (e *Watcher) Run(ctx context.Context) error { return ctx.Err() } +// schedule pushes a job to workers after delay. It is context aware and will not execute the job if the context +// is cancelled before delay has passed and the job is picked up by a worker. func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) { go func() { timer := time.NewTimer(delay)