periodically report sigverify_stage stats (#19674)

This commit is contained in:
Jeff Biseda 2021-09-21 10:37:58 -07:00 committed by GitHub
parent 7f3d445af5
commit 640e93187c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 126 additions and 20 deletions

1
Cargo.lock generated
View File

@ -4496,6 +4496,7 @@ dependencies = [
"dashmap", "dashmap",
"etcd-client", "etcd-client",
"fs_extra", "fs_extra",
"histogram",
"itertools 0.10.1", "itertools 0.10.1",
"jsonrpc-core", "jsonrpc-core",
"jsonrpc-core-client", "jsonrpc-core-client",

View File

@ -23,6 +23,7 @@ crossbeam-channel = "0.5"
dashmap = { version = "4.0.2", features = ["rayon", "raw-api"] } dashmap = { version = "4.0.2", features = ["rayon", "raw-api"] }
etcd-client = { version = "0.7.2", features = ["tls"]} etcd-client = { version = "0.7.2", features = ["tls"]}
fs_extra = "1.2.0" fs_extra = "1.2.0"
histogram = "0.6.9"
itertools = "0.10.1" itertools = "0.10.1"
log = "0.4.14" log = "0.4.14"
lru = "0.6.6" lru = "0.6.6"

View File

@ -8,14 +8,18 @@
use crate::sigverify; use crate::sigverify;
use crossbeam_channel::{SendError, Sender as CrossbeamSender}; use crossbeam_channel::{SendError, Sender as CrossbeamSender};
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_metrics::datapoint_debug;
use solana_perf::packet::Packets; use solana_perf::packet::Packets;
use solana_perf::perf_libs; use solana_perf::perf_libs;
use solana_sdk::timing; use solana_sdk::timing;
use solana_streamer::streamer::{self, PacketReceiver, StreamerError}; use solana_streamer::streamer::{self, PacketReceiver, StreamerError};
use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::{
use std::sync::{Arc, Mutex}; sync::{
use std::thread::{self, Builder, JoinHandle}; mpsc::{Receiver, RecvTimeoutError},
Arc, Mutex,
},
thread::{self, Builder, JoinHandle},
time::Instant,
};
use thiserror::Error; use thiserror::Error;
const RECV_BATCH_MAX_CPU: usize = 1_000; const RECV_BATCH_MAX_CPU: usize = 1_000;
@ -43,6 +47,82 @@ pub trait SigVerifier {
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct DisabledSigVerifier {} pub struct DisabledSigVerifier {}
#[derive(Default)]
struct SigVerifierStats {
recv_batches_us_hist: histogram::Histogram, // time to call recv_batch
verify_batches_pp_us_hist: histogram::Histogram, // per-packet time to call verify_batch
batches_hist: histogram::Histogram, // number of Packets structures per verify call
packets_hist: histogram::Histogram, // number of packets per verify call
total_batches: usize,
total_packets: usize,
}
impl SigVerifierStats {
fn report(&self) {
datapoint_info!(
"sigverify_stage-total_verify_time",
(
"recv_batches_us_90pct",
self.recv_batches_us_hist.percentile(90.0).unwrap_or(0),
i64
),
(
"recv_batches_us_min",
self.recv_batches_us_hist.minimum().unwrap_or(0),
i64
),
(
"recv_batches_us_max",
self.recv_batches_us_hist.maximum().unwrap_or(0),
i64
),
(
"recv_batches_us_mean",
self.recv_batches_us_hist.mean().unwrap_or(0),
i64
),
(
"verify_batches_pp_us_90pct",
self.verify_batches_pp_us_hist.percentile(90.0).unwrap_or(0),
i64
),
(
"verify_batches_pp_us_min",
self.verify_batches_pp_us_hist.minimum().unwrap_or(0),
i64
),
(
"verify_batches_pp_us_max",
self.verify_batches_pp_us_hist.maximum().unwrap_or(0),
i64
),
(
"verify_batches_pp_us_mean",
self.verify_batches_pp_us_hist.mean().unwrap_or(0),
i64
),
(
"batches_90pct",
self.batches_hist.percentile(90.0).unwrap_or(0),
i64
),
("batches_min", self.batches_hist.minimum().unwrap_or(0), i64),
("batches_max", self.batches_hist.maximum().unwrap_or(0), i64),
("batches_mean", self.batches_hist.mean().unwrap_or(0), i64),
(
"packets_90pct",
self.packets_hist.percentile(90.0).unwrap_or(0),
i64
),
("packets_min", self.packets_hist.minimum().unwrap_or(0), i64),
("packets_max", self.packets_hist.maximum().unwrap_or(0), i64),
("packets_mean", self.packets_hist.mean().unwrap_or(0), i64),
("total_batches", self.total_batches, i64),
("total_packets", self.total_packets, i64),
);
}
}
impl SigVerifier for DisabledSigVerifier { impl SigVerifier for DisabledSigVerifier {
fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> { fn verify_batch(&self, mut batch: Vec<Packets>) -> Vec<Packets> {
sigverify::ed25519_verify_disabled(&mut batch); sigverify::ed25519_verify_disabled(&mut batch);
@ -66,8 +146,9 @@ impl SigVerifyStage {
sendr: &CrossbeamSender<Vec<Packets>>, sendr: &CrossbeamSender<Vec<Packets>>,
id: usize, id: usize,
verifier: &T, verifier: &T,
stats: &mut SigVerifierStats,
) -> Result<()> { ) -> Result<()> {
let (batch, len, recv_time) = streamer::recv_batch( let (batch, num_packets, recv_duration) = streamer::recv_batch(
&recvr.lock().expect("'recvr' lock in fn verifier"), &recvr.lock().expect("'recvr' lock in fn verifier"),
if perf_libs::api().is_some() { if perf_libs::api().is_some() {
RECV_BATCH_MAX_GPU RECV_BATCH_MAX_GPU
@ -76,14 +157,15 @@ impl SigVerifyStage {
}, },
)?; )?;
let mut verify_batch_time = Measure::start("sigverify_batch_time");
let batch_len = batch.len(); let batch_len = batch.len();
debug!( debug!(
"@{:?} verifier: verifying: {} id: {}", "@{:?} verifier: verifying: {} id: {}",
timing::timestamp(), timing::timestamp(),
len, num_packets,
id id
); );
let mut verify_batch_time = Measure::start("sigverify_batch_time");
sendr.send(verifier.verify_batch(batch))?; sendr.send(verifier.verify_batch(batch))?;
verify_batch_time.stop(); verify_batch_time.stop();
@ -93,17 +175,22 @@ impl SigVerifyStage {
batch_len, batch_len,
verify_batch_time.as_ms(), verify_batch_time.as_ms(),
id, id,
len, num_packets,
(len as f32 / verify_batch_time.as_s()) (num_packets as f32 / verify_batch_time.as_s())
); );
datapoint_debug!( stats
"sigverify_stage-total_verify_time", .recv_batches_us_hist
("num_batches", batch_len, i64), .increment(recv_duration.as_micros() as u64)
("num_packets", len, i64), .unwrap();
("verify_time_ms", verify_batch_time.as_ms(), i64), stats
("recv_time", recv_time, i64), .verify_batches_pp_us_hist
); .increment(verify_batch_time.as_us() / (num_packets as u64))
.unwrap();
stats.batches_hist.increment(batch_len as u64).unwrap();
stats.packets_hist.increment(num_packets as u64).unwrap();
stats.total_batches += batch_len;
stats.total_packets += num_packets;
Ok(()) Ok(())
} }
@ -115,10 +202,18 @@ impl SigVerifyStage {
verifier: &T, verifier: &T,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let verifier = verifier.clone(); let verifier = verifier.clone();
let mut stats = SigVerifierStats::default();
let mut last_print = Instant::now();
Builder::new() Builder::new()
.name(format!("solana-verifier-{}", id)) .name(format!("solana-verifier-{}", id))
.spawn(move || loop { .spawn(move || loop {
if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, id, &verifier) { if let Err(e) = Self::verifier(
&packet_receiver,
&verified_sender,
id,
&verifier,
&mut stats,
) {
match e { match e {
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout( SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
RecvTimeoutError::Disconnected, RecvTimeoutError::Disconnected,
@ -132,6 +227,11 @@ impl SigVerifyStage {
_ => error!("{:?}", e), _ => error!("{:?}", e),
} }
} }
if last_print.elapsed().as_secs() > 2 {
stats.report();
stats = SigVerifierStats::default();
last_print = Instant::now();
}
}) })
.unwrap() .unwrap()
} }

View File

@ -6,7 +6,7 @@ use crate::{
recvmmsg::NUM_RCVMMSGS, recvmmsg::NUM_RCVMMSGS,
socket::SocketAddrSpace, socket::SocketAddrSpace,
}; };
use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::timing::timestamp;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender}; use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender};
@ -126,7 +126,10 @@ fn recv_send(
Ok(()) Ok(())
} }
pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec<Packets>, usize, u64)> { pub fn recv_batch(
recvr: &PacketReceiver,
max_batch: usize,
) -> Result<(Vec<Packets>, usize, Duration)> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?; let msgs = recvr.recv_timeout(timer)?;
let recv_start = Instant::now(); let recv_start = Instant::now();
@ -141,8 +144,9 @@ pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec<Packe
break; break;
} }
} }
let recv_duration = recv_start.elapsed();
trace!("batch len {}", batch.len()); trace!("batch len {}", batch.len());
Ok((batch, len, duration_as_ms(&recv_start.elapsed()))) Ok((batch, len, recv_duration))
} }
pub fn responder( pub fn responder(