Added metrics to the networking lib

This commit is contained in:
StephenButtolph 2020-05-25 16:02:03 -04:00
parent ced58c7d2e
commit 25ca13000c
9 changed files with 239 additions and 17 deletions

View File

@ -22,14 +22,14 @@ type Codec struct{}
// Pack attempts to pack a map of fields into a message.
// The first byte of the message is the opcode of the message.
func (Codec) Pack(op uint8, fields map[Field]interface{}) (Msg, error) {
func (Codec) Pack(op Op, fields map[Field]interface{}) (Msg, error) {
message, ok := Messages[op]
if !ok {
return nil, errBadOp
}
p := wrappers.Packer{MaxSize: math.MaxInt32}
p.PackByte(op)
p.PackByte(byte(op))
for _, field := range message {
data, ok := fields[field]
if !ok {
@ -49,7 +49,7 @@ func (Codec) Pack(op uint8, fields map[Field]interface{}) (Msg, error) {
// The first byte of the message is the opcode of the message.
func (Codec) Parse(b []byte) (Msg, error) {
p := wrappers.Packer{Bytes: b}
op := p.UnpackByte()
op := Op(p.UnpackByte())
message, ok := Messages[op]
if !ok {
return nil, errBadOp

View File

@ -30,6 +30,6 @@ func TestCodecParseInvalidOp(t *testing.T) {
}
func TestCodecParseExtraSpace(t *testing.T) {
_, err := TestCodec.Parse([]byte{GetVersion, 0x00})
_, err := TestCodec.Parse([]byte{byte(GetVersion), 0x00})
assert.Error(t, err)
}

View File

@ -112,10 +112,46 @@ func (f Field) String() string {
}
}
// Op is an opcode
type Op byte
func (op Op) String() string {
switch op {
case GetVersion:
return "get_version"
case Version:
return "version"
case GetPeerList:
return "get_peerlist"
case PeerList:
return "peerlist"
case GetAcceptedFrontier:
return "get_accepted_frontier"
case AcceptedFrontier:
return "accepted_frontier"
case GetAccepted:
return "get_accepted"
case Accepted:
return "accepted"
case Get:
return "get"
case Put:
return "put"
case PushQuery:
return "push_query"
case PullQuery:
return "pull_query"
case Chits:
return "chits"
default:
return "Unknown Op"
}
}
// Public commands that may be sent between stakers
const (
// Handshake:
GetVersion uint8 = iota
GetVersion Op = iota
Version
GetPeerList
PeerList
@ -134,7 +170,7 @@ const (
// Defines the messages that can be sent/received with this network
var (
Messages = map[uint8][]Field{
Messages = map[Op][]Field{
// Handshake:
GetVersion: []Field{},
Version: []Field{NetworkID, NodeID, MyTime, IP, VersionStr},

125
network/metrics.go Normal file
View File

@ -0,0 +1,125 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package network
import (
"fmt"
"github.com/ava-labs/gecko/utils/wrappers"
"github.com/prometheus/client_golang/prometheus"
)
type messageMetrics struct {
numSent, numFailed, numReceived prometheus.Counter
}
func (mm *messageMetrics) initialize(msgType Op, registerer prometheus.Registerer) error {
mm.numSent = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: fmt.Sprintf("%s_sent", msgType),
Help: fmt.Sprintf("Number of %s messages sent", msgType),
})
mm.numFailed = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: fmt.Sprintf("%s_failed", msgType),
Help: fmt.Sprintf("Number of %s messages that failed to be sent", msgType),
})
mm.numReceived = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "gecko",
Name: fmt.Sprintf("%s_received", msgType),
Help: fmt.Sprintf("Number of %s messages received", msgType),
})
if err := registerer.Register(mm.numSent); err != nil {
return fmt.Errorf("failed to register sent statistics of %s due to %s",
msgType, err)
}
if err := registerer.Register(mm.numFailed); err != nil {
return fmt.Errorf("failed to register failed statistics of %s due to %s",
msgType, err)
}
if err := registerer.Register(mm.numReceived); err != nil {
return fmt.Errorf("failed to register received statistics of %s due to %s",
msgType, err)
}
return nil
}
type metrics struct {
numPeers prometheus.Gauge
getVersion, version,
getPeerlist, peerlist,
getAcceptedFrontier, acceptedFrontier,
getAccepted, accepted,
get, put,
pushQuery, pullQuery, chits messageMetrics
}
func (m *metrics) initialize(registerer prometheus.Registerer) error {
m.numPeers = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "gecko",
Name: "peers",
Help: "Number of network peers",
})
errs := wrappers.Errs{}
if err := registerer.Register(m.numPeers); err != nil {
errs.Add(fmt.Errorf("failed to register peers statistics due to %s",
err))
}
errs.Add(m.getVersion.initialize(GetVersion, registerer))
errs.Add(m.version.initialize(Version, registerer))
errs.Add(m.getPeerlist.initialize(GetPeerList, registerer))
errs.Add(m.peerlist.initialize(PeerList, registerer))
errs.Add(m.getAcceptedFrontier.initialize(GetAcceptedFrontier, registerer))
errs.Add(m.acceptedFrontier.initialize(AcceptedFrontier, registerer))
errs.Add(m.getAccepted.initialize(GetAccepted, registerer))
errs.Add(m.accepted.initialize(Accepted, registerer))
errs.Add(m.get.initialize(Get, registerer))
errs.Add(m.put.initialize(Put, registerer))
errs.Add(m.pushQuery.initialize(PushQuery, registerer))
errs.Add(m.pullQuery.initialize(PullQuery, registerer))
errs.Add(m.chits.initialize(Chits, registerer))
return errs.Err
}
func (m *metrics) message(msgType Op) *messageMetrics {
switch msgType {
case GetVersion:
return &m.getVersion
case Version:
return &m.version
case GetPeerList:
return &m.getPeerlist
case PeerList:
return &m.peerlist
case GetAcceptedFrontier:
return &m.getAcceptedFrontier
case AcceptedFrontier:
return &m.acceptedFrontier
case GetAccepted:
return &m.getAccepted
case Accepted:
return &m.accepted
case Get:
return &m.get
case Put:
return &m.put
case PushQuery:
return &m.pushQuery
case PullQuery:
return &m.pullQuery
case Chits:
return &m.chits
default:
return nil
}
}

View File

@ -5,19 +5,19 @@ package network
// Msg represents a set of fields that can be serialized into a byte stream
type Msg interface {
Op() uint8
Op() Op
Get(Field) interface{}
Bytes() []byte
}
type msg struct {
op uint8
op Op
fields map[Field]interface{}
bytes []byte
}
// Field returns the value of the specified field in this message
func (msg *msg) Op() uint8 { return msg.op }
func (msg *msg) Op() Op { return msg.op }
// Field returns the value of the specified field in this message
func (msg *msg) Get(field Field) interface{} { return msg.fields[field] }

View File

@ -12,6 +12,8 @@ import (
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/api/health"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/router"
@ -79,6 +81,9 @@ type Network interface {
}
type network struct {
// The metrics that this network tracks
metrics
log logging.Logger
id ids.ShortID
ip utils.IPDesc
@ -126,6 +131,7 @@ type network struct {
// NewDefaultNetwork returns a new Network implementation with the provided
// parameters and some reasonable default values.
func NewDefaultNetwork(
registerer prometheus.Registerer,
log logging.Logger,
id ids.ShortID,
ip utils.IPDesc,
@ -140,6 +146,7 @@ func NewDefaultNetwork(
router router.Router,
) Network {
return NewNetwork(
registerer,
log,
id,
ip,
@ -168,6 +175,7 @@ func NewDefaultNetwork(
// NewNetwork returns a new Network implementation with the provided parameters.
func NewNetwork(
registerer prometheus.Registerer,
log logging.Logger,
id ids.ShortID,
ip utils.IPDesc,
@ -223,6 +231,7 @@ func NewNetwork(
myIPs: map[string]struct{}{ip.String(): struct{}{}},
peers: make(map[[20]byte]*peer),
}
net.initialize(registerer)
net.executor.Initialize()
net.heartbeat()
return net
@ -244,6 +253,9 @@ func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID,
}
if !sent {
n.executor.Add(func() { n.router.GetAcceptedFrontierFailed(vID, chainID, requestID) })
n.getAcceptedFrontier.numFailed.Inc()
} else {
n.getAcceptedFrontier.numSent.Inc()
}
}
}
@ -266,6 +278,9 @@ func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requ
}
if !sent {
n.log.Debug("failed to send an AcceptedFrontier message to: %s", validatorID)
n.acceptedFrontier.numFailed.Inc()
} else {
n.acceptedFrontier.numSent.Inc()
}
}
@ -291,6 +306,9 @@ func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, request
}
if !sent {
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
n.getAccepted.numFailed.Inc()
} else {
n.getAccepted.numSent.Inc()
}
}
}
@ -313,6 +331,9 @@ func (n *network) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID ui
}
if !sent {
n.log.Debug("failed to send an Accepted message to: %s", validatorID)
n.accepted.numFailed.Inc()
} else {
n.accepted.numSent.Inc()
}
}
@ -330,6 +351,9 @@ func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32,
}
if !sent {
n.log.Debug("failed to send a Get message to: %s", validatorID)
n.get.numFailed.Inc()
} else {
n.get.numSent.Inc()
}
}
@ -350,6 +374,9 @@ func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32,
}
if !sent {
n.log.Debug("failed to send a Put message to: %s", validatorID)
n.put.numFailed.Inc()
} else {
n.put.numSent.Inc()
}
}
@ -377,6 +404,9 @@ func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID
if !sent {
n.log.Debug("failed sending a PushQuery message to: %s", vID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
n.pushQuery.numFailed.Inc()
} else {
n.pushQuery.numSent.Inc()
}
}
}
@ -398,6 +428,9 @@ func (n *network) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID
if !sent {
n.log.Debug("failed sending a PullQuery message to: %s", vID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
n.pullQuery.numFailed.Inc()
} else {
n.pullQuery.numSent.Inc()
}
}
}
@ -419,6 +452,9 @@ func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint3
}
if !sent {
n.log.Debug("failed to send a Chits message to: %s", validatorID)
n.chits.numFailed.Inc()
} else {
n.chits.numSent.Inc()
}
}
@ -536,7 +572,11 @@ func (n *network) gossipContainer(chainID, containerID ids.ID, container []byte)
sampler := random.Uniform{N: len(allPeers)}
for i := 0; i < numToGossip; i++ {
allPeers[sampler.Sample()].send(msg)
if allPeers[sampler.Sample()].send(msg) {
n.put.numSent.Inc()
} else {
n.put.numFailed.Inc()
}
}
return nil
}

