Retransmit and shred fetch metrics (#9965)

* Retransmit stats

* Shred fetch stats
This commit is contained in:
sakridge 2020-05-10 21:37:05 -07:00 committed by GitHub
parent 9c0b80ea1b
commit 1a47b1cd86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 162 additions and 33 deletions

View File

@ -19,11 +19,12 @@ use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_error;
use solana_perf::packet::Packets;
use solana_sdk::epoch_schedule::EpochSchedule;
use solana_sdk::timing::timestamp;
use solana_streamer::streamer::PacketReceiver;
use std::{
cmp,
net::UdpSocket,
sync::atomic::{AtomicBool, Ordering},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::mpsc::channel,
sync::mpsc::RecvTimeoutError,
sync::Mutex,
@ -36,6 +37,91 @@ use std::{
// it doesn't pull up too much work.
const MAX_PACKET_BATCH_SIZE: usize = 100;
#[derive(Default)]
struct RetransmitStats {
total_packets: AtomicU64,
total_batches: AtomicU64,
total_time: AtomicU64,
repair_total: AtomicU64,
discard_total: AtomicU64,
retransmit_total: AtomicU64,
last_ts: AtomicU64,
compute_turbine_peers_total: AtomicU64,
}
fn update_retransmit_stats(
stats: &Arc<RetransmitStats>,
total_time: u64,
total_packets: usize,
retransmit_total: u64,
discard_total: u64,
repair_total: u64,
compute_turbine_peers_total: u64,
peers_len: usize,
) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed);
stats
.total_packets
.fetch_add(total_packets as u64, Ordering::Relaxed);
stats
.retransmit_total
.fetch_add(retransmit_total, Ordering::Relaxed);
stats
.repair_total
.fetch_add(repair_total, Ordering::Relaxed);
stats
.discard_total
.fetch_add(discard_total, Ordering::Relaxed);
stats
.compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
stats.total_batches.fetch_add(1, Ordering::Relaxed);
let now = timestamp();
let last = stats.last_ts.load(Ordering::Relaxed);
if now - last > 2000 && stats.last_ts.compare_and_swap(last, now, Ordering::Relaxed) == last {
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
datapoint_info!(
"retransmit-stage",
(
"total_time",
stats.total_time.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_batches",
stats.total_batches.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_packets",
stats.total_packets.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"retransmit_total",
stats.retransmit_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"compute_turbine",
stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"repair_total",
stats.repair_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"discard_total",
stats.discard_total.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
fn retransmit(
bank_forks: &Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
@ -43,6 +129,7 @@ fn retransmit(
r: &Arc<Mutex<PacketReceiver>>,
sock: &UdpSocket,
id: u32,
stats: &Arc<RetransmitStats>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let r_lock = r.lock().unwrap();
@ -104,7 +191,7 @@ fn retransmit(
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_ms();
compute_turbine_peers_total += compute_turbine_peers.as_us();
let leader =
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
@ -116,7 +203,7 @@ fn retransmit(
ClusterInfo::retransmit_to(&children, packet, leader, sock, true)?;
}
retransmit_time.stop();
retransmit_total += retransmit_time.as_ms();
retransmit_total += retransmit_time.as_us();
}
}
timer_start.stop();
@ -127,16 +214,17 @@ fn retransmit(
retransmit_total,
id,
);
datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64));
datapoint_debug!(
"retransmit-stage",
("total_time", timer_start.as_ms() as i64, i64),
("total_packets", total_packets as i64, i64),
("retransmit_total", retransmit_total as i64, i64),
("compute_turbine", compute_turbine_peers_total as i64, i64),
("repair_total", i64::from(repair_total), i64),
("discard_total", i64::from(discard_total), i64),
update_retransmit_stats(
stats,
timer_start.as_us(),
total_packets,
retransmit_total,
discard_total,
repair_total,
compute_turbine_peers_total,
peers_len,
);
Ok(())
}
@ -155,6 +243,7 @@ pub fn retransmitter(
cluster_info: Arc<ClusterInfo>,
r: Arc<Mutex<PacketReceiver>>,
) -> Vec<JoinHandle<()>> {
let stats = Arc::new(RetransmitStats::default());
(0..sockets.len())
.map(|s| {
let sockets = sockets.clone();
@ -162,6 +251,7 @@ pub fn retransmitter(
let leader_schedule_cache = leader_schedule_cache.clone();
let r = r.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
Builder::new()
.name("solana-retransmitter".to_string())
@ -175,6 +265,7 @@ pub fn retransmitter(
&r,
&sockets[s],
s as u32,
&stats,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,

View File

@ -22,12 +22,23 @@ use std::time::Instant;
pub type ShredsReceived = HashMap<Slot, BitVec<u64>>;
#[derive(Default)]
struct ShredFetchStats {
index_overrun: usize,
shred_count: usize,
index_bad_deserialize: usize,
index_out_of_bounds: usize,
slot_bad_deserialize: usize,
duplicate_shred: usize,
slot_out_of_range: usize,
}
pub struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}
impl ShredFetchStage {
fn get_slot_index(p: &Packet, index_overrun: &mut usize) -> Option<(u64, u32)> {
fn get_slot_index(p: &Packet, stats: &mut ShredFetchStats) -> Option<(u64, u32)> {
let index_start = OFFSET_OF_SHRED_INDEX;
let index_end = index_start + SIZE_OF_SHRED_INDEX;
let slot_start = OFFSET_OF_SHRED_SLOT;
@ -38,11 +49,17 @@ impl ShredFetchStage {
if index < MAX_DATA_SHREDS_PER_SLOT as u32 && slot_end <= p.meta.size {
if let Ok(slot) = limited_deserialize::<Slot>(&p.data[slot_start..slot_end]) {
return Some((slot, index));
} else {
stats.slot_bad_deserialize += 1;
}
} else {
stats.index_out_of_bounds += 1;
}
} else {
stats.index_bad_deserialize += 1;
}
} else {
*index_overrun += 1;
stats.index_overrun += 1;
}
None
}
@ -50,7 +67,7 @@ impl ShredFetchStage {
fn process_packet<F>(
p: &mut Packet,
shreds_received: &mut ShredsReceived,
index_overrun: &mut usize,
stats: &mut ShredFetchStats,
last_root: Slot,
last_slot: Slot,
slots_per_epoch: u64,
@ -59,7 +76,7 @@ impl ShredFetchStage {
F: Fn(&mut Packet),
{
p.meta.discard = true;
if let Some((slot, index)) = Self::get_slot_index(p, index_overrun) {
if let Some((slot, index)) = Self::get_slot_index(p, stats) {
// Seems reasonable to limit shreds to 2 epochs away
if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) {
// Shred filter
@ -70,7 +87,11 @@ impl ShredFetchStage {
p.meta.discard = false;
modify(p);
slot_received.set(index.into(), true);
} else {
stats.duplicate_shred += 1;
}
} else {
stats.slot_out_of_range += 1;
}
}
}
@ -80,6 +101,7 @@ impl ShredFetchStage {
recvr: PacketReceiver,
sendr: PacketSender,
bank_forks: Option<Arc<RwLock<BankForks>>>,
name: &'static str,
modify: F,
) where
F: Fn(&mut Packet),
@ -92,6 +114,9 @@ impl ShredFetchStage {
let mut last_slot = std::u64::MAX;
let mut slots_per_epoch = 0;
let mut last_stats = Instant::now();
let mut stats = ShredFetchStats::default();
while let Some(mut p) = recvr.iter().next() {
if last_cleared.elapsed().as_millis() > 200 {
shreds_received.clear();
@ -105,22 +130,32 @@ impl ShredFetchStage {
slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch());
}
}
let mut index_overrun = 0;
let mut shred_count = 0;
stats.shred_count += p.packets.len();
p.packets.iter_mut().for_each(|mut packet| {
shred_count += 1;
Self::process_packet(
&mut packet,
&mut shreds_received,
&mut index_overrun,
&mut stats,
last_root,
last_slot,
slots_per_epoch,
&modify,
);
});
inc_new_counter_warn!("shred_fetch_stage-shred_index_overrun", index_overrun);
inc_new_counter_info!("shred_fetch_stage-shred_count", shred_count);
if last_stats.elapsed().as_millis() > 1000 {
datapoint_info!(
name,
("index_overrun", stats.index_overrun, i64),
("shred_count", stats.shred_count, i64),
("slot_bad_deserialize", stats.slot_bad_deserialize, i64),
("index_bad_deserialize", stats.index_bad_deserialize, i64),
("index_out_of_bounds", stats.index_out_of_bounds, i64),
("slot_out_of_range", stats.slot_out_of_range, i64),
("duplicate_shred", stats.duplicate_shred, i64),
);
stats = ShredFetchStats::default();
last_stats = Instant::now();
}
if sendr.send(p).is_err() {
break;
}
@ -133,6 +168,7 @@ impl ShredFetchStage {
sender: PacketSender,
recycler: Recycler<PinnedVec<Packet>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
name: &'static str,
modify: F,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>)
where
@ -154,7 +190,7 @@ impl ShredFetchStage {
let modifier_hdl = Builder::new()
.name("solana-tvu-fetch-stage-packet-modifier".to_string())
.spawn(move || Self::modify_packets(packet_receiver, sender, bank_forks, modify))
.spawn(move || Self::modify_packets(packet_receiver, sender, bank_forks, name, modify))
.unwrap();
(streamers, modifier_hdl)
}
@ -185,6 +221,7 @@ impl ShredFetchStage {
sender.clone(),
recycler.clone(),
bank_forks.clone(),
"shred_fetch_tvu_forwards",
|p| p.meta.forward = true,
);
@ -194,6 +231,7 @@ impl ShredFetchStage {
sender.clone(),
recycler.clone(),
bank_forks,
"shred_fetch_repair",
|p| p.meta.repair = true,
);
@ -225,7 +263,7 @@ mod tests {
solana_logger::setup();
let mut shreds_received = ShredsReceived::default();
let mut packet = Packet::default();
let mut index_overrun = 0;
let mut stats = ShredFetchStats::default();
let last_root = 0;
let last_slot = 100;
let slots_per_epoch = 10;
@ -233,13 +271,13 @@ mod tests {
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut index_overrun,
&mut stats,
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
);
assert_eq!(index_overrun, 1);
assert_eq!(stats.index_overrun, 1);
assert!(packet.meta.discard);
let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0);
shred.copy_to_packet(&mut packet);
@ -248,7 +286,7 @@ mod tests {
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut index_overrun,
&mut stats,
3,
last_slot,
slots_per_epoch,
@ -260,7 +298,7 @@ mod tests {
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut index_overrun,
&mut stats,
last_root,
last_slot,
slots_per_epoch,
@ -272,7 +310,7 @@ mod tests {
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut index_overrun,
&mut stats,
last_root,
last_slot,
slots_per_epoch,
@ -287,7 +325,7 @@ mod tests {
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut index_overrun,
&mut stats,
last_root,
last_slot,
slots_per_epoch,
@ -301,7 +339,7 @@ mod tests {
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut index_overrun,
&mut stats,
last_root,
last_slot,
slots_per_epoch,
@ -315,10 +353,10 @@ mod tests {
let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0);
let mut packet = Packet::default();
shred.copy_to_packet(&mut packet);
let mut index_overrun = 0;
let mut stats = ShredFetchStats::default();
assert_eq!(
Some((1, 3)),
ShredFetchStage::get_slot_index(&packet, &mut index_overrun)
ShredFetchStage::get_slot_index(&packet, &mut stats)
);
}
}