Add metrics for inbound and outbound messages.
This commit is contained in:
parent
8c938af579
commit
80e7ee6dae
|
@ -195,21 +195,29 @@ where
|
||||||
|
|
||||||
let (peer_tx, peer_rx) = stream.split();
|
let (peer_tx, peer_rx) = stream.split();
|
||||||
|
|
||||||
use super::connection;
|
// Instrument the peer's rx and tx streams.
|
||||||
let server = Connection {
|
|
||||||
state: connection::State::AwaitingRequest,
|
|
||||||
svc: internal_service,
|
|
||||||
client_rx: server_rx,
|
|
||||||
error_slot: slot,
|
|
||||||
peer_tx,
|
|
||||||
request_timer: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let hooked_peer_rx = peer_rx
|
let peer_tx = peer_tx.with(move |msg: Message| {
|
||||||
|
// Add a metric for outbound messages.
|
||||||
|
metrics::counter!("peer.outbound_messages", 1, "addr" => addr.to_string());
|
||||||
|
// We need to use future::ready rather than an async block here,
|
||||||
|
// because we need the sink to be Unpin, and the With<Fut, ...>
|
||||||
|
// returned by .with is Unpin only if Fut is Unpin, and the
|
||||||
|
// futures generated by async blocks are not Unpin.
|
||||||
|
future::ready(Ok(msg))
|
||||||
|
});
|
||||||
|
|
||||||
|
let peer_rx = peer_rx
|
||||||
.then(move |msg| {
|
.then(move |msg| {
|
||||||
|
// Add a metric for inbound messages and fire a timestamp event.
|
||||||
let mut timestamp_collector = timestamp_collector.clone();
|
let mut timestamp_collector = timestamp_collector.clone();
|
||||||
async move {
|
async move {
|
||||||
if msg.is_ok() {
|
if msg.is_ok() {
|
||||||
|
metrics::counter!(
|
||||||
|
"inbound_messages",
|
||||||
|
1,
|
||||||
|
"addr" => addr.to_string(),
|
||||||
|
);
|
||||||
use futures::sink::SinkExt;
|
use futures::sink::SinkExt;
|
||||||
let _ = timestamp_collector
|
let _ = timestamp_collector
|
||||||
.send(MetaAddr {
|
.send(MetaAddr {
|
||||||
|
@ -224,12 +232,17 @@ where
|
||||||
})
|
})
|
||||||
.boxed();
|
.boxed();
|
||||||
|
|
||||||
tokio::spawn(
|
use super::connection;
|
||||||
server
|
let server = Connection {
|
||||||
.run(hooked_peer_rx)
|
state: connection::State::AwaitingRequest,
|
||||||
.instrument(connection_span)
|
svc: internal_service,
|
||||||
.boxed(),
|
client_rx: server_rx,
|
||||||
);
|
error_slot: slot,
|
||||||
|
peer_tx,
|
||||||
|
request_timer: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::spawn(server.run(peer_rx).instrument(connection_span).boxed());
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
use futures::channel::oneshot;
|
use futures::channel::oneshot;
|
||||||
|
|
Loading…
Reference in New Issue