watcher: fix near typos / docs

Change-Id: I7e271945e8d636b08e384f09359ad8aebf708103
This commit is contained in:
Hendrik Hofstadt 2022-11-29 12:26:46 +01:00 committed by Evan Gray
parent 97f302e7d8
commit 56645faaad
6 changed files with 23 additions and 21 deletions

View File

@ -7,16 +7,16 @@ Responsibility: Observe finalized `publishMessage` event emissions from the Worm
There are multiple supervised runners:
* *BlockPoll*: Polls the NEAR RPC API for finalized blocks and generates transactionProcessingJobs for every single transaction
* *ChunkFetcher*: Fetches chunks for each block in parallel
* *TxProcessor*: Processes `transactionProcessingJob`, going through all receipt outcomes, searching for Wormhole messages, and checking that they have been finalized. If there are Wormhole messages in any receipts from this transaction but those receipts are not in finalized blocks, the `transactionProcessingJob` will be put in the back of the queque.
* *TxProcessor*: Processes `transactionProcessingJob`, going through all receipt outcomes, searching for Wormhole messages, and checking that they have been finalized. If there are Wormhole messages in any receipts from this transaction but those receipts are not in finalized blocks, the `transactionProcessingJob` will be put in the back of the queue.
* *ObsvReqProcessor*: Process observation requests. An observation request is a way of kindly asking the Guardian to go back in time and look at a particular transaction and try to identify wormhole events in it. Observation requests are received from other Guardians or injected through the admin API. Eventhough they are signed, they should not be trusted.
* chunkProcessingQueue gets new chunk hashes.
These are processed once we think that the transactions in it are likely to be completed.
* transactionProcessingQueue is a priority queue and gets all the transactions from the chunks.
These are constantly processed and put back to the end of the queque if processing fails.
These are constantly processed and put back to the end of the queue if processing fails.
If processing a transaction fails, most likely because not all receipts have been finalized, it is retried with an exponential backoff and eventually it is dropped.
* multiple workers process both types of jobs from these two queques
* multiple workers process both types of jobs from these two queues
Determining finality of blocks:
* There is a lru cache, finalizedBlocksCache, that keeps track of blocks hashes of blocks that are known to be finalized

View File

