From b48cb6607c5d65486ffbd1b53038cb5e586ed642 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Fri, 2 Feb 2024 14:10:51 -0600 Subject: [PATCH] Node/CCQServer: Quorum not met (#3758) * Node/CCQServer: Quorum not met * Better track failed queries * Add total responses by chain and peer ID --- node/cmd/ccq/http.go | 14 +++++++-- node/cmd/ccq/metrics.go | 30 +++++++++++++++++++ node/cmd/ccq/p2p.go | 52 ++++++++++++++++++++++++++++----- node/cmd/ccq/pending_request.go | 19 ++++++++---- 4 files changed, 100 insertions(+), 15 deletions(-) diff --git a/node/cmd/ccq/http.go b/node/cmd/ccq/http.go index a2e544d23..28e660e60 100644 --- a/node/cmd/ccq/http.go +++ b/node/cmd/ccq/http.go @@ -138,7 +138,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { return } - pendingResponse := NewPendingResponse(signedQueryRequest) + pendingResponse := NewPendingResponse(signedQueryRequest, permEntry.userName) added := s.pendingResponses.Add(pendingResponse) if !added { s.logger.Info("duplicate request", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) @@ -168,6 +168,8 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { case <-time.After(query.RequestTimeout + 5*time.Second): s.logger.Info("publishing time out to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) http.Error(w, "Timed out waiting for response", http.StatusGatewayTimeout) + queryTimeoutsByUser.WithLabelValues(permEntry.userName).Inc() + failedQueriesByUser.WithLabelValues(permEntry.userName).Inc() case res := <-pendingResponse.ch: s.logger.Info("publishing response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) resBytes, err := res.Response.Marshal() @@ -175,7 +177,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { s.logger.Error("failed to marshal response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_response").Inc() - invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc() + failedQueriesByUser.WithLabelValues(permEntry.userName).Inc() break } // Signature indices must be ascending for on-chain verification @@ -197,9 +199,15 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { s.logger.Error("failed to encode response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) invalidQueryRequestReceived.WithLabelValues("failed_to_encode_response").Inc() - invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc() + failedQueriesByUser.WithLabelValues(permEntry.userName).Inc() break } + successfulQueriesByUser.WithLabelValues(permEntry.userName).Inc() + case errEntry := <-pendingResponse.errCh: + s.logger.Info("publishing error response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Int("status", errEntry.status), zap.Error(errEntry.err)) + http.Error(w, errEntry.err.Error(), errEntry.status) + // Metrics have already been pegged. + break } totalQueryTime.Observe(float64(time.Since(start).Milliseconds())) diff --git a/node/cmd/ccq/metrics.go b/node/cmd/ccq/metrics.go index 303ec8e51..b71ee09d6 100644 --- a/node/cmd/ccq/metrics.go +++ b/node/cmd/ccq/metrics.go @@ -36,6 +36,30 @@ var ( Help: "Total number of requests by user name", }, []string{"user_name"}) + successfulQueriesByUser = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_server_successful_queries_by_user", + Help: "Total number of successful queries by user name", + }, []string{"user_name"}) + + failedQueriesByUser = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_server_failed_queries_by_user", + Help: "Total number of failed queries by user name", + }, []string{"user_name"}) + + queryTimeoutsByUser = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_server_query_timeouts_by_user", + Help: "Total number of query timeouts by user name", + }, []string{"user_name"}) + + quorumNotMetByUser = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_server_quorum_not_met_by_user", + Help: "Total number of query failures due to quorum not met by user name", + }, []string{"user_name"}) + invalidRequestsByUser = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "ccq_server_invalid_requests_by_user", @@ -48,6 +72,12 @@ var ( Help: "Total number of query responses received by peer ID", }, []string{"peer_id"}) + queryResponsesReceivedByChainAndPeerID = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ccq_server_total_query_responses_received_by_chain_and_peer_id", + Help: "Total number of query responses received by chain and peer ID", + }, []string{"chain_name", "peer_id"}) + inboundP2pError = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "ccq_server_inbound_p2p_errors", diff --git a/node/cmd/ccq/p2p.go b/node/cmd/ccq/p2p.go index 3e3b7ee14..f0e2e512f 100644 --- a/node/cmd/ccq/p2p.go +++ b/node/cmd/ccq/p2p.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "fmt" + "net/http" "time" "github.com/certusone/wormhole/node/pkg/p2p" @@ -166,6 +167,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot inboundP2pError.WithLabelValues("failed_to_unmarshal_response").Inc() continue } + for _, pcr := range queryResponse.PerChainResponses { + queryResponsesReceivedByChainAndPeerID.WithLabelValues(pcr.ChainId.String(), peerId).Inc() + } requestSignature := hex.EncodeToString(queryResponse.Request.Signature) logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature)) if loggingMap.ShouldLogResponse(requestSignature) { @@ -230,8 +234,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot delete(responses, requestSignature) select { case pendingResponse.ch <- s: - logger.Info("forwarded query response", + logger.Info("quorum reached, forwarded query response", zap.String("peerId", peerId), + zap.String("userId", pendingResponse.userName), zap.Any("requestId", requestSignature), zap.Int("numSigners", numSigners), zap.Int("quorum", quorum), @@ -241,12 +246,45 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot // Leave the request in the pending map. It will get cleaned up if it times out. } } else { - logger.Info("waiting for more query responses", - zap.String("peerId", peerId), - zap.Any("requestId", requestSignature), - zap.Int("numSigners", numSigners), - zap.Int("quorum", quorum), - ) + // Proxy should return early if quorum is no longer possible - i.e maxMatchingResponses + outstandingResponses < quorum + var totalSigners, maxMatchingResponses int + for _, signers := range responses[requestSignature] { + totalSigners += len(signers) + if len(signers) > maxMatchingResponses { + maxMatchingResponses = len(signers) + } + } + outstandingResponses := len(guardianSet.Keys) - totalSigners + if maxMatchingResponses+outstandingResponses < quorum { + quorumNotMetByUser.WithLabelValues(pendingResponse.userName).Inc() + failedQueriesByUser.WithLabelValues(pendingResponse.userName).Inc() + delete(responses, requestSignature) + select { + case pendingResponse.errCh <- &ErrorEntry{err: fmt.Errorf("quorum not met"), status: http.StatusBadRequest}: + logger.Info("query failed, quorum not met", + zap.String("peerId", peerId), + zap.String("userId", pendingResponse.userName), + zap.Any("requestId", requestSignature), + zap.Int("numSigners", numSigners), + zap.Int("maxMatchingResponses", maxMatchingResponses), + zap.Int("outstandingResponses", outstandingResponses), + zap.Int("quorum", quorum), + ) + default: + logger.Error("failed to write query error response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature)) + // Leave the request in the pending map. It will get cleaned up if it times out. + } + } else { + logger.Info("waiting for more query responses", + zap.String("peerId", peerId), + zap.String("userId", pendingResponse.userName), + zap.Any("requestId", requestSignature), + zap.Int("numSigners", numSigners), + zap.Int("maxMatchingResponses", maxMatchingResponses), + zap.Int("outstandingResponses", outstandingResponses), + zap.Int("quorum", quorum), + ) + } } } else { logger.Warn("received observation by unknown guardian - is our guardian set outdated?", diff --git a/node/cmd/ccq/pending_request.go b/node/cmd/ccq/pending_request.go index 8ca74417d..78942ec93 100644 --- a/node/cmd/ccq/pending_request.go +++ b/node/cmd/ccq/pending_request.go @@ -8,14 +8,23 @@ import ( ) type PendingResponse struct { - req *gossipv1.SignedQueryRequest - ch chan *SignedResponse + req *gossipv1.SignedQueryRequest + userName string + ch chan *SignedResponse + errCh chan *ErrorEntry } -func NewPendingResponse(req *gossipv1.SignedQueryRequest) *PendingResponse { +type ErrorEntry struct { + err error + status int +} + +func NewPendingResponse(req *gossipv1.SignedQueryRequest, userName string) *PendingResponse { return &PendingResponse{ - req: req, - ch: make(chan *SignedResponse), + req: req, + userName: userName, + ch: make(chan *SignedResponse), + errCh: make(chan *ErrorEntry), } }