perf counters

This commit is contained in:
Anatoly Yakovenko 2018-05-30 21:24:21 -07:00 committed by Grimes
parent 586279bcfc
commit c2a9395a4b
5 changed files with 98 additions and 4 deletions

View File

@ -4,13 +4,14 @@
use bank::Bank; use bank::Bank;
use bincode::deserialize; use bincode::deserialize;
use counter::Counter;
use packet; use packet;
use packet::SharedPackets; use packet::SharedPackets;
use rayon::prelude::*; use rayon::prelude::*;
use record_stage::Signal; use record_stage::Signal;
use result::Result; use result::Result;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{Builder, JoinHandle}; use std::thread::{Builder, JoinHandle};
@ -94,6 +95,8 @@ impl BankingStage {
timing::duration_as_ms(&recv_start.elapsed()), timing::duration_as_ms(&recv_start.elapsed()),
mms.len(), mms.len(),
); );
let count = mms.iter().map(|x| x.1.len()).sum();
static mut COUNTER: Counter = create_counter!("banking_stage_process_packets", 1);
let proc_start = Instant::now(); let proc_start = Instant::now();
for (msgs, vers) in mms { for (msgs, vers) in mms {
let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
@ -129,6 +132,7 @@ impl BankingStage {
reqs_len, reqs_len,
(reqs_len as f32) / (total_time_s) (reqs_len as f32) / (total_time_s)
); );
inc_counter!(COUNTER, count, proc_start);
Ok(()) Ok(())
} }
} }

70
src/counter.rs Normal file
View File

@ -0,0 +1,70 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub struct Counter {
pub name: &'static str,
pub counts: AtomicUsize,
pub nanos: AtomicUsize,
pub times: AtomicUsize,
pub lograte: usize,
}
macro_rules! create_counter {
($name:expr, $lograte:expr) => {
Counter {
name: $name,
counts: AtomicUsize::new(0),
nanos: AtomicUsize::new(0),
times: AtomicUsize::new(0),
lograte: $lograte,
}
};
}
macro_rules! inc_counter {
($name:expr, $count:expr, $start:expr) => {
unsafe { $name.inc($count, $start.elapsed()) };
};
}
impl Counter {
pub fn inc(&mut self, events: usize, dur: Duration) {
let total = dur.as_secs() * 1_000_000_000 + dur.subsec_nanos() as u64;
let counts = self.counts.fetch_add(events, Ordering::Relaxed);
let nanos = self.nanos.fetch_add(total as usize, Ordering::Relaxed);
let times = self.times.fetch_add(1, Ordering::Relaxed);
if times % self.lograte == 0 && times > 0 {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let now_ms = now.as_secs() * 1_000 + now.subsec_nanos() as u64 / 1_000_000;
info!(
"COUNTER:{{\"name:\":\"{}\", \"counts\": {}, \"nanos\": {}, \"samples\": {} \"rate\": {}, \"now\": {}}}",
self.name,
counts,
nanos,
times,
counts as f64 * 1e9 / nanos as f64,
now_ms,
);
}
}
}
#[cfg(test)]
mod tests {
use counter::Counter;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
#[test]
fn test_counter() {
static mut COUNTER: Counter = create_counter!("test", 100);
let start = Instant::now();
let count = 1;
inc_counter!(COUNTER, count, start);
unsafe {
assert_eq!(COUNTER.counts.load(Ordering::Relaxed), 1);
assert_ne!(COUNTER.nanos.load(Ordering::Relaxed), 0);
assert_eq!(COUNTER.times.load(Ordering::Relaxed), 1);
assert_eq!(COUNTER.lograte, 100);
assert_eq!(COUNTER.name, "test");
}
}
}

View File

@ -7,6 +7,8 @@
//! //!
#![cfg_attr(feature = "unstable", feature(test))] #![cfg_attr(feature = "unstable", feature(test))]
#[macro_use]
pub mod counter;
pub mod bank; pub mod bank;
pub mod banking_stage; pub mod banking_stage;
pub mod budget; pub mod budget;

View File

