Zecwallet latency logging

This commit is contained in:
adityapk00 2021-04-20 10:40:36 -07:00
parent cac5873ecd
commit 7ff8513186
1 changed files with 104 additions and 1 deletions

View File

@ -11,29 +11,43 @@ import (
"encoding/json"
"errors"
"io"
"math"
"net"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/adityapk00/lightwalletd/common"
"github.com/adityapk00/lightwalletd/parser"
"github.com/adityapk00/lightwalletd/walletrpc"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
type latencyCacheEntry struct {
timeNanos int64
lastBlock uint64
totalBlocks uint64
}
type lwdStreamer struct {
cache *common.BlockCache
chainName string
pingEnable bool
metrics *common.PrometheusMetrics
walletrpc.UnimplementedCompactTxStreamerServer
latencyCache map[string]*latencyCacheEntry
latencyMutex sync.RWMutex
}
// NewLwdStreamer constructs a gRPC context.
func NewLwdStreamer(cache *common.BlockCache, chainName string, enablePing bool, metrics *common.PrometheusMetrics) (walletrpc.CompactTxStreamerServer, error) {
return &lwdStreamer{cache: cache, chainName: chainName, pingEnable: enablePing, metrics: metrics}, nil
return &lwdStreamer{cache: cache, chainName: chainName, pingEnable: enablePing, metrics: metrics, latencyCache: make(map[string]*latencyCacheEntry), latencyMutex: sync.RWMutex{}}, nil
}
// DarksideStreamer holds the gRPC state for darksidewalletd.
@ -56,6 +70,34 @@ func checkTaddress(taddr string) error {
return nil
}
func (s *lwdStreamer) peerIPFromContext(ctx context.Context) string {
if xRealIP, ok := metadata.FromIncomingContext(ctx); ok {
realIP := xRealIP.Get("x-real-ip")
if len(realIP) > 0 {
return realIP[0]
}
}
if peerInfo, ok := peer.FromContext(ctx); ok {
ip, _, err := net.SplitHostPort(peerInfo.Addr.String())
if err == nil {
return ip
}
}
return "unknown"
}
func (s *lwdStreamer) dailyActiveBlock(height uint64, peerip string) {
if height%1152 == 0 {
common.Log.WithFields(logrus.Fields{
"method": "DailyActiveBlock",
"peer_addr": peerip,
"block_height": height,
}).Info("Service")
}
}
// GetLatestBlock returns the height of the best chain, according to zcashd.
func (s *lwdStreamer) GetLatestBlock(ctx context.Context, placeholder *walletrpc.ChainSpec) (*walletrpc.BlockID, error) {
result, rpcErr := common.RawRequest("getblockchaininfo", []json.RawMessage{})
@ -164,6 +206,67 @@ func (s *lwdStreamer) GetBlockRange(span *walletrpc.BlockRange, resp walletrpc.C
return errors.New("Must specify start and end heights")
}
peerip := s.peerIPFromContext(resp.Context())
// Latency logging
go func() {
// If there is no ip, ignore
if peerip == "unknown" {
return
}
// Log only if bulk requesting blocks
if span.End.Height-span.Start.Height < 100 {
return
}
now := time.Now().UnixNano()
s.latencyMutex.Lock()
defer s.latencyMutex.Unlock()
// remove all old entries
for ip, entry := range s.latencyCache {
if entry.timeNanos+int64(30*math.Pow10(9)) < now { // delete after 30 seconds
delete(s.latencyCache, ip)
}
}
// Look up if this ip address has a previous getblock range
if entry, ok := s.latencyCache[peerip]; ok {
// Log only continous blocks
if entry.lastBlock+1 == span.Start.Height {
common.Log.WithFields(logrus.Fields{
"method": "GetBlockRangeLatency",
"peer_addr": peerip,
"num_blocks": entry.totalBlocks,
"end_height": entry.lastBlock,
"latency_millis": (now - entry.timeNanos) / int64(math.Pow10(6)),
}).Info("Service")
}
}
// Add or update the ip entry
s.latencyCache[peerip] = &latencyCacheEntry{
lastBlock: span.End.Height,
totalBlocks: span.End.Height - span.Start.Height + 1,
timeNanos: now,
}
}()
// Log a daily active user if the user requests the day's "key block"
go func() {
for height := span.Start.Height; height <= span.End.Height; height++ {
s.dailyActiveBlock(height, peerip)
}
}()
common.Log.WithFields(logrus.Fields{
"method": "GetBlockRange",
"start": span.Start.Height,
"end": span.End.Height,
"peer_addr": peerip,
}).Info("Service")
go common.GetBlockRange(s.cache, blockChan, errChan, int(span.Start.Height), int(span.End.Height))
for {