diff --git a/node/cmd/ccq/http.go b/node/cmd/ccq/http.go index 304ca1331..330def27c 100644 --- a/node/cmd/ccq/http.go +++ b/node/cmd/ccq/http.go @@ -62,7 +62,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { var q queryRequest err := json.NewDecoder(http.MaxBytesReader(w, r.Body, MAX_BODY_SIZE)).Decode(&q) if err != nil { - s.logger.Debug("failed to decode body", zap.Error(err)) + s.logger.Error("failed to decode body", zap.Error(err)) http.Error(w, err.Error(), http.StatusBadRequest) invalidQueryRequestReceived.WithLabelValues("failed_to_decode_body").Inc() return @@ -71,7 +71,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { // There should be one and only one API key in the header. apiKeys, exists := r.Header["X-Api-Key"] if !exists || len(apiKeys) != 1 { - s.logger.Debug("received a request with the wrong number of api keys", zap.Stringer("url", r.URL), zap.Int("numApiKeys", len(apiKeys))) + s.logger.Error("received a request with the wrong number of api keys", zap.Stringer("url", r.URL), zap.Int("numApiKeys", len(apiKeys))) http.Error(w, "api key is missing", http.StatusUnauthorized) invalidQueryRequestReceived.WithLabelValues("missing_api_key").Inc() return @@ -79,9 +79,9 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { apiKey := strings.ToLower(apiKeys[0]) // Make sure the user is authorized before we go any farther. - _, exists = s.permissions[apiKey] + permEntry, exists := s.permissions[apiKey] if !exists { - s.logger.Debug("invalid api key", zap.String("apiKey", apiKey)) + s.logger.Error("invalid api key", zap.String("apiKey", apiKey)) http.Error(w, "invalid api key", http.StatusForbidden) invalidQueryRequestReceived.WithLabelValues("invalid_api_key").Inc() return @@ -89,7 +89,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { queryRequestBytes, err := hex.DecodeString(q.Bytes) if err != nil { - s.logger.Debug("failed to decode request bytes", zap.Error(err)) + s.logger.Error("failed to decode request bytes", zap.String("userId", permEntry.userName), zap.Error(err)) http.Error(w, err.Error(), http.StatusBadRequest) invalidQueryRequestReceived.WithLabelValues("failed_to_decode_request").Inc() return @@ -97,7 +97,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { signature, err := hex.DecodeString(q.Signature) if err != nil { - s.logger.Debug("failed to decode signature bytes", zap.Error(err)) + s.logger.Error("failed to decode signature bytes", zap.String("userId", permEntry.userName), zap.Error(err)) http.Error(w, err.Error(), http.StatusBadRequest) invalidQueryRequestReceived.WithLabelValues("failed_to_decode_signature").Inc() return @@ -108,8 +108,11 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { Signature: signature, } + requestId := hex.EncodeToString(signedQueryRequest.Signature) + s.logger.Info("received request from client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) + if status, err := validateRequest(s.logger, s.env, s.permissions, s.signerKey, apiKey, signedQueryRequest); err != nil { - // Don't need to log here because the details were logged in the function. + s.logger.Error("failed to validate request", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Int("status", status), zap.Error(err)) http.Error(w, err.Error(), status) // Metric has already been pegged. return @@ -123,7 +126,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { b, err := proto.Marshal(&m) if err != nil { - s.logger.Error("failed to marshal gossip message", zap.Error(err)) + s.logger.Error("failed to marshal gossip message", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_gossip_msg").Inc() return @@ -132,14 +135,16 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { pendingResponse := NewPendingResponse(signedQueryRequest) added := s.pendingResponses.Add(pendingResponse) if !added { + s.logger.Info("duplicate request", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) http.Error(w, "Duplicate request", http.StatusBadRequest) invalidQueryRequestReceived.WithLabelValues("duplicate_request").Inc() return } + s.logger.Info("posting request to gossip", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) err = s.topic.Publish(r.Context(), b) if err != nil { - s.logger.Error("failed to publish gossip message", zap.Error(err)) + s.logger.Error("failed to publish gossip message", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) invalidQueryRequestReceived.WithLabelValues("failed_to_publish_gossip_msg").Inc() s.pendingResponses.Remove(pendingResponse) @@ -149,11 +154,13 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { // Wait for the response or timeout select { case <-time.After(query.RequestTimeout + 5*time.Second): + s.logger.Info("publishing time out to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) http.Error(w, "Timed out waiting for response", http.StatusGatewayTimeout) case res := <-pendingResponse.ch: + s.logger.Info("publishing response to client", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) resBytes, err := res.Response.Marshal() if err != nil { - s.logger.Error("failed to marshal response", zap.Error(err)) + s.logger.Error("failed to marshal response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) invalidQueryRequestReceived.WithLabelValues("failed_to_marshal_response").Inc() break @@ -174,9 +181,10 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { Bytes: hex.EncodeToString(resBytes), }) if err != nil { - s.logger.Error("failed to encode response", zap.Error(err)) + s.logger.Error("failed to encode response", zap.String("userId", permEntry.userName), zap.String("requestId", requestId), zap.Error(err)) http.Error(w, err.Error(), http.StatusInternalServerError) invalidQueryRequestReceived.WithLabelValues("failed_to_encode_response").Inc() + break } } diff --git a/node/cmd/ccq/p2p.go b/node/cmd/ccq/p2p.go index 354fba38b..e0382e7ea 100644 --- a/node/cmd/ccq/p2p.go +++ b/node/cmd/ccq/p2p.go @@ -119,7 +119,8 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot switch m := msg.Message.(type) { case *gossipv1.GossipMessage_SignedQueryResponse: logger.Debug("query response received", zap.Any("response", m.SignedQueryResponse)) - queryResponsesReceived.WithLabelValues(envelope.GetFrom().String()).Inc() + peerId := envelope.GetFrom().String() + queryResponsesReceived.WithLabelValues(peerId).Inc() var queryResponse query.QueryResponsePublication err := queryResponse.Unmarshal(m.SignedQueryResponse.QueryResponse) if err != nil { @@ -128,6 +129,7 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot continue } requestSignature := hex.EncodeToString(queryResponse.Request.Signature) + logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature)) // Check that we're handling the request for this response pendingResponse := pendingResponses.Get(requestSignature) if pendingResponse == nil { @@ -181,10 +183,22 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot delete(responses, requestSignature) select { case pendingResponse.ch <- s: - logger.Debug("forwarded query response") + logger.Info("forwarded query response", + zap.String("peerId", peerId), + zap.Any("requestId", requestSignature), + zap.Int("numSigners", len(responses[requestSignature][digest])), + zap.Int("quorum", quorum), + ) default: - logger.Error("failed to write query response to channel, dropping it") + logger.Error("failed to write query response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature)) } + } else { + logger.Info("waiting for more query responses", + zap.String("peerId", peerId), + zap.Any("requestId", requestSignature), + zap.Int("numSigners", len(responses[requestSignature][digest])), + zap.Int("quorum", quorum), + ) } } else { logger.Warn("received observation by unknown guardian - is our guardian set outdated?",