diff --git a/core/benches/cluster_nodes.rs b/core/benches/cluster_nodes.rs index f22da675e..10900c0be 100644 --- a/core/benches/cluster_nodes.rs +++ b/core/benches/cluster_nodes.rs @@ -51,7 +51,7 @@ fn get_retransmit_peers_deterministic( ); let (_root_distance, _neighbors, _children) = cluster_nodes.get_retransmit_peers( slot_leader, - &shred, + &shred.id(), root_bank, solana_gossip::cluster_info::DATA_PLANE_FANOUT, ); diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 05f0f4672..1b460ea03 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -155,7 +155,8 @@ fn bench_retransmitter(bencher: &mut Bencher) { shred.set_index(index); index += 1; index %= 200; - let _ = shreds_sender.send(vec![shred.clone()]); + let shred = shred.payload().clone(); + let _ = shreds_sender.send(vec![shred]); } slot += 1; diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 82b95d9a9..18ab25a0b 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -419,7 +419,7 @@ pub fn broadcast_shreds( let root_bank = root_bank.clone(); shreds.flat_map(move |shred| { repeat(shred.payload()).zip(cluster_nodes.get_broadcast_addrs( - shred, + &shred.id(), &root_bank, DATA_PLANE_FANOUT, socket_addr_space, diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 2567e5997..741be826c 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -303,7 +303,12 @@ impl BroadcastRun for BroadcastDuplicatesRun { .iter() .filter_map(|shred| { let addr = cluster_nodes - .get_broadcast_addrs(shred, &root_bank, DATA_PLANE_FANOUT, socket_addr_space) + .get_broadcast_addrs( + &shred.id(), + &root_bank, + DATA_PLANE_FANOUT, + socket_addr_space, + ) .first() .copied()?; let node = nodes.iter().find(|node| node.tvu == addr)?; diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 795f7897a..f83175a99 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -12,7 +12,7 @@ use { crds_value::{CrdsData, CrdsValue}, weighted_shuffle::WeightedShuffle, }, - solana_ledger::shred::Shred, + solana_ledger::shred::ShredId, solana_runtime::bank::Bank, solana_sdk::{ clock::{Epoch, Slot}, @@ -116,13 +116,13 @@ impl ClusterNodes { pub(crate) fn get_broadcast_addrs( &self, - shred: &Shred, + shred: &ShredId, root_bank: &Bank, fanout: usize, socket_addr_space: &SocketAddrSpace, ) -> Vec { const MAX_CONTACT_INFO_AGE: Duration = Duration::from_secs(2 * 60); - let shred_seed = shred.id().seed(&self.pubkey); + let shred_seed = shred.seed(&self.pubkey); let mut rng = ChaChaRng::from_seed(shred_seed); let index = match self.weighted_shuffle.first(&mut rng) { None => return Vec::default(), @@ -177,7 +177,7 @@ impl ClusterNodes { pub(crate) fn get_retransmit_addrs( &self, slot_leader: &Pubkey, - shred: &Shred, + shred: &ShredId, root_bank: &Bank, fanout: usize, ) -> (/*root_distance:*/ usize, Vec) { @@ -213,7 +213,7 @@ impl ClusterNodes { pub fn get_retransmit_peers( &self, slot_leader: &Pubkey, - shred: &Shred, + shred: &ShredId, root_bank: &Bank, fanout: usize, ) -> ( @@ -221,7 +221,7 @@ impl ClusterNodes { Vec<&Node>, // neighbors Vec<&Node>, // children ) { - let shred_seed = shred.id().seed(slot_leader); + let shred_seed = shred.seed(slot_leader); let mut weighted_shuffle = self.weighted_shuffle.clone(); // Exclude slot leader from list of nodes. if slot_leader == &self.pubkey { diff --git a/core/src/packet_hasher.rs b/core/src/packet_hasher.rs index 871062dcb..58d40ec4b 100644 --- a/core/src/packet_hasher.rs +++ b/core/src/packet_hasher.rs @@ -4,7 +4,6 @@ use { ahash::AHasher, rand::{thread_rng, Rng}, - solana_ledger::shred::Shred, solana_perf::packet::Packet, std::hash::Hasher, }; @@ -29,8 +28,8 @@ impl PacketHasher { self.hash_data(packet.data(..).unwrap_or_default()) } - pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 { - self.hash_data(shred.payload()) + pub(crate) fn hash_shred(&self, shred: &[u8]) -> u64 { + self.hash_data(shred) } fn hash_data(&self, data: &[u8]) -> u64 { diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index b8f78c6a6..94fab21bc 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -25,7 +25,7 @@ use { solana_ledger::{ blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache, - shred::{Shred, ShredId}, + shred::{self, ShredId}, }, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, @@ -136,11 +136,11 @@ type ShredFilter = LruCache>; // Returns true if shred is already received and should skip retransmit. fn should_skip_retransmit( - shred: &Shred, + key: ShredId, + shred: &[u8], shreds_received: &mut ShredFilter, packet_hasher: &PacketHasher, ) -> bool { - let key = shred.id(); match shreds_received.get_mut(&key) { Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true, Some(sent) => { @@ -179,7 +179,7 @@ fn retransmit( bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, cluster_info: &ClusterInfo, - shreds_receiver: &Receiver>, + shreds_receiver: &Receiver>>, sockets: &[UdpSocket], stats: &mut RetransmitStats, cluster_nodes_cache: &ClusterNodesCache, @@ -211,15 +211,16 @@ fn retransmit( // Lookup slot leader and cluster nodes for each slot. let shreds: Vec<_> = shreds .into_iter() - .filter(|shred| { - if should_skip_retransmit(shred, shreds_received, packet_hasher) { + .filter_map(|shred| { + let key = shred::layout::get_shred_id(&shred)?; + if should_skip_retransmit(key, &shred, shreds_received, packet_hasher) { stats.num_shreds_skipped += 1; - false + None } else { - true + Some((key, shred)) } }) - .into_group_map_by(Shred::slot) + .into_group_map_by(|(key, _shred)| key.slot()) .into_iter() .filter_map(|(slot, shreds)| { max_slots.retransmit.fetch_max(slot, Ordering::Relaxed); @@ -255,8 +256,9 @@ fn retransmit( shreds .into_iter() .enumerate() - .map(|(index, (shred, slot_leader, cluster_nodes))| { + .map(|(index, ((key, shred), slot_leader, cluster_nodes))| { let (root_distance, num_nodes) = retransmit_shred( + &key, &shred, &slot_leader, &root_bank, @@ -265,16 +267,17 @@ fn retransmit( &sockets[index % sockets.len()], stats, ); - (shred.slot(), root_distance, num_nodes) + (key.slot(), root_distance, num_nodes) }) .fold(HashMap::new(), record) } else { thread_pool.install(|| { shreds .into_par_iter() - .map(|(shred, slot_leader, cluster_nodes)| { + .map(|((key, shred), slot_leader, cluster_nodes)| { let index = thread_pool.current_thread_index().unwrap(); let (root_distance, num_nodes) = retransmit_shred( + &key, &shred, &slot_leader, &root_bank, @@ -283,7 +286,7 @@ fn retransmit( &sockets[index % sockets.len()], stats, ); - (shred.slot(), root_distance, num_nodes) + (key.slot(), root_distance, num_nodes) }) .fold(HashMap::new, record) .reduce(HashMap::new, RetransmitSlotStats::merge) @@ -297,7 +300,8 @@ fn retransmit( } fn retransmit_shred( - shred: &Shred, + key: &ShredId, + shred: &[u8], slot_leader: &Pubkey, root_bank: &Bank, cluster_nodes: &ClusterNodes, @@ -307,7 +311,7 @@ fn retransmit_shred( ) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) { let mut compute_turbine_peers = Measure::start("turbine_start"); let (root_distance, addrs) = - cluster_nodes.get_retransmit_addrs(slot_leader, shred, root_bank, DATA_PLANE_FANOUT); + cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, DATA_PLANE_FANOUT); let addrs: Vec<_> = addrs .into_iter() .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) @@ -318,7 +322,7 @@ fn retransmit_shred( .fetch_add(compute_turbine_peers.as_us(), Ordering::Relaxed); let mut retransmit_time = Measure::start("retransmit_to"); - let num_nodes = match multi_target_send(socket, shred.payload(), &addrs) { + let num_nodes = match multi_target_send(socket, shred, &addrs) { Ok(()) => addrs.len(), Err(SendPktsError::IoError(ioerr, num_failed)) => { stats @@ -354,7 +358,7 @@ pub fn retransmitter( bank_forks: Arc>, leader_schedule_cache: Arc, cluster_info: Arc, - shreds_receiver: Receiver>, + shreds_receiver: Receiver>>, max_slots: Arc, rpc_subscriptions: Option>, ) -> JoinHandle<()> { @@ -630,7 +634,10 @@ impl RetransmitSlotStats { #[cfg(test)] mod tests { - use {super::*, solana_ledger::shred::ShredFlags}; + use { + super::*, + solana_ledger::shred::{Shred, ShredFlags}, + }; #[test] fn test_already_received() { @@ -651,13 +658,15 @@ mod tests { let packet_hasher = PacketHasher::default(); // unique shred for (1, 5) should pass assert!(!should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); // duplicate shred for (1, 5) blocked assert!(should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); @@ -674,13 +683,15 @@ mod tests { ); // first duplicate shred for (1, 5) passed assert!(!should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); // then blocked assert!(should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); @@ -697,12 +708,14 @@ mod tests { ); // 2nd duplicate shred for (1, 5) blocked assert!(should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); assert!(should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); @@ -710,13 +723,15 @@ mod tests { let shred = Shred::new_from_parity_shard(slot, index, &[], 0, 1, 1, 0, version); // Coding at (1, 5) passes assert!(!should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); // then blocked assert!(should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); @@ -724,13 +739,15 @@ mod tests { let shred = Shred::new_from_parity_shard(slot, index, &[], 2, 1, 1, 0, version); // 2nd unique coding at (1, 5) passes assert!(!should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); // same again is blocked assert!(should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); @@ -738,12 +755,14 @@ mod tests { let shred = Shred::new_from_parity_shard(slot, index, &[], 3, 1, 1, 0, version); // Another unique coding at (1, 5) always blocked assert!(should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); assert!(should_skip_retransmit( - &shred, + shred.id(), + shred.payload(), &mut shreds_received, &packet_hasher )); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 9b7fdff58..55e448df4 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -16,7 +16,7 @@ use { solana_ledger::{ blockstore::{Blockstore, BlockstoreInsertionMetrics}, leader_schedule_cache::LeaderScheduleCache, - shred::{Nonce, Shred}, + shred::{self, Nonce, Shred}, }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, @@ -36,6 +36,7 @@ use { }, }; +type ShredPayload = Vec; type DuplicateSlotSender = Sender; pub(crate) type DuplicateSlotReceiver = Receiver; @@ -108,8 +109,8 @@ impl WindowServiceMetrics { struct ReceiveWindowStats { num_iters: usize, num_packets: usize, - num_shreds: usize, // num_discards: num_packets - num_shreds num_repairs: usize, + num_shreds: usize, // num_discards: num_packets - num_shreds elapsed: Duration, // excludes waiting time on the receiver channel. addrs: HashMap, since: Option, @@ -229,14 +230,14 @@ fn prune_shreds_invalid_repair( } fn run_insert( - shred_receiver: &Receiver<(Vec, Vec>)>, + shred_receiver: &Receiver<(Vec, Vec>)>, blockstore: &Blockstore, leader_schedule_cache: &LeaderScheduleCache, handle_duplicate: F, metrics: &mut BlockstoreInsertionMetrics, ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: &CompletedDataSetsSender, - retransmit_sender: &Sender>, + retransmit_sender: &Sender>, outstanding_requests: &RwLock, ) -> Result<()> where @@ -253,7 +254,15 @@ where shred_receiver_elapsed.stop(); ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us(); ws_metrics.num_shreds_received += shreds.len() as u64; - + // TODO: Consider using thread-pool here instead of recv_window. + let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = shreds + .into_iter() + .zip(repair_infos) + .filter_map(|(shred, repair_info)| { + let shred = Shred::new_from_serialized_shred(shred).ok()?; + Some((shred, repair_info)) + }) + .unzip(); let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed"); let num_shreds = shreds.len(); prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests); @@ -285,9 +294,9 @@ where } fn recv_window( - insert_shred_sender: &Sender<(Vec, Vec>)>, + insert_shred_sender: &Sender<(Vec, Vec>)>, verified_receiver: &Receiver>, - retransmit_sender: &Sender>, + retransmit_sender: &Sender>, turbine_disabled: &AtomicBool, thread_pool: &ThreadPool, stats: &mut ReceiveWindowStats, @@ -301,17 +310,16 @@ fn recv_window( if turbine_disabled || packet.meta.discard() { return None; } - let serialized_shred = packet.data(..)?.to_vec(); - let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?; + let shred = shred::layout::get_shred(packet)?; if packet.meta.repair() { let repair_info = RepairMeta { _from_addr: packet.meta.socket_addr(), // If can't parse the nonce, dump the packet. nonce: repair_response::nonce(packet)?, }; - Some((shred, Some(repair_info))) + Some((shred.to_vec(), Some(repair_info))) } else { - Some((shred, None)) + Some((shred.to_vec(), None)) } }; let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { @@ -379,7 +387,7 @@ impl WindowService { pub(crate) fn new( blockstore: Arc, verified_receiver: Receiver>, - retransmit_sender: Sender>, + retransmit_sender: Sender>, repair_socket: Arc, ancestor_hashes_socket: Arc, exit: Arc, @@ -482,10 +490,10 @@ impl WindowService { exit: Arc, blockstore: Arc, leader_schedule_cache: Arc, - insert_receiver: Receiver<(Vec, Vec>)>, + insert_receiver: Receiver<(Vec, Vec>)>, check_duplicate_sender: Sender, completed_data_sets_sender: CompletedDataSetsSender, - retransmit_sender: Sender>, + retransmit_sender: Sender>, outstanding_requests: Arc>, ) -> JoinHandle<()> { let mut handle_timeout = || {}; @@ -540,10 +548,10 @@ impl WindowService { fn start_recv_window_thread( id: Pubkey, exit: Arc, - insert_sender: Sender<(Vec, Vec>)>, + insert_sender: Sender<(Vec, Vec>)>, verified_receiver: Receiver>, turbine_disabled: Arc, - retransmit_sender: Sender>, + retransmit_sender: Sender>, ) -> JoinHandle<()> { let mut stats = ReceiveWindowStats::default(); Builder::new() diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 0ecdaeead..065bb792f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -798,7 +798,7 @@ impl Blockstore { is_repaired: Vec, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, - retransmit_sender: Option<&Sender>>, + retransmit_sender: Option<&Sender>>>, handle_duplicate: &F, metrics: &mut BlockstoreInsertionMetrics, ) -> Result<(Vec, Vec)> @@ -937,7 +937,12 @@ impl Blockstore { .collect(); if !recovered_data_shreds.is_empty() { if let Some(retransmit_sender) = retransmit_sender { - let _ = retransmit_sender.send(recovered_data_shreds); + let _ = retransmit_sender.send( + recovered_data_shreds + .into_iter() + .map(Shred::into_payload) + .collect(), + ); } } } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 6e870abeb..446f8d5d4 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -215,7 +215,7 @@ pub enum Shred { } /// Tuple which uniquely identifies a shred should it exists. -#[derive(Clone, Copy, Eq, Hash, PartialEq)] +#[derive(Clone, Copy, Eq, Debug, Hash, PartialEq)] pub struct ShredId(Slot, /*shred index:*/ u32, ShredType); impl ShredId { @@ -223,6 +223,10 @@ impl ShredId { ShredId(slot, index, shred_type) } + pub fn slot(&self) -> Slot { + self.0 + } + pub(crate) fn unwrap(&self) -> (Slot, /*shred index:*/ u32, ShredType) { (self.0, self.1, self.2) } @@ -549,17 +553,20 @@ pub mod layout { ShredVariant::try_from(shred_variant).map_err(|_| Error::InvalidShredVariant) } + #[inline] pub(super) fn get_shred_type(shred: &[u8]) -> Result { let shred_variant = get_shred_variant(shred)?; Ok(ShredType::from(shred_variant)) } + #[inline] pub fn get_slot(shred: &[u8]) -> Option { <[u8; 8]>::try_from(shred.get(OFFSET_OF_SHRED_SLOT..)?.get(..8)?) .map(Slot::from_le_bytes) .ok() } + #[inline] pub(super) fn get_index(shred: &[u8]) -> Option { <[u8; 4]>::try_from(shred.get(OFFSET_OF_SHRED_INDEX..)?.get(..4)?) .map(u32::from_le_bytes) @@ -582,6 +589,15 @@ pub mod layout { .ok() } + #[inline] + pub fn get_shred_id(shred: &[u8]) -> Option { + Some(ShredId( + get_slot(shred)?, + get_index(shred)?, + get_shred_type(shred).ok()?, + )) + } + // Returns slice range of the shred payload which is signed. pub(crate) fn get_signed_message_range(shred: &[u8]) -> Option> { let range = match get_shred_variant(shred).ok()? { @@ -1219,9 +1235,13 @@ mod tests { fn verify_shred_layout(shred: &Shred, packet: &Packet) { let data = layout::get_shred(packet).unwrap(); + assert_eq!(data, packet.data(..).unwrap()); assert_eq!(layout::get_slot(data), Some(shred.slot())); assert_eq!(layout::get_index(data), Some(shred.index())); assert_eq!(layout::get_version(data), Some(shred.version())); + assert_eq!(layout::get_shred_id(data), Some(shred.id())); + assert_eq!(layout::get_signature(data), Some(shred.signature())); + assert_eq!(layout::get_shred_type(data).unwrap(), shred.shred_type()); match shred.shred_type() { ShredType::Code => { assert_matches!(