drop outstanding_requests lock before sending repair requests (#18893)
This commit is contained in:
parent
84e78316b1
commit
9255ae334d
|
@ -22,6 +22,7 @@ use solana_runtime::{bank_forks::BankForks, contains::Contains};
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, timing::timestamp,
|
clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, timing::timestamp,
|
||||||
};
|
};
|
||||||
|
use solana_streamer::sendmmsg::{batch_send, SendPktsError};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
iter::Iterator,
|
iter::Iterator,
|
||||||
|
@ -95,6 +96,8 @@ pub struct RepairTiming {
|
||||||
pub get_best_orphans_elapsed: u64,
|
pub get_best_orphans_elapsed: u64,
|
||||||
pub get_best_shreds_elapsed: u64,
|
pub get_best_shreds_elapsed: u64,
|
||||||
pub send_repairs_elapsed: u64,
|
pub send_repairs_elapsed: u64,
|
||||||
|
pub build_repairs_batch_elapsed: u64,
|
||||||
|
pub batch_send_repairs_elapsed: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RepairTiming {
|
impl RepairTiming {
|
||||||
|
@ -103,12 +106,15 @@ impl RepairTiming {
|
||||||
set_root_elapsed: u64,
|
set_root_elapsed: u64,
|
||||||
get_votes_elapsed: u64,
|
get_votes_elapsed: u64,
|
||||||
add_votes_elapsed: u64,
|
add_votes_elapsed: u64,
|
||||||
send_repairs_elapsed: u64,
|
build_repairs_batch_elapsed: u64,
|
||||||
|
batch_send_repairs_elapsed: u64,
|
||||||
) {
|
) {
|
||||||
self.set_root_elapsed += set_root_elapsed;
|
self.set_root_elapsed += set_root_elapsed;
|
||||||
self.get_votes_elapsed += get_votes_elapsed;
|
self.get_votes_elapsed += get_votes_elapsed;
|
||||||
self.add_votes_elapsed += add_votes_elapsed;
|
self.add_votes_elapsed += add_votes_elapsed;
|
||||||
self.send_repairs_elapsed += send_repairs_elapsed;
|
self.build_repairs_batch_elapsed += build_repairs_batch_elapsed;
|
||||||
|
self.batch_send_repairs_elapsed += batch_send_repairs_elapsed;
|
||||||
|
self.send_repairs_elapsed += build_repairs_batch_elapsed + batch_send_repairs_elapsed;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,29 +268,51 @@ impl RepairService {
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
|
let mut build_repairs_batch_elapsed = Measure::start("build_repairs_batch_elapsed");
|
||||||
let mut outstanding_requests = outstanding_requests.write().unwrap();
|
let batch: Vec<(Vec<u8>, SocketAddr)> = {
|
||||||
repairs.into_iter().for_each(|repair_request| {
|
let mut outstanding_requests = outstanding_requests.write().unwrap();
|
||||||
if let Ok((to, req)) = serve_repair.repair_request(
|
repairs
|
||||||
&repair_info.cluster_slots,
|
.iter()
|
||||||
repair_request,
|
.filter_map(|repair_request| {
|
||||||
&mut peers_cache,
|
let (to, req) = serve_repair
|
||||||
&mut repair_stats,
|
.repair_request(
|
||||||
&repair_info.repair_validators,
|
&repair_info.cluster_slots,
|
||||||
&mut outstanding_requests,
|
*repair_request,
|
||||||
) {
|
&mut peers_cache,
|
||||||
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
|
&mut repair_stats,
|
||||||
info!("{} repair req send_to({}) error {:?}", id, to, e);
|
&repair_info.repair_validators,
|
||||||
0
|
&mut outstanding_requests,
|
||||||
});
|
)
|
||||||
|
.ok()?;
|
||||||
|
Some((req, to))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
let batch: Vec<(&[u8], &SocketAddr)> = batch.iter().map(|(v, s)| (&v[..], s)).collect();
|
||||||
|
build_repairs_batch_elapsed.stop();
|
||||||
|
|
||||||
|
let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed");
|
||||||
|
if !batch.is_empty() {
|
||||||
|
if let Err(SendPktsError::IoError(err, num_failed)) =
|
||||||
|
batch_send(repair_socket, &batch)
|
||||||
|
{
|
||||||
|
error!(
|
||||||
|
"{} batch_send failed to send {}/{} packets first error {:?}",
|
||||||
|
id,
|
||||||
|
num_failed,
|
||||||
|
batch.len(),
|
||||||
|
err
|
||||||
|
);
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
send_repairs_elapsed.stop();
|
batch_send_repairs_elapsed.stop();
|
||||||
|
|
||||||
repair_timing.update(
|
repair_timing.update(
|
||||||
set_root_elapsed.as_us(),
|
set_root_elapsed.as_us(),
|
||||||
get_votes_elapsed.as_us(),
|
get_votes_elapsed.as_us(),
|
||||||
add_votes_elapsed.as_us(),
|
add_votes_elapsed.as_us(),
|
||||||
send_repairs_elapsed.as_us(),
|
build_repairs_batch_elapsed.as_us(),
|
||||||
|
batch_send_repairs_elapsed.as_us(),
|
||||||
);
|
);
|
||||||
|
|
||||||
if last_stats.elapsed().as_secs() > 2 {
|
if last_stats.elapsed().as_secs() > 2 {
|
||||||
|
@ -340,6 +368,16 @@ impl RepairService {
|
||||||
repair_timing.send_repairs_elapsed,
|
repair_timing.send_repairs_elapsed,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"build-repairs-batch-elapsed",
|
||||||
|
repair_timing.build_repairs_batch_elapsed,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"batch-send-repairs-elapsed",
|
||||||
|
repair_timing.batch_send_repairs_elapsed,
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
repair_stats = RepairStats::default();
|
repair_stats = RepairStats::default();
|
||||||
repair_timing = RepairTiming::default();
|
repair_timing = RepairTiming::default();
|
||||||
|
|
|
@ -160,7 +160,7 @@ impl RepairWeight {
|
||||||
);
|
);
|
||||||
get_best_orphans_elapsed.stop();
|
get_best_orphans_elapsed.stop();
|
||||||
|
|
||||||
let mut get_best_shreds_elapsed = Measure::start("get_best_orphans");
|
let mut get_best_shreds_elapsed = Measure::start("get_best_shreds");
|
||||||
// Find the best incomplete slots in rooted subtree
|
// Find the best incomplete slots in rooted subtree
|
||||||
self.get_best_shreds(blockstore, &mut repairs, max_new_shreds, ignore_slots);
|
self.get_best_shreds(blockstore, &mut repairs, max_new_shreds, ignore_slots);
|
||||||
get_best_shreds_elapsed.stop();
|
get_best_shreds_elapsed.stop();
|
||||||
|
|
|
@ -20,6 +20,7 @@ use solana_ledger::{
|
||||||
leader_schedule_cache::LeaderScheduleCache,
|
leader_schedule_cache::LeaderScheduleCache,
|
||||||
shred::{Nonce, Shred},
|
shred::{Nonce, Shred},
|
||||||
};
|
};
|
||||||
|
use solana_measure::measure::Measure;
|
||||||
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
||||||
use solana_perf::packet::{Packet, Packets};
|
use solana_perf::packet::{Packet, Packets};
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
use solana_rayon_threadlimit::get_thread_count;
|
||||||
|
@ -39,6 +40,34 @@ use std::{
|
||||||
pub type DuplicateSlotSender = CrossbeamSender<Slot>;
|
pub type DuplicateSlotSender = CrossbeamSender<Slot>;
|
||||||
pub type DuplicateSlotReceiver = CrossbeamReceiver<Slot>;
|
pub type DuplicateSlotReceiver = CrossbeamReceiver<Slot>;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct WindowServiceMetrics {
|
||||||
|
run_insert_count: u64,
|
||||||
|
num_shreds_received: u64,
|
||||||
|
shred_receiver_elapsed_us: u64,
|
||||||
|
prune_shreds_elapsed_us: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WindowServiceMetrics {
|
||||||
|
pub fn report_metrics(&self, metric_name: &'static str) {
|
||||||
|
datapoint_info!(
|
||||||
|
metric_name,
|
||||||
|
("run_insert_count", self.run_insert_count as i64, i64),
|
||||||
|
("num_shreds_received", self.num_shreds_received as i64, i64),
|
||||||
|
(
|
||||||
|
"shred_receiver_elapsed_us",
|
||||||
|
self.shred_receiver_elapsed_us as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"prune_shreds_elapsed_us",
|
||||||
|
self.prune_shreds_elapsed_us as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
|
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
|
||||||
if shred.is_data() {
|
if shred.is_data() {
|
||||||
// Only data shreds have parent information
|
// Only data shreds have parent information
|
||||||
|
@ -176,24 +205,30 @@ fn run_insert<F>(
|
||||||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||||
handle_duplicate: F,
|
handle_duplicate: F,
|
||||||
metrics: &mut BlockstoreInsertionMetrics,
|
metrics: &mut BlockstoreInsertionMetrics,
|
||||||
|
ws_metrics: &mut WindowServiceMetrics,
|
||||||
completed_data_sets_sender: &CompletedDataSetsSender,
|
completed_data_sets_sender: &CompletedDataSetsSender,
|
||||||
outstanding_requests: &Arc<RwLock<OutstandingShredRepairs>>,
|
outstanding_requests: &Arc<RwLock<OutstandingShredRepairs>>,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
F: Fn(Shred),
|
F: Fn(Shred),
|
||||||
{
|
{
|
||||||
|
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
|
||||||
let timer = Duration::from_millis(200);
|
let timer = Duration::from_millis(200);
|
||||||
let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?;
|
let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?;
|
||||||
while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() {
|
while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() {
|
||||||
shreds.extend(more_shreds);
|
shreds.extend(more_shreds);
|
||||||
repair_infos.extend(more_repair_infos);
|
repair_infos.extend(more_repair_infos);
|
||||||
}
|
}
|
||||||
|
shred_receiver_elapsed.stop();
|
||||||
|
ws_metrics.num_shreds_received += shreds.len() as u64;
|
||||||
|
|
||||||
|
let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
|
||||||
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
|
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
|
||||||
let repairs: Vec<_> = repair_infos
|
let repairs: Vec<_> = repair_infos
|
||||||
.iter()
|
.iter()
|
||||||
.map(|repair_info| repair_info.is_some())
|
.map(|repair_info| repair_info.is_some())
|
||||||
.collect();
|
.collect();
|
||||||
|
prune_shreds_elapsed.stop();
|
||||||
|
|
||||||
let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate(
|
let (completed_data_sets, inserted_indices) = blockstore.insert_shreds_handle_duplicate(
|
||||||
shreds,
|
shreds,
|
||||||
|
@ -210,6 +245,11 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
completed_data_sets_sender.try_send(completed_data_sets)?;
|
completed_data_sets_sender.try_send(completed_data_sets)?;
|
||||||
|
|
||||||
|
ws_metrics.run_insert_count += 1;
|
||||||
|
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
|
||||||
|
ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -464,6 +504,7 @@ impl WindowService {
|
||||||
let _ = check_duplicate_sender.send(shred);
|
let _ = check_duplicate_sender.send(shred);
|
||||||
};
|
};
|
||||||
let mut metrics = BlockstoreInsertionMetrics::default();
|
let mut metrics = BlockstoreInsertionMetrics::default();
|
||||||
|
let mut ws_metrics = WindowServiceMetrics::default();
|
||||||
let mut last_print = Instant::now();
|
let mut last_print = Instant::now();
|
||||||
loop {
|
loop {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
@ -476,6 +517,7 @@ impl WindowService {
|
||||||
&leader_schedule_cache,
|
&leader_schedule_cache,
|
||||||
&handle_duplicate,
|
&handle_duplicate,
|
||||||
&mut metrics,
|
&mut metrics,
|
||||||
|
&mut ws_metrics,
|
||||||
&completed_data_sets_sender,
|
&completed_data_sets_sender,
|
||||||
&outstanding_requests,
|
&outstanding_requests,
|
||||||
) {
|
) {
|
||||||
|
@ -487,6 +529,8 @@ impl WindowService {
|
||||||
if last_print.elapsed().as_secs() > 2 {
|
if last_print.elapsed().as_secs() > 2 {
|
||||||
metrics.report_metrics("recv-window-insert-shreds");
|
metrics.report_metrics("recv-window-insert-shreds");
|
||||||
metrics = BlockstoreInsertionMetrics::default();
|
metrics = BlockstoreInsertionMetrics::default();
|
||||||
|
ws_metrics.report_metrics("recv-window-insert-shreds");
|
||||||
|
ws_metrics = WindowServiceMetrics::default();
|
||||||
last_print = Instant::now();
|
last_print = Instant::now();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue