Add retransmit packets_by_slot metrics (#9975)

This commit is contained in:
sakridge 2020-05-11 13:49:10 -07:00 committed by GitHub
parent 7e364d01c2
commit 903a8a3196
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 1 deletions

View File

@ -18,11 +18,13 @@ use solana_ledger::{
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_error; use solana_metrics::inc_new_counter_error;
use solana_perf::packet::Packets; use solana_perf::packet::Packets;
use solana_sdk::clock::Slot;
use solana_sdk::epoch_schedule::EpochSchedule; use solana_sdk::epoch_schedule::EpochSchedule;
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
use solana_streamer::streamer::PacketReceiver; use solana_streamer::streamer::PacketReceiver;
use std::{ use std::{
cmp, cmp,
collections::{BTreeMap, HashMap},
net::UdpSocket, net::UdpSocket,
sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::mpsc::channel, sync::mpsc::channel,
@ -47,8 +49,11 @@ struct RetransmitStats {
retransmit_total: AtomicU64, retransmit_total: AtomicU64,
last_ts: AtomicU64, last_ts: AtomicU64,
compute_turbine_peers_total: AtomicU64, compute_turbine_peers_total: AtomicU64,
packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
packets_by_source: Mutex<BTreeMap<String, usize>>,
} }
#[allow(clippy::too_many_arguments)]
fn update_retransmit_stats( fn update_retransmit_stats(
stats: &Arc<RetransmitStats>, stats: &Arc<RetransmitStats>,
total_time: u64, total_time: u64,
@ -58,6 +63,8 @@ fn update_retransmit_stats(
repair_total: u64, repair_total: u64,
compute_turbine_peers_total: u64, compute_turbine_peers_total: u64,
peers_len: usize, peers_len: usize,
packets_by_slot: HashMap<Slot, usize>,
packets_by_source: HashMap<String, usize>,
) { ) {
stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats.total_time.fetch_add(total_time, Ordering::Relaxed);
stats stats
@ -76,6 +83,18 @@ fn update_retransmit_stats(
.compute_turbine_peers_total .compute_turbine_peers_total
.fetch_add(compute_turbine_peers_total, Ordering::Relaxed); .fetch_add(compute_turbine_peers_total, Ordering::Relaxed);
stats.total_batches.fetch_add(1, Ordering::Relaxed); stats.total_batches.fetch_add(1, Ordering::Relaxed);
{
let mut stats_packets_by_slot = stats.packets_by_slot.lock().unwrap();
for (slot, count) in packets_by_slot {
*stats_packets_by_slot.entry(slot).or_insert(0) += count;
}
}
{
let mut stats_packets_by_source = stats.packets_by_source.lock().unwrap();
for (source, count) in packets_by_source {
*stats_packets_by_source.entry(source).or_insert(0) += count;
}
}
let now = timestamp(); let now = timestamp();
let last = stats.last_ts.load(Ordering::Relaxed); let last = stats.last_ts.load(Ordering::Relaxed);
@ -119,6 +138,13 @@ fn update_retransmit_stats(
i64 i64
), ),
); );
let mut packets_by_slot = stats.packets_by_slot.lock().unwrap();
info!("retransmit: packets_by_slot: {:#?}", packets_by_slot);
packets_by_slot.clear();
drop(packets_by_slot);
let mut packets_by_source = stats.packets_by_source.lock().unwrap();
info!("retransmit: packets_by_source: {:#?}", packets_by_source);
packets_by_source.clear();
} }
} }
@ -157,6 +183,8 @@ fn retransmit(
let mut repair_total = 0; let mut repair_total = 0;
let mut retransmit_total = 0; let mut retransmit_total = 0;
let mut compute_turbine_peers_total = 0; let mut compute_turbine_peers_total = 0;
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
let mut packets_by_source: HashMap<String, usize> = HashMap::new();
for mut packets in packet_v { for mut packets in packet_v {
for packet in packets.packets.iter_mut() { for packet in packets.packets.iter_mut() {
// skip discarded packets and repair packets // skip discarded packets and repair packets
@ -193,6 +221,11 @@ fn retransmit(
compute_turbine_peers.stop(); compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us(); compute_turbine_peers_total += compute_turbine_peers.as_us();
*packets_by_slot.entry(packet.meta.slot).or_insert(0) += 1;
*packets_by_source
.entry(packet.meta.addr().to_string())
.or_insert(0) += 1;
let leader = let leader =
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
let mut retransmit_time = Measure::start("retransmit_to"); let mut retransmit_time = Measure::start("retransmit_to");
@ -223,6 +256,8 @@ fn retransmit(
repair_total, repair_total,
compute_turbine_peers_total, compute_turbine_peers_total,
peers_len, peers_len,
packets_by_slot,
packets_by_source,
); );
Ok(()) Ok(())

View File

@ -946,7 +946,7 @@ impl Blockstore {
// Should be safe to modify index_meta here. Two cases // Should be safe to modify index_meta here. Two cases
// 1) Recovery happens: Then all inserted erasure metas are removed // 1) Recovery happens: Then all inserted erasure metas are removed
// from just_received_coding_shreds, and nothing wll be committed by // from just_received_coding_shreds, and nothing will be committed by
// `check_insert_coding_shred`, so the coding index meta will not be // `check_insert_coding_shred`, so the coding index meta will not be
// committed // committed
index_meta.coding_mut().set_present(shred_index, true); index_meta.coding_mut().set_present(shred_index, true);