diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index cc94843185..59afa0ada5 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -31,7 +31,7 @@ use { sync::{ atomic::{AtomicUsize, Ordering}, mpsc::channel, - Arc, Mutex, RwLock, + Arc, RwLock, }, thread::{sleep, Builder}, time::Duration, @@ -78,7 +78,6 @@ fn bench_retransmitter(bencher: &mut Bencher) { let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let (shreds_sender, shreds_receiver) = channel(); - let shreds_receiver = Arc::new(Mutex::new(shreds_receiver)); const NUM_THREADS: usize = 2; let sockets = (0..NUM_THREADS) .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) @@ -165,7 +164,5 @@ fn bench_retransmitter(bencher: &mut Bencher) { total.store(0, Ordering::Relaxed); }); - for t in retransmitter_handles { - t.join().unwrap(); - } + retransmitter_handles.join().unwrap(); } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 9b2f2b0a47..f2ed475351 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -11,11 +11,11 @@ use { completed_data_sets_service::CompletedDataSetsSender, packet_hasher::PacketHasher, repair_service::{DuplicateSlotsResetSender, RepairInfo}, - result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, }, crossbeam_channel::{Receiver, Sender}, lru::LruCache, + rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_client::rpc_response::SlotUpdate, solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, solana_ledger::{ @@ -23,125 +23,90 @@ use { {blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, }, solana_measure::measure::Measure, - solana_metrics::inc_new_counter_error, solana_perf::packet::Packets, + solana_rayon_threadlimit::get_thread_count, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_sdk::{ - clock::Slot, - epoch_schedule::EpochSchedule, - pubkey::Pubkey, - timing::{timestamp, AtomicInterval}, - }, + solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}, std::{ collections::{BTreeSet, HashSet}, net::UdpSocket, ops::DerefMut, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, mpsc::{self, channel, RecvTimeoutError}, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, - time::Duration, + time::{Duration, Instant}, }, }; const MAX_DUPLICATE_COUNT: usize = 2; const DEFAULT_LRU_SIZE: usize = 10_000; -// Limit a given thread to consume about this many shreds so that -// it doesn't pull up too much work. -const MAX_SHREDS_BATCH_SIZE: usize = 100; - const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); #[derive(Default)] struct RetransmitStats { - num_shreds: AtomicU64, - num_shreds_skipped: AtomicU64, - total_batches: AtomicU64, - total_time: AtomicU64, - epoch_fetch: AtomicU64, - epoch_cache_update: AtomicU64, + since: Option, + num_shreds: usize, + num_shreds_skipped: AtomicUsize, + total_batches: usize, + total_time: u64, + epoch_fetch: u64, + epoch_cache_update: u64, retransmit_total: AtomicU64, - last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, + unknown_shred_slot_leader: AtomicUsize, } -#[allow(clippy::too_many_arguments)] -fn update_retransmit_stats( - stats: &RetransmitStats, - total_time: u64, - num_shreds: usize, - num_shreds_skipped: usize, - retransmit_total: u64, - compute_turbine_peers_total: u64, - peers_len: usize, - epoch_fetch: u64, - epoch_cach_update: u64, -) { - stats.total_time.fetch_add(total_time, Ordering::Relaxed); - stats - .num_shreds - .fetch_add(num_shreds as u64, Ordering::Relaxed); - stats - .num_shreds_skipped - .fetch_add(num_shreds_skipped as u64, Ordering::Relaxed); - stats - .retransmit_total - .fetch_add(retransmit_total, Ordering::Relaxed); - stats - .compute_turbine_peers_total - .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); - stats.total_batches.fetch_add(1, Ordering::Relaxed); - stats.epoch_fetch.fetch_add(epoch_fetch, Ordering::Relaxed); - stats - .epoch_cache_update - .fetch_add(epoch_cach_update, Ordering::Relaxed); - if stats.last_ts.should_update(2000) { - datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); +impl RetransmitStats { + fn maybe_submit( + &mut self, + root_bank: &Bank, + working_bank: &Bank, + cluster_info: &ClusterInfo, + cluster_nodes_cache: &ClusterNodesCache, + ) { + const SUBMIT_CADENCE: Duration = Duration::from_secs(2); + let elapsed = self.since.as_ref().map(Instant::elapsed); + if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE { + return; + } + let num_peers = cluster_nodes_cache + .get(root_bank.slot(), root_bank, working_bank, cluster_info) + .num_peers(); + let stats = std::mem::replace( + self, + Self { + since: Some(Instant::now()), + ..Self::default() + }, + ); + datapoint_info!("retransmit-num_nodes", ("count", num_peers, i64)); datapoint_info!( "retransmit-stage", - ( - "total_time", - stats.total_time.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "epoch_fetch", - stats.epoch_fetch.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "epoch_cache_update", - stats.epoch_cache_update.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "total_batches", - stats.total_batches.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "num_shreds", - stats.num_shreds.swap(0, Ordering::Relaxed) as i64, - i64 - ), + ("total_time", stats.total_time, i64), + ("epoch_fetch", stats.epoch_fetch, i64), + ("epoch_cache_update", stats.epoch_cache_update, i64), + ("total_batches", stats.total_batches, i64), + ("num_shreds", stats.num_shreds, i64), ( "num_shreds_skipped", - stats.num_shreds_skipped.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "retransmit_total", - stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, + stats.num_shreds_skipped.into_inner(), i64 ), + ("retransmit_total", stats.retransmit_total.into_inner(), i64), ( "compute_turbine", - stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, + stats.compute_turbine_peers_total.into_inner(), + i64 + ), + ( + "unknown_shred_slot_leader", + stats.unknown_shred_slot_leader.into_inner(), i64 ), ); @@ -188,14 +153,11 @@ fn check_if_first_shred_received( } let mut first_shreds_received_locked = first_shreds_received.lock().unwrap(); - if !first_shreds_received_locked.contains(&shred_slot) { + if first_shreds_received_locked.insert(shred_slot) { datapoint_info!("retransmit-first-shred", ("slot", shred_slot, i64)); - first_shreds_received_locked.insert(shred_slot); if first_shreds_received_locked.len() > 100 { - let mut slots_before_root = + *first_shreds_received_locked = first_shreds_received_locked.split_off(&(root_bank.slot() + 1)); - // `slots_before_root` now contains all slots <= root - std::mem::swap(&mut slots_before_root, &mut first_shreds_received_locked); } true } else { @@ -205,16 +167,11 @@ fn check_if_first_shred_received( fn maybe_reset_shreds_received_cache( shreds_received: &Mutex, - hasher_reset_ts: &AtomicU64, + hasher_reset_ts: &mut Instant, ) { - const UPDATE_INTERVAL_MS: u64 = 1000; - let now = timestamp(); - let prev = hasher_reset_ts.load(Ordering::Acquire); - if now.saturating_sub(prev) > UPDATE_INTERVAL_MS - && hasher_reset_ts - .compare_exchange(prev, now, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { + const UPDATE_INTERVAL: Duration = Duration::from_secs(1); + if hasher_reset_ts.elapsed() >= UPDATE_INTERVAL { + *hasher_reset_ts = Instant::now(); let mut shreds_received = shreds_received.lock().unwrap(); let (cache, hasher) = shreds_received.deref_mut(); cache.clear(); @@ -224,31 +181,26 @@ fn maybe_reset_shreds_received_cache( #[allow(clippy::too_many_arguments)] fn retransmit( + thread_pool: &ThreadPool, bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, cluster_info: &ClusterInfo, - shreds_receiver: &Mutex>>, - sock: &UdpSocket, - id: u32, - stats: &RetransmitStats, + shreds_receiver: &mpsc::Receiver>, + sockets: &[UdpSocket], + stats: &mut RetransmitStats, cluster_nodes_cache: &ClusterNodesCache, - hasher_reset_ts: &AtomicU64, + hasher_reset_ts: &mut Instant, shreds_received: &Mutex, max_slots: &MaxSlots, first_shreds_received: &Mutex>, rpc_subscriptions: Option<&RpcSubscriptions>, -) -> Result<()> { +) -> Result<(), RecvTimeoutError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let shreds_receiver = shreds_receiver.lock().unwrap(); let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; let mut timer_start = Measure::start("retransmit"); - while let Ok(more_shreds) = shreds_receiver.try_recv() { - shreds.extend(more_shreds); - if shreds.len() >= MAX_SHREDS_BATCH_SIZE { - break; - } - } - drop(shreds_receiver); + shreds.extend(shreds_receiver.try_iter().flatten()); + stats.num_shreds += shreds.len(); + stats.total_batches += 1; let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let (working_bank, root_bank) = { @@ -256,25 +208,24 @@ fn retransmit( (bank_forks.working_bank(), bank_forks.root_bank()) }; epoch_fetch.stop(); + stats.epoch_fetch += epoch_fetch.as_us(); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts); epoch_cache_update.stop(); + stats.epoch_cache_update += epoch_cache_update.as_us(); - let num_shreds = shreds.len(); let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); - let mut retransmit_total = 0; - let mut num_shreds_skipped = 0; - let mut compute_turbine_peers_total = 0; - let mut max_slot = 0; - for shred in shreds { + let retransmit_shred = |shred: Shred, socket: &UdpSocket| { if should_skip_retransmit(&shred, shreds_received) { - num_shreds_skipped += 1; - continue; + stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed); + return; } let shred_slot = shred.slot(); - max_slot = max_slot.max(shred_slot); + max_slots + .retransmit + .fetch_max(shred_slot, Ordering::Relaxed); if let Some(rpc_subscriptions) = rpc_subscriptions { if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) { @@ -293,7 +244,12 @@ fn retransmit( let slot_leader = match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) { Some(pubkey) => pubkey, - None => continue, + None => { + stats + .unknown_shred_slot_leader + .fetch_add(1, Ordering::Relaxed); + return; + } }; let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); @@ -302,7 +258,9 @@ fn retransmit( cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader); let anchor_node = neighbors[0].id == my_id; compute_turbine_peers.stop(); - compute_turbine_peers_total += compute_turbine_peers.as_us(); + stats + .compute_turbine_peers_total + .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); // If the node is on the critical path (i.e. the first node in each @@ -314,7 +272,7 @@ fn retransmit( ClusterInfo::retransmit_to( &neighbors[1..], &shred.payload, - sock, + socket, true, // forward socket socket_addr_space, ); @@ -322,36 +280,25 @@ fn retransmit( ClusterInfo::retransmit_to( &children, &shred.payload, - sock, + socket, !anchor_node, // send to forward socket! socket_addr_space, ); retransmit_time.stop(); - retransmit_total += retransmit_time.as_us(); - } - max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed); + stats + .retransmit_total + .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); + }; + thread_pool.install(|| { + shreds.into_par_iter().with_min_len(4).for_each(|shred| { + let index = thread_pool.current_thread_index().unwrap(); + let socket = &sockets[index % sockets.len()]; + retransmit_shred(shred, socket); + }); + }); timer_start.stop(); - debug!( - "retransmitted {} shreds in {}ms retransmit_time: {}ms id: {}", - num_shreds, - timer_start.as_ms(), - retransmit_total, - id, - ); - let cluster_nodes = - cluster_nodes_cache.get(root_bank.slot(), &root_bank, &working_bank, cluster_info); - update_retransmit_stats( - stats, - timer_start.as_us(), - num_shreds, - num_shreds_skipped, - retransmit_total, - compute_turbine_peers_total, - cluster_nodes.num_peers(), - epoch_fetch.as_us(), - epoch_cache_update.as_us(), - ); - + stats.total_time += timer_start.as_us(); + stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache); Ok(()) } @@ -368,74 +315,56 @@ pub fn retransmitter( bank_forks: Arc>, leader_schedule_cache: Arc, cluster_info: Arc, - shreds_receiver: Arc>>>, + shreds_receiver: mpsc::Receiver>, max_slots: Arc, rpc_subscriptions: Option>, -) -> Vec> { - let cluster_nodes_cache = Arc::new(ClusterNodesCache::::new( +) -> JoinHandle<()> { + let cluster_nodes_cache = ClusterNodesCache::::new( CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, CLUSTER_NODES_CACHE_TTL, - )); - let hasher_reset_ts = Arc::default(); - let stats = Arc::new(RetransmitStats::default()); - let shreds_received = Arc::new(Mutex::new(( - LruCache::new(DEFAULT_LRU_SIZE), - PacketHasher::default(), - ))); - let first_shreds_received = Arc::new(Mutex::new(BTreeSet::new())); - (0..sockets.len()) - .map(|s| { - let sockets = sockets.clone(); - let bank_forks = bank_forks.clone(); - let leader_schedule_cache = leader_schedule_cache.clone(); - let shreds_receiver = shreds_receiver.clone(); - let cluster_info = cluster_info.clone(); - let stats = stats.clone(); - let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache); - let hasher_reset_ts = Arc::clone(&hasher_reset_ts); - let shreds_received = shreds_received.clone(); - let max_slots = max_slots.clone(); - let first_shreds_received = first_shreds_received.clone(); - let rpc_subscriptions = rpc_subscriptions.clone(); - - Builder::new() - .name("solana-retransmitter".to_string()) - .spawn(move || { - trace!("retransmitter started"); - loop { - if let Err(e) = retransmit( - &bank_forks, - &leader_schedule_cache, - &cluster_info, - &shreds_receiver, - &sockets[s], - s as u32, - &stats, - &cluster_nodes_cache, - &hasher_reset_ts, - &shreds_received, - &max_slots, - &first_shreds_received, - rpc_subscriptions.as_deref(), - ) { - match e { - Error::RecvTimeout(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeout(RecvTimeoutError::Timeout) => (), - _ => { - inc_new_counter_error!("streamer-retransmit-error", 1, 1); - } - } - } - } - trace!("exiting retransmitter"); - }) - .unwrap() + ); + let mut hasher_reset_ts = Instant::now(); + let mut stats = RetransmitStats::default(); + let shreds_received = Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default())); + let first_shreds_received = Mutex::>::default(); + let num_threads = get_thread_count().min(8).max(sockets.len()); + let thread_pool = ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|i| format!("retransmit-{}", i)) + .build() + .unwrap(); + Builder::new() + .name("solana-retransmitter".to_string()) + .spawn(move || { + trace!("retransmitter started"); + loop { + match retransmit( + &thread_pool, + &bank_forks, + &leader_schedule_cache, + &cluster_info, + &shreds_receiver, + &sockets, + &mut stats, + &cluster_nodes_cache, + &mut hasher_reset_ts, + &shreds_received, + &max_slots, + &first_shreds_received, + rpc_subscriptions.as_deref(), + ) { + Ok(()) => (), + Err(RecvTimeoutError::Timeout) => (), + Err(RecvTimeoutError::Disconnected) => break, + } + } + trace!("exiting retransmitter"); }) - .collect() + .unwrap() } pub(crate) struct RetransmitStage { - thread_hdls: Vec>, + retransmit_thread_handle: JoinHandle<()>, window_service: WindowService, cluster_slots_service: ClusterSlotsService, } @@ -470,8 +399,7 @@ impl RetransmitStage { // https://github.com/rust-lang/rust/issues/39364#issuecomment-634545136 let _retransmit_sender = retransmit_sender.clone(); - let retransmit_receiver = Arc::new(Mutex::new(retransmit_receiver)); - let thread_hdls = retransmitter( + let retransmit_thread_handle = retransmitter( retransmit_sockets, bank_forks.clone(), leader_schedule_cache.clone(), @@ -529,19 +457,16 @@ impl RetransmitStage { ); Self { - thread_hdls, + retransmit_thread_handle, window_service, cluster_slots_service, } } pub(crate) fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } + self.retransmit_thread_handle.join()?; self.window_service.join()?; - self.cluster_slots_service.join()?; - Ok(()) + self.cluster_slots_service.join() } } @@ -613,7 +538,7 @@ mod tests { bank_forks, leader_schedule_cache, cluster_info, - Arc::new(Mutex::new(retransmit_receiver)), + retransmit_receiver, Arc::default(), // MaxSlots None, );