Node/CCQServer: Quorum not met (#3758)

* Node/CCQServer: Quorum not met

* Better track failed queries

* Add total responses by chain and peer ID
This commit is contained in:
bruce-riley 2024-02-02 14:10:51 -06:00 committed by GitHub
parent 696cb909b7
commit b48cb6607c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 100 additions and 15 deletions

View File

@ -138,7 +138,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
return return
} }
pendingResponse := NewPendingResponse(signedQueryRequest) pendingResponse := NewPendingResponse(signedQueryRequest, permEntry.userName)
added := s.pendingResponses.Add(pendingResponse) added := s.pendingResponses.Add(pendingResponse)
if !added { if !added {
s.logger.Info("duplicate request", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) s.logger.Info("duplicate request", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
@ -168,6 +168,8 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
case <-time.After(query.RequestTimeout + 5*time.Second): case <-time.After(query.RequestTimeout + 5*time.Second):
s.logger.Info("publishing time out to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) s.logger.Info("publishing time out to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
http.Error(w, "Timed out waiting for response", http.StatusGatewayTimeout) http.Error(w, "Timed out waiting for response", http.StatusGatewayTimeout)
queryTimeoutsByUser.WithLabelValues(permEntry.userName).Inc()
failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
case res := <-pendingResponse.ch: case res := <-pendingResponse.ch:
s.logger.Info("publishing response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) s.logger.Info("publishing response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId))
resBytes, err := res.Response.Marshal() resBytes, err := res.Response.Marshal()
@ -175,7 +177,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
s.logger.Error("failed to marshal response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err)) s.logger.Error("failed to marshal response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_response").Inc() invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_response").Inc()
invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc() failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
break break
} }
// Signature indices must be ascending for on-chain verification // Signature indices must be ascending for on-chain verification
@ -197,9 +199,15 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) {
s.logger.Error("failed to encode response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err)) s.logger.Error("failed to encode response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err))
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
invalidQueryRequestReceived.WithLabelValues("failed_to_encode_response").Inc() invalidQueryRequestReceived.WithLabelValues("failed_to_encode_response").Inc()
invalidRequestsByUser.WithLabelValues(permEntry.userName).Inc() failedQueriesByUser.WithLabelValues(permEntry.userName).Inc()
break break
} }
successfulQueriesByUser.WithLabelValues(permEntry.userName).Inc()
case errEntry := <-pendingResponse.errCh:
s.logger.Info("publishing error response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Int("status", errEntry.status), zap.Error(errEntry.err))
http.Error(w, errEntry.err.Error(), errEntry.status)
// Metrics have already been pegged.
break
} }
totalQueryTime.Observe(float64(time.Since(start).Milliseconds())) totalQueryTime.Observe(float64(time.Since(start).Milliseconds()))

View File

@ -36,6 +36,30 @@ var (
Help: "Total number of requests by user name", Help: "Total number of requests by user name",
}, []string{"user_name"}) }, []string{"user_name"})
successfulQueriesByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_successful_queries_by_user",
Help: "Total number of successful queries by user name",
}, []string{"user_name"})
failedQueriesByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_failed_queries_by_user",
Help: "Total number of failed queries by user name",
}, []string{"user_name"})
queryTimeoutsByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_query_timeouts_by_user",
Help: "Total number of query timeouts by user name",
}, []string{"user_name"})
quorumNotMetByUser = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_quorum_not_met_by_user",
Help: "Total number of query failures due to quorum not met by user name",
}, []string{"user_name"})
invalidRequestsByUser = promauto.NewCounterVec( invalidRequestsByUser = promauto.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "ccq_server_invalid_requests_by_user", Name: "ccq_server_invalid_requests_by_user",
@ -48,6 +72,12 @@ var (
Help: "Total number of query responses received by peer ID", Help: "Total number of query responses received by peer ID",
}, []string{"peer_id"}) }, []string{"peer_id"})
queryResponsesReceivedByChainAndPeerID = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ccq_server_total_query_responses_received_by_chain_and_peer_id",
Help: "Total number of query responses received by chain and peer ID",
}, []string{"chain_name", "peer_id"})
inboundP2pError = promauto.NewCounterVec( inboundP2pError = promauto.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "ccq_server_inbound_p2p_errors", Name: "ccq_server_inbound_p2p_errors",

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net/http"
"time" "time"
"github.com/certusone/wormhole/node/pkg/p2p" "github.com/certusone/wormhole/node/pkg/p2p"
@ -166,6 +167,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
inboundP2pError.WithLabelValues("failed_to_unmarshal_response").Inc() inboundP2pError.WithLabelValues("failed_to_unmarshal_response").Inc()
continue continue
} }
for _, pcr := range queryResponse.PerChainResponses {
queryResponsesReceivedByChainAndPeerID.WithLabelValues(pcr.ChainId.String(), peerId).Inc()
}
requestSignature := hex.EncodeToString(queryResponse.Request.Signature) requestSignature := hex.EncodeToString(queryResponse.Request.Signature)
logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature)) logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
if loggingMap.ShouldLogResponse(requestSignature) { if loggingMap.ShouldLogResponse(requestSignature) {
@ -230,8 +234,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
delete(responses, requestSignature) delete(responses, requestSignature)
select { select {
case pendingResponse.ch <- s: case pendingResponse.ch <- s:
logger.Info("forwarded query response", logger.Info("quorum reached, forwarded query response",
zap.String("peerId", peerId), zap.String("peerId", peerId),
zap.String("userId", pendingResponse.userName),
zap.Any("requestId", requestSignature), zap.Any("requestId", requestSignature),
zap.Int("numSigners", numSigners), zap.Int("numSigners", numSigners),
zap.Int("quorum", quorum), zap.Int("quorum", quorum),
@ -241,12 +246,45 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
// Leave the request in the pending map. It will get cleaned up if it times out. // Leave the request in the pending map. It will get cleaned up if it times out.
} }
} else { } else {
logger.Info("waiting for more query responses", // Proxy should return early if quorum is no longer possible - i.e maxMatchingResponses + outstandingResponses < quorum
zap.String("peerId", peerId), var totalSigners, maxMatchingResponses int
zap.Any("requestId", requestSignature), for _, signers := range responses[requestSignature] {
zap.Int("numSigners", numSigners), totalSigners += len(signers)
zap.Int("quorum", quorum), if len(signers) > maxMatchingResponses {
) maxMatchingResponses = len(signers)
}
}
outstandingResponses := len(guardianSet.Keys) - totalSigners
if maxMatchingResponses+outstandingResponses < quorum {
quorumNotMetByUser.WithLabelValues(pendingResponse.userName).Inc()
failedQueriesByUser.WithLabelValues(pendingResponse.userName).Inc()
delete(responses, requestSignature)
select {
case pendingResponse.errCh <- &ErrorEntry{err: fmt.Errorf("quorum not met"), status: http.StatusBadRequest}:
logger.Info("query failed, quorum not met",
zap.String("peerId", peerId),
zap.String("userId", pendingResponse.userName),
zap.Any("requestId", requestSignature),
zap.Int("numSigners", numSigners),
zap.Int("maxMatchingResponses", maxMatchingResponses),
zap.Int("outstandingResponses", outstandingResponses),
zap.Int("quorum", quorum),
)
default:
logger.Error("failed to write query error response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
// Leave the request in the pending map. It will get cleaned up if it times out.
}
} else {
logger.Info("waiting for more query responses",
zap.String("peerId", peerId),
zap.String("userId", pendingResponse.userName),
zap.Any("requestId", requestSignature),
zap.Int("numSigners", numSigners),
zap.Int("maxMatchingResponses", maxMatchingResponses),
zap.Int("outstandingResponses", outstandingResponses),
zap.Int("quorum", quorum),
)
}
} }
} else { } else {
logger.Warn("received observation by unknown guardian - is our guardian set outdated?", logger.Warn("received observation by unknown guardian - is our guardian set outdated?",

View File

@ -8,14 +8,23 @@ import (
) )
type PendingResponse struct { type PendingResponse struct {
req *gossipv1.SignedQueryRequest req *gossipv1.SignedQueryRequest
ch chan *SignedResponse userName string
ch chan *SignedResponse
errCh chan *ErrorEntry
} }
func NewPendingResponse(req *gossipv1.SignedQueryRequest) *PendingResponse { type ErrorEntry struct {
err error
status int
}
func NewPendingResponse(req *gossipv1.SignedQueryRequest, userName string) *PendingResponse {
return &PendingResponse{ return &PendingResponse{
req: req, req: req,
ch: make(chan *SignedResponse), userName: userName,
ch: make(chan *SignedResponse),
errCh: make(chan *ErrorEntry),
} }
} }