Merge branch 'improved-diagnostics' into network-upgrade

This commit is contained in:
StephenButtolph 2020-06-06 11:47:58 -04:00
commit e7e446f962
10 changed files with 170 additions and 18 deletions

View File

@ -429,7 +429,13 @@ func (m *manager) createAvalancheChain(
// Asynchronously passes messages from the network to the consensus engine
handler := &router.Handler{}
handler.Initialize(&engine, msgChan, defaultChannelSize)
handler.Initialize(
&engine,
msgChan,
defaultChannelSize,
fmt.Sprintf("%s_handler", consensusParams.Namespace),
consensusParams.Metrics,
)
// Allows messages to be routed to the new chain
m.chainRouter.AddChain(handler)
@ -515,7 +521,13 @@ func (m *manager) createSnowmanChain(
// Asynchronously passes messages from the network to the consensus engine
handler := &router.Handler{}
handler.Initialize(&engine, msgChan, defaultChannelSize)
handler.Initialize(
&engine,
msgChan,
defaultChannelSize,
fmt.Sprintf("%s_handler", consensusParams.Namespace),
consensusParams.Metrics,
)
// Allow incoming messages to be routed to the new chain
m.chainRouter.AddChain(handler)

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20200520141224-0f14e646773f // indirect
github.com/ava-labs/coreth v0.2.0 // Added manually; don't delete
github.com/ava-labs/go-ethereum v1.9.3 // indirect
github.com/ava-labs/go-ethereum v1.9.3
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1 v1.0.3
github.com/decred/dcrd/dcrec/secp256k1/v3 v3.0.0-20200526030155-0c6c7ca85d3b

View File

@ -37,14 +37,14 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
Namespace: namespace,
Name: "vtx_accepted",
Help: "Latency of accepting from the time the vertex was issued in milliseconds",
Buckets: timer.Buckets,
Buckets: timer.MillisecondsBuckets,
})
m.latRejected = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "vtx_rejected",
Help: "Latency of rejecting from the time the vertex was issued in milliseconds",
Buckets: timer.Buckets,
Buckets: timer.MillisecondsBuckets,
})
if err := registerer.Register(m.numProcessing); err != nil {

View File

@ -37,14 +37,14 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
Namespace: namespace,
Name: "accepted",
Help: "Latency of accepting from the time the block was issued in milliseconds",
Buckets: timer.Buckets,
Buckets: timer.MillisecondsBuckets,
})
m.latRejected = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "rejected",
Help: "Latency of rejecting from the time the block was issued in milliseconds",
Buckets: timer.Buckets,
Buckets: timer.MillisecondsBuckets,
})
if err := registerer.Register(m.numProcessing); err != nil {

View File

@ -37,14 +37,14 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
Namespace: namespace,
Name: "tx_accepted",
Help: "Latency of accepting from the time the transaction was issued in milliseconds",
Buckets: timer.Buckets,
Buckets: timer.MillisecondsBuckets,
})
m.latRejected = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: "tx_rejected",
Help: "Latency of rejecting from the time the transaction was issued in milliseconds",
Buckets: timer.Buckets,
Buckets: timer.MillisecondsBuckets,
})
if err := registerer.Register(m.numProcessing); err != nil {

View File

@ -59,11 +59,11 @@ func (v *voter) Update() {
}
if v.t.Consensus.Quiesce() {
v.t.Config.Context.Log.Verbo("Avalanche engine can quiesce")
v.t.Config.Context.Log.Debug("Avalanche engine can quiesce")
return
}
v.t.Config.Context.Log.Verbo("Avalanche engine can't quiesce")
v.t.Config.Context.Log.Debug("Avalanche engine can't quiesce")
v.t.errs.Add(v.t.repoll())
}

View File

