diff --git a/Cargo.lock b/Cargo.lock index 29a3e0de08..0d0053c084 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4496,6 +4496,7 @@ dependencies = [ "dashmap", "etcd-client", "fs_extra", + "histogram", "itertools 0.10.1", "jsonrpc-core", "jsonrpc-core-client", diff --git a/core/Cargo.toml b/core/Cargo.toml index 1a7a452b04..7f73356114 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -23,6 +23,7 @@ crossbeam-channel = "0.5" dashmap = { version = "4.0.2", features = ["rayon", "raw-api"] } etcd-client = { version = "0.7.2", features = ["tls"]} fs_extra = "1.2.0" +histogram = "0.6.9" itertools = "0.10.1" log = "0.4.14" lru = "0.6.6" diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index d3de643735..8dec1d367b 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -8,14 +8,18 @@ use crate::sigverify; use crossbeam_channel::{SendError, Sender as CrossbeamSender}; use solana_measure::measure::Measure; -use solana_metrics::datapoint_debug; use solana_perf::packet::Packets; use solana_perf::perf_libs; use solana_sdk::timing; use solana_streamer::streamer::{self, PacketReceiver, StreamerError}; -use std::sync::mpsc::{Receiver, RecvTimeoutError}; -use std::sync::{Arc, Mutex}; -use std::thread::{self, Builder, JoinHandle}; +use std::{ + sync::{ + mpsc::{Receiver, RecvTimeoutError}, + Arc, Mutex, + }, + thread::{self, Builder, JoinHandle}, + time::Instant, +}; use thiserror::Error; const RECV_BATCH_MAX_CPU: usize = 1_000; @@ -43,6 +47,82 @@ pub trait SigVerifier { #[derive(Default, Clone)] pub struct DisabledSigVerifier {} +#[derive(Default)] +struct SigVerifierStats { + recv_batches_us_hist: histogram::Histogram, // time to call recv_batch + verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch + batches_hist: histogram::Histogram, // number of Packets structures per verify call + packets_hist: histogram::Histogram, // number of packets per verify call + total_batches: usize, + total_packets: usize, +} + +impl SigVerifierStats { + fn report(&self) { + datapoint_info!( + "sigverify_stage-total_verify_time", + ( + "recv_batches_us_90pct", + self.recv_batches_us_hist.percentile(90.0).unwrap_or(0), + i64 + ), + ( + "recv_batches_us_min", + self.recv_batches_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "recv_batches_us_max", + self.recv_batches_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "recv_batches_us_mean", + self.recv_batches_us_hist.mean().unwrap_or(0), + i64 + ), + ( + "verify_batches_pp_us_90pct", + self.verify_batches_pp_us_hist.percentile(90.0).unwrap_or(0), + i64 + ), + ( + "verify_batches_pp_us_min", + self.verify_batches_pp_us_hist.minimum().unwrap_or(0), + i64 + ), + ( + "verify_batches_pp_us_max", + self.verify_batches_pp_us_hist.maximum().unwrap_or(0), + i64 + ), + ( + "verify_batches_pp_us_mean", + self.verify_batches_pp_us_hist.mean().unwrap_or(0), + i64 + ), + ( + "batches_90pct", + self.batches_hist.percentile(90.0).unwrap_or(0), + i64 + ), + ("batches_min", self.batches_hist.minimum().unwrap_or(0), i64), + ("batches_max", self.batches_hist.maximum().unwrap_or(0), i64), + ("batches_mean", self.batches_hist.mean().unwrap_or(0), i64), + ( + "packets_90pct", + self.packets_hist.percentile(90.0).unwrap_or(0), + i64 + ), + ("packets_min", self.packets_hist.minimum().unwrap_or(0), i64), + ("packets_max", self.packets_hist.maximum().unwrap_or(0), i64), + ("packets_mean", self.packets_hist.mean().unwrap_or(0), i64), + ("total_batches", self.total_batches, i64), + ("total_packets", self.total_packets, i64), + ); + } +} + impl SigVerifier for DisabledSigVerifier { fn verify_batch(&self, mut batch: Vec) -> Vec { sigverify::ed25519_verify_disabled(&mut batch); @@ -66,8 +146,9 @@ impl SigVerifyStage { sendr: &CrossbeamSender>, id: usize, verifier: &T, + stats: &mut SigVerifierStats, ) -> Result<()> { - let (batch, len, recv_time) = streamer::recv_batch( + let (batch, num_packets, recv_duration) = streamer::recv_batch( &recvr.lock().expect("'recvr' lock in fn verifier"), if perf_libs::api().is_some() { RECV_BATCH_MAX_GPU @@ -76,14 +157,15 @@ impl SigVerifyStage { }, )?; - let mut verify_batch_time = Measure::start("sigverify_batch_time"); let batch_len = batch.len(); debug!( "@{:?} verifier: verifying: {} id: {}", timing::timestamp(), - len, + num_packets, id ); + + let mut verify_batch_time = Measure::start("sigverify_batch_time"); sendr.send(verifier.verify_batch(batch))?; verify_batch_time.stop(); @@ -93,17 +175,22 @@ impl SigVerifyStage { batch_len, verify_batch_time.as_ms(), id, - len, - (len as f32 / verify_batch_time.as_s()) + num_packets, + (num_packets as f32 / verify_batch_time.as_s()) ); - datapoint_debug!( - "sigverify_stage-total_verify_time", - ("num_batches", batch_len, i64), - ("num_packets", len, i64), - ("verify_time_ms", verify_batch_time.as_ms(), i64), - ("recv_time", recv_time, i64), - ); + stats + .recv_batches_us_hist + .increment(recv_duration.as_micros() as u64) + .unwrap(); + stats + .verify_batches_pp_us_hist + .increment(verify_batch_time.as_us() / (num_packets as u64)) + .unwrap(); + stats.batches_hist.increment(batch_len as u64).unwrap(); + stats.packets_hist.increment(num_packets as u64).unwrap(); + stats.total_batches += batch_len; + stats.total_packets += num_packets; Ok(()) } @@ -115,10 +202,18 @@ impl SigVerifyStage { verifier: &T, ) -> JoinHandle<()> { let verifier = verifier.clone(); + let mut stats = SigVerifierStats::default(); + let mut last_print = Instant::now(); Builder::new() .name(format!("solana-verifier-{}", id)) .spawn(move || loop { - if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, id, &verifier) { + if let Err(e) = Self::verifier( + &packet_receiver, + &verified_sender, + id, + &verifier, + &mut stats, + ) { match e { SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( RecvTimeoutError::Disconnected, @@ -132,6 +227,11 @@ impl SigVerifyStage { _ => error!("{:?}", e), } } + if last_print.elapsed().as_secs() > 2 { + stats.report(); + stats = SigVerifierStats::default(); + last_print = Instant::now(); + } }) .unwrap() } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 10d0472275..698827542f 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -6,7 +6,7 @@ use crate::{ recvmmsg::NUM_RCVMMSGS, socket::SocketAddrSpace, }; -use solana_sdk::timing::{duration_as_ms, timestamp}; +use solana_sdk::timing::timestamp; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; @@ -126,7 +126,10 @@ fn recv_send( Ok(()) } -pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec, usize, u64)> { +pub fn recv_batch( + recvr: &PacketReceiver, + max_batch: usize, +) -> Result<(Vec, usize, Duration)> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); @@ -141,8 +144,9 @@ pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec