diff --git a/p2p/src/net/stats.rs b/p2p/src/net/stats.rs index 3e04f643..61cb6fd0 100644 --- a/p2p/src/net/stats.rs +++ b/p2p/src/net/stats.rs @@ -1,8 +1,15 @@ -use message::Command; +use std::time::Instant; use std::collections::hash_map::Entry; use std::collections::HashMap; +use util::interval::{Interval, RealInterval}; -#[derive(Default, Clone)] +use message::{Command, Payload}; +use message::types::{Ping, Pong}; + +// delay somewhere near communication timeout +const ENORMOUS_PING_DELAY: f64 = 10f64; + +#[derive(Default, Clone, Debug)] pub struct RunningAverage { count: u64, bytes: u64, @@ -28,10 +35,16 @@ impl RunningAverage { // qed self.bytes = (self.bytes as i64 + ((bytes as i64 - self.bytes as i64) / self.count as i64)) as u64; } + + pub fn val(&self) -> u64 { + self.bytes + } } +pub enum Flow { Receive, Send } + #[derive(Default, Clone)] -pub struct PeerStats { +pub struct PeerStats { pub last_send: u32, pub last_recv: u32, @@ -39,19 +52,34 @@ pub struct PeerStats { pub total_recv: u64, pub avg_ping: f64, - pub min_ping: f64, - pub total_ping: u64, + pub min_ping: Option, - pub synced_blocks: u32, - pub synced_headers: u32, - pub send_avg: HashMap, - pub recv_avg: HashMap, + send_avg: HashMap, + recv_avg: HashMap, + + last_ping: Option, + ping_count: u64, + + interval: T, } -impl PeerStats { +impl PeerStats { + + pub fn with_interval(interval: I) -> PeerStats { + PeerStats { + interval: interval, + .. PeerStats::default() + } + } + pub fn report_send(&mut self, command: Command, bytes: usize) { self.total_send += bytes as u64; self.last_send = ::time::get_time().sec as u32; + + if command == Ping::command() { + self.report_ping_send(); + } + match self.send_avg.entry(command) { Entry::Occupied(mut avg) => { avg.get_mut().add(bytes); @@ -62,9 +90,34 @@ impl PeerStats { } } + fn report_ping_send(&mut self) { + self.last_ping = Some(self.interval.now()); + self.ping_count += 1; + } + + fn report_pong_recv(&mut self) { + if let Some(last_ping) = self.last_ping { + let dur = self.interval.elapsed(last_ping); + let update = if dur.as_secs() > 10 { + ENORMOUS_PING_DELAY + } + else { + // max is 10, checked above, dur.as_secs() as u32 cannot overflow; qed + f64::from(dur.as_secs() as u32) + f64::from(dur.subsec_nanos()) / 1e9 + }; + self.min_ping = Some(self.min_ping.unwrap_or(ENORMOUS_PING_DELAY).min(update)); + self.avg_ping += (update - self.avg_ping) / (self.ping_count as f64); + } + } + pub fn report_recv(&mut self, command: Command, bytes: usize) { self.total_recv += bytes as u64; self.last_recv = ::time::get_time().sec as u32; + + if command == Pong::command() { + self.report_pong_recv(); + } + match self.recv_avg.entry(command) { Entry::Occupied(mut avg) => { avg.get_mut().add(bytes); @@ -74,12 +127,22 @@ impl PeerStats { }, } } + + pub fn avg(&self, dir: Flow, cmd: T) -> u64 + where T: Into + { + match dir { + Flow::Receive => self.recv_avg.get(&cmd.into()).and_then(|x| Some(x.val())).unwrap_or_default(), + Flow::Send => self.send_avg.get(&cmd.into()).and_then(|x| Some(x.val())).unwrap_or_default(), + } + } } #[cfg(test)] mod tests { - use super::RunningAverage; + use super::{RunningAverage, PeerStats, Flow}; + use util::interval::{FixedIntervalSpawner, RealInterval}; #[test] fn avg() { @@ -99,4 +162,31 @@ mod tests { assert_eq!(avg.bytes, 16); } + + #[test] + fn smoky() { + let mut stats = PeerStats::::with_interval(FixedIntervalSpawner::new(50)); + stats.report_send("ping".into(), 200); + + assert_eq!(stats.send_avg[&"ping".into()].val(), 200); + + stats.report_recv("pong".into(), 50); + assert!(stats.avg_ping > 0.03); + assert!(stats.avg_ping < 0.1); + } + + #[test] + fn avg_t() { + let mut stats = PeerStats::::default(); + stats.report_send("inv".into(), 200); + stats.report_send("inv".into(), 300); + + assert_eq!(stats.avg(Flow::Send, "inv"), 250); + + let mut stats = PeerStats::::default(); + stats.report_recv("inv".into(), 2000); + stats.report_recv("inv".into(), 3000); + + assert_eq!(stats.avg(Flow::Receive, "inv"), 2500); + } } diff --git a/p2p/src/util/interval.rs b/p2p/src/util/interval.rs new file mode 100644 index 00000000..e2c2d583 --- /dev/null +++ b/p2p/src/util/interval.rs @@ -0,0 +1,40 @@ +use std::time::{Instant, Duration}; + +pub trait Interval : Default { + fn now(&self) -> Instant { + Instant::now() + } + + fn elapsed(&self, instant: Instant) -> Duration { + instant.elapsed() + } +} + +#[derive(Default)] +pub struct RealInterval; + +impl Interval for RealInterval { } + +#[derive(Default)] +#[cfg(test)] +pub struct FixedIntervalSpawner { + step_millis: u64, +} + +#[cfg(test)] +impl FixedIntervalSpawner { + pub fn new(step_millis: u64) -> Self { + FixedIntervalSpawner { step_millis : step_millis } + } +} + +#[cfg(test)] +impl Interval for FixedIntervalSpawner { + fn now(&self) -> Instant { + Instant::now() + } + + fn elapsed(&self, instant: Instant) -> Duration { + (instant - Duration::from_millis(self.step_millis)).elapsed() + } +} diff --git a/p2p/src/util/mod.rs b/p2p/src/util/mod.rs index 5a8a6203..7933557a 100644 --- a/p2p/src/util/mod.rs +++ b/p2p/src/util/mod.rs @@ -1,5 +1,6 @@ pub mod nonce; pub mod time; +pub mod interval; mod internet_protocol; mod node_table; mod peer;