metrics: Add additional metrics to p2p and consensus (#2425)

* Add additional metrics to p2p and consensus
Partially addresses https://github.com/cosmos/cosmos-sdk/issues/2169.
* WIP
* Updates from code review
* Updates from code review
* Add instrumentation namespace to configuration
* Fix test failure
* Updates from code review
* Add quotes
* Add atomic load
* Use storeint64
* Use addInt64 in writePacketMsgTo
This commit is contained in:
Matthew Slipper 2018-09-25 04:14:38 -07:00 committed by Alexander Simmerl
parent eb0da7f9cb
commit 587116dae1
16 changed files with 202 additions and 43 deletions

View File

@ -14,6 +14,8 @@ BREAKING CHANGES:
FEATURES:
IMPROVEMENTS:
- [consensus] [\#2169](https://github.com/cosmos/cosmos-sdk/issues/2169) add additional metrics
- [p2p] [\#2169](https://github.com/cosmos/cosmos-sdk/issues/2169) add additional metrics
BUG FIXES:
- [node] \#2434 Make node respond to signal interrupts while sleeping for genesis time

View File

@ -634,6 +634,9 @@ type InstrumentationConfig struct {
// you increase your OS limits.
// 0 - unlimited.
MaxOpenConnections int `mapstructure:"max_open_connections"`
// Tendermint instrumentation namespace.
Namespace string `mapstructure:"namespace"`
}
// DefaultInstrumentationConfig returns a default configuration for metrics
@ -643,6 +646,7 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
Prometheus: false,
PrometheusListenAddr: ":26660",
MaxOpenConnections: 3,
Namespace: "tendermint",
}
}

View File

@ -284,6 +284,9 @@ prometheus_listen_addr = "{{ .Instrumentation.PrometheusListenAddr }}"
# you increase your OS limits.
# 0 - unlimited.
max_open_connections = {{ .Instrumentation.MaxOpenConnections }}
# Instrumentation namespace
namespace = "{{ .Instrumentation.Namespace }}"
`
/****** these are for test settings ***********/

View File

@ -8,6 +8,8 @@ import (
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const MetricsSubsystem = "consensus"
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Height of the chain.
@ -38,74 +40,102 @@ type Metrics struct {
BlockSizeBytes metrics.Gauge
// Total number of transactions.
TotalTxs metrics.Gauge
// The latest block height.
CommittedHeight metrics.Gauge
// Whether or not a node is fast syncing. 1 if yes, 0 if no.
FastSyncing metrics.Gauge
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
func PrometheusMetrics() *Metrics {
func PrometheusMetrics(namespace string) *Metrics {
return &Metrics{
Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "height",
Help: "Height of the chain.",
}, []string{}),
Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "rounds",
Help: "Number of rounds.",
}, []string{}),
Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validators",
Help: "Number of validators.",
}, []string{}),
ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "validators_power",
Help: "Total power of all validators.",
}, []string{}),
MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_validators",
Help: "Number of validators who did not sign.",
}, []string{}),
MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "missing_validators_power",
Help: "Total power of the missing validators.",
}, []string{}),
ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "byzantine_validators",
Help: "Number of validators who tried to double sign.",
}, []string{}),
ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "byzantine_validators_power",
Help: "Total power of the byzantine validators.",
}, []string{}),
BlockIntervalSeconds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_interval_seconds",
Help: "Time between this and the last block.",
}, []string{}),
NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "num_txs",
Help: "Number of transactions.",
}, []string{}),
BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_size_bytes",
Help: "Size of the block.",
}, []string{}),
TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "consensus",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "total_txs",
Help: "Total number of transactions.",
}, []string{}),
CommittedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "latest_block_height",
Help: "The latest block height.",
}, []string{}),
FastSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "fast_syncing",
Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.",
}, []string{}),
}
}
@ -125,8 +155,10 @@ func NopMetrics() *Metrics {
BlockIntervalSeconds: discard.NewGauge(),
NumTxs: discard.NewGauge(),
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
NumTxs: discard.NewGauge(),
BlockSizeBytes: discard.NewGauge(),
TotalTxs: discard.NewGauge(),
CommittedHeight: discard.NewGauge(),
FastSyncing: discard.NewGauge(),
}
}

View File

@ -43,16 +43,27 @@ type ConsensusReactor struct {
mtx sync.RWMutex
fastSync bool
eventBus *types.EventBus
metrics *Metrics
}
type ReactorOption func(*ConsensusReactor)
// NewConsensusReactor returns a new ConsensusReactor with the given
// consensusState.
func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options ...ReactorOption) *ConsensusReactor {
conR := &ConsensusReactor{
conS: consensusState,
fastSync: fastSync,
metrics: NopMetrics(),
}
conR.updateFastSyncingMetric()
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
for _, option := range options {
option(conR)
}
return conR
}
@ -98,6 +109,7 @@ func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int
conR.mtx.Lock()
conR.fastSync = false
conR.mtx.Unlock()
conR.metrics.FastSyncing.Set(0)
if blocksSynced > 0 {
// dont bother with the WAL if we fast synced
@ -850,6 +862,21 @@ func (conR *ConsensusReactor) StringIndented(indent string) string {
return s
}
func (conR *ConsensusReactor) updateFastSyncingMetric() {
var fastSyncing float64
if conR.fastSync {
fastSyncing = 1
} else {
fastSyncing = 0
}
conR.metrics.FastSyncing.Set(fastSyncing)
}
// ReactorMetrics sets the metrics
func ReactorMetrics(metrics *Metrics) ReactorOption {
return func(conR *ConsensusReactor) { conR.metrics = metrics }
}
//-----------------------------------------------------------------------------
var (

View File

@ -124,8 +124,8 @@ type ConsensusState struct {
metrics *Metrics
}
// CSOption sets an optional parameter on the ConsensusState.
type CSOption func(*ConsensusState)
// StateOption sets an optional parameter on the ConsensusState.
type StateOption func(*ConsensusState)
// NewConsensusState returns a new ConsensusState.
func NewConsensusState(
@ -135,7 +135,7 @@ func NewConsensusState(
blockStore sm.BlockStore,
mempool sm.Mempool,
evpool sm.EvidencePool,
options ...CSOption,
options ...StateOption,
) *ConsensusState {
cs := &ConsensusState{
config: config,
@ -185,8 +185,8 @@ func (cs *ConsensusState) SetEventBus(b *types.EventBus) {
cs.blockExec.SetEventBus(b)
}
// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) CSOption {
// StateMetrics sets the metrics.
func StateMetrics(metrics *Metrics) StateOption {
return func(cs *ConsensusState) { cs.metrics = metrics }
}
@ -1397,6 +1397,8 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
cs.metrics.NumTxs.Set(float64(block.NumTxs))
cs.metrics.BlockSizeBytes.Set(float64(block.Size()))
cs.metrics.TotalTxs.Set(float64(block.TotalTxs))
cs.metrics.CommittedHeight.Set(float64(block.Height))
}
//-----------------------------------------------------------------------------

View File

@ -227,4 +227,7 @@ prometheus_listen_addr = ":26660"
# you increase your OS limits.
# 0 - unlimited.
max_open_connections = 3
# Instrumentation namespace
namespace = "tendermint"
```

View File

@ -3,7 +3,6 @@ package mempool
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
@ -16,9 +15,10 @@ type Metrics struct {
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
func PrometheusMetrics() *Metrics {
func PrometheusMetrics(namespace string) *Metrics {
return &Metrics{
Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "mempool",
Name: "size",
Help: "Size of the mempool (number of uncommitted transactions).",

View File

@ -105,7 +105,7 @@ type MetricsProvider func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics)
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(), p2p.PrometheusMetrics(), mempl.PrometheusMetrics()
return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace), mempl.PrometheusMetrics(config.Namespace)
}
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics()
}
@ -303,13 +303,13 @@ func NewNode(config *cfg.Config,
blockStore,
mempool,
evidencePool,
cs.WithMetrics(csMetrics),
cs.StateMetrics(csMetrics),
)
consensusState.SetLogger(consensusLogger)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync)
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger)
eventBus := types.NewEventBus()

