diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index b3041d6bb8..06024ed3bf 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -22,6 +22,7 @@ use solana_runtime::{bank_forks::BankForks, contains::Contains}; use solana_sdk::{ clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, timing::timestamp, }; +use solana_streamer::sendmmsg::{batch_send, SendPktsError}; use std::{ collections::{HashMap, HashSet}, iter::Iterator, @@ -95,6 +96,8 @@ pub struct RepairTiming { pub get_best_orphans_elapsed: u64, pub get_best_shreds_elapsed: u64, pub send_repairs_elapsed: u64, + pub build_repairs_batch_elapsed: u64, + pub batch_send_repairs_elapsed: u64, } impl RepairTiming { @@ -103,12 +106,15 @@ impl RepairTiming { set_root_elapsed: u64, get_votes_elapsed: u64, add_votes_elapsed: u64, - send_repairs_elapsed: u64, + build_repairs_batch_elapsed: u64, + batch_send_repairs_elapsed: u64, ) { self.set_root_elapsed += set_root_elapsed; self.get_votes_elapsed += get_votes_elapsed; self.add_votes_elapsed += add_votes_elapsed; - self.send_repairs_elapsed += send_repairs_elapsed; + self.build_repairs_batch_elapsed += build_repairs_batch_elapsed; + self.batch_send_repairs_elapsed += batch_send_repairs_elapsed; + self.send_repairs_elapsed += build_repairs_batch_elapsed + batch_send_repairs_elapsed; } } @@ -262,29 +268,51 @@ impl RepairService { ) }; - let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed"); - let mut outstanding_requests = outstanding_requests.write().unwrap(); - repairs.into_iter().for_each(|repair_request| { - if let Ok((to, req)) = serve_repair.repair_request( - &repair_info.cluster_slots, - repair_request, - &mut peers_cache, - &mut repair_stats, - &repair_info.repair_validators, - &mut outstanding_requests, - ) { - repair_socket.send_to(&req, to).unwrap_or_else(|e| { - info!("{} repair req send_to({}) error {:?}", id, to, e); - 0 - }); + let mut build_repairs_batch_elapsed = Measure::start("build_repairs_batch_elapsed"); + let batch: Vec<(Vec, SocketAddr)> = { + let mut outstanding_requests = outstanding_requests.write().unwrap(); + repairs + .iter() + .filter_map(|repair_request| { + let (to, req) = serve_repair + .repair_request( + &repair_info.cluster_slots, + *repair_request, + &mut peers_cache, + &mut repair_stats, + &repair_info.repair_validators, + &mut outstanding_requests, + ) + .ok()?; + Some((req, to)) + }) + .collect() + }; + let batch: Vec<(&[u8], &SocketAddr)> = batch.iter().map(|(v, s)| (&v[..], s)).collect(); + build_repairs_batch_elapsed.stop(); + + let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed"); + if !batch.is_empty() { + if let Err(SendPktsError::IoError(err, num_failed)) = + batch_send(repair_socket, &batch) + { + error!( + "{} batch_send failed to send {}/{} packets first error {:?}", + id, + num_failed, + batch.len(), + err + ); } - }); - send_repairs_elapsed.stop(); + } + batch_send_repairs_elapsed.stop(); + repair_timing.update( set_root_elapsed.as_us(), get_votes_elapsed.as_us(), add_votes_elapsed.as_us(), - send_repairs_elapsed.as_us(), + build_repairs_batch_elapsed.as_us(), + batch_send_repairs_elapsed.as_us(), ); if last_stats.elapsed().as_secs() > 2 { @@ -340,6 +368,16 @@ impl RepairService { repair_timing.send_repairs_elapsed, i64 ), + ( + "build-repairs-batch-elapsed", + repair_timing.build_repairs_batch_elapsed, + i64 + ), + ( + "batch-send-repairs-elapsed", + repair_timing.batch_send_repairs_elapsed, + i64 + ), ); repair_stats = RepairStats::default(); repair_timing = RepairTiming::default(); diff --git a/core/src/repair_weight.rs b/core/src/repair_weight.rs index dad4e18eaa..5073fbac87 100644 --- a/core/src/repair_weight.rs +++ b/core/src/repair_weight.rs @@ -160,7 +160,7 @@ impl RepairWeight { ); get_best_orphans_elapsed.stop(); - let mut get_best_shreds_elapsed = Measure::start("get_best_orphans"); + let mut get_best_shreds_elapsed = Measure::start("get_best_shreds"); // Find the best incomplete slots in rooted subtree self.get_best_shreds(blockstore, &mut repairs, max_new_shreds, ignore_slots); get_best_shreds_elapsed.stop(); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 8ed795a3e2..cde76015d7 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -20,6 +20,7 @@ use solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred::{Nonce, Shred}, }; +use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_perf::packet::{Packet, Packets}; use solana_rayon_threadlimit::get_thread_count; @@ -39,6 +40,34 @@ use std::{ pub type DuplicateSlotSender = CrossbeamSender; pub type DuplicateSlotReceiver = CrossbeamReceiver; +#[derive(Default)] +struct WindowServiceMetrics { + run_insert_count: u64, + num_shreds_received: u64, + shred_receiver_elapsed_us: u64, + prune_shreds_elapsed_us: u64, +} + +impl WindowServiceMetrics { + pub fn report_metrics(&self, metric_name: &'static str) { + datapoint_info!( + metric_name, + ("run_insert_count", self.run_insert_count as i64, i64), + ("num_shreds_received", self.num_shreds_received as i64, i64), + ( + "shred_receiver_elapsed_us", + self.shred_receiver_elapsed_us as i64, + i64 + ), + ( + "prune_shreds_elapsed_us", + self.prune_shreds_elapsed_us as i64, + i64 + ), + ); + } +} + fn verify_shred_slot(shred: &Shred, root: u64) -> bool { if shred.is_data() { // Only data shreds have parent information @@ -176,24 +205,30 @@ fn run_insert( leader_schedule_cache: &Arc, handle_duplicate: F, metrics: &mut BlockstoreInsertionMetrics, + ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: &CompletedDataSetsSender, outstanding_requests: &Arc>, ) -> Result<()> where F: Fn(Shred), { + let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed"); let timer = Duration::from_millis(200); let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?; while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() { shreds.extend(more_shreds); repair_infos.extend(more_repair_infos); } + shred_receiver_elapsed.stop(); + ws_metrics.num_shreds_received += shreds.len() as u64; + let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed"); prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests); let repairs: Vec<_> = repair_infos .iter() .map(|repair_info| repair_info.is_some()) .collect(); + prune_shreds_elapsed.stop(); let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate( shreds, @@ -210,6 +245,11 @@ where } completed_data_sets_sender.try_send(completed_data_sets)?; + + ws_metrics.run_insert_count += 1; + ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us(); + ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us(); + Ok(()) } @@ -464,6 +504,7 @@ impl WindowService { let _ = check_duplicate_sender.send(shred); }; let mut metrics = BlockstoreInsertionMetrics::default(); + let mut ws_metrics = WindowServiceMetrics::default(); let mut last_print = Instant::now(); loop { if exit.load(Ordering::Relaxed) { @@ -476,6 +517,7 @@ impl WindowService { &leader_schedule_cache, &handle_duplicate, &mut metrics, + &mut ws_metrics, &completed_data_sets_sender, &outstanding_requests, ) { @@ -487,6 +529,8 @@ impl WindowService { if last_print.elapsed().as_secs() > 2 { metrics.report_metrics("recv-window-insert-shreds"); metrics = BlockstoreInsertionMetrics::default(); + ws_metrics.report_metrics("recv-window-insert-shreds"); + ws_metrics = WindowServiceMetrics::default(); last_print = Instant::now(); } }