@ -45,7 +45,7 @@ func (v *voter) Update() {
// must be bubbled to the nearest valid block
results = v.bubbleVotes(results)
v.t.Config.Context.Log.Verbo("Finishing poll [%d] with:\n%s", v.requestID, &results)
v.t.Config.Context.Log.Debug("Finishing poll [%d] with:\n%s", v.requestID, &results)
if err := v.t.Consensus.RecordPoll(results); err != nil {
v.t.errs.Add(err)
return
@ -54,11 +54,11 @@ func (v *voter) Update() {
v.t.Config.VM.SetPreference(v.t.Consensus.Preference())
if v.t.Consensus.Finalized() {
v.t.Config.Context.Log.Verbo("Snowman engine can quiesce")
v.t.Config.Context.Log.Debug("Snowman engine can quiesce")
return
}
v.t.Config.Context.Log.Verbo("Snowman engine can't quiesce")
v.t.Config.Context.Log.Debug("Snowman engine can't quiesce")
v.t.repoll()
}

View File

@ -4,14 +4,19 @@
package router
import (
"time"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/prometheus/client_golang/prometheus"
)
// Handler passes incoming messages from the network to the consensus engine
// (Actually, it receives the incoming messages from a ChainRouter, but same difference)
type Handler struct {
metrics
msgs chan message
closed chan struct{}
engine common.Engine
@ -21,7 +26,14 @@ type Handler struct {
}
// Initialize this consensus handler
func (h *Handler) Initialize(engine common.Engine, msgChan <-chan common.Message, bufferSize int) {
func (h *Handler) Initialize(
engine common.Engine,
msgChan <-chan common.Message,
bufferSize int,
namespace string,
metrics prometheus.Registerer,
) {
h.metrics.Initialize(namespace, metrics)
h.msgs = make(chan message, bufferSize)
h.closed = make(chan struct{})
h.engine = engine
@ -47,6 +59,7 @@ func (h *Handler) Dispatch() {
if !ok {
return
}
h.metrics.pending.Dec()
if closing {
log.Debug("dropping message due to closing:\n%s", msg)
continue
@ -73,6 +86,7 @@ func (h *Handler) Dispatch() {
// Returns true iff this consensus handler (and its associated engine) should shutdown
// (due to receipt of a shutdown message)
func (h *Handler) dispatchMsg(msg message) bool {
startTime := time.Now()
ctx := h.engine.Context()
ctx.Lock.Lock()
@ -86,36 +100,52 @@ func (h *Handler) dispatchMsg(msg message) bool {
switch msg.messageType {
case getAcceptedFrontierMsg:
err = h.engine.GetAcceptedFrontier(msg.validatorID, msg.requestID)
h.getAcceptedFrontier.Observe(float64(time.Now().Sub(startTime)))
case acceptedFrontierMsg:
err = h.engine.AcceptedFrontier(msg.validatorID, msg.requestID, msg.containerIDs)
h.acceptedFrontier.Observe(float64(time.Now().Sub(startTime)))
case getAcceptedFrontierFailedMsg:
err = h.engine.GetAcceptedFrontierFailed(msg.validatorID, msg.requestID)
h.getAcceptedFrontierFailed.Observe(float64(time.Now().Sub(startTime)))
case getAcceptedMsg:
err = h.engine.GetAccepted(msg.validatorID, msg.requestID, msg.containerIDs)
h.getAccepted.Observe(float64(time.Now().Sub(startTime)))
case acceptedMsg:
err = h.engine.Accepted(msg.validatorID, msg.requestID, msg.containerIDs)
h.accepted.Observe(float64(time.Now().Sub(startTime)))
case getAcceptedFailedMsg:
err = h.engine.GetAcceptedFailed(msg.validatorID, msg.requestID)
h.getAcceptedFailed.Observe(float64(time.Now().Sub(startTime)))
case getMsg:
err = h.engine.Get(msg.validatorID, msg.requestID, msg.containerID)
h.get.Observe(float64(time.Now().Sub(startTime)))
case getFailedMsg:
err = h.engine.GetFailed(msg.validatorID, msg.requestID)
h.getFailed.Observe(float64(time.Now().Sub(startTime)))
case putMsg:
err = h.engine.Put(msg.validatorID, msg.requestID, msg.containerID, msg.container)
h.put.Observe(float64(time.Now().Sub(startTime)))
case pushQueryMsg:
err = h.engine.PushQuery(msg.validatorID, msg.requestID, msg.containerID, msg.container)
h.pushQuery.Observe(float64(time.Now().Sub(startTime)))
case pullQueryMsg:
err = h.engine.PullQuery(msg.validatorID, msg.requestID, msg.containerID)
h.pullQuery.Observe(float64(time.Now().Sub(startTime)))
case queryFailedMsg:
err = h.engine.QueryFailed(msg.validatorID, msg.requestID)
h.queryFailed.Observe(float64(time.Now().Sub(startTime)))
case chitsMsg:
err = h.engine.Chits(msg.validatorID, msg.requestID, msg.containerIDs)
h.chits.Observe(float64(time.Now().Sub(startTime)))
case notifyMsg:
err = h.engine.Notify(msg.notification)
h.notify.Observe(float64(time.Now().Sub(startTime)))
case gossipMsg:
err = h.engine.Gossip()
h.gossip.Observe(float64(time.Now().Sub(startTime)))
case shutdownMsg:
err = h.engine.Shutdown()
h.shutdown.Observe(float64(time.Now().Sub(startTime)))
done = true
}
@ -128,6 +158,7 @@ func (h *Handler) dispatchMsg(msg message) bool {
// GetAcceptedFrontier passes a GetAcceptedFrontier message received from the
// network to the consensus engine.
func (h *Handler) GetAcceptedFrontier(validatorID ids.ShortID, requestID uint32) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: getAcceptedFrontierMsg,
validatorID: validatorID,
@ -138,6 +169,7 @@ func (h *Handler) GetAcceptedFrontier(validatorID ids.ShortID, requestID uint32)
// AcceptedFrontier passes a AcceptedFrontier message received from the network
// to the consensus engine.
func (h *Handler) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: acceptedFrontierMsg,
validatorID: validatorID,
@ -149,6 +181,7 @@ func (h *Handler) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, co
// GetAcceptedFrontierFailed passes a GetAcceptedFrontierFailed message received
// from the network to the consensus engine.
func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: getAcceptedFrontierFailedMsg,
validatorID: validatorID,
@ -159,6 +192,7 @@ func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID u
// GetAccepted passes a GetAccepted message received from the
// network to the consensus engine.
func (h *Handler) GetAccepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: getAcceptedMsg,
validatorID: validatorID,
@ -170,6 +204,7 @@ func (h *Handler) GetAccepted(validatorID ids.ShortID, requestID uint32, contain
// Accepted passes a Accepted message received from the network to the consensus
// engine.
func (h *Handler) Accepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: acceptedMsg,
validatorID: validatorID,
@ -181,6 +216,7 @@ func (h *Handler) Accepted(validatorID ids.ShortID, requestID uint32, containerI
// GetAcceptedFailed passes a GetAcceptedFailed message received from the
// network to the consensus engine.
func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: getAcceptedFailedMsg,
validatorID: validatorID,
@ -190,6 +226,7 @@ func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) {
// Get passes a Get message received from the network to the consensus engine.
func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: getMsg,
validatorID: validatorID,
@ -200,6 +237,7 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids
// Put passes a Put message received from the network to the consensus engine.
func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: putMsg,
validatorID: validatorID,
@ -211,6 +249,7 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids
// GetFailed passes a GetFailed message to the consensus engine.
func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: getFailedMsg,
validatorID: validatorID,
@ -220,6 +259,7 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
// PushQuery passes a PushQuery message received from the network to the consensus engine.
func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID, block []byte) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: pushQueryMsg,
validatorID: validatorID,
@ -231,6 +271,7 @@ func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID i
// PullQuery passes a PullQuery message received from the network to the consensus engine.
func (h *Handler) PullQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: pullQueryMsg,
validatorID: validatorID,
@ -241,6 +282,7 @@ func (h *Handler) PullQuery(validatorID ids.ShortID, requestID uint32, blockID i
// Chits passes a Chits message received from the network to the consensus engine.
func (h *Handler) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: chitsMsg,
validatorID: validatorID,
@ -251,6 +293,7 @@ func (h *Handler) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set
// QueryFailed passes a QueryFailed message received from the network to the consensus engine.
func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: queryFailedMsg,
validatorID: validatorID,
@ -259,13 +302,20 @@ func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) {
}
// Gossip passes a gossip request to the consensus engine
func (h *Handler) Gossip() { h.msgs <- message{messageType: gossipMsg} }
func (h *Handler) Gossip() {
h.metrics.pending.Inc()
h.msgs <- message{messageType: gossipMsg}
}
// Shutdown shuts down the dispatcher
func (h *Handler) Shutdown() { h.msgs <- message{messageType: shutdownMsg} }
func (h *Handler) Shutdown() {
h.metrics.pending.Inc()
h.msgs <- message{messageType: shutdownMsg}
}
// Notify ...
func (h *Handler) Notify(msg common.Message) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: notifyMsg,
notification: msg,

View File

@ -0,0 +1,75 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package router
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/utils/timer"
"github.com/ava-labs/gecko/utils/wrappers"
)
func initHistogram(namespace, name string, registerer prometheus.Registerer, errs *wrappers.Errs) prometheus.Histogram {
histogram := prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: namespace,
Name: name,
Help: "Time spent processing this request in nanoseconds",
Buckets: timer.NanosecondsBuckets,
})
if err := registerer.Register(histogram); err != nil {
errs.Add(fmt.Errorf("failed to register %s statistics due to %s", name, err))
}
return histogram
}
type metrics struct {
pending prometheus.Gauge
getAcceptedFrontier, acceptedFrontier, getAcceptedFrontierFailed,
getAccepted, accepted, getAcceptedFailed,
get, put, getFailed,
pushQuery, pullQuery, chits, queryFailed,
notify,
gossip,
shutdown prometheus.Histogram
}
// Initialize implements the Engine interface
func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer) error {
errs := wrappers.Errs{}
m.pending = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "pending",
Help: "Number of pending events",
})
if err := registerer.Register(m.pending); err != nil {
errs.Add(fmt.Errorf("failed to register pending statistics due to %s", err))
}
m.getAcceptedFrontier = initHistogram(namespace, "get_accepted_frontier", registerer, &errs)
m.acceptedFrontier = initHistogram(namespace, "accepted_frontier", registerer, &errs)
m.getAcceptedFrontierFailed = initHistogram(namespace, "get_accepted_frontier_failed", registerer, &errs)
m.getAccepted = initHistogram(namespace, "get_accepted", registerer, &errs)
m.accepted = initHistogram(namespace, "accepted", registerer, &errs)
m.getAcceptedFailed = initHistogram(namespace, "get_accepted_failed", registerer, &errs)
m.get = initHistogram(namespace, "get", registerer, &errs)
m.put = initHistogram(namespace, "put", registerer, &errs)
m.getFailed = initHistogram(namespace, "get_failed", registerer, &errs)
m.pushQuery = initHistogram(namespace, "push_query", registerer, &errs)
m.pullQuery = initHistogram(namespace, "pull_query", registerer, &errs)
m.chits = initHistogram(namespace, "chits", registerer, &errs)
m.queryFailed = initHistogram(namespace, "query_failed", registerer, &errs)
m.notify = initHistogram(namespace, "notify", registerer, &errs)
m.gossip = initHistogram(namespace, "gossip", registerer, &errs)
m.shutdown = initHistogram(namespace, "shutdown", registerer, &errs)
return errs.Err
}

View File

@ -3,9 +3,13 @@
package timer
import (
"time"
)
// Useful latency buckets
var (
Buckets = []float64{
MillisecondsBuckets = []float64{
10, // 10 ms is ~ instant
100, // 100 ms
250, // 250 ms
@ -18,4 +22,15 @@ var (
10000, // 10 seconds
// anything larger than 10 seconds will be bucketed together
}
NanosecondsBuckets = []float64{
float64(100 * time.Nanosecond),
float64(time.Microsecond),
float64(10 * time.Microsecond),
float64(100 * time.Microsecond),
float64(time.Millisecond),
float64(10 * time.Millisecond),
float64(100 * time.Millisecond),
float64(time.Second),
// anything larger than a second will be bucketed together
}
)