commit
e1909f9b8b
|
@ -1,8 +1,15 @@
|
||||||
use message::Command;
|
use std::time::Instant;
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use util::interval::{Interval, RealInterval};
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
use message::{Command, Payload};
|
||||||
|
use message::types::{Ping, Pong};
|
||||||
|
|
||||||
|
// delay somewhere near communication timeout
|
||||||
|
const ENORMOUS_PING_DELAY: f64 = 10f64;
|
||||||
|
|
||||||
|
#[derive(Default, Clone, Debug)]
|
||||||
pub struct RunningAverage {
|
pub struct RunningAverage {
|
||||||
count: u64,
|
count: u64,
|
||||||
bytes: u64,
|
bytes: u64,
|
||||||
|
@ -28,10 +35,16 @@ impl RunningAverage {
|
||||||
// qed
|
// qed
|
||||||
self.bytes = (self.bytes as i64 + ((bytes as i64 - self.bytes as i64) / self.count as i64)) as u64;
|
self.bytes = (self.bytes as i64 + ((bytes as i64 - self.bytes as i64) / self.count as i64)) as u64;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn val(&self) -> u64 {
|
||||||
|
self.bytes
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum Flow { Receive, Send }
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
pub struct PeerStats {
|
pub struct PeerStats<T: Interval = RealInterval> {
|
||||||
pub last_send: u32,
|
pub last_send: u32,
|
||||||
pub last_recv: u32,
|
pub last_recv: u32,
|
||||||
|
|
||||||
|
@ -39,19 +52,34 @@ pub struct PeerStats {
|
||||||
pub total_recv: u64,
|
pub total_recv: u64,
|
||||||
|
|
||||||
pub avg_ping: f64,
|
pub avg_ping: f64,
|
||||||
pub min_ping: f64,
|
pub min_ping: Option<f64>,
|
||||||
pub total_ping: u64,
|
|
||||||
|
|
||||||
pub synced_blocks: u32,
|
send_avg: HashMap<Command, RunningAverage>,
|
||||||
pub synced_headers: u32,
|
recv_avg: HashMap<Command, RunningAverage>,
|
||||||
pub send_avg: HashMap<Command, RunningAverage>,
|
|
||||||
pub recv_avg: HashMap<Command, RunningAverage>,
|
last_ping: Option<Instant>,
|
||||||
|
ping_count: u64,
|
||||||
|
|
||||||
|
interval: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerStats {
|
impl<I: Interval> PeerStats<I> {
|
||||||
|
|
||||||
|
pub fn with_interval(interval: I) -> PeerStats<I> {
|
||||||
|
PeerStats {
|
||||||
|
interval: interval,
|
||||||
|
.. PeerStats::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn report_send(&mut self, command: Command, bytes: usize) {
|
pub fn report_send(&mut self, command: Command, bytes: usize) {
|
||||||
self.total_send += bytes as u64;
|
self.total_send += bytes as u64;
|
||||||
self.last_send = ::time::get_time().sec as u32;
|
self.last_send = ::time::get_time().sec as u32;
|
||||||
|
|
||||||
|
if command == Ping::command() {
|
||||||
|
self.report_ping_send();
|
||||||
|
}
|
||||||
|
|
||||||
match self.send_avg.entry(command) {
|
match self.send_avg.entry(command) {
|
||||||
Entry::Occupied(mut avg) => {
|
Entry::Occupied(mut avg) => {
|
||||||
avg.get_mut().add(bytes);
|
avg.get_mut().add(bytes);
|
||||||
|
@ -62,9 +90,34 @@ impl PeerStats {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn report_ping_send(&mut self) {
|
||||||
|
self.last_ping = Some(self.interval.now());
|
||||||
|
self.ping_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report_pong_recv(&mut self) {
|
||||||
|
if let Some(last_ping) = self.last_ping {
|
||||||
|
let dur = self.interval.elapsed(last_ping);
|
||||||
|
let update = if dur.as_secs() > 10 {
|
||||||
|
ENORMOUS_PING_DELAY
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// max is 10, checked above, dur.as_secs() as u32 cannot overflow; qed
|
||||||
|
f64::from(dur.as_secs() as u32) + f64::from(dur.subsec_nanos()) / 1e9
|
||||||
|
};
|
||||||
|
self.min_ping = Some(self.min_ping.unwrap_or(ENORMOUS_PING_DELAY).min(update));
|
||||||
|
self.avg_ping += (update - self.avg_ping) / (self.ping_count as f64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn report_recv(&mut self, command: Command, bytes: usize) {
|
pub fn report_recv(&mut self, command: Command, bytes: usize) {
|
||||||
self.total_recv += bytes as u64;
|
self.total_recv += bytes as u64;
|
||||||
self.last_recv = ::time::get_time().sec as u32;
|
self.last_recv = ::time::get_time().sec as u32;
|
||||||
|
|
||||||
|
if command == Pong::command() {
|
||||||
|
self.report_pong_recv();
|
||||||
|
}
|
||||||
|
|
||||||
match self.recv_avg.entry(command) {
|
match self.recv_avg.entry(command) {
|
||||||
Entry::Occupied(mut avg) => {
|
Entry::Occupied(mut avg) => {
|
||||||
avg.get_mut().add(bytes);
|
avg.get_mut().add(bytes);
|
||||||
|
@ -74,12 +127,22 @@ impl PeerStats {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn avg<T>(&self, dir: Flow, cmd: T) -> u64
|
||||||
|
where T: Into<Command>
|
||||||
|
{
|
||||||
|
match dir {
|
||||||
|
Flow::Receive => self.recv_avg.get(&cmd.into()).and_then(|x| Some(x.val())).unwrap_or_default(),
|
||||||
|
Flow::Send => self.send_avg.get(&cmd.into()).and_then(|x| Some(x.val())).unwrap_or_default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
use super::RunningAverage;
|
use super::{RunningAverage, PeerStats, Flow};
|
||||||
|
use util::interval::{FixedIntervalSpawner, RealInterval};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn avg() {
|
fn avg() {
|
||||||
|
@ -99,4 +162,31 @@ mod tests {
|
||||||
|
|
||||||
assert_eq!(avg.bytes, 16);
|
assert_eq!(avg.bytes, 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn smoky() {
|
||||||
|
let mut stats = PeerStats::<FixedIntervalSpawner>::with_interval(FixedIntervalSpawner::new(50));
|
||||||
|
stats.report_send("ping".into(), 200);
|
||||||
|
|
||||||
|
assert_eq!(stats.send_avg[&"ping".into()].val(), 200);
|
||||||
|
|
||||||
|
stats.report_recv("pong".into(), 50);
|
||||||
|
assert!(stats.avg_ping > 0.03);
|
||||||
|
assert!(stats.avg_ping < 0.1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn avg_t() {
|
||||||
|
let mut stats = PeerStats::<RealInterval>::default();
|
||||||
|
stats.report_send("inv".into(), 200);
|
||||||
|
stats.report_send("inv".into(), 300);
|
||||||
|
|
||||||
|
assert_eq!(stats.avg(Flow::Send, "inv"), 250);
|
||||||
|
|
||||||
|
let mut stats = PeerStats::<RealInterval>::default();
|
||||||
|
stats.report_recv("inv".into(), 2000);
|
||||||
|
stats.report_recv("inv".into(), 3000);
|
||||||
|
|
||||||
|
assert_eq!(stats.avg(Flow::Receive, "inv"), 2500);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
use std::time::{Instant, Duration};
|
||||||
|
|
||||||
|
pub trait Interval : Default {
|
||||||
|
fn now(&self) -> Instant {
|
||||||
|
Instant::now()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn elapsed(&self, instant: Instant) -> Duration {
|
||||||
|
instant.elapsed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct RealInterval;
|
||||||
|
|
||||||
|
impl Interval for RealInterval { }
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
#[cfg(test)]
|
||||||
|
pub struct FixedIntervalSpawner {
|
||||||
|
step_millis: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
impl FixedIntervalSpawner {
|
||||||
|
pub fn new(step_millis: u64) -> Self {
|
||||||
|
FixedIntervalSpawner { step_millis : step_millis }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
impl Interval for FixedIntervalSpawner {
|
||||||
|
fn now(&self) -> Instant {
|
||||||
|
Instant::now()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn elapsed(&self, instant: Instant) -> Duration {
|
||||||
|
(instant - Duration::from_millis(self.step_millis)).elapsed()
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,5 +1,6 @@
|
||||||
pub mod nonce;
|
pub mod nonce;
|
||||||
pub mod time;
|
pub mod time;
|
||||||
|
pub mod interval;
|
||||||
mod internet_protocol;
|
mod internet_protocol;
|
||||||
mod node_table;
|
mod node_table;
|
||||||
mod peer;
|
mod peer;
|
||||||
|
|
Loading…
Reference in New Issue