From 25ca13000c8342759659667279e1576dece7edf1 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Mon, 25 May 2020 16:02:03 -0400 Subject: [PATCH] Added metrics to the networking lib --- network/codec.go | 6 +- network/codec_test.go | 2 +- network/commands.go | 40 ++++++++++++- network/metrics.go | 125 ++++++++++++++++++++++++++++++++++++++++ network/msg.go | 6 +- network/network.go | 42 +++++++++++++- network/network_test.go | 15 +++++ network/peer.go | 9 ++- node/node.go | 11 ++-- 9 files changed, 239 insertions(+), 17 deletions(-) create mode 100644 network/metrics.go diff --git a/network/codec.go b/network/codec.go index bcf40ae..0d04f92 100644 --- a/network/codec.go +++ b/network/codec.go @@ -22,14 +22,14 @@ type Codec struct{} // Pack attempts to pack a map of fields into a message. // The first byte of the message is the opcode of the message. -func (Codec) Pack(op uint8, fields map[Field]interface{}) (Msg, error) { +func (Codec) Pack(op Op, fields map[Field]interface{}) (Msg, error) { message, ok := Messages[op] if !ok { return nil, errBadOp } p := wrappers.Packer{MaxSize: math.MaxInt32} - p.PackByte(op) + p.PackByte(byte(op)) for _, field := range message { data, ok := fields[field] if !ok { @@ -49,7 +49,7 @@ func (Codec) Pack(op uint8, fields map[Field]interface{}) (Msg, error) { // The first byte of the message is the opcode of the message. func (Codec) Parse(b []byte) (Msg, error) { p := wrappers.Packer{Bytes: b} - op := p.UnpackByte() + op := Op(p.UnpackByte()) message, ok := Messages[op] if !ok { return nil, errBadOp diff --git a/network/codec_test.go b/network/codec_test.go index 0e5e313..f38cb26 100644 --- a/network/codec_test.go +++ b/network/codec_test.go @@ -30,6 +30,6 @@ func TestCodecParseInvalidOp(t *testing.T) { } func TestCodecParseExtraSpace(t *testing.T) { - _, err := TestCodec.Parse([]byte{GetVersion, 0x00}) + _, err := TestCodec.Parse([]byte{byte(GetVersion), 0x00}) assert.Error(t, err) } diff --git a/network/commands.go b/network/commands.go index 10e29e2..177f58b 100644 --- a/network/commands.go +++ b/network/commands.go @@ -112,10 +112,46 @@ func (f Field) String() string { } } +// Op is an opcode +type Op byte + +func (op Op) String() string { + switch op { + case GetVersion: + return "get_version" + case Version: + return "version" + case GetPeerList: + return "get_peerlist" + case PeerList: + return "peerlist" + case GetAcceptedFrontier: + return "get_accepted_frontier" + case AcceptedFrontier: + return "accepted_frontier" + case GetAccepted: + return "get_accepted" + case Accepted: + return "accepted" + case Get: + return "get" + case Put: + return "put" + case PushQuery: + return "push_query" + case PullQuery: + return "pull_query" + case Chits: + return "chits" + default: + return "Unknown Op" + } +} + // Public commands that may be sent between stakers const ( // Handshake: - GetVersion uint8 = iota + GetVersion Op = iota Version GetPeerList PeerList @@ -134,7 +170,7 @@ const ( // Defines the messages that can be sent/received with this network var ( - Messages = map[uint8][]Field{ + Messages = map[Op][]Field{ // Handshake: GetVersion: []Field{}, Version: []Field{NetworkID, NodeID, MyTime, IP, VersionStr}, diff --git a/network/metrics.go b/network/metrics.go new file mode 100644 index 0000000..3afda5b --- /dev/null +++ b/network/metrics.go @@ -0,0 +1,125 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package network + +import ( + "fmt" + + "github.com/ava-labs/gecko/utils/wrappers" + "github.com/prometheus/client_golang/prometheus" +) + +type messageMetrics struct { + numSent, numFailed, numReceived prometheus.Counter +} + +func (mm *messageMetrics) initialize(msgType Op, registerer prometheus.Registerer) error { + mm.numSent = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "gecko", + Name: fmt.Sprintf("%s_sent", msgType), + Help: fmt.Sprintf("Number of %s messages sent", msgType), + }) + mm.numFailed = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "gecko", + Name: fmt.Sprintf("%s_failed", msgType), + Help: fmt.Sprintf("Number of %s messages that failed to be sent", msgType), + }) + mm.numReceived = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "gecko", + Name: fmt.Sprintf("%s_received", msgType), + Help: fmt.Sprintf("Number of %s messages received", msgType), + }) + + if err := registerer.Register(mm.numSent); err != nil { + return fmt.Errorf("failed to register sent statistics of %s due to %s", + msgType, err) + } + if err := registerer.Register(mm.numFailed); err != nil { + return fmt.Errorf("failed to register failed statistics of %s due to %s", + msgType, err) + } + if err := registerer.Register(mm.numReceived); err != nil { + return fmt.Errorf("failed to register received statistics of %s due to %s", + msgType, err) + } + return nil +} + +type metrics struct { + numPeers prometheus.Gauge + + getVersion, version, + getPeerlist, peerlist, + getAcceptedFrontier, acceptedFrontier, + getAccepted, accepted, + get, put, + pushQuery, pullQuery, chits messageMetrics +} + +func (m *metrics) initialize(registerer prometheus.Registerer) error { + m.numPeers = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "gecko", + Name: "peers", + Help: "Number of network peers", + }) + + errs := wrappers.Errs{} + if err := registerer.Register(m.numPeers); err != nil { + errs.Add(fmt.Errorf("failed to register peers statistics due to %s", + err)) + } + + errs.Add(m.getVersion.initialize(GetVersion, registerer)) + errs.Add(m.version.initialize(Version, registerer)) + errs.Add(m.getPeerlist.initialize(GetPeerList, registerer)) + errs.Add(m.peerlist.initialize(PeerList, registerer)) + errs.Add(m.getAcceptedFrontier.initialize(GetAcceptedFrontier, registerer)) + errs.Add(m.acceptedFrontier.initialize(AcceptedFrontier, registerer)) + errs.Add(m.getAccepted.initialize(GetAccepted, registerer)) + errs.Add(m.accepted.initialize(Accepted, registerer)) + errs.Add(m.get.initialize(Get, registerer)) + errs.Add(m.put.initialize(Put, registerer)) + errs.Add(m.pushQuery.initialize(PushQuery, registerer)) + errs.Add(m.pullQuery.initialize(PullQuery, registerer)) + errs.Add(m.chits.initialize(Chits, registerer)) + + return errs.Err +} + +func (m *metrics) message(msgType Op) *messageMetrics { + switch msgType { + case GetVersion: + return &m.getVersion + case Version: + return &m.version + case GetPeerList: + return &m.getPeerlist + case PeerList: + return &m.peerlist + case GetAcceptedFrontier: + return &m.getAcceptedFrontier + case AcceptedFrontier: + return &m.acceptedFrontier + case GetAccepted: + return &m.getAccepted + case Accepted: + return &m.accepted + case Get: + return &m.get + case Put: + return &m.put + case PushQuery: + return &m.pushQuery + case PullQuery: + return &m.pullQuery + case Chits: + return &m.chits + default: + return nil + } +} diff --git a/network/msg.go b/network/msg.go index df83141..ff624b6 100644 --- a/network/msg.go +++ b/network/msg.go @@ -5,19 +5,19 @@ package network // Msg represents a set of fields that can be serialized into a byte stream type Msg interface { - Op() uint8 + Op() Op Get(Field) interface{} Bytes() []byte } type msg struct { - op uint8 + op Op fields map[Field]interface{} bytes []byte } // Field returns the value of the specified field in this message -func (msg *msg) Op() uint8 { return msg.op } +func (msg *msg) Op() Op { return msg.op } // Field returns the value of the specified field in this message func (msg *msg) Get(field Field) interface{} { return msg.fields[field] } diff --git a/network/network.go b/network/network.go index 4091d40..b5c3f21 100644 --- a/network/network.go +++ b/network/network.go @@ -12,6 +12,8 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/ava-labs/gecko/api/health" "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow/networking/router" @@ -79,6 +81,9 @@ type Network interface { } type network struct { + // The metrics that this network tracks + metrics + log logging.Logger id ids.ShortID ip utils.IPDesc @@ -126,6 +131,7 @@ type network struct { // NewDefaultNetwork returns a new Network implementation with the provided // parameters and some reasonable default values. func NewDefaultNetwork( + registerer prometheus.Registerer, log logging.Logger, id ids.ShortID, ip utils.IPDesc, @@ -140,6 +146,7 @@ func NewDefaultNetwork( router router.Router, ) Network { return NewNetwork( + registerer, log, id, ip, @@ -168,6 +175,7 @@ func NewDefaultNetwork( // NewNetwork returns a new Network implementation with the provided parameters. func NewNetwork( + registerer prometheus.Registerer, log logging.Logger, id ids.ShortID, ip utils.IPDesc, @@ -223,6 +231,7 @@ func NewNetwork( myIPs: map[string]struct{}{ip.String(): struct{}{}}, peers: make(map[[20]byte]*peer), } + net.initialize(registerer) net.executor.Initialize() net.heartbeat() return net @@ -244,6 +253,9 @@ func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, } if !sent { n.executor.Add(func() { n.router.GetAcceptedFrontierFailed(vID, chainID, requestID) }) + n.getAcceptedFrontier.numFailed.Inc() + } else { + n.getAcceptedFrontier.numSent.Inc() } } } @@ -266,6 +278,9 @@ func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requ } if !sent { n.log.Debug("failed to send an AcceptedFrontier message to: %s", validatorID) + n.acceptedFrontier.numFailed.Inc() + } else { + n.acceptedFrontier.numSent.Inc() } } @@ -291,6 +306,9 @@ func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, request } if !sent { n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) }) + n.getAccepted.numFailed.Inc() + } else { + n.getAccepted.numSent.Inc() } } } @@ -313,6 +331,9 @@ func (n *network) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID ui } if !sent { n.log.Debug("failed to send an Accepted message to: %s", validatorID) + n.accepted.numFailed.Inc() + } else { + n.accepted.numSent.Inc() } } @@ -330,6 +351,9 @@ func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, } if !sent { n.log.Debug("failed to send a Get message to: %s", validatorID) + n.get.numFailed.Inc() + } else { + n.get.numSent.Inc() } } @@ -350,6 +374,9 @@ func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, } if !sent { n.log.Debug("failed to send a Put message to: %s", validatorID) + n.put.numFailed.Inc() + } else { + n.put.numSent.Inc() } } @@ -377,6 +404,9 @@ func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID if !sent { n.log.Debug("failed sending a PushQuery message to: %s", vID) n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) }) + n.pushQuery.numFailed.Inc() + } else { + n.pushQuery.numSent.Inc() } } } @@ -398,6 +428,9 @@ func (n *network) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID if !sent { n.log.Debug("failed sending a PullQuery message to: %s", vID) n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) }) + n.pullQuery.numFailed.Inc() + } else { + n.pullQuery.numSent.Inc() } } } @@ -419,6 +452,9 @@ func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint3 } if !sent { n.log.Debug("failed to send a Chits message to: %s", validatorID) + n.chits.numFailed.Inc() + } else { + n.chits.numSent.Inc() } } @@ -536,7 +572,11 @@ func (n *network) gossipContainer(chainID, containerID ids.ID, container []byte) sampler := random.Uniform{N: len(allPeers)} for i := 0; i < numToGossip; i++ { - allPeers[sampler.Sample()].send(msg) + if allPeers[sampler.Sample()].send(msg) { + n.put.numSent.Inc() + } else { + n.put.numFailed.Inc() + } } return nil } diff --git a/network/network_test.go b/network/network_test.go index e7f118c..0fd7053 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" "github.com/ava-labs/gecko/ids" @@ -183,6 +185,7 @@ func TestNewDefaultNetwork(t *testing.T) { handler := router.Router(nil) net := NewDefaultNetwork( + prometheus.NewRegistry(), log, id, ip, @@ -265,6 +268,7 @@ func TestEstablishConnection(t *testing.T) { handler := router.Router(nil) net0 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id0, ip0, @@ -281,6 +285,7 @@ func TestEstablishConnection(t *testing.T) { assert.NotNil(t, net0) net1 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id1, ip1, @@ -402,6 +407,7 @@ func TestDoubleTrack(t *testing.T) { handler := router.Router(nil) net0 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id0, ip0, @@ -418,6 +424,7 @@ func TestDoubleTrack(t *testing.T) { assert.NotNil(t, net0) net1 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id1, ip1, @@ -540,6 +547,7 @@ func TestDoubleClose(t *testing.T) { handler := router.Router(nil) net0 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id0, ip0, @@ -556,6 +564,7 @@ func TestDoubleClose(t *testing.T) { assert.NotNil(t, net0) net1 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id1, ip1, @@ -683,6 +692,7 @@ func TestRemoveHandlers(t *testing.T) { handler := router.Router(nil) net0 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id0, ip0, @@ -699,6 +709,7 @@ func TestRemoveHandlers(t *testing.T) { assert.NotNil(t, net0) net1 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id1, ip1, @@ -835,6 +846,7 @@ func TestTrackConnected(t *testing.T) { handler := router.Router(nil) net0 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id0, ip0, @@ -851,6 +863,7 @@ func TestTrackConnected(t *testing.T) { assert.NotNil(t, net0) net1 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id1, ip1, @@ -974,6 +987,7 @@ func TestTrackConnectedRace(t *testing.T) { handler := router.Router(nil) net0 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id0, ip0, @@ -990,6 +1004,7 @@ func TestTrackConnectedRace(t *testing.T) { assert.NotNil(t, net0) net1 := NewDefaultNetwork( + prometheus.NewRegistry(), log, id1, ip1, diff --git a/network/peer.go b/network/peer.go index 9f9c5c7..dfbde63 100644 --- a/network/peer.go +++ b/network/peer.go @@ -190,6 +190,13 @@ func (p *peer) handle(msg Msg) { p.net.heartbeat() op := msg.Op() + msgMetrics := p.net.message(op) + if msgMetrics == nil { + p.net.log.Debug("dropping an unknown message from %s with op %d", p.id, op) + return + } + msgMetrics.numReceived.Inc() + switch op { case Version: p.version(msg) @@ -228,8 +235,6 @@ func (p *peer) handle(msg Msg) { p.pullQuery(msg) case Chits: p.chits(msg) - default: - p.net.log.Debug("dropping an unknown message from %s with op %d", p.id, op) } } diff --git a/node/node.go b/node/node.go index 604f83e..fcb3283 100644 --- a/node/node.go +++ b/node/node.go @@ -154,6 +154,7 @@ func (n *Node) initNetworking() error { n.vdrs.PutValidatorSet(platformvm.DefaultSubnetID, defaultSubnetValidators) n.Net = network.NewDefaultNetwork( + n.Config.ConsensusParams.Metrics, n.Log, n.ID, n.Config.StakingIP, @@ -542,6 +543,11 @@ func (n *Node) Initialize(Config *Config, logger logging.Logger, logFactory logg return fmt.Errorf("problem initializing staker ID: %w", err) } + // Start HTTP APIs + n.initAPIServer() // Start the API Server + n.initKeystoreAPI() // Start the Keystore API + n.initMetricsAPI() // Start the Metrics API + // initialize shared memory n.initSharedMemory() @@ -549,11 +555,6 @@ func (n *Node) Initialize(Config *Config, logger logging.Logger, logFactory logg return fmt.Errorf("problem initializing networking: %w", err) } - // Start HTTP APIs - n.initAPIServer() // Start the API Server - n.initKeystoreAPI() // Start the Keystore API - n.initMetricsAPI() // Start the Metrics API - if err := n.initVMManager(); err != nil { // Set up the vm manager return fmt.Errorf("problem initializing the VM manager: %w", err) }