@ -1,6 +1,7 @@
//! The `packet` module defines data structures and methods to pull data from the network. //! The `packet` module defines data structures and methods to pull data from the network.
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use counter::Counter;
use result::{Error, Result}; use result::{Error, Result};
use serde::Serialize; use serde::Serialize;
use signature::PublicKey; use signature::PublicKey;
@ -9,7 +10,9 @@ use std::fmt;
use std::io; use std::io;
use std::mem::size_of; use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::time::Instant;
pub type SharedPackets = Arc<RwLock<Packets>>; pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>; pub type SharedBlob = Arc<RwLock<Blob>>;
@ -169,6 +172,7 @@ impl<T: Default> Recycler<T> {
impl Packets { impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> { fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
static mut COUNTER: Counter = create_counter!("packets", 10);
self.packets.resize(NUM_PACKETS, Packet::default()); self.packets.resize(NUM_PACKETS, Packet::default());
let mut i = 0; let mut i = 0;
//DOCUMENTED SIDE-EFFECT //DOCUMENTED SIDE-EFFECT
@ -178,11 +182,13 @@ impl Packets {
// * read until it fails // * read until it fails
// * set it back to blocking before returning // * set it back to blocking before returning
socket.set_nonblocking(false)?; socket.set_nonblocking(false)?;
let mut start = Instant::now();
for p in &mut self.packets { for p in &mut self.packets {
p.meta.size = 0; p.meta.size = 0;
trace!("receiving on {}", socket.local_addr().unwrap()); trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) { match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => { Err(_) if i > 0 => {
inc_counter!(COUNTER, i, start);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap()); debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break; break;
} }
@ -194,6 +200,7 @@ impl Packets {
p.meta.size = nrecv; p.meta.size = nrecv;
p.meta.set_addr(&from); p.meta.set_addr(&from);
if i == 0 { if i == 0 {
start = Instant::now();
socket.set_nonblocking(true)?; socket.set_nonblocking(true)?;
} }
} }

View File

@ -4,8 +4,11 @@
//! offloaded to the GPU. //! offloaded to the GPU.
//! //!
use counter::Counter;
use packet::{Packet, SharedPackets}; use packet::{Packet, SharedPackets};
use std::mem::size_of; use std::mem::size_of;
use std::sync::atomic::AtomicUsize;
use std::time::Instant;
use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET}; use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET};
pub const TX_OFFSET: usize = 0; pub const TX_OFFSET: usize = 0;
@ -67,8 +70,11 @@ fn batch_size(batches: &Vec<SharedPackets>) -> usize {
#[cfg(not(feature = "cuda"))] #[cfg(not(feature = "cuda"))]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> { pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use rayon::prelude::*; use rayon::prelude::*;
static mut COUNTER: Counter = create_counter!("ed25519_verify", 1);
let start = Instant::now();
let count = batch_size(batches);
info!("CPU ECDSA for {}", batch_size(batches)); info!("CPU ECDSA for {}", batch_size(batches));
batches let rv = batches
.into_par_iter() .into_par_iter()
.map(|p| { .map(|p| {
p.read() p.read()
@ -78,13 +84,17 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
.map(verify_packet) .map(verify_packet)
.collect() .collect()
}) })
.collect() .collect();
inc_counter!(COUNTER, count, start);
rv
} }
#[cfg(feature = "cuda")] #[cfg(feature = "cuda")]
pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> { pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use packet::PACKET_DATA_SIZE; use packet::PACKET_DATA_SIZE;
static mut COUNTER: Counter = create_counter!("ed25519_verify_cuda", 1);
let start = Instant::now();
let count = batch_size(batches);
info!("CUDA ECDSA for {}", batch_size(batches)); info!("CUDA ECDSA for {}", batch_size(batches));
let mut out = Vec::new(); let mut out = Vec::new();
let mut elems = Vec::new(); let mut elems = Vec::new();
@ -143,6 +153,7 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
num += 1; num += 1;
} }
} }
inc_counter!(COUNTER, count, start);
rvs rvs
} }