removes packet-count metrics from retransmit stage

Working towards sending shreds (instead of packets) to retransmit stage
so that shreds recovered from erasure codes are as well retransmitted.

Following commit will add these metrics back to window-service, earlier
in the pipeline.
This commit is contained in:
behzad nouri 2021-08-12 20:14:23 -04:00
parent 563aec0b4d
commit bf437b0336
1 changed files with 1 additions and 86 deletions

View File

@ -35,10 +35,7 @@ use {
}, },
solana_streamer::streamer::PacketReceiver, solana_streamer::streamer::PacketReceiver,
std::{ std::{
collections::{ collections::{BTreeSet, HashSet},
hash_set::HashSet,
{BTreeMap, BTreeSet, HashMap},
},
net::UdpSocket, net::UdpSocket,
ops::DerefMut, ops::DerefMut,
sync::{ sync::{
@ -68,14 +65,10 @@ struct RetransmitStats {
total_time: AtomicU64, total_time: AtomicU64,
epoch_fetch: AtomicU64, epoch_fetch: AtomicU64,
epoch_cache_update: AtomicU64, epoch_cache_update: AtomicU64,
repair_total: AtomicU64,
discard_total: AtomicU64,
retransmit_total: AtomicU64, retransmit_total: AtomicU64,
last_ts: AtomicInterval, last_ts: AtomicInterval,
compute_turbine_peers_total: AtomicU64, compute_turbine_peers_total: AtomicU64,
retransmit_tree_mismatch: AtomicU64, retransmit_tree_mismatch: AtomicU64,
packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
packets_by_source: Mutex<BTreeMap<String, usize>>,
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -84,12 +77,8 @@ fn update_retransmit_stats(
total_time: u64, total_time: u64,
total_packets: usize, total_packets: usize,
retransmit_total: u64, retransmit_total: u64,
discard_total: u64,
repair_total: u64,
compute_turbine_peers_total: u64, compute_turbine_peers_total: u64,
peers_len: usize, peers_len: usize,
packets_by_slot: HashMap<Slot, usize>,
packets_by_source: HashMap<String, usize>,
epoch_fetch: u64, epoch_fetch: u64,
epoch_cach_update: u64, epoch_cach_update: u64,
retransmit_tree_mismatch: u64, retransmit_tree_mismatch: u64,
@ -101,12 +90,6 @@ fn update_retransmit_stats(
stats stats
.retransmit_total .retransmit_total
.fetch_add(retransmit_total, Ordering::Relaxed); .fetch_add(retransmit_total, Ordering::Relaxed);
stats
.repair_total
.fetch_add(repair_total, Ordering::Relaxed);
stats
.discard_total
.fetch_add(discard_total, Ordering::Relaxed);
stats stats
.compute_turbine_peers_total .compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed); .fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
@ -118,19 +101,6 @@ fn update_retransmit_stats(
stats stats
.retransmit_tree_mismatch .retransmit_tree_mismatch
.fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed);
{
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
for (slot, count) in packets_by_slot {
*stats_packets_by_slot.entry(slot).or_insert(0) += count;
}
}
{
let mut stats_packets_by_source = stats.packets_by_source.lock().unwrap();
for (source, count) in packets_by_source {
*stats_packets_by_source.entry(source).or_insert(0) += count;
}
}
if stats.last_ts.should_update(2000) { if stats.last_ts.should_update(2000) {
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
datapoint_info!( datapoint_info!(
@ -175,47 +145,7 @@ fn update_retransmit_stats(
stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"repair_total",
stats.repair_total.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"discard_total",
stats.discard_total.swap(0, Ordering::Relaxed) as i64,
i64
),
); );
let mut packets_by_slot = stats.packets_by_slot.lock().unwrap();
let old_packets_by_slot = std::mem::take(&mut *packets_by_slot);
drop(packets_by_slot);
for (slot, num_shreds) in old_packets_by_slot {
datapoint_info!(
"retransmit-slot-num-packets",
("slot", slot, i64),
("num_shreds", num_shreds, i64)
);
}
let mut packets_by_source = stats.packets_by_source.lock().unwrap();
let mut top = BTreeMap::new();
let mut max = 0;
for (source, num) in packets_by_source.iter() {
if *num > max {
top.insert(*num, source.clone());
if top.len() > 5 {
let last = *top.iter().next().unwrap().0;
top.remove(&last);
}
max = *top.iter().next().unwrap().0;
}
}
info!(
"retransmit: top packets_by_source: {:?} len: {}",
top,
packets_by_source.len()
);
packets_by_source.clear();
} }
} }
@ -341,24 +271,18 @@ fn retransmit(
let my_id = cluster_info.id(); let my_id = cluster_info.id();
let socket_addr_space = cluster_info.socket_addr_space(); let socket_addr_space = cluster_info.socket_addr_space();
let mut discard_total = 0;
let mut repair_total = 0;
let mut retransmit_total = 0; let mut retransmit_total = 0;
let mut compute_turbine_peers_total = 0; let mut compute_turbine_peers_total = 0;
let mut retransmit_tree_mismatch = 0; let mut retransmit_tree_mismatch = 0;
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
let mut packets_by_source: HashMap<String, usize> = HashMap::new();
let mut max_slot = 0; let mut max_slot = 0;
for packet in packets.iter().flat_map(|p| p.packets.iter()) { for packet in packets.iter().flat_map(|p| p.packets.iter()) {
// skip discarded packets and repair packets // skip discarded packets and repair packets
if packet.meta.discard { if packet.meta.discard {
total_packets -= 1; total_packets -= 1;
discard_total += 1;
continue; continue;
} }
if packet.meta.repair { if packet.meta.repair {
total_packets -= 1; total_packets -= 1;
repair_total += 1;
continue; continue;
} }
let shred_slot = match check_if_already_received(packet, shreds_received) { let shred_slot = match check_if_already_received(packet, shreds_received) {
@ -394,11 +318,6 @@ fn retransmit(
compute_turbine_peers.stop(); compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us(); compute_turbine_peers_total += compute_turbine_peers.as_us();
*packets_by_slot.entry(packet.meta.slot).or_default() += 1;
*packets_by_source
.entry(packet.meta.addr().to_string())
.or_default() += 1;
let mut retransmit_time = Measure::start("retransmit_to"); let mut retransmit_time = Measure::start("retransmit_to");
// If the node is on the critical path (i.e. the first node in each // If the node is on the critical path (i.e. the first node in each
// neighborhood), it should send the packet to tvu socket of its // neighborhood), it should send the packet to tvu socket of its
@ -440,12 +359,8 @@ fn retransmit(
timer_start.as_us(), timer_start.as_us(),
total_packets, total_packets,
retransmit_total, retransmit_total,
discard_total,
repair_total,
compute_turbine_peers_total, compute_turbine_peers_total,
cluster_nodes.num_peers(), cluster_nodes.num_peers(),
packets_by_slot,
packets_by_source,
epoch_fetch.as_us(), epoch_fetch.as_us(),
epoch_cache_update.as_us(), epoch_cache_update.as_us(),
retransmit_tree_mismatch, retransmit_tree_mismatch,