From e98bb644360176b29868bde6babf633851af085c Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Mon, 29 Jan 2024 14:00:54 -0600 Subject: [PATCH] Node/CCQServer: log responses (#3752) --- node/cmd/ccq/http.go | 8 ++++- node/cmd/ccq/loggingMap.go | 69 ++++++++++++++++++++++++++++++++++++ node/cmd/ccq/p2p.go | 5 ++- node/cmd/ccq/permissions.go | 3 ++ node/cmd/ccq/query_server.go | 9 +++-- 5 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 node/cmd/ccq/loggingMap.go diff --git a/node/cmd/ccq/http.go b/node/cmd/ccq/http.go index 2823f7057..8c9e940ed 100644 --- a/node/cmd/ccq/http.go +++ b/node/cmd/ccq/http.go @@ -38,6 +38,7 @@ type httpServer struct { permissions *Permissions signerKey *ecdsa.PrivateKey pendingResponses *PendingResponses + loggingMap *LoggingMap } func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { @@ -147,6 +148,10 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { return } + if permEntry.logResponses { + s.loggingMap.AddRequest(requestId) + } + s.logger.Info("posting request to gossip", zap.String("userId", permEntry.userName), zap.String("requestId", requestId)) err = s.topic.Publish(r.Context(), b) if err != nil { @@ -202,7 +207,7 @@ func (s *httpServer) handleQuery(w http.ResponseWriter, r *http.Request) { s.pendingResponses.Remove(pendingResponse) } -func NewHTTPServer(addr string, t *pubsub.Topic, permissions *Permissions, signerKey *ecdsa.PrivateKey, p *PendingResponses, logger *zap.Logger, env common.Environment) *http.Server { +func NewHTTPServer(addr string, t *pubsub.Topic, permissions *Permissions, signerKey *ecdsa.PrivateKey, p *PendingResponses, logger *zap.Logger, env common.Environment, loggingMap *LoggingMap) *http.Server { s := &httpServer{ topic: t, permissions: permissions, @@ -210,6 +215,7 @@ func NewHTTPServer(addr string, t *pubsub.Topic, permissions *Permissions, signe pendingResponses: p, logger: logger, env: env, + loggingMap: loggingMap, } r := mux.NewRouter() r.HandleFunc("/v1/query", s.handleQuery).Methods("PUT", "POST", "OPTIONS") diff --git a/node/cmd/ccq/loggingMap.go b/node/cmd/ccq/loggingMap.go new file mode 100644 index 000000000..b8e64bc0c --- /dev/null +++ b/node/cmd/ccq/loggingMap.go @@ -0,0 +1,69 @@ +package ccq + +import ( + "context" + "sync" + "time" + + "github.com/certusone/wormhole/node/pkg/common" + "go.uber.org/zap" +) + +// LoggingMap is used to track the requests for which we should log response. It contains a map keyed by the request signature +// where the payload is time the request was received. Requests will be removed from the map after two minutes. +type LoggingMap struct { + loggingLock sync.Mutex + loggingMap map[string]time.Time +} + +// NewLoggingMap creates the map used to track requests for which we should log responses. +func NewLoggingMap() *LoggingMap { + return &LoggingMap{ + loggingMap: make(map[string]time.Time), + } +} + +// Start starts a go routine to clean up requests that have been in the map for two minutes. +func (lm *LoggingMap) Start(ctx context.Context, logger *zap.Logger, errC chan error) { + common.RunWithScissors(ctx, errC, "logging_cleanup", func(ctx context.Context) error { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + lm.CleanUp(logger) + } + } + }) +} + +// CleanUp iterates over the map and removes all entries that are more than two minutes old. +func (lm *LoggingMap) CleanUp(logger *zap.Logger) { + lm.loggingLock.Lock() + defer lm.loggingLock.Unlock() + for requestId, cleanUpTime := range lm.loggingMap { + if time.Now().After(cleanUpTime) { + delete(lm.loggingMap, requestId) + } + } +} + +// AddRequest adds a request to the map, giving it an expiration time two minutes into the future. +func (lm *LoggingMap) AddRequest(requestSignature string) { + lm.loggingLock.Lock() + defer lm.loggingLock.Unlock() + lm.loggingMap[requestSignature] = time.Now().Add(2 * time.Minute) +} + +// ShouldLogResponse returns true if the request is in the map. +func (lm *LoggingMap) ShouldLogResponse(requestSignature string) bool { + lm.loggingLock.Lock() + defer lm.loggingLock.Unlock() + if _, exists := lm.loggingMap[requestSignature]; exists { + return true + } + return false +} diff --git a/node/cmd/ccq/p2p.go b/node/cmd/ccq/p2p.go index 8e4e1d096..6bf23db19 100644 --- a/node/cmd/ccq/p2p.go +++ b/node/cmd/ccq/p2p.go @@ -37,7 +37,7 @@ type P2PSub struct { host host.Host } -func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, bootstrapPeers, ethRpcUrl, ethCoreAddr string, pendingResponses *PendingResponses, logger *zap.Logger, monitorPeers bool) (*P2PSub, error) { +func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, bootstrapPeers, ethRpcUrl, ethCoreAddr string, pendingResponses *PendingResponses, logger *zap.Logger, monitorPeers bool, loggingMap *LoggingMap) (*P2PSub, error) { // p2p setup components := p2p.DefaultComponents() components.Port = port @@ -168,6 +168,9 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot } requestSignature := hex.EncodeToString(queryResponse.Request.Signature) logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature)) + if loggingMap.ShouldLogResponse(requestSignature) { + logger.Info("logging response", zap.Any("requestId", requestSignature), zap.Any("response", queryResponse)) + } // Check that we're handling the request for this response pendingResponse := pendingResponses.Get(requestSignature) if pendingResponse == nil { diff --git a/node/cmd/ccq/permissions.go b/node/cmd/ccq/permissions.go index e4fb7bf46..f0eacd542 100644 --- a/node/cmd/ccq/permissions.go +++ b/node/cmd/ccq/permissions.go @@ -28,6 +28,7 @@ type ( UserName string `json:"userName"` ApiKey string `json:"apiKey"` AllowUnsigned bool `json:"allowUnsigned"` + LogResponses bool `json:"logResponses"` AllowedCalls []AllowedCall `json:"allowedCalls"` } @@ -67,6 +68,7 @@ type ( userName string apiKey string allowUnsigned bool + logResponses bool allowedCalls allowedCallsForUser // Key is something like "ethCall:2:000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d6:06fdde03" } @@ -267,6 +269,7 @@ func parseConfig(byteValue []byte) (PermissionsMap, error) { userName: user.UserName, apiKey: apiKey, allowUnsigned: user.AllowUnsigned, + logResponses: user.LogResponses, allowedCalls: allowedCalls, } diff --git a/node/cmd/ccq/query_server.go b/node/cmd/ccq/query_server.go index b717f72c9..9ef51e976 100644 --- a/node/cmd/ccq/query_server.go +++ b/node/cmd/ccq/query_server.go @@ -148,6 +148,8 @@ func runQueryServer(cmd *cobra.Command, args []string) { logger.Fatal("Failed to load permissions file", zap.String("permFile", *permFile), zap.Error(err)) } + loggingMap := NewLoggingMap() + // Load p2p private key var priv crypto.PrivKey priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath) @@ -170,14 +172,14 @@ func runQueryServer(cmd *cobra.Command, args []string) { // Run p2p pendingResponses := NewPendingResponses() - p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers) + p2p, err := runP2P(ctx, priv, *p2pPort, networkID, *p2pBootstrap, *ethRPC, *ethContract, pendingResponses, logger, *monitorPeers, loggingMap) if err != nil { logger.Fatal("Failed to start p2p", zap.Error(err)) } // Start the HTTP server go func() { - s := NewHTTPServer(*listenAddr, p2p.topic_req, permissions, signerKey, pendingResponses, logger, env) + s := NewHTTPServer(*listenAddr, p2p.topic_req, permissions, signerKey, pendingResponses, logger, env, loggingMap) logger.Sugar().Infof("Server listening on %s", *listenAddr) err := s.ListenAndServe() if err != nil && err != http.ErrServerClosed { @@ -249,6 +251,9 @@ func runQueryServer(cmd *cobra.Command, args []string) { errC := make(chan error) permissions.StartWatcher(ctx, logger, errC) + // Star logging cleanup process. + loggingMap.Start(ctx, logger, errC) + // Wait for either a shutdown or a fatal error from the permissions watcher. select { case <-ctx.Done():