From 7ff8513186682d4d88187582114534f2f66cb3cd Mon Sep 17 00:00:00 2001 From: adityapk00 Date: Tue, 20 Apr 2021 10:40:36 -0700 Subject: [PATCH] Zecwallet latency logging --- frontend/service.go | 105 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/frontend/service.go b/frontend/service.go index 60e7b2a..c70a2ca 100644 --- a/frontend/service.go +++ b/frontend/service.go @@ -11,29 +11,43 @@ import ( "encoding/json" "errors" "io" + "math" + "net" "regexp" "sort" "strconv" "strings" + "sync" "sync/atomic" "time" "github.com/adityapk00/lightwalletd/common" "github.com/adityapk00/lightwalletd/parser" "github.com/adityapk00/lightwalletd/walletrpc" + "github.com/sirupsen/logrus" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" ) +type latencyCacheEntry struct { + timeNanos int64 + lastBlock uint64 + totalBlocks uint64 +} + type lwdStreamer struct { cache *common.BlockCache chainName string pingEnable bool metrics *common.PrometheusMetrics walletrpc.UnimplementedCompactTxStreamerServer + latencyCache map[string]*latencyCacheEntry + latencyMutex sync.RWMutex } // NewLwdStreamer constructs a gRPC context. func NewLwdStreamer(cache *common.BlockCache, chainName string, enablePing bool, metrics *common.PrometheusMetrics) (walletrpc.CompactTxStreamerServer, error) { - return &lwdStreamer{cache: cache, chainName: chainName, pingEnable: enablePing, metrics: metrics}, nil + return &lwdStreamer{cache: cache, chainName: chainName, pingEnable: enablePing, metrics: metrics, latencyCache: make(map[string]*latencyCacheEntry), latencyMutex: sync.RWMutex{}}, nil } // DarksideStreamer holds the gRPC state for darksidewalletd. @@ -56,6 +70,34 @@ func checkTaddress(taddr string) error { return nil } +func (s *lwdStreamer) peerIPFromContext(ctx context.Context) string { + if xRealIP, ok := metadata.FromIncomingContext(ctx); ok { + realIP := xRealIP.Get("x-real-ip") + if len(realIP) > 0 { + return realIP[0] + } + } + + if peerInfo, ok := peer.FromContext(ctx); ok { + ip, _, err := net.SplitHostPort(peerInfo.Addr.String()) + if err == nil { + return ip + } + } + + return "unknown" +} + +func (s *lwdStreamer) dailyActiveBlock(height uint64, peerip string) { + if height%1152 == 0 { + common.Log.WithFields(logrus.Fields{ + "method": "DailyActiveBlock", + "peer_addr": peerip, + "block_height": height, + }).Info("Service") + } +} + // GetLatestBlock returns the height of the best chain, according to zcashd. func (s *lwdStreamer) GetLatestBlock(ctx context.Context, placeholder *walletrpc.ChainSpec) (*walletrpc.BlockID, error) { result, rpcErr := common.RawRequest("getblockchaininfo", []json.RawMessage{}) @@ -164,6 +206,67 @@ func (s *lwdStreamer) GetBlockRange(span *walletrpc.BlockRange, resp walletrpc.C return errors.New("Must specify start and end heights") } + peerip := s.peerIPFromContext(resp.Context()) + + // Latency logging + go func() { + // If there is no ip, ignore + if peerip == "unknown" { + return + } + + // Log only if bulk requesting blocks + if span.End.Height-span.Start.Height < 100 { + return + } + + now := time.Now().UnixNano() + s.latencyMutex.Lock() + defer s.latencyMutex.Unlock() + + // remove all old entries + for ip, entry := range s.latencyCache { + if entry.timeNanos+int64(30*math.Pow10(9)) < now { // delete after 30 seconds + delete(s.latencyCache, ip) + } + } + + // Look up if this ip address has a previous getblock range + if entry, ok := s.latencyCache[peerip]; ok { + // Log only continous blocks + if entry.lastBlock+1 == span.Start.Height { + common.Log.WithFields(logrus.Fields{ + "method": "GetBlockRangeLatency", + "peer_addr": peerip, + "num_blocks": entry.totalBlocks, + "end_height": entry.lastBlock, + "latency_millis": (now - entry.timeNanos) / int64(math.Pow10(6)), + }).Info("Service") + } + } + + // Add or update the ip entry + s.latencyCache[peerip] = &latencyCacheEntry{ + lastBlock: span.End.Height, + totalBlocks: span.End.Height - span.Start.Height + 1, + timeNanos: now, + } + }() + + // Log a daily active user if the user requests the day's "key block" + go func() { + for height := span.Start.Height; height <= span.End.Height; height++ { + s.dailyActiveBlock(height, peerip) + } + }() + + common.Log.WithFields(logrus.Fields{ + "method": "GetBlockRange", + "start": span.Start.Height, + "end": span.End.Height, + "peer_addr": peerip, + }).Info("Service") + go common.GetBlockRange(s.cache, blockChan, errChan, int(span.Start.Height), int(span.End.Height)) for {