diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index a0e836724..78b5e15b9 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -28,7 +28,7 @@ impl ShredFetchStage { // updates packets received on a channel and sends them on another channel fn modify_packets( recvr: PacketBatchReceiver, - sendr: Sender>, + sendr: Sender, bank_forks: &RwLock, shred_version: u16, name: &'static str, @@ -46,7 +46,7 @@ impl ShredFetchStage { let mut stats = ShredFetchStats::default(); let mut packet_hasher = PacketHasher::default(); - while let Some(mut packet_batch) = recvr.iter().next() { + for mut packet_batch in recvr { if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT { last_updated = Instant::now(); packet_hasher.reset(); @@ -79,7 +79,7 @@ impl ShredFetchStage { } } stats.maybe_submit(name, STATS_SUBMIT_CADENCE); - if sendr.send(vec![packet_batch]).is_err() { + if sendr.send(packet_batch).is_err() { break; } } @@ -88,7 +88,7 @@ impl ShredFetchStage { fn packet_modifier( sockets: Vec>, exit: &Arc, - sender: Sender>, + sender: Sender, recycler: PacketBatchRecycler, bank_forks: Arc>, shred_version: u16, @@ -132,7 +132,7 @@ impl ShredFetchStage { sockets: Vec>, forward_sockets: Vec>, repair_socket: Arc, - sender: Sender>, + sender: Sender, shred_version: u16, bank_forks: Arc>, exit: &Arc, diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index b32d045bc..f9a50ab8b 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -1,11 +1,5 @@ -#![allow(clippy::implicit_hasher)] - use { - crate::{ - sigverify, - sigverify_stage::{SigVerifier, SigVerifyServiceError}, - }, - crossbeam_channel::Sender, + crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu, }, @@ -18,87 +12,115 @@ use { atomic::{AtomicBool, Ordering}, Arc, RwLock, }, + thread::{Builder, JoinHandle}, + time::{Duration, Instant}, }, }; -#[derive(Clone)] -pub struct ShredSigVerifier { - pubkey: Pubkey, // TODO: Hot swap will change pubkey. +#[allow(clippy::enum_variant_names)] +enum Error { + RecvDisconnected, + RecvTimeout, + SendError, +} + +pub(crate) fn spawn_shred_sigverify( + // TODO: Hot swap will change pubkey. + self_pubkey: Pubkey, bank_forks: Arc>, leader_schedule_cache: Arc, - recycler_cache: RecyclerCache, + shred_fetch_receiver: Receiver, retransmit_sender: Sender>>, - packet_sender: Sender>, + verified_sender: Sender>, turbine_disabled: Arc, +) -> JoinHandle<()> { + let recycler_cache = RecyclerCache::warmed(); + let mut stats = ShredSigVerifyStats::new(Instant::now()); + Builder::new() + .name("shred-verifier".to_string()) + .spawn(move || loop { + match run_shred_sigverify( + &self_pubkey, + &bank_forks, + &leader_schedule_cache, + &recycler_cache, + &shred_fetch_receiver, + &retransmit_sender, + &verified_sender, + &turbine_disabled, + &mut stats, + ) { + Ok(()) => (), + Err(Error::RecvTimeout) => (), + Err(Error::RecvDisconnected) => break, + Err(Error::SendError) => break, + } + stats.maybe_submit(); + }) + .unwrap() } -impl ShredSigVerifier { - pub fn new( - pubkey: Pubkey, - bank_forks: Arc>, - leader_schedule_cache: Arc, - retransmit_sender: Sender>>, - packet_sender: Sender>, - turbine_disabled: Arc, - ) -> Self { - sigverify::init(); - Self { - pubkey, - bank_forks, - leader_schedule_cache, - recycler_cache: RecyclerCache::warmed(), - retransmit_sender, - packet_sender, - turbine_disabled, - } - } -} - -impl SigVerifier for ShredSigVerifier { - type SendType = Vec; - - fn send_packets( - &mut self, - packet_batches: Vec, - ) -> Result<(), SigVerifyServiceError> { - if self.turbine_disabled.load(Ordering::Relaxed) { - return Ok(()); - } - // Exclude repair packets from retransmit. - // TODO: return the error here! - let _ = self.retransmit_sender.send( - packet_batches - .iter() - .flat_map(PacketBatch::iter) - .filter(|packet| !packet.meta.discard() && !packet.meta.repair()) - .filter_map(shred::layout::get_shred) - .map(<[u8]>::to_vec) - .collect(), - ); - self.packet_sender.send(packet_batches)?; - Ok(()) - } - - fn verify_batches( - &self, - mut batches: Vec, - _valid_packets: usize, - ) -> Vec { - let working_bank = self.bank_forks.read().unwrap().working_bank(); - let leader_slots: HashMap = get_slot_leaders( - &self.pubkey, - &mut batches, - &self.leader_schedule_cache, - &working_bank, - ) - .into_iter() - .filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes()))) - .chain(std::iter::once((Slot::MAX, [0u8; 32]))) +fn run_shred_sigverify( + self_pubkey: &Pubkey, + bank_forks: &RwLock, + leader_schedule_cache: &LeaderScheduleCache, + recycler_cache: &RecyclerCache, + shred_fetch_receiver: &Receiver, + retransmit_sender: &Sender>>, + verified_sender: &Sender>, + turbine_disabled: &AtomicBool, + stats: &mut ShredSigVerifyStats, +) -> Result<(), Error> { + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let packets = shred_fetch_receiver.recv_timeout(RECV_TIMEOUT)?; + let mut packets: Vec<_> = std::iter::once(packets) + .chain(shred_fetch_receiver.try_iter()) .collect(); - let r = verify_shreds_gpu(&batches, &leader_slots, &self.recycler_cache); - solana_perf::sigverify::mark_disabled(&mut batches, &r); - batches + let now = Instant::now(); + stats.num_iters += 1; + stats.num_packets += packets.iter().map(PacketBatch::len).sum::(); + stats.num_discards_pre += count_discards(&packets); + verify_packets( + self_pubkey, + bank_forks, + leader_schedule_cache, + recycler_cache, + &mut packets, + ); + stats.num_discards_post += count_discards(&packets); + // Exclude repair packets from retransmit. + let shreds: Vec<_> = packets + .iter() + .flat_map(PacketBatch::iter) + .filter(|packet| !packet.meta.discard() && !packet.meta.repair()) + .filter_map(shred::layout::get_shred) + .map(<[u8]>::to_vec) + .collect(); + stats.num_retransmit_shreds += shreds.len(); + if !turbine_disabled.load(Ordering::Relaxed) { + retransmit_sender.send(shreds)?; + verified_sender.send(packets)?; } + stats.elapsed_micros += now.elapsed().as_micros() as u64; + Ok(()) +} + +fn verify_packets( + self_pubkey: &Pubkey, + bank_forks: &RwLock, + leader_schedule_cache: &LeaderScheduleCache, + recycler_cache: &RecyclerCache, + packets: &mut [PacketBatch], +) { + let working_bank = bank_forks.read().unwrap().working_bank(); + let leader_slots: HashMap = + get_slot_leaders(self_pubkey, packets, leader_schedule_cache, &working_bank) + .into_iter() + .filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes()))) + .chain(std::iter::once((Slot::MAX, [0u8; 32]))) + .collect(); + let out = verify_shreds_gpu(packets, &leader_slots, recycler_cache); + solana_perf::sigverify::mark_disabled(packets, &out); } // Returns pubkey of leaders for shred slots refrenced in the packets. @@ -139,11 +161,75 @@ fn get_slot_leaders( leaders } +fn count_discards(packets: &[PacketBatch]) -> usize { + packets + .iter() + .flat_map(PacketBatch::iter) + .filter(|packet| packet.meta.discard()) + .count() +} + +impl From for Error { + fn from(err: RecvTimeoutError) -> Self { + match err { + RecvTimeoutError::Timeout => Self::RecvTimeout, + RecvTimeoutError::Disconnected => Self::RecvDisconnected, + } + } +} + +impl From> for Error { + fn from(_: SendError) -> Self { + Self::SendError + } +} + +struct ShredSigVerifyStats { + since: Instant, + num_iters: usize, + num_packets: usize, + num_discards_pre: usize, + num_discards_post: usize, + num_retransmit_shreds: usize, + elapsed_micros: u64, +} + +impl ShredSigVerifyStats { + const METRICS_SUBMIT_CADENCE: Duration = Duration::from_secs(2); + + fn new(now: Instant) -> Self { + Self { + since: now, + num_iters: 0usize, + num_packets: 0usize, + num_discards_pre: 0usize, + num_discards_post: 0usize, + num_retransmit_shreds: 0usize, + elapsed_micros: 0u64, + } + } + + fn maybe_submit(&mut self) { + if self.since.elapsed() <= Self::METRICS_SUBMIT_CADENCE { + return; + } + datapoint_info!( + "shred_sigverify", + ("num_iters", self.num_iters, i64), + ("num_packets", self.num_packets, i64), + ("num_discards_pre", self.num_discards_pre, i64), + ("num_discards_post", self.num_discards_post, i64), + ("num_retransmit_shreds", self.num_retransmit_shreds, i64), + ("elapsed_micros", self.elapsed_micros, i64), + ); + *self = Self::new(Instant::now()); + } +} + #[cfg(test)] -pub mod tests { +mod tests { use { super::*, - crossbeam_channel::unbounded, solana_ledger::{ genesis_utils::create_genesis_config_with_leader, shred::{Shred, ShredFlags}, @@ -160,18 +246,8 @@ pub mod tests { let bank = Bank::new_for_tests( &create_genesis_config_with_leader(100, &leader_pubkey, 10).genesis_config, ); - let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let bf = Arc::new(RwLock::new(BankForks::new(bank))); - let (sender, receiver) = unbounded(); - let (retransmit_sender, _retransmit_receiver) = unbounded(); - let mut verifier = ShredSigVerifier::new( - Pubkey::new_unique(), - bf, - cache, - retransmit_sender, - sender, - Arc::::default(), // turbine_disabled - ); + let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); + let bank_forks = RwLock::new(BankForks::new(bank)); let batch_size = 2; let mut batch = PacketBatch::with_capacity(batch_size); batch.resize(batch_size, Packet::default()); @@ -206,20 +282,14 @@ pub mod tests { batches[0][1].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); batches[0][1].meta.size = shred.payload().len(); - let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches); - let rv = verifier.verify_batches(batches, num_packets); - assert!(!rv[0][0].meta.discard()); - assert!(rv[0][1].meta.discard()); - - verifier.send_packets(rv.clone()).unwrap(); - let received_packets = receiver.recv().unwrap(); - assert_eq!(received_packets.len(), rv.len()); - for (received_packet_batch, original_packet_batch) in received_packets.iter().zip(rv.iter()) - { - assert_eq!( - received_packet_batch.iter().collect::>(), - original_packet_batch.iter().collect::>() - ); - } + verify_packets( + &Pubkey::new_unique(), // self_pubkey + &bank_forks, + &leader_schedule_cache, + &RecyclerCache::warmed(), + &mut batches, + ); + assert!(!batches[0][0].meta.discard()); + assert!(batches[0][1].meta.discard()); } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index fe7d1d94b..e6deed99f 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -20,8 +20,7 @@ use { retransmit_stage::RetransmitStage, rewards_recorder_service::RewardsRecorderSender, shred_fetch_stage::ShredFetchStage, - sigverify_shreds::ShredSigVerifier, - sigverify_stage::SigVerifyStage, + sigverify_shreds, tower_storage::TowerStorage, validator::ProcessBlockStore, voting_service::VotingService, @@ -56,13 +55,13 @@ use { collections::HashSet, net::UdpSocket, sync::{atomic::AtomicBool, Arc, RwLock}, - thread, + thread::{self, JoinHandle}, }, }; pub struct Tvu { fetch_stage: ShredFetchStage, - sigverify_stage: SigVerifyStage, + shred_sigverify: JoinHandle<()>, retransmit_stage: RetransmitStage, window_service: WindowService, cluster_slots_service: ClusterSlotsService, @@ -163,17 +162,14 @@ impl Tvu { let (verified_sender, verified_receiver) = unbounded(); let (retransmit_sender, retransmit_receiver) = unbounded(); - let sigverify_stage = SigVerifyStage::new( + let shred_sigverify = sigverify_shreds::spawn_shred_sigverify( + cluster_info.id(), + bank_forks.clone(), + leader_schedule_cache.clone(), fetch_receiver, - ShredSigVerifier::new( - cluster_info.id(), - bank_forks.clone(), - leader_schedule_cache.clone(), - retransmit_sender.clone(), - verified_sender, - turbine_disabled, - ), - "shred-verifier", + retransmit_sender.clone(), + verified_sender, + turbine_disabled, ); let retransmit_stage = RetransmitStage::new( @@ -319,7 +315,7 @@ impl Tvu { Tvu { fetch_stage, - sigverify_stage, + shred_sigverify, retransmit_stage, window_service, cluster_slots_service, @@ -338,7 +334,7 @@ impl Tvu { self.window_service.join()?; self.cluster_slots_service.join()?; self.fetch_stage.join()?; - self.sigverify_stage.join()?; + self.shred_sigverify.join()?; if self.ledger_cleanup_service.is_some() { self.ledger_cleanup_service.unwrap().join()?; }