View File

@ -585,9 +585,9 @@ func (c *MConnection) Status() ConnectionStatus {
status.Channels[i] = ChannelStatus{
ID: channel.desc.ID,
SendQueueCapacity: cap(channel.sendQueue),
SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)),
Priority: channel.desc.Priority,
RecentlySent: channel.recentlySent,
RecentlySent: atomic.LoadInt64(&channel.recentlySent),
}
}
return status
@ -724,7 +724,7 @@ func (ch *Channel) nextPacketMsg() PacketMsg {
func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) {
var packet = ch.nextPacketMsg()
n, err = cdc.MarshalBinaryWriter(w, packet)
ch.recentlySent += n
atomic.AddInt64(&ch.recentlySent, n)
return
}
@ -756,7 +756,7 @@ func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
func (ch *Channel) updateStats() {
// Exponential decay of stats.
// TODO: optimize.
ch.recentlySent = int64(float64(ch.recentlySent) * 0.8)
atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent)) * 0.8))
}
//----------------------------------------

View File

@ -3,25 +3,51 @@ package p2p
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const MetricsSubsystem = "p2p"
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Number of peers.
Peers metrics.Gauge
// Number of bytes received from a given peer.
PeerReceiveBytesTotal metrics.Counter
// Number of bytes sent to a given peer.
PeerSendBytesTotal metrics.Counter
// Pending bytes to be sent to a given peer.
PeerPendingSendBytes metrics.Gauge
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
func PrometheusMetrics() *Metrics {
func PrometheusMetrics(namespace string) *Metrics {
return &Metrics{
Peers: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Subsystem: "p2p",
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peers",
Help: "Number of peers.",
}, []string{}),
PeerReceiveBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peer_receive_bytes_total",
Help: "Number of bytes received from a given peer.",
}, []string{"peer_id"}),
PeerSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peer_send_bytes_total",
Help: "Number of bytes sent to a given peer.",
}, []string{"peer_id"}),
PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peer_pending_send_bytes",
Help: "Number of pending bytes to be sent to a given peer.",
}, []string{"peer_id"}),
}
}
@ -29,5 +55,8 @@ func PrometheusMetrics() *Metrics {
func NopMetrics() *Metrics {
return &Metrics{
Peers: discard.NewGauge(),
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
}
}

View File

@ -13,6 +13,8 @@ import (
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
const metricsTickerDuration = 10 * time.Second
var testIPSuffix uint32
// Peer is an interface representing a peer connected on a reactor.
@ -99,8 +101,13 @@ type peer struct {
// User data
Data *cmn.CMap
metrics *Metrics
metricsTicker *time.Ticker
}
type PeerOption func(*peer)
func newPeer(
pc peerConn,
mConfig tmconn.MConnConfig,
@ -108,12 +115,15 @@ func newPeer(
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.Channels,
Data: cmn.NewCMap(),
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.Channels,
Data: cmn.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
}
p.mconn = createMConnection(
@ -125,6 +135,9 @@ func newPeer(
mConfig,
)
p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
for _, option := range options {
option(p)
}
return p
}
@ -143,12 +156,18 @@ func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil {
return err
}
err := p.mconn.Start()
return err
if err := p.mconn.Start(); err != nil {
return err
}
go p.metricsReporter()
return nil
}
// OnStop implements BaseService.
func (p *peer) OnStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
p.mconn.Stop() // stop everything and close the conn
}
@ -200,7 +219,11 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool {
} else if !p.hasChannel(chID) {
return false
}
return p.mconn.Send(chID, msgBytes)
res := p.mconn.Send(chID, msgBytes)
if res {
p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes)))
}
return res
}
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
@ -211,7 +234,11 @@ func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
} else if !p.hasChannel(chID) {
return false
}
return p.mconn.TrySend(chID, msgBytes)
res := p.mconn.TrySend(chID, msgBytes)
if res {
p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes)))
}
return res
}
// Get the data for a given key.
@ -314,6 +341,29 @@ func (p *peer) String() string {
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
}
func PeerMetrics(metrics *Metrics) PeerOption {
return func(p *peer) {
p.metrics = metrics
}
}
func (p *peer) metricsReporter() {
for {
select {
case <-p.metricsTicker.C:
status := p.mconn.Status()
var sendQueueSize float64
for _, chStatus := range status.Channels {
sendQueueSize += float64(chStatus.SendQueueSize)
}
p.metrics.PeerPendingSendBytes.With("peer-id", string(p.ID())).Set(sendQueueSize)
case <-p.Quit():
return
}
}
}
//------------------------------------------------------------------
// helper funcs
@ -333,6 +383,7 @@ func createMConnection(
// which does onPeerError.
panic(fmt.Sprintf("Unknown channel %X", chID))
}
p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes)))
reactor.Receive(chID, p, msgBytes)
}

