diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index 422d891489..13fe469425 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -14,7 +14,7 @@ use solana_core::{ contact_info::ContactInfo, gossip_service::GossipService, repair_service, - repair_service::{RepairService, RepairSlotRange, RepairStrategy}, + repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy}, serve_repair::ServeRepair, shred_fetch_stage::ShredFetchStage, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, @@ -844,13 +844,14 @@ impl Archiver { repair_service::MAX_REPAIR_LENGTH, &repair_slot_range, ); + let mut repair_stats = RepairStats::default(); //iter over the repairs and send them if let Ok(repairs) = repairs { let reqs: Vec<_> = repairs .into_iter() .filter_map(|repair_request| { serve_repair - .map_repair_request(&repair_request) + .map_repair_request(&repair_request, &mut repair_stats) .map(|result| ((archiver_info.gossip, result), repair_request)) .ok() }) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index e94bb63453..9350e0e88a 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -20,9 +20,31 @@ use std::{ sync::{Arc, RwLock}, thread::sleep, thread::{self, Builder, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }; +#[derive(Default)] +pub struct RepairStatsGroup { + pub count: u64, + pub min: u64, + pub max: u64, +} + +impl RepairStatsGroup { + pub fn update(&mut self, slot: u64) { + self.count += 1; + self.min = std::cmp::min(self.min, slot); + self.max = std::cmp::max(self.max, slot); + } +} + +#[derive(Default)] +pub struct RepairStats { + pub shred: RepairStatsGroup, + pub highest_shred: RepairStatsGroup, + pub orphan: RepairStatsGroup, +} + pub const MAX_REPAIR_LENGTH: usize = 512; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; @@ -93,6 +115,8 @@ impl RepairService { if let RepairStrategy::RepairAll { .. } = repair_strategy { Self::initialize_lowest_slot(id, blockstore, cluster_info); } + let mut repair_stats = RepairStats::default(); + let mut last_stats = Instant::now(); loop { if exit.load(Ordering::Relaxed) { break; @@ -137,7 +161,12 @@ impl RepairService { .into_iter() .filter_map(|repair_request| { serve_repair - .repair_request(&cluster_slots, &repair_request, &mut cache) + .repair_request( + &cluster_slots, + &repair_request, + &mut cache, + &mut repair_stats, + ) .map(|result| (result, repair_request)) .ok() }) @@ -150,6 +179,24 @@ impl RepairService { }); } } + if last_stats.elapsed().as_secs() > 1 { + let repair_total = repair_stats.shred.count + + repair_stats.highest_shred.count + + repair_stats.orphan.count; + if repair_total > 0 { + datapoint_info!( + "serve_repair-repair", + ("repair-total", repair_total, i64), + ("shred-count", repair_stats.shred.count, i64), + ("highest-shred-count", repair_stats.highest_shred.count, i64), + ("orphan-count", repair_stats.orphan.count, i64), + ("repair-highest-slot", repair_stats.highest_shred.max, i64), + ("repair-orphan", repair_stats.orphan.max, i64), + ); + } + repair_stats = RepairStats::default(); + last_stats = Instant::now(); + } sleep(Duration::from_millis(REPAIR_MS)); } } diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 7f7b7fb268..bc887cf724 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -2,6 +2,7 @@ use crate::{ cluster_info::{ClusterInfo, ClusterInfoError}, cluster_slots::ClusterSlots, contact_info::ContactInfo, + repair_service::RepairStats, result::{Error, Result}, weighted_shuffle::weighted_best, }; @@ -46,6 +47,16 @@ impl RepairType { } } +#[derive(Default)] +pub struct ServeRepairStats { + pub total_packets: usize, + pub processed: usize, + pub self_repair: usize, + pub window_index: usize, + pub highest_window_index: usize, + pub orphan: usize, +} + /// Window protocol messages #[derive(Serialize, Deserialize, Debug)] enum RepairProtocol { @@ -106,6 +117,7 @@ impl ServeRepair { from_addr: &SocketAddr, blockstore: Option<&Arc>, request: RepairProtocol, + stats: &mut ServeRepairStats, ) -> Option { let now = Instant::now(); @@ -113,18 +125,14 @@ impl ServeRepair { let my_id = me.read().unwrap().keypair.pubkey(); let from = Self::get_repair_sender(&request); if from.id == my_id { - warn!( - "{}: Ignored received repair request from ME {}", - my_id, from.id, - ); - inc_new_counter_debug!("serve_repair-handle-repair--eq", 1); + stats.self_repair += 1; return None; } let (res, label) = { match &request { RepairProtocol::WindowIndex(from, slot, shred_index) => { - inc_new_counter_debug!("serve_repair-request-window-index", 1); + stats.window_index += 1; ( Self::run_window_request( recycler, @@ -140,7 +148,7 @@ impl ServeRepair { } RepairProtocol::HighestWindowIndex(_, slot, highest_index) => { - inc_new_counter_debug!("serve_repair-request-highest-window-index", 1); + stats.highest_window_index += 1; ( Self::run_highest_window_request( recycler, @@ -153,7 +161,7 @@ impl ServeRepair { ) } RepairProtocol::Orphan(_, slot) => { - inc_new_counter_debug!("serve_repair-request-orphan", 1); + stats.orphan += 1; ( Self::run_orphan( recycler, @@ -187,15 +195,42 @@ impl ServeRepair { blockstore: Option<&Arc>, requests_receiver: &PacketReceiver, response_sender: &PacketSender, + stats: &mut ServeRepairStats, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); let reqs = requests_receiver.recv_timeout(timeout)?; + stats.total_packets += reqs.packets.len(); - Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender); + Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats); Ok(()) } + fn report_reset_stats(me: &Arc>, stats: &mut ServeRepairStats) { + if stats.self_repair > 0 { + let my_id = me.read().unwrap().keypair.pubkey(); + warn!( + "{}: Ignored received repair requests from ME: {}", + my_id, stats.self_repair, + ); + inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair); + } + + debug!( + "repair_listener: total_packets: {} passed: {}", + stats.total_packets, stats.processed + ); + + inc_new_counter_debug!("serve_repair-request-window-index", stats.window_index); + inc_new_counter_debug!( + "serve_repair-request-highest-window-index", + stats.highest_window_index + ); + inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan); + + *stats = ServeRepairStats::default(); + } + pub fn listen( me: Arc>, blockstore: Option>, @@ -207,22 +242,31 @@ impl ServeRepair { let recycler = PacketsRecycler::default(); Builder::new() .name("solana-repair-listen".to_string()) - .spawn(move || loop { - let result = Self::run_listen( - &me, - &recycler, - blockstore.as_ref(), - &requests_receiver, - &response_sender, - ); - match result { - Err(Error::RecvTimeoutError(_)) | Ok(_) => {} - Err(err) => info!("repair listener error: {:?}", err), - }; - if exit.load(Ordering::Relaxed) { - return; + .spawn(move || { + let mut last_print = Instant::now(); + let mut stats = ServeRepairStats::default(); + loop { + let result = Self::run_listen( + &me, + &recycler, + blockstore.as_ref(), + &requests_receiver, + &response_sender, + &mut stats, + ); + match result { + Err(Error::RecvTimeoutError(_)) | Ok(_) => {} + Err(err) => info!("repair listener error: {:?}", err), + }; + if exit.load(Ordering::Relaxed) { + return; + } + if last_print.elapsed().as_secs() > 2 { + Self::report_reset_stats(&me, &mut stats); + last_print = Instant::now(); + } + thread_mem_usage::datapoint("solana-repair-listen"); } - thread_mem_usage::datapoint("solana-repair-listen"); }) .unwrap() } @@ -233,6 +277,7 @@ impl ServeRepair { blockstore: Option<&Arc>, packets: Packets, response_sender: &PacketSender, + stats: &mut ServeRepairStats, ) { // iter over the packets, collect pulls separately and process everything else let allocated = thread_mem_usage::Allocatedp::default(); @@ -242,7 +287,9 @@ impl ServeRepair { limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() .for_each(|request| { - let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request); + stats.processed += 1; + let rsp = + Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats); if let Some(rsp) = rsp { let _ignore_disconnect = response_sender.send(rsp); } @@ -277,6 +324,7 @@ impl ServeRepair { cluster_slots: &ClusterSlots, repair_request: &RepairType, cache: &mut RepairCache, + repair_stats: &mut RepairStats, ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location @@ -295,30 +343,26 @@ impl ServeRepair { let (repair_peers, weights) = cache.get(&repair_request.slot()).unwrap(); let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port - let out = self.map_repair_request(repair_request)?; + let out = self.map_repair_request(repair_request, repair_stats)?; Ok((addr, out)) } - pub fn map_repair_request(&self, repair_request: &RepairType) -> Result> { + pub fn map_repair_request( + &self, + repair_request: &RepairType, + repair_stats: &mut RepairStats, + ) -> Result> { match repair_request { RepairType::Shred(slot, shred_index) => { - datapoint_debug!( - "serve_repair-repair", - ("repair-slot", *slot, i64), - ("repair-ix", *shred_index, i64) - ); + repair_stats.shred.update(*slot); Ok(self.window_index_request_bytes(*slot, *shred_index)?) } RepairType::HighestShred(slot, shred_index) => { - datapoint_info!( - "serve_repair-repair_highest", - ("repair-highest-slot", *slot, i64), - ("repair-highest-ix", *shred_index, i64) - ); + repair_stats.highest_shred.update(*slot); Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?) } RepairType::Orphan(slot) => { - datapoint_info!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64)); + repair_stats.orphan.update(*slot); Ok(self.orphan_bytes(*slot)?) } } @@ -583,6 +627,7 @@ mod tests { &cluster_slots, &RepairType::Shred(0, 0), &mut HashMap::new(), + &mut RepairStats::default(), ); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); @@ -608,6 +653,7 @@ mod tests { &cluster_slots, &RepairType::Shred(0, 0), &mut HashMap::new(), + &mut RepairStats::default(), ) .unwrap(); assert_eq!(nxt.serve_repair, serve_repair_addr); @@ -639,6 +685,7 @@ mod tests { &cluster_slots, &RepairType::Shred(0, 0), &mut HashMap::new(), + &mut RepairStats::default(), ) .unwrap(); if rv.0 == serve_repair_addr { diff --git a/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json b/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json index 90517e0ac5..901bc429fa 100644 --- a/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json +++ b/metrics/scripts/grafana-provisioning/dashboards/cluster-monitor.json @@ -6928,7 +6928,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "C", "resultFormat": "time_series", @@ -6965,7 +6965,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "A", "resultFormat": "time_series", @@ -7245,7 +7245,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_orphan\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "C", "resultFormat": "time_series", @@ -10270,4 +10270,4 @@ "title": "Cluster Telemetry (edge)", "uid": "monitor-edge", "version": 2 -} \ No newline at end of file +}