@ -20,7 +20,7 @@ type Finalizer struct {
}
func newFinalizer(eventChan chan eventType, nearAPI nearapi.NearApi, mainnet bool) Finalizer {
finalizedBlocksCache, _ := lru.New(workerCountTxProcessing * quequeSize)
finalizedBlocksCache, _ := lru.New(workerCountTxProcessing * queueSize)
return Finalizer{
finalizedBlocksCache,

View File

@ -44,16 +44,16 @@ func (e *Watcher) runMetrics(ctx context.Context) error {
Help: "Average duration it takes for a wormhole message to be processed in milliseconds",
})
txQuequeLen := promauto.With(reg).NewGauge(
txqueueLen := promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_near_tx_queque",
Help: "Current Near transaction processing queque length",
Name: "wormhole_near_tx_queue",
Help: "Current Near transaction processing queue length",
})
chunkQuequeLen := promauto.With(reg).NewGauge(
chunkqueueLen := promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_near_chunk_queque",
Help: "Current Near chunk processing queque length",
Name: "wormhole_near_chunk_queue",
Help: "Current Near chunk processing queue length",
})
nearMessagesConfirmed := promauto.With(reg).NewCounter(
@ -86,9 +86,9 @@ func (e *Watcher) runMetrics(ctx context.Context) error {
// compute and publish periodic metrics
l1 := e.transactionProcessingQueueCounter.Load()
l2 := len(e.chunkProcessingQueue)
txQuequeLen.Set(float64(l1))
chunkQuequeLen.Set(float64(l2))
logger.Info("metrics", zap.Int64("txQuequeLen", l1), zap.Int("chunkQuequeLen", l2))
txqueueLen.Set(float64(l1))
chunkqueueLen.Set(float64(l2))
logger.Info("metrics", zap.Int64("txqueueLen", l1), zap.Int("chunkqueueLen", l2))
case height := <-e.eventChanBlockProcessedHeight:
if highestBlockHeightProcessed < height {

View File

@ -29,7 +29,7 @@ func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, ch
// recursivelyReadFinalizedBlocks walks back the blockchain from the startBlock (inclusive)
// until it reaches a block of height stopHeight or less (exclusive). Chunks from all these blocks are put
// into e.chunkProcessingQueque with the chunks from the oldest block first
// into e.chunkProcessingqueue with the chunks from the oldest block first
// if there is an error while walking back the chain, no chunks will be returned
func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context.Context, startBlock nearapi.Block, stopHeight uint64, chunkSink chan<- nearapi.ChunkHeader, recursionDepth uint) error {
@ -75,7 +75,7 @@ func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context
return nil
}
// readFinalChunksSince polls the NEAR blockchain for new blocks with height > startHeight, parses out the chunks and places
// ReadFinalChunksSince polls the NEAR blockchain for new blocks with height > startHeight, parses out the chunks and places
// them into `chunkSink` in the order they were recorded on the blockchain
// returns the height of the latest final block
func (e *Watcher) ReadFinalChunksSince(logger *zap.Logger, ctx context.Context, startHeight uint64, chunkSink chan<- nearapi.ChunkHeader) (newestFinalHeight uint64, err error) {

View File

@ -96,7 +96,7 @@ func (e *Watcher) processOutcome(logger *zap.Logger, ctx context.Context, job *t
// SECURITY CRITICAL: Check that block has been finalized.
outcomeBlockHeader, isFinalized := e.finalizer.isFinalized(logger, ctx, outcomeBlockHash.String())
if !isFinalized {
// If it has not, we return an error such that this transaction can be put back into the queque.
// If it has not, we return an error such that this transaction can be put back into the queue.
return errors.New("block not finalized yet")
}

View File

@ -21,7 +21,7 @@ var (
// how long to initially wait between observing a transaction and attempting to process the transaction.
// To successfully process the transaction, all receipts need to be finalized, which typically only occurs two blocks later or so.
// transaction processing will be retried with exponential backoff, i.e. transaction may stay in the queque for ca. initialTxProcDelay^(txProcRetry+2) time.
// transaction processing will be retried with exponential backoff, i.e. transaction may stay in the queue for ca. initialTxProcDelay^(txProcRetry+2) time.
initialTxProcDelay = time.Second * 3
blockPollInterval = time.Millisecond * 200
@ -33,9 +33,9 @@ var (
// such that they can all be fetched in parallel.
// We're currently seeing ~10 chunks/block, so setting this to 20 conservatively.
workerChunkFetching int = 20
quequeSize int = 10_000 // size of the queques for chunk processing as well as transaction processing
queueSize int = 10_000 // size of the queues for chunk processing as well as transaction processing
// if watcher falls behind this many blocks, start over. This should be set proportional to `quequeSize`
// if watcher falls behind this many blocks, start over. This should be set proportional to `queueSize`
// such that all transactions from `maxFallBehindBlocks` can easily fit into the queue
maxFallBehindBlocks uint = 200
@ -71,7 +71,7 @@ type (
msgC chan<- *common.MessagePublication // validated (SECURITY: and only validated!) observations go into this channel
obsvReqC <-chan *gossipv1.ObservationRequest // observation requests are coming from this channel
// internal queques
// internal queues
transactionProcessingQueueCounter atomic.Int64
transactionProcessingQueue chan *transactionProcessingJob
chunkProcessingQueue chan nearapi.ChunkHeader
@ -102,7 +102,7 @@ func NewWatcher(
msgC: msgC,
obsvReqC: obsvReqC,
transactionProcessingQueue: make(chan *transactionProcessingJob),
chunkProcessingQueue: make(chan nearapi.ChunkHeader, quequeSize),
chunkProcessingQueue: make(chan nearapi.ChunkHeader, queueSize),
eventChanBlockProcessedHeight: make(chan uint64, 10),
eventChanTxProcessedDuration: make(chan time.Duration, 10),
eventChan: make(chan eventType, 10),
@ -298,6 +298,8 @@ func (e *Watcher) Run(ctx context.Context) error {
return ctx.Err()
}
// schedule pushes a job to workers after delay. It is context aware and will not execute the job if the context
// is cancelled before delay has passed and the job is picked up by a worker.
func (e *Watcher) schedule(ctx context.Context, job *transactionProcessingJob, delay time.Duration) {
go func() {
timer := time.NewTimer(delay)