From 041f60c91ddfea6b34988061f4b71e12f780be3f Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Fri, 5 Jun 2020 16:18:30 -0400 Subject: [PATCH 1/2] wip --- chains/manager.go | 4 +- go.mod | 2 +- snow/consensus/avalanche/metrics.go | 4 +- snow/consensus/snowman/metrics.go | 4 +- snow/consensus/snowstorm/metrics.go | 4 +- snow/engine/avalanche/voter.go | 4 +- snow/engine/snowman/voter.go | 6 +- snow/networking/router/handler.go | 8 +- snow/networking/router/metrics.go | 120 ++++++++++++++++++++++++++++ utils/timer/latency.go | 17 +++- 10 files changed, 157 insertions(+), 16 deletions(-) create mode 100644 snow/networking/router/metrics.go diff --git a/chains/manager.go b/chains/manager.go index 044cba5..77f21f3 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -429,7 +429,7 @@ func (m *manager) createAvalancheChain( // Asynchronously passes messages from the network to the consensus engine handler := &router.Handler{} - handler.Initialize(&engine, msgChan, defaultChannelSize) + handler.Initialize(&engine, msgChan, defaultChannelSize, consensusParams.Metrics) // Allows messages to be routed to the new chain m.chainRouter.AddChain(handler) @@ -515,7 +515,7 @@ func (m *manager) createSnowmanChain( // Asynchronously passes messages from the network to the consensus engine handler := &router.Handler{} - handler.Initialize(&engine, msgChan, defaultChannelSize) + handler.Initialize(&engine, msgChan, defaultChannelSize, consensusParams.Metrics) // Allow incoming messages to be routed to the new chain m.chainRouter.AddChain(handler) diff --git a/go.mod b/go.mod index d9caa7b..744eb89 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/allegro/bigcache v1.2.1 // indirect github.com/aristanetworks/goarista v0.0.0-20200520141224-0f14e646773f // indirect github.com/ava-labs/coreth v0.2.0 // Added manually; don't delete - github.com/ava-labs/go-ethereum v1.9.3 // indirect + github.com/ava-labs/go-ethereum v1.9.3 github.com/deckarep/golang-set v1.7.1 // indirect github.com/decred/dcrd/dcrec/secp256k1 v1.0.3 github.com/decred/dcrd/dcrec/secp256k1/v3 v3.0.0-20200526030155-0c6c7ca85d3b diff --git a/snow/consensus/avalanche/metrics.go b/snow/consensus/avalanche/metrics.go index 4553361..5caaaf7 100644 --- a/snow/consensus/avalanche/metrics.go +++ b/snow/consensus/avalanche/metrics.go @@ -37,14 +37,14 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr Namespace: namespace, Name: "vtx_accepted", Help: "Latency of accepting from the time the vertex was issued in milliseconds", - Buckets: timer.Buckets, + Buckets: timer.MillisecondsBuckets, }) m.latRejected = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: namespace, Name: "vtx_rejected", Help: "Latency of rejecting from the time the vertex was issued in milliseconds", - Buckets: timer.Buckets, + Buckets: timer.MillisecondsBuckets, }) if err := registerer.Register(m.numProcessing); err != nil { diff --git a/snow/consensus/snowman/metrics.go b/snow/consensus/snowman/metrics.go index f415909..81f7989 100644 --- a/snow/consensus/snowman/metrics.go +++ b/snow/consensus/snowman/metrics.go @@ -37,14 +37,14 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr Namespace: namespace, Name: "accepted", Help: "Latency of accepting from the time the block was issued in milliseconds", - Buckets: timer.Buckets, + Buckets: timer.MillisecondsBuckets, }) m.latRejected = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: namespace, Name: "rejected", Help: "Latency of rejecting from the time the block was issued in milliseconds", - Buckets: timer.Buckets, + Buckets: timer.MillisecondsBuckets, }) if err := registerer.Register(m.numProcessing); err != nil { diff --git a/snow/consensus/snowstorm/metrics.go b/snow/consensus/snowstorm/metrics.go index 7591637..c34a6d3 100644 --- a/snow/consensus/snowstorm/metrics.go +++ b/snow/consensus/snowstorm/metrics.go @@ -37,14 +37,14 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr Namespace: namespace, Name: "tx_accepted", Help: "Latency of accepting from the time the transaction was issued in milliseconds", - Buckets: timer.Buckets, + Buckets: timer.MillisecondsBuckets, }) m.latRejected = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: namespace, Name: "tx_rejected", Help: "Latency of rejecting from the time the transaction was issued in milliseconds", - Buckets: timer.Buckets, + Buckets: timer.MillisecondsBuckets, }) if err := registerer.Register(m.numProcessing); err != nil { diff --git a/snow/engine/avalanche/voter.go b/snow/engine/avalanche/voter.go index 581fe27..01cbe50 100644 --- a/snow/engine/avalanche/voter.go +++ b/snow/engine/avalanche/voter.go @@ -60,11 +60,11 @@ func (v *voter) Update() { } if v.t.Consensus.Quiesce() { - v.t.Config.Context.Log.Verbo("Avalanche engine can quiesce") + v.t.Config.Context.Log.Debug("Avalanche engine can quiesce") return } - v.t.Config.Context.Log.Verbo("Avalanche engine can't quiesce") + v.t.Config.Context.Log.Debug("Avalanche engine can't quiesce") v.t.errs.Add(v.t.repoll()) } diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index 25f9ab0..bd15831 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -45,7 +45,7 @@ func (v *voter) Update() { // must be bubbled to the nearest valid block results = v.bubbleVotes(results) - v.t.Config.Context.Log.Verbo("Finishing poll [%d] with:\n%s", v.requestID, &results) + v.t.Config.Context.Log.Debug("Finishing poll [%d] with:\n%s", v.requestID, &results) if err := v.t.Consensus.RecordPoll(results); err != nil { v.t.errs.Add(err) return @@ -54,11 +54,11 @@ func (v *voter) Update() { v.t.Config.VM.SetPreference(v.t.Consensus.Preference()) if v.t.Consensus.Finalized() { - v.t.Config.Context.Log.Verbo("Snowman engine can quiesce") + v.t.Config.Context.Log.Debug("Snowman engine can quiesce") return } - v.t.Config.Context.Log.Verbo("Snowman engine can't quiesce") + v.t.Config.Context.Log.Debug("Snowman engine can't quiesce") v.t.repoll() } diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index b6bb1cf..163bf13 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -7,6 +7,7 @@ import ( "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" + "github.com/prometheus/client_golang/prometheus" ) // Handler passes incoming messages from the network to the consensus engine @@ -21,7 +22,12 @@ type Handler struct { } // Initialize this consensus handler -func (h *Handler) Initialize(engine common.Engine, msgChan <-chan common.Message, bufferSize int) { +func (h *Handler) Initialize( + engine common.Engine, + msgChan <-chan common.Message, + bufferSize int, + metrics prometheus.Registerer, +) { h.msgs = make(chan message, bufferSize) h.closed = make(chan struct{}) h.engine = engine diff --git a/snow/networking/router/metrics.go b/snow/networking/router/metrics.go new file mode 100644 index 0000000..86d5e75 --- /dev/null +++ b/snow/networking/router/metrics.go @@ -0,0 +1,120 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package router + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/ava-labs/gecko/utils/timer" + "github.com/ava-labs/gecko/utils/wrappers" +) + +func initHistogram(namespace, name string, registerer prometheus.Registerer, errs *wrappers.Errs) prometheus.Histogram { + histogram := prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: name, + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + + if err := registerer.Register(histogram); err != nil { + errs.Add(fmt.Errorf("failed to register %s statistics due to %s", name, err)) + } + return histogram +} + +type metrics struct { + getAcceptedFrontier, acceptedFrontier, getAcceptedFrontierFailed, + getAccepted, accepted, getAcceptedFailed, + get, put, getFailed, + pushQuery, pullQuery, chits, queryFailed, + notify, + gossip, + shutdown prometheus.Histogram +} + +// Initialize implements the Engine interface +func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) error { + errs := wrappers.Errs{} + m.getAcceptedFrontier = initHistogram() + return errs.Err + m.getAcceptedFrontier = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get_accepted_frontier", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.acceptedFrontier = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "accepted_frontier", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.getAcceptedFrontierFailed = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get_accepted_frontier_failed", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.getAccepted = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get_accepted", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.accepted = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "accepted", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.getAcceptedFailed = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get_accepted_failed", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.get = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "get", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.put = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "put", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.getFailed = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "getFailed", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + m.getFailed = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Name: "getFailed", + Help: "Time spent processing this request in nanoseconds", + Buckets: timer.NanosecondsBuckets, + }) + + if err := registerer.Register(m.getAcceptedFrontier); err != nil { + return fmt.Errorf("failed to register get_accepted_frontier statistics due to %s", err) + } + return nil +} diff --git a/utils/timer/latency.go b/utils/timer/latency.go index 2d2371f..1917033 100644 --- a/utils/timer/latency.go +++ b/utils/timer/latency.go @@ -3,9 +3,13 @@ package timer +import ( + "time" +) + // Useful latency buckets var ( - Buckets = []float64{ + MillisecondsBuckets = []float64{ 10, // 10 ms is ~ instant 100, // 100 ms 250, // 250 ms @@ -18,4 +22,15 @@ var ( 10000, // 10 seconds // anything larger than 10 seconds will be bucketed together } + NanosecondsBuckets = []float64{ + float64(100 * time.Nanosecond), + float64(time.Microsecond), + float64(10 * time.Microsecond), + float64(100 * time.Microsecond), + float64(time.Millisecond), + float64(10 * time.Millisecond), + float64(100 * time.Millisecond), + float64(time.Second), + // anything larger than a second will be bucketed together + } ) From df05169d209ef38a55fff795fd7e48b6bb8a138a Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sat, 6 Jun 2020 11:41:17 -0400 Subject: [PATCH 2/2] Added per consensus engine handling metrics --- chains/manager.go | 16 ++++- snow/networking/router/handler.go | 48 +++++++++++++- snow/networking/router/metrics.go | 101 +++++++++--------------------- 3 files changed, 88 insertions(+), 77 deletions(-) diff --git a/chains/manager.go b/chains/manager.go index 77f21f3..06d8358 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -429,7 +429,13 @@ func (m *manager) createAvalancheChain( // Asynchronously passes messages from the network to the consensus engine handler := &router.Handler{} - handler.Initialize(&engine, msgChan, defaultChannelSize, consensusParams.Metrics) + handler.Initialize( + &engine, + msgChan, + defaultChannelSize, + fmt.Sprintf("%s_handler", consensusParams.Namespace), + consensusParams.Metrics, + ) // Allows messages to be routed to the new chain m.chainRouter.AddChain(handler) @@ -515,7 +521,13 @@ func (m *manager) createSnowmanChain( // Asynchronously passes messages from the network to the consensus engine handler := &router.Handler{} - handler.Initialize(&engine, msgChan, defaultChannelSize, consensusParams.Metrics) + handler.Initialize( + &engine, + msgChan, + defaultChannelSize, + fmt.Sprintf("%s_handler", consensusParams.Namespace), + consensusParams.Metrics, + ) // Allow incoming messages to be routed to the new chain m.chainRouter.AddChain(handler) diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index 163bf13..c6f05fd 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -4,6 +4,8 @@ package router import ( + "time" + "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" @@ -13,6 +15,8 @@ import ( // Handler passes incoming messages from the network to the consensus engine // (Actually, it receives the incoming messages from a ChainRouter, but same difference) type Handler struct { + metrics + msgs chan message closed chan struct{} engine common.Engine @@ -26,8 +30,10 @@ func (h *Handler) Initialize( engine common.Engine, msgChan <-chan common.Message, bufferSize int, + namespace string, metrics prometheus.Registerer, ) { + h.metrics.Initialize(namespace, metrics) h.msgs = make(chan message, bufferSize) h.closed = make(chan struct{}) h.engine = engine @@ -53,6 +59,7 @@ func (h *Handler) Dispatch() { if !ok { return } + h.metrics.pending.Dec() if closing { log.Debug("dropping message due to closing:\n%s", msg) continue @@ -79,6 +86,7 @@ func (h *Handler) Dispatch() { // Returns true iff this consensus handler (and its associated engine) should shutdown // (due to receipt of a shutdown message) func (h *Handler) dispatchMsg(msg message) bool { + startTime := time.Now() ctx := h.engine.Context() ctx.Lock.Lock() @@ -92,36 +100,52 @@ func (h *Handler) dispatchMsg(msg message) bool { switch msg.messageType { case getAcceptedFrontierMsg: err = h.engine.GetAcceptedFrontier(msg.validatorID, msg.requestID) + h.getAcceptedFrontier.Observe(float64(time.Now().Sub(startTime))) case acceptedFrontierMsg: err = h.engine.AcceptedFrontier(msg.validatorID, msg.requestID, msg.containerIDs) + h.acceptedFrontier.Observe(float64(time.Now().Sub(startTime))) case getAcceptedFrontierFailedMsg: err = h.engine.GetAcceptedFrontierFailed(msg.validatorID, msg.requestID) + h.getAcceptedFrontierFailed.Observe(float64(time.Now().Sub(startTime))) case getAcceptedMsg: err = h.engine.GetAccepted(msg.validatorID, msg.requestID, msg.containerIDs) + h.getAccepted.Observe(float64(time.Now().Sub(startTime))) case acceptedMsg: err = h.engine.Accepted(msg.validatorID, msg.requestID, msg.containerIDs) + h.accepted.Observe(float64(time.Now().Sub(startTime))) case getAcceptedFailedMsg: err = h.engine.GetAcceptedFailed(msg.validatorID, msg.requestID) + h.getAcceptedFailed.Observe(float64(time.Now().Sub(startTime))) case getMsg: err = h.engine.Get(msg.validatorID, msg.requestID, msg.containerID) + h.get.Observe(float64(time.Now().Sub(startTime))) case getFailedMsg: err = h.engine.GetFailed(msg.validatorID, msg.requestID) + h.getFailed.Observe(float64(time.Now().Sub(startTime))) case putMsg: err = h.engine.Put(msg.validatorID, msg.requestID, msg.containerID, msg.container) + h.put.Observe(float64(time.Now().Sub(startTime))) case pushQueryMsg: err = h.engine.PushQuery(msg.validatorID, msg.requestID, msg.containerID, msg.container) + h.pushQuery.Observe(float64(time.Now().Sub(startTime))) case pullQueryMsg: err = h.engine.PullQuery(msg.validatorID, msg.requestID, msg.containerID) + h.pullQuery.Observe(float64(time.Now().Sub(startTime))) case queryFailedMsg: err = h.engine.QueryFailed(msg.validatorID, msg.requestID) + h.queryFailed.Observe(float64(time.Now().Sub(startTime))) case chitsMsg: err = h.engine.Chits(msg.validatorID, msg.requestID, msg.containerIDs) + h.chits.Observe(float64(time.Now().Sub(startTime))) case notifyMsg: err = h.engine.Notify(msg.notification) + h.notify.Observe(float64(time.Now().Sub(startTime))) case gossipMsg: err = h.engine.Gossip() + h.gossip.Observe(float64(time.Now().Sub(startTime))) case shutdownMsg: err = h.engine.Shutdown() + h.shutdown.Observe(float64(time.Now().Sub(startTime))) done = true } @@ -134,6 +158,7 @@ func (h *Handler) dispatchMsg(msg message) bool { // GetAcceptedFrontier passes a GetAcceptedFrontier message received from the // network to the consensus engine. func (h *Handler) GetAcceptedFrontier(validatorID ids.ShortID, requestID uint32) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: getAcceptedFrontierMsg, validatorID: validatorID, @@ -144,6 +169,7 @@ func (h *Handler) GetAcceptedFrontier(validatorID ids.ShortID, requestID uint32) // AcceptedFrontier passes a AcceptedFrontier message received from the network // to the consensus engine. func (h *Handler) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: acceptedFrontierMsg, validatorID: validatorID, @@ -155,6 +181,7 @@ func (h *Handler) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, co // GetAcceptedFrontierFailed passes a GetAcceptedFrontierFailed message received // from the network to the consensus engine. func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: getAcceptedFrontierFailedMsg, validatorID: validatorID, @@ -165,6 +192,7 @@ func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID u // GetAccepted passes a GetAccepted message received from the // network to the consensus engine. func (h *Handler) GetAccepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: getAcceptedMsg, validatorID: validatorID, @@ -176,6 +204,7 @@ func (h *Handler) GetAccepted(validatorID ids.ShortID, requestID uint32, contain // Accepted passes a Accepted message received from the network to the consensus // engine. func (h *Handler) Accepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: acceptedMsg, validatorID: validatorID, @@ -187,6 +216,7 @@ func (h *Handler) Accepted(validatorID ids.ShortID, requestID uint32, containerI // GetAcceptedFailed passes a GetAcceptedFailed message received from the // network to the consensus engine. func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: getAcceptedFailedMsg, validatorID: validatorID, @@ -196,6 +226,7 @@ func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) { // Get passes a Get message received from the network to the consensus engine. func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: getMsg, validatorID: validatorID, @@ -206,6 +237,7 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids // Put passes a Put message received from the network to the consensus engine. func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: putMsg, validatorID: validatorID, @@ -217,6 +249,7 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids // GetFailed passes a GetFailed message to the consensus engine. func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: getFailedMsg, validatorID: validatorID, @@ -226,6 +259,7 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) { // PushQuery passes a PushQuery message received from the network to the consensus engine. func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID, block []byte) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: pushQueryMsg, validatorID: validatorID, @@ -237,6 +271,7 @@ func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID i // PullQuery passes a PullQuery message received from the network to the consensus engine. func (h *Handler) PullQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: pullQueryMsg, validatorID: validatorID, @@ -247,6 +282,7 @@ func (h *Handler) PullQuery(validatorID ids.ShortID, requestID uint32, blockID i // Chits passes a Chits message received from the network to the consensus engine. func (h *Handler) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: chitsMsg, validatorID: validatorID, @@ -257,6 +293,7 @@ func (h *Handler) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set // QueryFailed passes a QueryFailed message received from the network to the consensus engine. func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: queryFailedMsg, validatorID: validatorID, @@ -265,13 +302,20 @@ func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) { } // Gossip passes a gossip request to the consensus engine -func (h *Handler) Gossip() { h.msgs <- message{messageType: gossipMsg} } +func (h *Handler) Gossip() { + h.metrics.pending.Inc() + h.msgs <- message{messageType: gossipMsg} +} // Shutdown shuts down the dispatcher -func (h *Handler) Shutdown() { h.msgs <- message{messageType: shutdownMsg} } +func (h *Handler) Shutdown() { + h.metrics.pending.Inc() + h.msgs <- message{messageType: shutdownMsg} +} // Notify ... func (h *Handler) Notify(msg common.Message) { + h.metrics.pending.Inc() h.msgs <- message{ messageType: notifyMsg, notification: msg, diff --git a/snow/networking/router/metrics.go b/snow/networking/router/metrics.go index 86d5e75..b7f7c87 100644 --- a/snow/networking/router/metrics.go +++ b/snow/networking/router/metrics.go @@ -28,6 +28,8 @@ func initHistogram(namespace, name string, registerer prometheus.Registerer, err } type metrics struct { + pending prometheus.Gauge + getAcceptedFrontier, acceptedFrontier, getAcceptedFrontierFailed, getAccepted, accepted, getAcceptedFailed, get, put, getFailed, @@ -40,81 +42,34 @@ type metrics struct { // Initialize implements the Engine interface func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) error { errs := wrappers.Errs{} - m.getAcceptedFrontier = initHistogram() - return errs.Err - m.getAcceptedFrontier = prometheus.NewHistogram( - prometheus.HistogramOpts{ + + m.pending = prometheus.NewGauge( + prometheus.GaugeOpts{ Namespace: namespace, - Name: "get_accepted_frontier", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.acceptedFrontier = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "accepted_frontier", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.getAcceptedFrontierFailed = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "get_accepted_frontier_failed", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.getAccepted = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "get_accepted", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.accepted = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "accepted", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.getAcceptedFailed = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "get_accepted_failed", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.get = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "get", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.put = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "put", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.getFailed = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "getFailed", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, - }) - m.getFailed = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: namespace, - Name: "getFailed", - Help: "Time spent processing this request in nanoseconds", - Buckets: timer.NanosecondsBuckets, + Name: "pending", + Help: "Number of pending events", }) - if err := registerer.Register(m.getAcceptedFrontier); err != nil { - return fmt.Errorf("failed to register get_accepted_frontier statistics due to %s", err) + if err := registerer.Register(m.pending); err != nil { + errs.Add(fmt.Errorf("failed to register pending statistics due to %s", err)) } - return nil + + m.getAcceptedFrontier = initHistogram(namespace, "get_accepted_frontier", registerer, &errs) + m.acceptedFrontier = initHistogram(namespace, "accepted_frontier", registerer, &errs) + m.getAcceptedFrontierFailed = initHistogram(namespace, "get_accepted_frontier_failed", registerer, &errs) + m.getAccepted = initHistogram(namespace, "get_accepted", registerer, &errs) + m.accepted = initHistogram(namespace, "accepted", registerer, &errs) + m.getAcceptedFailed = initHistogram(namespace, "get_accepted_failed", registerer, &errs) + m.get = initHistogram(namespace, "get", registerer, &errs) + m.put = initHistogram(namespace, "put", registerer, &errs) + m.getFailed = initHistogram(namespace, "get_failed", registerer, &errs) + m.pushQuery = initHistogram(namespace, "push_query", registerer, &errs) + m.pullQuery = initHistogram(namespace, "pull_query", registerer, &errs) + m.chits = initHistogram(namespace, "chits", registerer, &errs) + m.queryFailed = initHistogram(namespace, "query_failed", registerer, &errs) + m.notify = initHistogram(namespace, "notify", registerer, &errs) + m.gossip = initHistogram(namespace, "gossip", registerer, &errs) + m.shutdown = initHistogram(namespace, "shutdown", registerer, &errs) + + return errs.Err }