From aa22a2b950fbbd10221c25a7e19e82e7fd688ed8 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Tue, 26 Mar 2024 13:02:41 -0500 Subject: [PATCH] Node/CCQ: Load testing tweaks (#3857) --- node/cmd/ccq/pending_request.go | 11 +++++++++-- node/pkg/node/node.go | 4 ++-- node/pkg/node/options.go | 2 +- node/pkg/query/query.go | 10 ++++++++-- node/pkg/watchers/evm/ccq_backfill.go | 2 +- node/pkg/watchers/solana/ccq.go | 14 +++++++++----- 6 files changed, 30 insertions(+), 13 deletions(-) diff --git a/node/cmd/ccq/pending_request.go b/node/cmd/ccq/pending_request.go index 7a0eed967..f5aef0ae5 100644 --- a/node/cmd/ccq/pending_request.go +++ b/node/cmd/ccq/pending_request.go @@ -57,7 +57,7 @@ func (p *PendingResponses) Add(r *PendingResponse) bool { return false } p.pendingResponses[signature] = r - p.updateMetricsAlreadyLocked() + p.updateMetricsAlreadyLocked(nil) return true } @@ -75,6 +75,7 @@ func (p *PendingResponses) Remove(r *PendingResponse) { p.mu.Lock() defer p.mu.Unlock() delete(p.pendingResponses, signature) + p.updateMetricsAlreadyLocked(r) } func (p *PendingResponses) NumPending() int { @@ -83,8 +84,14 @@ func (p *PendingResponses) NumPending() int { return len(p.pendingResponses) } -func (p *PendingResponses) updateMetricsAlreadyLocked() { +func (p *PendingResponses) updateMetricsAlreadyLocked(reqRemoved *PendingResponse) { counts := make(map[vaa.ChainID]float64) + if reqRemoved != nil { + // We may have removed the last request for a chain. Make sure we always update that chain. + for _, pcr := range reqRemoved.queryRequest.PerChainQueries { + counts[pcr.ChainId] = 0 + } + } for _, pr := range p.pendingResponses { for _, pcr := range pr.queryRequest.PerChainQueries { counts[pcr.ChainId] = counts[pcr.ChainId] + 1 diff --git a/node/pkg/node/node.go b/node/pkg/node/node.go index abc9619c9..d957db321 100644 --- a/node/pkg/node/node.go +++ b/node/pkg/node/node.go @@ -120,8 +120,8 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) { // Cross Chain Query Handler channels g.chainQueryReqC = make(map[vaa.ChainID]chan *query.PerChainQueryInternal) g.signedQueryReqC = makeChannelPair[*gossipv1.SignedQueryRequest](query.SignedQueryRequestChannelSize) - g.queryResponseC = makeChannelPair[*query.PerChainQueryResponseInternal](0) - g.queryResponsePublicationC = makeChannelPair[*query.QueryResponsePublication](0) + g.queryResponseC = makeChannelPair[*query.PerChainQueryResponseInternal](query.QueryResponseBufferSize) + g.queryResponsePublicationC = makeChannelPair[*query.QueryResponsePublication](query.QueryResponsePublicationChannelSize) // Guardian set state managed by processor g.gst = common.NewGuardianSetState(nil) diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index bb03b5540..e25ec6904 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -355,7 +355,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC // aggregate per-chain msgC into msgC. // SECURITY defense-in-depth: This way we enforce that a watcher must set the msg.EmitterChain to its chainId, which makes the code easier to audit for _, chainId := range vaa.GetAllNetworkIDs() { - chainQueryResponseC[chainId] = make(chan *query.PerChainQueryResponseInternal) + chainQueryResponseC[chainId] = make(chan *query.PerChainQueryResponseInternal, query.QueryResponseBufferSize) go func(c <-chan *query.PerChainQueryResponseInternal, chainId vaa.ChainID) { for { select { diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go index dced530c4..6af11c3d9 100644 --- a/node/pkg/query/query.go +++ b/node/pkg/query/query.go @@ -29,10 +29,16 @@ const ( AuditInterval = time.Second // SignedQueryRequestChannelSize is the buffer size of the incoming query request channel. - SignedQueryRequestChannelSize = 50 + SignedQueryRequestChannelSize = 500 // QueryRequestBufferSize is the buffer size of the per-network query request channel. - QueryRequestBufferSize = 25 + QueryRequestBufferSize = 250 + + // QueryResponseBufferSize is the buffer size of the single query response channel from the watchers. + QueryResponseBufferSize = 500 + + // QueryResponsePublicationChannelSize is the buffer size of the single query response channel back to the P2P publisher. + QueryResponsePublicationChannelSize = 500 ) func NewQueryHandler( diff --git a/node/pkg/watchers/evm/ccq_backfill.go b/node/pkg/watchers/evm/ccq_backfill.go index bc20cf398..a0c9c811f 100644 --- a/node/pkg/watchers/evm/ccq_backfill.go +++ b/node/pkg/watchers/evm/ccq_backfill.go @@ -115,7 +115,7 @@ func (w *Watcher) ccqBackfillInit(ctx context.Context) error { } if len(newBlocks) == 0 { - w.ccqLogger.Error("failed to read any more blocks, giving up on the backfill") + w.ccqLogger.Warn("failed to read any more blocks, giving up on the backfill") break } diff --git a/node/pkg/watchers/solana/ccq.go b/node/pkg/watchers/solana/ccq.go index 24e173273..137c5f6c4 100644 --- a/node/pkg/watchers/solana/ccq.go +++ b/node/pkg/watchers/solana/ccq.go @@ -93,6 +93,7 @@ func (w *SolanaWatcher) ccqBaseHandleSolanaAccountQueryRequest( requestId string, isRetry bool, publisher ccqCustomPublisher, + numFastRetries int, ) { rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) defer cancel() @@ -123,7 +124,7 @@ func (w *SolanaWatcher) ccqBaseHandleSolanaAccountQueryRequest( // Read the accounts. info, err := w.getMultipleAccountsWithOpts(rCtx, accounts, ¶ms) if err != nil { - if w.ccqCheckForMinSlotContext(ctx, queryRequest, req, requestId, err, giveUpTime, !isRetry, tag, publisher) { + if w.ccqCheckForMinSlotContext(ctx, queryRequest, req, requestId, err, giveUpTime, !isRetry, tag, publisher, numFastRetries) { // Return without posting a response because a go routine was created to handle it. return } @@ -216,6 +217,7 @@ func (w *SolanaWatcher) ccqBaseHandleSolanaAccountQueryRequest( zap.Uint64("blockTime", uint64(*block.BlockTime)), zap.String("blockHash", hex.EncodeToString(block.Blockhash[:])), zap.Uint64("blockHeight", *block.BlockHeight), + zap.Int("numFastRetries", numFastRetries), ) // Publish the response using the custom publisher. @@ -236,6 +238,7 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext( log bool, tag string, publisher ccqCustomPublisher, + numFastRetries int, ) bool { if req.MinContextSlot == 0 { return false @@ -274,7 +277,7 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext( } // Kick off the retry after a short delay. - go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, currentSlotFromError, giveUpTime, log, tag, publisher) + go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, currentSlotFromError, giveUpTime, log, tag, publisher, numFastRetries) return true } @@ -290,6 +293,7 @@ func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery( log bool, tag string, publisher ccqCustomPublisher, + numFastRetries int, ) { if log { w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly", @@ -307,7 +311,7 @@ func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery( w.ccqLogger.Info("initiating fast retry", zap.String("requestId", requestId)) } - w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, tag, requestId, true, publisher) + w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, tag, requestId, true, publisher, numFastRetries+1) } // ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number @@ -365,7 +369,7 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, ) publisher := ccqSolanaAccountPublisher{w} - w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, "sol_account", requestId, false, publisher) + w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, "sol_account", requestId, false, publisher, 0) } // ccqSolanaAccountPublisher is the publisher for the sol_account query. All it has to do is forward the response passed in to the watcher, as is. @@ -427,7 +431,7 @@ func (w *SolanaWatcher) ccqHandleSolanaPdaQueryRequest(ctx context.Context, quer } // Execute the standard sol_account query passing in the publisher to publish a sol_pda response. - w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, acctReq, giveUpTime, "sol_pda", requestId, false, publisher) + w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, acctReq, giveUpTime, "sol_pda", requestId, false, publisher, 0) } // ccqPdaPublisher is a custom publisher that publishes a sol_pda response.