Tweak peer set metrics.
- Add a total peers metric to prevent races between measurements of ready/unready peers (which can cause the sum to be wrong). - Add an outbound request counter.
This commit is contained in:
parent
9c357eaf1e
commit
3ed75cb626
|
@ -199,6 +199,7 @@ where
|
||||||
|
|
||||||
let peer_tx = peer_tx.with(move |msg: Message| {
|
let peer_tx = peer_tx.with(move |msg: Message| {
|
||||||
// Add a metric for outbound messages.
|
// Add a metric for outbound messages.
|
||||||
|
// XXX add a dimension tagging message metrics by type
|
||||||
metrics::counter!("peer.outbound_messages", 1, "addr" => addr.to_string());
|
metrics::counter!("peer.outbound_messages", 1, "addr" => addr.to_string());
|
||||||
// We need to use future::ready rather than an async block here,
|
// We need to use future::ready rather than an async block here,
|
||||||
// because we need the sink to be Unpin, and the With<Fut, ...>
|
// because we need the sink to be Unpin, and the With<Fut, ...>
|
||||||
|
@ -213,6 +214,7 @@ where
|
||||||
let mut timestamp_collector = timestamp_collector.clone();
|
let mut timestamp_collector = timestamp_collector.clone();
|
||||||
async move {
|
async move {
|
||||||
if msg.is_ok() {
|
if msg.is_ok() {
|
||||||
|
// XXX add a dimension tagging message metrics by type
|
||||||
metrics::counter!(
|
metrics::counter!(
|
||||||
"inbound_messages",
|
"inbound_messages",
|
||||||
1,
|
1,
|
||||||
|
|
|
@ -209,7 +209,7 @@ where
|
||||||
impl<D> Service<Request> for PeerSet<D>
|
impl<D> Service<Request> for PeerSet<D>
|
||||||
where
|
where
|
||||||
D: Discover + Unpin,
|
D: Discover + Unpin,
|
||||||
D::Key: Clone + Debug,
|
D::Key: Clone + Debug + ToString,
|
||||||
D::Service: Service<Request, Response = Response> + Load,
|
D::Service: Service<Request, Response = Response> + Load,
|
||||||
D::Error: Into<BoxedStdError>,
|
D::Error: Into<BoxedStdError>,
|
||||||
<D::Service as Service<Request>>::Error: Into<BoxedStdError> + 'static,
|
<D::Service as Service<Request>>::Error: Into<BoxedStdError> + 'static,
|
||||||
|
@ -227,17 +227,13 @@ where
|
||||||
|
|
||||||
// Poll unready services to drive them to readiness.
|
// Poll unready services to drive them to readiness.
|
||||||
self.poll_unready(cx);
|
self.poll_unready(cx);
|
||||||
trace!(
|
let num_ready = self.ready_services.len();
|
||||||
num_ready = self.ready_services.len(),
|
let num_unready = self.unready_services.len();
|
||||||
num_unready = self.unready_services.len(),
|
metrics::gauge!("pool.num_ready", num_ready.try_into().unwrap(),);
|
||||||
);
|
metrics::gauge!("pool.num_unready", num_unready.try_into().unwrap(),);
|
||||||
metrics::gauge!(
|
metrics::gauge!(
|
||||||
"pool.num_ready",
|
"pool.num_peers",
|
||||||
self.ready_services.len().try_into().unwrap()
|
(num_ready + num_unready).try_into().unwrap(),
|
||||||
);
|
|
||||||
metrics::gauge!(
|
|
||||||
"pool.num_unready",
|
|
||||||
self.unready_services.len().try_into().unwrap()
|
|
||||||
);
|
);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -291,6 +287,13 @@ where
|
||||||
.swap_remove_index(index)
|
.swap_remove_index(index)
|
||||||
.expect("preselected index must be valid");
|
.expect("preselected index must be valid");
|
||||||
|
|
||||||
|
// XXX add a dimension tagging request metrics by type
|
||||||
|
metrics::counter!(
|
||||||
|
"outbound_requests",
|
||||||
|
1,
|
||||||
|
"key" => key.to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
let fut = svc.call(req);
|
let fut = svc.call(req);
|
||||||
self.push_unready(key, svc);
|
self.push_unready(key, svc);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue