From 587116dae19d09b1d4fa23a1750bf88410429648 Mon Sep 17 00:00:00 2001 From: Matthew Slipper Date: Tue, 25 Sep 2018 04:14:38 -0700 Subject: [PATCH] metrics: Add additional metrics to p2p and consensus (#2425) * Add additional metrics to p2p and consensus Partially addresses https://github.com/cosmos/cosmos-sdk/issues/2169. * WIP * Updates from code review * Updates from code review * Add instrumentation namespace to configuration * Fix test failure * Updates from code review * Add quotes * Add atomic load * Use storeint64 * Use addInt64 in writePacketMsgTo --- CHANGELOG_PENDING.md | 2 + config/config.go | 4 ++ config/toml.go | 3 ++ consensus/metrics.go | 64 ++++++++++++++++++------- consensus/reactor.go | 29 +++++++++++- consensus/state.go | 12 +++-- docs/tendermint-core/configuration.md | 3 ++ mempool/metrics.go | 4 +- node/node.go | 6 +-- p2p/conn/connection.go | 8 ++-- p2p/metrics.go | 35 ++++++++++++-- p2p/peer.go | 67 +++++++++++++++++++++++---- p2p/peer_set_test.go | 1 + p2p/switch.go | 2 + p2p/test_util.go | 1 + p2p/transport.go | 4 +- 16 files changed, 202 insertions(+), 43 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 450c6a5d..3893cc4c 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -14,6 +14,8 @@ BREAKING CHANGES: FEATURES: IMPROVEMENTS: +- [consensus] [\#2169](https://github.com/cosmos/cosmos-sdk/issues/2169) add additional metrics +- [p2p] [\#2169](https://github.com/cosmos/cosmos-sdk/issues/2169) add additional metrics BUG FIXES: - [node] \#2434 Make node respond to signal interrupts while sleeping for genesis time diff --git a/config/config.go b/config/config.go index d0e07547..ebb7a9ac 100644 --- a/config/config.go +++ b/config/config.go @@ -634,6 +634,9 @@ type InstrumentationConfig struct { // you increase your OS limits. // 0 - unlimited. MaxOpenConnections int `mapstructure:"max_open_connections"` + + // Tendermint instrumentation namespace. + Namespace string `mapstructure:"namespace"` } // DefaultInstrumentationConfig returns a default configuration for metrics @@ -643,6 +646,7 @@ func DefaultInstrumentationConfig() *InstrumentationConfig { Prometheus: false, PrometheusListenAddr: ":26660", MaxOpenConnections: 3, + Namespace: "tendermint", } } diff --git a/config/toml.go b/config/toml.go index 9beb9d79..bc10590c 100644 --- a/config/toml.go +++ b/config/toml.go @@ -284,6 +284,9 @@ prometheus_listen_addr = "{{ .Instrumentation.PrometheusListenAddr }}" # you increase your OS limits. # 0 - unlimited. max_open_connections = {{ .Instrumentation.MaxOpenConnections }} + +# Instrumentation namespace +namespace = "{{ .Instrumentation.Namespace }}" ` /****** these are for test settings ***********/ diff --git a/consensus/metrics.go b/consensus/metrics.go index 68d065ec..39bfd24b 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -8,6 +8,8 @@ import ( stdprometheus "github.com/prometheus/client_golang/prometheus" ) +const MetricsSubsystem = "consensus" + // Metrics contains metrics exposed by this package. type Metrics struct { // Height of the chain. @@ -38,74 +40,102 @@ type Metrics struct { BlockSizeBytes metrics.Gauge // Total number of transactions. TotalTxs metrics.Gauge + // The latest block height. + CommittedHeight metrics.Gauge + // Whether or not a node is fast syncing. 1 if yes, 0 if no. + FastSyncing metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. -func PrometheusMetrics() *Metrics { +func PrometheusMetrics(namespace string) *Metrics { return &Metrics{ Height: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "height", Help: "Height of the chain.", }, []string{}), Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "rounds", Help: "Number of rounds.", }, []string{}), Validators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "validators", Help: "Number of validators.", }, []string{}), ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "validators_power", Help: "Total power of all validators.", }, []string{}), MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "missing_validators", Help: "Number of validators who did not sign.", }, []string{}), MissingValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "missing_validators_power", Help: "Total power of the missing validators.", }, []string{}), ByzantineValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "byzantine_validators", Help: "Number of validators who tried to double sign.", }, []string{}), ByzantineValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "byzantine_validators_power", Help: "Total power of the byzantine validators.", }, []string{}), BlockIntervalSeconds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "block_interval_seconds", Help: "Time between this and the last block.", }, []string{}), NumTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "num_txs", Help: "Number of transactions.", }, []string{}), BlockSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "block_size_bytes", Help: "Size of the block.", }, []string{}), TotalTxs: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "consensus", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "total_txs", Help: "Total number of transactions.", }, []string{}), + CommittedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "latest_block_height", + Help: "The latest block height.", + }, []string{}), + FastSyncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "fast_syncing", + Help: "Whether or not a node is fast syncing. 1 if yes, 0 if no.", + }, []string{}), } } @@ -125,8 +155,10 @@ func NopMetrics() *Metrics { BlockIntervalSeconds: discard.NewGauge(), - NumTxs: discard.NewGauge(), - BlockSizeBytes: discard.NewGauge(), - TotalTxs: discard.NewGauge(), + NumTxs: discard.NewGauge(), + BlockSizeBytes: discard.NewGauge(), + TotalTxs: discard.NewGauge(), + CommittedHeight: discard.NewGauge(), + FastSyncing: discard.NewGauge(), } } diff --git a/consensus/reactor.go b/consensus/reactor.go index 4a915ace..2b4bab13 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -43,16 +43,27 @@ type ConsensusReactor struct { mtx sync.RWMutex fastSync bool eventBus *types.EventBus + + metrics *Metrics } +type ReactorOption func(*ConsensusReactor) + // NewConsensusReactor returns a new ConsensusReactor with the given // consensusState. -func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { +func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options ...ReactorOption) *ConsensusReactor { conR := &ConsensusReactor{ conS: consensusState, fastSync: fastSync, + metrics: NopMetrics(), } + conR.updateFastSyncingMetric() conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR) + + for _, option := range options { + option(conR) + } + return conR } @@ -98,6 +109,7 @@ func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int conR.mtx.Lock() conR.fastSync = false conR.mtx.Unlock() + conR.metrics.FastSyncing.Set(0) if blocksSynced > 0 { // dont bother with the WAL if we fast synced @@ -850,6 +862,21 @@ func (conR *ConsensusReactor) StringIndented(indent string) string { return s } +func (conR *ConsensusReactor) updateFastSyncingMetric() { + var fastSyncing float64 + if conR.fastSync { + fastSyncing = 1 + } else { + fastSyncing = 0 + } + conR.metrics.FastSyncing.Set(fastSyncing) +} + +// ReactorMetrics sets the metrics +func ReactorMetrics(metrics *Metrics) ReactorOption { + return func(conR *ConsensusReactor) { conR.metrics = metrics } +} + //----------------------------------------------------------------------------- var ( diff --git a/consensus/state.go b/consensus/state.go index 3ee1cfbf..12dfa4ed 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -124,8 +124,8 @@ type ConsensusState struct { metrics *Metrics } -// CSOption sets an optional parameter on the ConsensusState. -type CSOption func(*ConsensusState) +// StateOption sets an optional parameter on the ConsensusState. +type StateOption func(*ConsensusState) // NewConsensusState returns a new ConsensusState. func NewConsensusState( @@ -135,7 +135,7 @@ func NewConsensusState( blockStore sm.BlockStore, mempool sm.Mempool, evpool sm.EvidencePool, - options ...CSOption, + options ...StateOption, ) *ConsensusState { cs := &ConsensusState{ config: config, @@ -185,8 +185,8 @@ func (cs *ConsensusState) SetEventBus(b *types.EventBus) { cs.blockExec.SetEventBus(b) } -// WithMetrics sets the metrics. -func WithMetrics(metrics *Metrics) CSOption { +// StateMetrics sets the metrics. +func StateMetrics(metrics *Metrics) StateOption { return func(cs *ConsensusState) { cs.metrics = metrics } } @@ -1397,6 +1397,8 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) { cs.metrics.NumTxs.Set(float64(block.NumTxs)) cs.metrics.BlockSizeBytes.Set(float64(block.Size())) cs.metrics.TotalTxs.Set(float64(block.TotalTxs)) + cs.metrics.CommittedHeight.Set(float64(block.Height)) + } //----------------------------------------------------------------------------- diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 29db1212..d759ab9f 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -227,4 +227,7 @@ prometheus_listen_addr = ":26660" # you increase your OS limits. # 0 - unlimited. max_open_connections = 3 + +# Instrumentation namespace +namespace = "tendermint" ``` diff --git a/mempool/metrics.go b/mempool/metrics.go index f381678c..fc4bb4fb 100644 --- a/mempool/metrics.go +++ b/mempool/metrics.go @@ -3,7 +3,6 @@ package mempool import ( "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" - prometheus "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -16,9 +15,10 @@ type Metrics struct { } // PrometheusMetrics returns Metrics build using Prometheus client library. -func PrometheusMetrics() *Metrics { +func PrometheusMetrics(namespace string) *Metrics { return &Metrics{ Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, Subsystem: "mempool", Name: "size", Help: "Size of the mempool (number of uncommitted transactions).", diff --git a/node/node.go b/node/node.go index 016ed367..bba4dbda 100644 --- a/node/node.go +++ b/node/node.go @@ -105,7 +105,7 @@ type MetricsProvider func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics) func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics) { if config.Prometheus { - return cs.PrometheusMetrics(), p2p.PrometheusMetrics(), mempl.PrometheusMetrics() + return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace), mempl.PrometheusMetrics(config.Namespace) } return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics() } @@ -303,13 +303,13 @@ func NewNode(config *cfg.Config, blockStore, mempool, evidencePool, - cs.WithMetrics(csMetrics), + cs.StateMetrics(csMetrics), ) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) } - consensusReactor := cs.NewConsensusReactor(consensusState, fastSync) + consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics)) consensusReactor.SetLogger(consensusLogger) eventBus := types.NewEventBus() diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index bb67eab3..2eb210e3 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -585,9 +585,9 @@ func (c *MConnection) Status() ConnectionStatus { status.Channels[i] = ChannelStatus{ ID: channel.desc.ID, SendQueueCapacity: cap(channel.sendQueue), - SendQueueSize: int(channel.sendQueueSize), // TODO use atomic + SendQueueSize: int(atomic.LoadInt32(&channel.sendQueueSize)), Priority: channel.desc.Priority, - RecentlySent: channel.recentlySent, + RecentlySent: atomic.LoadInt64(&channel.recentlySent), } } return status @@ -724,7 +724,7 @@ func (ch *Channel) nextPacketMsg() PacketMsg { func (ch *Channel) writePacketMsgTo(w io.Writer) (n int64, err error) { var packet = ch.nextPacketMsg() n, err = cdc.MarshalBinaryWriter(w, packet) - ch.recentlySent += n + atomic.AddInt64(&ch.recentlySent, n) return } @@ -756,7 +756,7 @@ func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) { func (ch *Channel) updateStats() { // Exponential decay of stats. // TODO: optimize. - ch.recentlySent = int64(float64(ch.recentlySent) * 0.8) + atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent)) * 0.8)) } //---------------------------------------- diff --git a/p2p/metrics.go b/p2p/metrics.go index ab876ee7..94794dfd 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -3,25 +3,51 @@ package p2p import ( "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/discard" - prometheus "github.com/go-kit/kit/metrics/prometheus" stdprometheus "github.com/prometheus/client_golang/prometheus" ) +const MetricsSubsystem = "p2p" + // Metrics contains metrics exposed by this package. type Metrics struct { // Number of peers. Peers metrics.Gauge + // Number of bytes received from a given peer. + PeerReceiveBytesTotal metrics.Counter + // Number of bytes sent to a given peer. + PeerSendBytesTotal metrics.Counter + // Pending bytes to be sent to a given peer. + PeerPendingSendBytes metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. -func PrometheusMetrics() *Metrics { +func PrometheusMetrics(namespace string) *Metrics { return &Metrics{ Peers: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Subsystem: "p2p", + Namespace: namespace, + Subsystem: MetricsSubsystem, Name: "peers", Help: "Number of peers.", }, []string{}), + PeerReceiveBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_receive_bytes_total", + Help: "Number of bytes received from a given peer.", + }, []string{"peer_id"}), + PeerSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_send_bytes_total", + Help: "Number of bytes sent to a given peer.", + }, []string{"peer_id"}), + PeerPendingSendBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_pending_send_bytes", + Help: "Number of pending bytes to be sent to a given peer.", + }, []string{"peer_id"}), } } @@ -29,5 +55,8 @@ func PrometheusMetrics() *Metrics { func NopMetrics() *Metrics { return &Metrics{ Peers: discard.NewGauge(), + PeerReceiveBytesTotal: discard.NewCounter(), + PeerSendBytesTotal: discard.NewCounter(), + PeerPendingSendBytes: discard.NewGauge(), } } diff --git a/p2p/peer.go b/p2p/peer.go index 5dbc582c..064f9181 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -13,6 +13,8 @@ import ( tmconn "github.com/tendermint/tendermint/p2p/conn" ) +const metricsTickerDuration = 10 * time.Second + var testIPSuffix uint32 // Peer is an interface representing a peer connected on a reactor. @@ -99,8 +101,13 @@ type peer struct { // User data Data *cmn.CMap + + metrics *Metrics + metricsTicker *time.Ticker } +type PeerOption func(*peer) + func newPeer( pc peerConn, mConfig tmconn.MConnConfig, @@ -108,12 +115,15 @@ func newPeer( reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), + options ...PeerOption, ) *peer { p := &peer{ - peerConn: pc, - nodeInfo: nodeInfo, - channels: nodeInfo.Channels, - Data: cmn.NewCMap(), + peerConn: pc, + nodeInfo: nodeInfo, + channels: nodeInfo.Channels, + Data: cmn.NewCMap(), + metricsTicker: time.NewTicker(metricsTickerDuration), + metrics: NopMetrics(), } p.mconn = createMConnection( @@ -125,6 +135,9 @@ func newPeer( mConfig, ) p.BaseService = *cmn.NewBaseService(nil, "Peer", p) + for _, option := range options { + option(p) + } return p } @@ -143,12 +156,18 @@ func (p *peer) OnStart() error { if err := p.BaseService.OnStart(); err != nil { return err } - err := p.mconn.Start() - return err + + if err := p.mconn.Start(); err != nil { + return err + } + + go p.metricsReporter() + return nil } // OnStop implements BaseService. func (p *peer) OnStop() { + p.metricsTicker.Stop() p.BaseService.OnStop() p.mconn.Stop() // stop everything and close the conn } @@ -200,7 +219,11 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - return p.mconn.Send(chID, msgBytes) + res := p.mconn.Send(chID, msgBytes) + if res { + p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes))) + } + return res } // TrySend msg bytes to the channel identified by chID byte. Immediately returns @@ -211,7 +234,11 @@ func (p *peer) TrySend(chID byte, msgBytes []byte) bool { } else if !p.hasChannel(chID) { return false } - return p.mconn.TrySend(chID, msgBytes) + res := p.mconn.TrySend(chID, msgBytes) + if res { + p.metrics.PeerSendBytesTotal.With("peer-id", string(p.ID())).Add(float64(len(msgBytes))) + } + return res } // Get the data for a given key. @@ -314,6 +341,29 @@ func (p *peer) String() string { return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) } +func PeerMetrics(metrics *Metrics) PeerOption { + return func(p *peer) { + p.metrics = metrics + } +} + +func (p *peer) metricsReporter() { + for { + select { + case <-p.metricsTicker.C: + status := p.mconn.Status() + var sendQueueSize float64 + for _, chStatus := range status.Channels { + sendQueueSize += float64(chStatus.SendQueueSize) + } + + p.metrics.PeerPendingSendBytes.With("peer-id", string(p.ID())).Set(sendQueueSize) + case <-p.Quit(): + return + } + } +} + //------------------------------------------------------------------ // helper funcs @@ -333,6 +383,7 @@ func createMConnection( // which does onPeerError. panic(fmt.Sprintf("Unknown channel %X", chID)) } + p.metrics.PeerReceiveBytesTotal.With("peer_id", string(p.ID())).Add(float64(len(msgBytes))) reactor.Receive(chID, p, msgBytes) } diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index a352cce0..ee1c52ea 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -24,6 +24,7 @@ func randPeer(ip net.IP) *peer { ID: nodeKey.ID(), ListenAddr: fmt.Sprintf("%v.%v.%v.%v:26656", cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256), }, + metrics: NopMetrics(), } p.ip = ip diff --git a/p2p/switch.go b/p2p/switch.go index 57077e07..dbef56eb 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -463,6 +463,7 @@ func (sw *Switch) acceptRoutine() { chDescs: sw.chDescs, onPeerError: sw.StopPeerForError, reactorsByCh: sw.reactorsByCh, + metrics: sw.metrics, }) if err != nil { switch err.(type) { @@ -549,6 +550,7 @@ func (sw *Switch) addOutboundPeerWithConfig( onPeerError: sw.StopPeerForError, persistent: persistent, reactorsByCh: sw.reactorsByCh, + metrics: sw.metrics, }) if err != nil { switch e := err.(type) { diff --git a/p2p/test_util.go b/p2p/test_util.go index 64b8b215..3d48aaac 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -29,6 +29,7 @@ func CreateRandomPeer(outbound bool) *peer { ListenAddr: netAddr.DialString(), }, mconn: &conn.MConnection{}, + metrics: NopMetrics(), } p.SetLogger(log.TestingLogger().With("peer", addr)) return p diff --git a/p2p/transport.go b/p2p/transport.go index 903d193d..6f097b4f 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -7,7 +7,7 @@ import ( "time" "github.com/tendermint/tendermint/config" - crypto "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/p2p/conn" ) @@ -41,6 +41,7 @@ type peerConfig struct { onPeerError func(Peer, interface{}) outbound, persistent bool reactorsByCh map[byte]Reactor + metrics *Metrics } // Transport emits and connects to Peers. The implementation of Peer is left to @@ -411,6 +412,7 @@ func (mt *MultiplexTransport) wrapPeer( cfg.reactorsByCh, cfg.chDescs, cfg.onPeerError, + PeerMetrics(cfg.metrics), ) // Wait for Peer to Stop so we can cleanup.