View File

@ -24,6 +24,7 @@ func randPeer(ip net.IP) *peer {
ID: nodeKey.ID(),
ListenAddr: fmt.Sprintf("%v.%v.%v.%v:26656", cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256),
},
metrics: NopMetrics(),
}
p.ip = ip

View File

@ -463,6 +463,7 @@ func (sw *Switch) acceptRoutine() {
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
})
if err != nil {
switch err.(type) {
@ -549,6 +550,7 @@ func (sw *Switch) addOutboundPeerWithConfig(
onPeerError: sw.StopPeerForError,
persistent: persistent,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
})
if err != nil {
switch e := err.(type) {

View File

@ -29,6 +29,7 @@ func CreateRandomPeer(outbound bool) *peer {
ListenAddr: netAddr.DialString(),
},
mconn: &conn.MConnection{},
metrics: NopMetrics(),
}
p.SetLogger(log.TestingLogger().With("peer", addr))
return p

View File

@ -7,7 +7,7 @@ import (
"time"
"github.com/tendermint/tendermint/config"
crypto "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/p2p/conn"
)
@ -41,6 +41,7 @@ type peerConfig struct {
onPeerError func(Peer, interface{})
outbound, persistent bool
reactorsByCh map[byte]Reactor
metrics *Metrics
}
// Transport emits and connects to Peers. The implementation of Peer is left to
@ -411,6 +412,7 @@ func (mt *MultiplexTransport) wrapPeer(
cfg.reactorsByCh,
cfg.chDescs,
cfg.onPeerError,
PeerMetrics(cfg.metrics),
)
// Wait for Peer to Stop so we can cleanup.