diff --git a/core/src/cluster_slots_service.rs b/core/src/cluster_slots_service.rs new file mode 100644 index 0000000000..27daa9d3bf --- /dev/null +++ b/core/src/cluster_slots_service.rs @@ -0,0 +1,191 @@ +use crate::{cluster_info::ClusterInfo, cluster_slots::ClusterSlots}; +use solana_ledger::blockstore::{Blockstore, CompletedSlotsReceiver}; +use solana_measure::measure::Measure; +use solana_runtime::bank_forks::BankForks; +use solana_sdk::{clock::Slot, pubkey::Pubkey}; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + {Arc, RwLock}, + }, + thread::sleep, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, +}; + +#[derive(Default, Debug)] +struct ClusterSlotsServiceTiming { + pub lowest_slot_elapsed: u64, + pub update_completed_slots_elapsed: u64, +} + +impl ClusterSlotsServiceTiming { + fn update(&mut self, lowest_slot_elapsed: u64, update_completed_slots_elapsed: u64) { + self.lowest_slot_elapsed += lowest_slot_elapsed; + self.update_completed_slots_elapsed += update_completed_slots_elapsed; + } +} + +pub struct ClusterSlotsService { + t_cluster_slots_service: JoinHandle<()>, +} + +impl ClusterSlotsService { + pub fn new( + blockstore: Arc, + cluster_slots: Arc, + bank_forks: Arc>, + cluster_info: Arc, + completed_slots_receiver: CompletedSlotsReceiver, + exit: Arc, + ) -> Self { + let id = cluster_info.id(); + Self::initialize_lowest_slot(id, &blockstore, &cluster_info); + Self::initialize_epoch_slots(&blockstore, &cluster_info, &completed_slots_receiver); + let t_cluster_slots_service = Builder::new() + .name("solana-cluster-slots-service".to_string()) + .spawn(move || { + Self::run( + blockstore, + cluster_slots, + bank_forks, + cluster_info, + completed_slots_receiver, + exit, + ) + }) + .unwrap(); + + ClusterSlotsService { + t_cluster_slots_service, + } + } + + pub fn join(self) -> thread::Result<()> { + self.t_cluster_slots_service.join() + } + + fn run( + blockstore: Arc, + cluster_slots: Arc, + bank_forks: Arc>, + cluster_info: Arc, + completed_slots_receiver: CompletedSlotsReceiver, + exit: Arc, + ) { + let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default(); + let mut last_stats = Instant::now(); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + let new_root = bank_forks.read().unwrap().root(); + let id = cluster_info.id(); + let mut lowest_slot_elapsed = Measure::start("lowest_slot_elapsed"); + let lowest_slot = blockstore.lowest_slot(); + Self::update_lowest_slot(&id, lowest_slot, &cluster_info); + lowest_slot_elapsed.stop(); + let mut update_completed_slots_elapsed = + Measure::start("update_completed_slots_elapsed"); + Self::update_completed_slots(&completed_slots_receiver, &cluster_info); + cluster_slots.update(new_root, &cluster_info, &bank_forks); + update_completed_slots_elapsed.stop(); + + cluster_slots_service_timing.update( + lowest_slot_elapsed.as_us(), + update_completed_slots_elapsed.as_us(), + ); + + if last_stats.elapsed().as_secs() > 2 { + datapoint_info!( + "cluster_slots_service-timing", + ( + "lowest_slot_elapsed", + cluster_slots_service_timing.lowest_slot_elapsed, + i64 + ), + ( + "update_completed_slots_elapsed", + cluster_slots_service_timing.update_completed_slots_elapsed, + i64 + ), + ); + cluster_slots_service_timing = ClusterSlotsServiceTiming::default(); + last_stats = Instant::now(); + } + sleep(Duration::from_millis(200)); + } + } + + fn update_completed_slots( + completed_slots_receiver: &CompletedSlotsReceiver, + cluster_info: &ClusterInfo, + ) { + let mut slots: Vec = vec![]; + while let Ok(mut more) = completed_slots_receiver.try_recv() { + slots.append(&mut more); + } + slots.sort(); + if !slots.is_empty() { + cluster_info.push_epoch_slots(&slots); + } + } + + fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) { + // Safe to set into gossip because by this time, the leader schedule cache should + // also be updated with the latest root (done in blockstore_processor) and thus + // will provide a schedule to window_service for any incoming shreds up to the + // last_confirmed_epoch. + cluster_info.push_lowest_slot(id, blockstore.lowest_slot()); + } + + fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &ClusterInfo) { + cluster_info.push_lowest_slot(*id, lowest_slot); + } + + fn initialize_epoch_slots( + blockstore: &Blockstore, + cluster_info: &ClusterInfo, + completed_slots_receiver: &CompletedSlotsReceiver, + ) { + let root = blockstore.last_root(); + let mut slots: Vec<_> = blockstore + .live_slots_iterator(root) + .filter_map(|(slot, slot_meta)| { + if slot_meta.is_full() { + Some(slot) + } else { + None + } + }) + .collect(); + + while let Ok(mut more) = completed_slots_receiver.try_recv() { + slots.append(&mut more); + } + slots.sort(); + slots.dedup(); + if !slots.is_empty() { + cluster_info.push_epoch_slots(&slots); + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::cluster_info::Node; + + #[test] + pub fn test_update_lowest_slot() { + let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); + let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info); + ClusterSlotsService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info); + let lowest = cluster_info + .get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| { + lowest_slot.clone() + }) + .unwrap(); + assert_eq!(lowest.lowest, 5); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 0cc959420f..ddaaaae070 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -19,6 +19,7 @@ pub mod contact_info; pub mod bank_weight_fork_choice; pub mod cluster_info; pub mod cluster_slots; +pub mod cluster_slots_service; pub mod consensus; pub mod crds; pub mod crds_gossip; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 8c6aa0e61d..7bc73d61e6 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -11,7 +11,7 @@ use crate::{ }; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use solana_ledger::{ - blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, + blockstore::{Blockstore, SlotMeta}, shred::Nonce, }; use solana_measure::measure::Measure; @@ -78,8 +78,6 @@ pub struct RepairTiming { pub set_root_elapsed: u64, pub get_votes_elapsed: u64, pub add_votes_elapsed: u64, - pub lowest_slot_elapsed: u64, - pub update_completed_slots_elapsed: u64, pub get_best_orphans_elapsed: u64, pub get_best_shreds_elapsed: u64, pub send_repairs_elapsed: u64, @@ -91,15 +89,11 @@ impl RepairTiming { set_root_elapsed: u64, get_votes_elapsed: u64, add_votes_elapsed: u64, - lowest_slot_elapsed: u64, - update_completed_slots_elapsed: u64, send_repairs_elapsed: u64, ) { self.set_root_elapsed += set_root_elapsed; self.get_votes_elapsed += get_votes_elapsed; self.add_votes_elapsed += add_votes_elapsed; - self.lowest_slot_elapsed += lowest_slot_elapsed; - self.update_completed_slots_elapsed += update_completed_slots_elapsed; self.send_repairs_elapsed += send_repairs_elapsed; } } @@ -112,7 +106,6 @@ pub const MAX_ORPHANS: usize = 5; pub struct RepairInfo { pub bank_forks: Arc>, - pub completed_slots_receiver: CompletedSlotsReceiver, pub epoch_schedule: EpochSchedule, pub duplicate_slots_reset_sender: DuplicateSlotsResetSender, } @@ -181,18 +174,12 @@ impl RepairService { let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root()); let serve_repair = ServeRepair::new(cluster_info.clone()); let id = cluster_info.id(); - Self::initialize_lowest_slot(id, blockstore, &cluster_info); let mut repair_stats = RepairStats::default(); let mut repair_timing = RepairTiming::default(); let mut last_stats = Instant::now(); let duplicate_slot_repair_statuses: HashMap = HashMap::new(); - Self::initialize_epoch_slots( - blockstore, - &cluster_info, - &repair_info.completed_slots_receiver, - ); loop { if exit.load(Ordering::Relaxed) { break; @@ -201,8 +188,6 @@ impl RepairService { let mut set_root_elapsed; let mut get_votes_elapsed; let mut add_votes_elapsed; - let mut lowest_slot_elapsed; - let mut update_completed_slots_elapsed; let repairs = { let root_bank = repair_info.bank_forks.read().unwrap().root_bank().clone(); let new_root = root_bank.slot(); @@ -235,15 +220,6 @@ impl RepairService { root_bank.epoch_schedule(), ); add_votes_elapsed.stop(); - - lowest_slot_elapsed = Measure::start("lowest_slot_elapsed"); - let lowest_slot = blockstore.lowest_slot(); - Self::update_lowest_slot(&id, lowest_slot, &cluster_info); - lowest_slot_elapsed.stop(); - update_completed_slots_elapsed = Measure::start("update_completed_slots_elapsed"); - Self::update_completed_slots(&repair_info.completed_slots_receiver, &cluster_info); - cluster_slots.update(new_root, &cluster_info, &repair_info.bank_forks); - update_completed_slots_elapsed.stop(); /*let new_duplicate_slots = Self::find_new_duplicate_slots( &duplicate_slot_repair_statuses, blockstore, @@ -299,8 +275,6 @@ impl RepairService { set_root_elapsed.as_us(), get_votes_elapsed.as_us(), add_votes_elapsed.as_us(), - lowest_slot_elapsed.as_us(), - update_completed_slots_elapsed.as_us(), send_repairs_elapsed.as_us(), ); @@ -335,16 +309,6 @@ impl RepairService { repair_timing.get_best_shreds_elapsed, i64 ), - ( - "lowest-slot-elapsed", - repair_timing.lowest_slot_elapsed, - i64 - ), - ( - "update-completed-slots-elapsed", - repair_timing.update_completed_slots_elapsed, - i64 - ), ( "send-repairs-elapsed", repair_timing.send_repairs_elapsed, @@ -650,59 +614,6 @@ impl RepairService { .collect() } - fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) { - // Safe to set into gossip because by this time, the leader schedule cache should - // also be updated with the latest root (done in blockstore_processor) and thus - // will provide a schedule to window_service for any incoming shreds up to the - // last_confirmed_epoch. - cluster_info.push_lowest_slot(id, blockstore.lowest_slot()); - } - - fn update_completed_slots( - completed_slots_receiver: &CompletedSlotsReceiver, - cluster_info: &ClusterInfo, - ) { - let mut slots: Vec = vec![]; - while let Ok(mut more) = completed_slots_receiver.try_recv() { - slots.append(&mut more); - } - slots.sort(); - if !slots.is_empty() { - cluster_info.push_epoch_slots(&slots); - } - } - - fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &ClusterInfo) { - cluster_info.push_lowest_slot(*id, lowest_slot); - } - - fn initialize_epoch_slots( - blockstore: &Blockstore, - cluster_info: &ClusterInfo, - completed_slots_receiver: &CompletedSlotsReceiver, - ) { - let root = blockstore.last_root(); - let mut slots: Vec<_> = blockstore - .live_slots_iterator(root) - .filter_map(|(slot, slot_meta)| { - if slot_meta.is_full() { - Some(slot) - } else { - None - } - }) - .collect(); - - while let Ok(mut more) = completed_slots_receiver.try_recv() { - slots.append(&mut more); - } - slots.sort(); - slots.dedup(); - if !slots.is_empty() { - cluster_info.push_epoch_slots(&slots); - } - } - pub fn join(self) -> thread::Result<()> { self.t_repair.join() } @@ -980,19 +891,6 @@ mod test { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - #[test] - pub fn test_update_lowest_slot() { - let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); - let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info); - RepairService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info); - let lowest = cluster_info - .get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| { - lowest_slot.clone() - }) - .unwrap(); - assert_eq!(lowest.lowest, 5); - } - #[test] pub fn test_generate_duplicate_repairs_for_slot() { let blockstore_path = get_tmp_ledger_path!(); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index d74ec50c71..0c0c77bd36 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -4,6 +4,7 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, + cluster_slots_service::ClusterSlotsService, contact_info::ContactInfo, repair_service::DuplicateSlotsResetSender, repair_service::RepairInfo, @@ -394,6 +395,7 @@ pub fn retransmitter( pub struct RetransmitStage { thread_hdls: Vec>, window_service: WindowService, + cluster_slots_service: ClusterSlotsService, } impl RetransmitStage { @@ -427,13 +429,20 @@ impl RetransmitStage { retransmit_receiver, ); + let leader_schedule_cache_clone = leader_schedule_cache.clone(); + let cluster_slots_service = ClusterSlotsService::new( + blockstore.clone(), + cluster_slots.clone(), + bank_forks.clone(), + cluster_info.clone(), + completed_slots_receiver, + exit.clone(), + ); let repair_info = RepairInfo { bank_forks, - completed_slots_receiver, epoch_schedule, duplicate_slots_reset_sender, }; - let leader_schedule_cache_clone = leader_schedule_cache.clone(); let window_service = WindowService::new( blockstore, cluster_info.clone(), @@ -466,6 +475,7 @@ impl RetransmitStage { Self { thread_hdls, window_service, + cluster_slots_service, } } @@ -474,6 +484,7 @@ impl RetransmitStage { thread_hdl.join()?; } self.window_service.join()?; + self.cluster_slots_service.join()?; Ok(()) } }