From 6f4838719b2127b7356b3e91ca1983c4e7e16c08 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 7 Jul 2022 11:13:13 +0000 Subject: [PATCH] decouples shreds sig-verify from tpu vote and transaction packets (#26300) Shreds have different workload and traffic pattern from TPU vote and transaction packets. Some of recent changes to SigVerifyStage are not suitable or at least optimal for shreds sig-verify; e.g. random discard, dedup with false positives, discard excess by IP-address, ... SigVerifier trait is meant to abstract out the distinctions between the two pipelines, but in practice it has led to more verbose and convoluted code. This commit discards SigVerifier implementation for shreds sig-verify and instead provides a standalone stage for verifying shreds signatures. --- core/src/shred_fetch_stage.rs | 10 +- core/src/sigverify_shreds.rs | 282 +++++++++++++++++++++------------- core/src/tvu.rs | 28 ++-- 3 files changed, 193 insertions(+), 127 deletions(-) diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index a0e836724..78b5e15b9 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -28,7 +28,7 @@ impl ShredFetchStage { // updates packets received on a channel and sends them on another channel fn modify_packets( recvr: PacketBatchReceiver, - sendr: Sender>, + sendr: Sender, bank_forks: &RwLock, shred_version: u16, name: &'static str, @@ -46,7 +46,7 @@ impl ShredFetchStage { let mut stats = ShredFetchStats::default(); let mut packet_hasher = PacketHasher::default(); - while let Some(mut packet_batch) = recvr.iter().next() { + for mut packet_batch in recvr { if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT { last_updated = Instant::now(); packet_hasher.reset(); @@ -79,7 +79,7 @@ impl ShredFetchStage { } } stats.maybe_submit(name, STATS_SUBMIT_CADENCE); - if sendr.send(vec![packet_batch]).is_err() { + if sendr.send(packet_batch).is_err() { break; } } @@ -88,7 +88,7 @@ impl ShredFetchStage { fn packet_modifier( sockets: Vec>, exit: &Arc, - sender: Sender>, + sender: Sender, recycler: PacketBatchRecycler, bank_forks: Arc>, shred_version: u16, @@ -132,7 +132,7 @@ impl ShredFetchStage { sockets: Vec>, forward_sockets: Vec>, repair_socket: Arc, - sender: Sender>, + sender: Sender, shred_version: u16, bank_forks: Arc>, exit: &Arc, diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index b32d045bc..f9a50ab8b 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -1,11 +1,5 @@ -#![allow(clippy::implicit_hasher)] - use { - crate::{ - sigverify, - sigverify_stage::{SigVerifier, SigVerifyServiceError}, - }, - crossbeam_channel::Sender, + crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu, }, @@ -18,87 +12,115 @@ use { atomic::{AtomicBool, Ordering}, Arc, RwLock, }, + thread::{Builder, JoinHandle}, + time::{Duration, Instant}, }, }; -#[derive(Clone)] -pub struct ShredSigVerifier { - pubkey: Pubkey, // TODO: Hot swap will change pubkey. +#[allow(clippy::enum_variant_names)] +enum Error { + RecvDisconnected, + RecvTimeout, + SendError, +} + +pub(crate) fn spawn_shred_sigverify( + // TODO: Hot swap will change pubkey. + self_pubkey: Pubkey, bank_forks: Arc>, leader_schedule_cache: Arc, - recycler_cache: RecyclerCache, + shred_fetch_receiver: Receiver, retransmit_sender: Sender>>, - packet_sender: Sender>, + verified_sender: Sender>, turbine_disabled: Arc, +) -> JoinHandle<()> { + let recycler_cache = RecyclerCache::warmed(); + let mut stats = ShredSigVerifyStats::new(Instant::now()); + Builder::new() + .name("shred-verifier".to_string()) + .spawn(move || loop { + match run_shred_sigverify( + &self_pubkey, + &bank_forks, + &leader_schedule_cache, + &recycler_cache, + &shred_fetch_receiver, + &retransmit_sender, + &verified_sender, + &turbine_disabled, + &mut stats, + ) { + Ok(()) => (), + Err(Error::RecvTimeout) => (), + Err(Error::RecvDisconnected) => break, + Err(Error::SendError) => break, + } + stats.maybe_submit(); + }) + .unwrap() } -impl ShredSigVerifier { - pub fn new( - pubkey: Pubkey, - bank_forks: Arc>, - leader_schedule_cache: Arc, - retransmit_sender: Sender>>, - packet_sender: Sender>, - turbine_disabled: Arc, - ) -> Self { - sigverify::init(); - Self { - pubkey, - bank_forks, - leader_schedule_cache, - recycler_cache: RecyclerCache::warmed(), - retransmit_sender, - packet_sender, - turbine_disabled, - } - } -} - -impl SigVerifier for ShredSigVerifier { - type SendType = Vec; - - fn send_packets( - &mut self, - packet_batches: Vec, - ) -> Result<(), SigVerifyServiceError> { - if self.turbine_disabled.load(Ordering::Relaxed) { - return Ok(()); - } - // Exclude repair packets from retransmit. - // TODO: return the error here! - let _ = self.retransmit_sender.send( - packet_batches - .iter() - .flat_map(PacketBatch::iter) - .filter(|packet| !packet.meta.discard() && !packet.meta.repair()) - .filter_map(shred::layout::get_shred) - .map(<[u8]>::to_vec) - .collect(), - ); - self.packet_sender.send(packet_batches)?; - Ok(()) - } - - fn verify_batches( - &self, - mut batches: Vec, - _valid_packets: usize, - ) -> Vec { - let working_bank = self.bank_forks.read().unwrap().working_bank(); - let leader_slots: HashMap = get_slot_leaders( - &self.pubkey, - &mut batches, - &self.leader_schedule_cache, - &working_bank, - ) - .into_iter() - .filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes()))) - .chain(std::iter::once((Slot::MAX, [0u8; 32]))) +fn run_shred_sigverify( + self_pubkey: &Pubkey, + bank_forks: &RwLock, + leader_schedule_cache: &LeaderScheduleCache, + recycler_cache: &RecyclerCache, + shred_fetch_receiver: &Receiver, + retransmit_sender: &Sender>>, + verified_sender: &Sender>, + turbine_disabled: &AtomicBool, + stats: &mut ShredSigVerifyStats, +) -> Result<(), Error> { + const RECV_TIMEOUT: Duration = Duration::from_secs(1); + let packets = shred_fetch_receiver.recv_timeout(RECV_TIMEOUT)?; + let mut packets: Vec<_> = std::iter::once(packets) + .chain(shred_fetch_receiver.try_iter()) .collect(); - let r = verify_shreds_gpu(&batches, &leader_slots, &self.recycler_cache); - solana_perf::sigverify::mark_disabled(&mut batches, &r); - batches + let now = Instant::now(); + stats.num_iters += 1; + stats.num_packets += packets.iter().map(PacketBatch::len).sum::(); + stats.num_discards_pre += count_discards(&packets); + verify_packets( + self_pubkey, + bank_forks, + leader_schedule_cache, + recycler_cache, + &mut packets, + ); + stats.num_discards_post += count_discards(&packets); + // Exclude repair packets from retransmit. + let shreds: Vec<_> = packets + .iter() + .flat_map(PacketBatch::iter) + .filter(|packet| !packet.meta.discard() && !packet.meta.repair()) + .filter_map(shred::layout::get_shred) + .map(<[u8]>::to_vec) + .collect(); + stats.num_retransmit_shreds += shreds.len(); + if !turbine_disabled.load(Ordering::Relaxed) { + retransmit_sender.send(shreds)?; + verified_sender.send(packets)?; } + stats.elapsed_micros += now.elapsed().as_micros() as u64; + Ok(()) +} + +fn verify_packets( + self_pubkey: &Pubkey, + bank_forks: &RwLock, + leader_schedule_cache: &LeaderScheduleCache, + recycler_cache: &RecyclerCache, + packets: &mut [PacketBatch], +) { + let working_bank = bank_forks.read().unwrap().working_bank(); + let leader_slots: HashMap = + get_slot_leaders(self_pubkey, packets, leader_schedule_cache, &working_bank) + .into_iter() + .filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes()))) + .chain(std::iter::once((Slot::MAX, [0u8; 32]))) + .collect(); + let out = verify_shreds_gpu(packets, &leader_slots, recycler_cache); + solana_perf::sigverify::mark_disabled(packets, &out); } // Returns pubkey of leaders for shred slots refrenced in the packets. @@ -139,11 +161,75 @@ fn get_slot_leaders( leaders } +fn count_discards(packets: &[PacketBatch]) -> usize { + packets + .iter() + .flat_map(PacketBatch::iter) + .filter(|packet| packet.meta.discard()) + .count() +} + +impl From for Error { + fn from(err: RecvTimeoutError) -> Self { + match err { + RecvTimeoutError::Timeout => Self::RecvTimeout, + RecvTimeoutError::Disconnected => Self::RecvDisconnected, + } + } +} + +impl From> for Error { + fn from(_: SendError) -> Self { + Self::SendError + } +} + +struct ShredSigVerifyStats { + since: Instant, + num_iters: usize, + num_packets: usize, + num_discards_pre: usize, + num_discards_post: usize, + num_retransmit_shreds: usize, + elapsed_micros: u64, +} + +impl ShredSigVerifyStats { + const METRICS_SUBMIT_CADENCE: Duration = Duration::from_secs(2); + + fn new(now: Instant) -> Self { + Self { + since: now, + num_iters: 0usize, + num_packets: 0usize, + num_discards_pre: 0usize, + num_discards_post: 0usize, + num_retransmit_shreds: 0usize, + elapsed_micros: 0u64, + } + } + + fn maybe_submit(&mut self) { + if self.since.elapsed() <= Self::METRICS_SUBMIT_CADENCE { + return; + } + datapoint_info!( + "shred_sigverify", + ("num_iters", self.num_iters, i64), + ("num_packets", self.num_packets, i64), + ("num_discards_pre", self.num_discards_pre, i64), + ("num_discards_post", self.num_discards_post, i64), + ("num_retransmit_shreds", self.num_retransmit_shreds, i64), + ("elapsed_micros", self.elapsed_micros, i64), + ); + *self = Self::new(Instant::now()); + } +} + #[cfg(test)] -pub mod tests { +mod tests { use { super::*, - crossbeam_channel::unbounded, solana_ledger::{ genesis_utils::create_genesis_config_with_leader, shred::{Shred, ShredFlags}, @@ -160,18 +246,8 @@ pub mod tests { let bank = Bank::new_for_tests( &create_genesis_config_with_leader(100, &leader_pubkey, 10).genesis_config, ); - let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let bf = Arc::new(RwLock::new(BankForks::new(bank))); - let (sender, receiver) = unbounded(); - let (retransmit_sender, _retransmit_receiver) = unbounded(); - let mut verifier = ShredSigVerifier::new( - Pubkey::new_unique(), - bf, - cache, - retransmit_sender, - sender, - Arc::::default(), // turbine_disabled - ); + let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); + let bank_forks = RwLock::new(BankForks::new(bank)); let batch_size = 2; let mut batch = PacketBatch::with_capacity(batch_size); batch.resize(batch_size, Packet::default()); @@ -206,20 +282,14 @@ pub mod tests { batches[0][1].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); batches[0][1].meta.size = shred.payload().len(); - let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches); - let rv = verifier.verify_batches(batches, num_packets); - assert!(!rv[0][0].meta.discard()); - assert!(rv[0][1].meta.discard()); - - verifier.send_packets(rv.clone()).unwrap(); - let received_packets = receiver.recv().unwrap(); - assert_eq!(received_packets.len(), rv.len()); - for (received_packet_batch, original_packet_batch) in received_packets.iter().zip(rv.iter()) - { - assert_eq!( - received_packet_batch.iter().collect::>(), - original_packet_batch.iter().collect::>() - ); - } + verify_packets( + &Pubkey::new_unique(), // self_pubkey + &bank_forks, + &leader_schedule_cache, + &RecyclerCache::warmed(), + &mut batches, + ); + assert!(!batches[0][0].meta.discard()); + assert!(batches[0][1].meta.discard()); } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index fe7d1d94b..e6deed99f 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -20,8 +20,7 @@ use { retransmit_stage::RetransmitStage, rewards_recorder_service::RewardsRecorderSender, shred_fetch_stage::ShredFetchStage, - sigverify_shreds::ShredSigVerifier, - sigverify_stage::SigVerifyStage, + sigverify_shreds, tower_storage::TowerStorage, validator::ProcessBlockStore, voting_service::VotingService, @@ -56,13 +55,13 @@ use { collections::HashSet, net::UdpSocket, sync::{atomic::AtomicBool, Arc, RwLock}, - thread, + thread::{self, JoinHandle}, }, }; pub struct Tvu { fetch_stage: ShredFetchStage, - sigverify_stage: SigVerifyStage, + shred_sigverify: JoinHandle<()>, retransmit_stage: RetransmitStage, window_service: WindowService, cluster_slots_service: ClusterSlotsService, @@ -163,17 +162,14 @@ impl Tvu { let (verified_sender, verified_receiver) = unbounded(); let (retransmit_sender, retransmit_receiver) = unbounded(); - let sigverify_stage = SigVerifyStage::new( + let shred_sigverify = sigverify_shreds::spawn_shred_sigverify( + cluster_info.id(), + bank_forks.clone(), + leader_schedule_cache.clone(), fetch_receiver, - ShredSigVerifier::new( - cluster_info.id(), - bank_forks.clone(), - leader_schedule_cache.clone(), - retransmit_sender.clone(), - verified_sender, - turbine_disabled, - ), - "shred-verifier", + retransmit_sender.clone(), + verified_sender, + turbine_disabled, ); let retransmit_stage = RetransmitStage::new( @@ -319,7 +315,7 @@ impl Tvu { Tvu { fetch_stage, - sigverify_stage, + shred_sigverify, retransmit_stage, window_service, cluster_slots_service, @@ -338,7 +334,7 @@ impl Tvu { self.window_service.join()?; self.cluster_slots_service.join()?; self.fetch_stage.join()?; - self.sigverify_stage.join()?; + self.shred_sigverify.join()?; if self.ledger_cleanup_service.is_some() { self.ledger_cleanup_service.unwrap().join()?; }