View File

@ -10,6 +10,8 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/ava-labs/gecko/ids"
@ -183,6 +185,7 @@ func TestNewDefaultNetwork(t *testing.T) {
handler := router.Router(nil)
net := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id,
ip,
@ -265,6 +268,7 @@ func TestEstablishConnection(t *testing.T) {
handler := router.Router(nil)
net0 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id0,
ip0,
@ -281,6 +285,7 @@ func TestEstablishConnection(t *testing.T) {
assert.NotNil(t, net0)
net1 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id1,
ip1,
@ -402,6 +407,7 @@ func TestDoubleTrack(t *testing.T) {
handler := router.Router(nil)
net0 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id0,
ip0,
@ -418,6 +424,7 @@ func TestDoubleTrack(t *testing.T) {
assert.NotNil(t, net0)
net1 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id1,
ip1,
@ -540,6 +547,7 @@ func TestDoubleClose(t *testing.T) {
handler := router.Router(nil)
net0 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id0,
ip0,
@ -556,6 +564,7 @@ func TestDoubleClose(t *testing.T) {
assert.NotNil(t, net0)
net1 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id1,
ip1,
@ -683,6 +692,7 @@ func TestRemoveHandlers(t *testing.T) {
handler := router.Router(nil)
net0 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id0,
ip0,
@ -699,6 +709,7 @@ func TestRemoveHandlers(t *testing.T) {
assert.NotNil(t, net0)
net1 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id1,
ip1,
@ -835,6 +846,7 @@ func TestTrackConnected(t *testing.T) {
handler := router.Router(nil)
net0 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id0,
ip0,
@ -851,6 +863,7 @@ func TestTrackConnected(t *testing.T) {
assert.NotNil(t, net0)
net1 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id1,
ip1,
@ -974,6 +987,7 @@ func TestTrackConnectedRace(t *testing.T) {
handler := router.Router(nil)
net0 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id0,
ip0,
@ -990,6 +1004,7 @@ func TestTrackConnectedRace(t *testing.T) {
assert.NotNil(t, net0)
net1 := NewDefaultNetwork(
prometheus.NewRegistry(),
log,
id1,
ip1,

View File

@ -190,6 +190,13 @@ func (p *peer) handle(msg Msg) {
p.net.heartbeat()
op := msg.Op()
msgMetrics := p.net.message(op)
if msgMetrics == nil {
p.net.log.Debug("dropping an unknown message from %s with op %d", p.id, op)
return
}
msgMetrics.numReceived.Inc()
switch op {
case Version:
p.version(msg)
@ -228,8 +235,6 @@ func (p *peer) handle(msg Msg) {
p.pullQuery(msg)
case Chits:
p.chits(msg)
default:
p.net.log.Debug("dropping an unknown message from %s with op %d", p.id, op)
}
}

View File

@ -154,6 +154,7 @@ func (n *Node) initNetworking() error {
n.vdrs.PutValidatorSet(platformvm.DefaultSubnetID, defaultSubnetValidators)
n.Net = network.NewDefaultNetwork(
n.Config.ConsensusParams.Metrics,
n.Log,
n.ID,
n.Config.StakingIP,
@ -542,6 +543,11 @@ func (n *Node) Initialize(Config *Config, logger logging.Logger, logFactory logg
return fmt.Errorf("problem initializing staker ID: %w", err)
}
// Start HTTP APIs
n.initAPIServer() // Start the API Server
n.initKeystoreAPI() // Start the Keystore API
n.initMetricsAPI() // Start the Metrics API
// initialize shared memory
n.initSharedMemory()
@ -549,11 +555,6 @@ func (n *Node) Initialize(Config *Config, logger logging.Logger, logFactory logg
return fmt.Errorf("problem initializing networking: %w", err)
}
// Start HTTP APIs
n.initAPIServer() // Start the API Server
n.initKeystoreAPI() // Start the Keystore API
n.initMetricsAPI() // Start the Metrics API
if err := n.initVMManager(); err != nil { // Set up the vm manager
return fmt.Errorf("problem initializing the VM manager: %w", err)
}