diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 07b58d5c81..a1a7e22312 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -18,11 +18,13 @@ use solana_ledger::{ use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_error; use solana_perf::packet::Packets; +use solana_sdk::clock::Slot; use solana_sdk::epoch_schedule::EpochSchedule; use solana_sdk::timing::timestamp; use solana_streamer::streamer::PacketReceiver; use std::{ cmp, + collections::{BTreeMap, HashMap}, net::UdpSocket, sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::mpsc::channel, @@ -47,8 +49,11 @@ struct RetransmitStats { retransmit_total: AtomicU64, last_ts: AtomicU64, compute_turbine_peers_total: AtomicU64, + packets_by_slot: Mutex>, + packets_by_source: Mutex>, } +#[allow(clippy::too_many_arguments)] fn update_retransmit_stats( stats: &Arc, total_time: u64, @@ -58,6 +63,8 @@ fn update_retransmit_stats( repair_total: u64, compute_turbine_peers_total: u64, peers_len: usize, + packets_by_slot: HashMap, + packets_by_source: HashMap, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats @@ -76,6 +83,18 @@ fn update_retransmit_stats( .compute_turbine_peers_total .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); stats.total_batches.fetch_add(1, Ordering::Relaxed); + { + let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap(); + for (slot, count) in packets_by_slot { + *stats_packets_by_slot.entry(slot).or_insert(0) += count; + } + } + { + let mut stats_packets_by_source = stats.packets_by_source.lock().unwrap(); + for (source, count) in packets_by_source { + *stats_packets_by_source.entry(source).or_insert(0) += count; + } + } let now = timestamp(); let last = stats.last_ts.load(Ordering::Relaxed); @@ -119,6 +138,13 @@ fn update_retransmit_stats( i64 ), ); + let mut packets_by_slot = stats.packets_by_slot.lock().unwrap(); + info!("retransmit: packets_by_slot: {:#?}", packets_by_slot); + packets_by_slot.clear(); + drop(packets_by_slot); + let mut packets_by_source = stats.packets_by_source.lock().unwrap(); + info!("retransmit: packets_by_source: {:#?}", packets_by_source); + packets_by_source.clear(); } } @@ -157,6 +183,8 @@ fn retransmit( let mut repair_total = 0; let mut retransmit_total = 0; let mut compute_turbine_peers_total = 0; + let mut packets_by_slot: HashMap = HashMap::new(); + let mut packets_by_source: HashMap = HashMap::new(); for mut packets in packet_v { for packet in packets.packets.iter_mut() { // skip discarded packets and repair packets @@ -193,6 +221,11 @@ fn retransmit( compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); + *packets_by_slot.entry(packet.meta.slot).or_insert(0) += 1; + *packets_by_source + .entry(packet.meta.addr().to_string()) + .or_insert(0) += 1; + let leader = leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); let mut retransmit_time = Measure::start("retransmit_to"); @@ -223,6 +256,8 @@ fn retransmit( repair_total, compute_turbine_peers_total, peers_len, + packets_by_slot, + packets_by_source, ); Ok(()) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 59200168b4..a7b98eb0b8 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -946,7 +946,7 @@ impl Blockstore { // Should be safe to modify index_meta here. Two cases // 1) Recovery happens: Then all inserted erasure metas are removed - // from just_received_coding_shreds, and nothing wll be committed by + // from just_received_coding_shreds, and nothing will be committed by // `check_insert_coding_shred`, so the coding index meta will not be // committed index_meta.coding_mut().set_present(shred_index, true);