From 56d95e4cb81770c41f0f44868f1c5bfdad1d12b9 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 26 Dec 2016 13:50:29 +0300 Subject: [PATCH 1/5] track ping-pong --- p2p/src/net/stats.rs | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/p2p/src/net/stats.rs b/p2p/src/net/stats.rs index 3e04f643..52407f51 100644 --- a/p2p/src/net/stats.rs +++ b/p2p/src/net/stats.rs @@ -1,7 +1,13 @@ -use message::Command; +use std::time::Instant; use std::collections::hash_map::Entry; use std::collections::HashMap; +use message::{Command, Payload}; +use message::types::{Ping, Pong}; + +// delay somewhere near communication timeout +const ENORMOUS_PING_DELAY: f64 = 10f64; + #[derive(Default, Clone)] pub struct RunningAverage { count: u64, @@ -39,19 +45,26 @@ pub struct PeerStats { pub total_recv: u64, pub avg_ping: f64, - pub min_ping: f64, - pub total_ping: u64, + pub min_ping: Option, pub synced_blocks: u32, pub synced_headers: u32, pub send_avg: HashMap, pub recv_avg: HashMap, + + last_ping: Option, + ping_count: u64, } impl PeerStats { pub fn report_send(&mut self, command: Command, bytes: usize) { self.total_send += bytes as u64; self.last_send = ::time::get_time().sec as u32; + + if command == Ping::command() { + self.report_ping_send(); + } + match self.send_avg.entry(command) { Entry::Occupied(mut avg) => { avg.get_mut().add(bytes); @@ -62,9 +75,34 @@ impl PeerStats { } } + fn report_ping_send(&mut self) { + self.last_ping = Some(Instant::now()); + self.ping_count += 1; + } + + fn report_pong_recv(&mut self) { + if let Some(last_ping) = self.last_ping { + let dur = last_ping.elapsed(); + let update = if dur.as_secs() > 10 { + ENORMOUS_PING_DELAY + } + else { + // max is 10, checked above, dur.as_secs() as u32 cannot overflow; qed + f64::from(dur.as_secs() as u32) + f64::from(dur.subsec_nanos()) / 1e9 + }; + self.min_ping = Some(self.min_ping.unwrap_or(ENORMOUS_PING_DELAY).min(update)); + self.avg_ping += (update - self.avg_ping) / (self.ping_count as f64); + } + } + pub fn report_recv(&mut self, command: Command, bytes: usize) { self.total_recv += bytes as u64; self.last_recv = ::time::get_time().sec as u32; + + if command == Pong::command() { + self.report_pong_recv(); + } + match self.recv_avg.entry(command) { Entry::Occupied(mut avg) => { avg.get_mut().add(bytes); From 4caf69dff731beb4ebf71e1497c01c8deb7c0b09 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 26 Dec 2016 13:58:30 +0300 Subject: [PATCH 2/5] smoky test --- p2p/src/net/stats.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/p2p/src/net/stats.rs b/p2p/src/net/stats.rs index 52407f51..19b43dae 100644 --- a/p2p/src/net/stats.rs +++ b/p2p/src/net/stats.rs @@ -8,7 +8,7 @@ use message::types::{Ping, Pong}; // delay somewhere near communication timeout const ENORMOUS_PING_DELAY: f64 = 10f64; -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub struct RunningAverage { count: u64, bytes: u64, @@ -34,6 +34,10 @@ impl RunningAverage { // qed self.bytes = (self.bytes as i64 + ((bytes as i64 - self.bytes as i64) / self.count as i64)) as u64; } + + pub fn val(&self) -> u64 { + self.bytes + } } #[derive(Default, Clone)] @@ -117,7 +121,7 @@ impl PeerStats { #[cfg(test)] mod tests { - use super::RunningAverage; + use super::{RunningAverage, PeerStats}; #[test] fn avg() { @@ -137,4 +141,18 @@ mod tests { assert_eq!(avg.bytes, 16); } + + #[test] + fn smoky() { + let mut stats = PeerStats::default(); + stats.report_send("ping".into(), 200); + + assert_eq!(stats.send_avg[&"ping".into()].val(), 200); + + ::std::thread::sleep(::std::time::Duration::from_millis(50)); + + stats.report_recv("pong".into(), 50); + assert!(stats.avg_ping > 0.03); + assert!(stats.avg_ping < 0.1); + } } From b3dff10cf942fd860210f91a7397b9076d3020b9 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 26 Dec 2016 14:23:15 +0300 Subject: [PATCH 3/5] remove misundestood fields --- p2p/src/net/stats.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/p2p/src/net/stats.rs b/p2p/src/net/stats.rs index 19b43dae..a0bdb493 100644 --- a/p2p/src/net/stats.rs +++ b/p2p/src/net/stats.rs @@ -51,8 +51,6 @@ pub struct PeerStats { pub avg_ping: f64, pub min_ping: Option, - pub synced_blocks: u32, - pub synced_headers: u32, pub send_avg: HashMap, pub recv_avg: HashMap, From 37e815b4356c5fd30b713c12853b26e0c53471c3 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 26 Dec 2016 14:36:13 +0300 Subject: [PATCH 4/5] handy method & tests --- p2p/src/net/stats.rs | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/p2p/src/net/stats.rs b/p2p/src/net/stats.rs index a0bdb493..ba09f2e0 100644 --- a/p2p/src/net/stats.rs +++ b/p2p/src/net/stats.rs @@ -40,6 +40,8 @@ impl RunningAverage { } } +pub enum Flow { Receive, Send } + #[derive(Default, Clone)] pub struct PeerStats { pub last_send: u32, @@ -51,13 +53,14 @@ pub struct PeerStats { pub avg_ping: f64, pub min_ping: Option, - pub send_avg: HashMap, - pub recv_avg: HashMap, + send_avg: HashMap, + recv_avg: HashMap, last_ping: Option, ping_count: u64, } + impl PeerStats { pub fn report_send(&mut self, command: Command, bytes: usize) { self.total_send += bytes as u64; @@ -114,12 +117,21 @@ impl PeerStats { }, } } + + pub fn avg(&self, dir: Flow, cmd: T) -> u64 + where T: Into + { + match dir { + Flow::Receive => self.recv_avg.get(&cmd.into()).and_then(|x| Some(x.val())).unwrap_or_default(), + Flow::Send => self.send_avg.get(&cmd.into()).and_then(|x| Some(x.val())).unwrap_or_default(), + } + } } #[cfg(test)] mod tests { - use super::{RunningAverage, PeerStats}; + use super::{RunningAverage, PeerStats, Flow}; #[test] fn avg() { @@ -153,4 +165,19 @@ mod tests { assert!(stats.avg_ping > 0.03); assert!(stats.avg_ping < 0.1); } + + #[test] + fn avg_t() { + let mut stats = PeerStats::default(); + stats.report_send("inv".into(), 200); + stats.report_send("inv".into(), 300); + + assert_eq!(stats.avg(Flow::Send, "inv"), 250); + + let mut stats = PeerStats::default(); + stats.report_recv("inv".into(), 2000); + stats.report_recv("inv".into(), 3000); + + assert_eq!(stats.avg(Flow::Receive, "inv"), 2500); + } } From 00e5c3c68567429b168bbf3ff93fba72b928438e Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 27 Dec 2016 18:30:14 +0300 Subject: [PATCH 5/5] mock intervals --- p2p/src/net/stats.rs | 27 ++++++++++++++++++--------- p2p/src/util/interval.rs | 40 ++++++++++++++++++++++++++++++++++++++++ p2p/src/util/mod.rs | 1 + 3 files changed, 59 insertions(+), 9 deletions(-) create mode 100644 p2p/src/util/interval.rs diff --git a/p2p/src/net/stats.rs b/p2p/src/net/stats.rs index ba09f2e0..61cb6fd0 100644 --- a/p2p/src/net/stats.rs +++ b/p2p/src/net/stats.rs @@ -1,6 +1,7 @@ use std::time::Instant; use std::collections::hash_map::Entry; use std::collections::HashMap; +use util::interval::{Interval, RealInterval}; use message::{Command, Payload}; use message::types::{Ping, Pong}; @@ -43,7 +44,7 @@ impl RunningAverage { pub enum Flow { Receive, Send } #[derive(Default, Clone)] -pub struct PeerStats { +pub struct PeerStats { pub last_send: u32, pub last_recv: u32, @@ -58,10 +59,19 @@ pub struct PeerStats { last_ping: Option, ping_count: u64, + + interval: T, } +impl PeerStats { + + pub fn with_interval(interval: I) -> PeerStats { + PeerStats { + interval: interval, + .. PeerStats::default() + } + } -impl PeerStats { pub fn report_send(&mut self, command: Command, bytes: usize) { self.total_send += bytes as u64; self.last_send = ::time::get_time().sec as u32; @@ -81,13 +91,13 @@ impl PeerStats { } fn report_ping_send(&mut self) { - self.last_ping = Some(Instant::now()); + self.last_ping = Some(self.interval.now()); self.ping_count += 1; } fn report_pong_recv(&mut self) { if let Some(last_ping) = self.last_ping { - let dur = last_ping.elapsed(); + let dur = self.interval.elapsed(last_ping); let update = if dur.as_secs() > 10 { ENORMOUS_PING_DELAY } @@ -132,6 +142,7 @@ impl PeerStats { mod tests { use super::{RunningAverage, PeerStats, Flow}; + use util::interval::{FixedIntervalSpawner, RealInterval}; #[test] fn avg() { @@ -154,13 +165,11 @@ mod tests { #[test] fn smoky() { - let mut stats = PeerStats::default(); + let mut stats = PeerStats::::with_interval(FixedIntervalSpawner::new(50)); stats.report_send("ping".into(), 200); assert_eq!(stats.send_avg[&"ping".into()].val(), 200); - ::std::thread::sleep(::std::time::Duration::from_millis(50)); - stats.report_recv("pong".into(), 50); assert!(stats.avg_ping > 0.03); assert!(stats.avg_ping < 0.1); @@ -168,13 +177,13 @@ mod tests { #[test] fn avg_t() { - let mut stats = PeerStats::default(); + let mut stats = PeerStats::::default(); stats.report_send("inv".into(), 200); stats.report_send("inv".into(), 300); assert_eq!(stats.avg(Flow::Send, "inv"), 250); - let mut stats = PeerStats::default(); + let mut stats = PeerStats::::default(); stats.report_recv("inv".into(), 2000); stats.report_recv("inv".into(), 3000); diff --git a/p2p/src/util/interval.rs b/p2p/src/util/interval.rs new file mode 100644 index 00000000..e2c2d583 --- /dev/null +++ b/p2p/src/util/interval.rs @@ -0,0 +1,40 @@ +use std::time::{Instant, Duration}; + +pub trait Interval : Default { + fn now(&self) -> Instant { + Instant::now() + } + + fn elapsed(&self, instant: Instant) -> Duration { + instant.elapsed() + } +} + +#[derive(Default)] +pub struct RealInterval; + +impl Interval for RealInterval { } + +#[derive(Default)] +#[cfg(test)] +pub struct FixedIntervalSpawner { + step_millis: u64, +} + +#[cfg(test)] +impl FixedIntervalSpawner { + pub fn new(step_millis: u64) -> Self { + FixedIntervalSpawner { step_millis : step_millis } + } +} + +#[cfg(test)] +impl Interval for FixedIntervalSpawner { + fn now(&self) -> Instant { + Instant::now() + } + + fn elapsed(&self, instant: Instant) -> Duration { + (instant - Duration::from_millis(self.step_millis)).elapsed() + } +} diff --git a/p2p/src/util/mod.rs b/p2p/src/util/mod.rs index 5a8a6203..7933557a 100644 --- a/p2p/src/util/mod.rs +++ b/p2p/src/util/mod.rs @@ -1,5 +1,6 @@ pub mod nonce; pub mod time; +pub mod interval; mod internet_protocol; mod node_table; mod peer;