From 348fe9ebe2656da5ce76998d29edcd2a2bf2d7bc Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 28 Jun 2022 12:45:50 +0000 Subject: [PATCH] verifies shred slot and parent in fetch stage (#26225) Shred slot and parent are not verified until window-service where resources are already wasted to sig-verify and deserialize shreds. This commit moves above verification to earlier in the pipeline in fetch stage. --- core/src/repair_response.rs | 10 +- core/src/retransmit_stage.rs | 13 +- core/src/shred_fetch_stage.rs | 43 +++---- core/src/tvu.rs | 4 +- core/src/validator.rs | 4 +- core/src/window_service.rs | 152 ++++-------------------- ledger/src/blockstore.rs | 2 +- ledger/src/shred.rs | 209 +++++++++++++++++++++++++-------- ledger/src/shred/shred_code.rs | 2 +- ledger/src/shred/stats.rs | 2 + local-cluster/tests/common.rs | 2 +- 11 files changed, 227 insertions(+), 216 deletions(-) diff --git a/core/src/repair_response.rs b/core/src/repair_response.rs index 600a58208..52623677a 100644 --- a/core/src/repair_response.rs +++ b/core/src/repair_response.rs @@ -40,9 +40,13 @@ pub fn repair_response_packet_from_bytes( Some(packet) } -pub fn nonce(packet: &Packet) -> Option { - let nonce_start = packet.meta.size.checked_sub(SIZE_OF_NONCE)?; - packet.deserialize_slice(nonce_start..).ok() +pub(crate) fn nonce(packet: &Packet) -> Option { + // Nonces are attached to both repair and ancestor hashes responses. + let data = packet.data(..)?; + let offset = data.len().checked_sub(SIZE_OF_NONCE)?; + <[u8; SIZE_OF_NONCE]>::try_from(&data[offset..]) + .map(Nonce::from_le_bytes) + .ok() } #[cfg(test)] diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index ab2f51ef7..2f6714f5b 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -11,7 +11,7 @@ use { completed_data_sets_service::CompletedDataSetsSender, packet_hasher::PacketHasher, repair_service::{DuplicateSlotsResetSender, RepairInfo}, - window_service::{should_retransmit_and_persist, WindowService}, + window_service::WindowService, }, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, itertools::{izip, Itertools}, @@ -421,7 +421,7 @@ impl RetransmitStage { exit: Arc, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, - turbine_disabled: Option>, + turbine_disabled: Arc, cluster_slots: Arc, duplicate_slots_reset_sender: DuplicateSlotsResetSender, verified_vote_receiver: VerifiedVoteReceiver, @@ -470,14 +470,7 @@ impl RetransmitStage { exit, repair_info, leader_schedule_cache, - move |shred, last_root| { - let turbine_disabled = turbine_disabled - .as_ref() - .map(|x| x.load(Ordering::Relaxed)) - .unwrap_or(false); - let rv = should_retransmit_and_persist(shred, last_root); - rv && !turbine_disabled - }, + turbine_disabled, verified_vote_receiver, completed_data_sets_sender, duplicate_slots_sender, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index f92436506..97fed9e92 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -4,7 +4,7 @@ use { crate::packet_hasher::PacketHasher, crossbeam_channel::{unbounded, Sender}, lru::LruCache, - solana_ledger::shred::{self, get_shred_slot_index_type, ShredFetchStats}, + solana_ledger::shred::{should_discard_shred, ShredFetchStats}, solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, solana_runtime::bank_forks::BankForks, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, @@ -67,6 +67,7 @@ impl ShredFetchStage { for packet in packet_batch.iter_mut() { if should_discard_packet( packet, + last_root, slot_bounds.clone(), shred_version, &packet_hasher, @@ -195,6 +196,7 @@ impl ShredFetchStage { #[must_use] fn should_discard_packet( packet: &Packet, + root: Slot, // Range of slots to ingest shreds for. slot_bounds: impl RangeBounds, shred_version: u16, @@ -202,17 +204,7 @@ fn should_discard_packet( shreds_received: &mut ShredsReceived, stats: &mut ShredFetchStats, ) -> bool { - let slot = match get_shred_slot_index_type(packet, stats) { - None => return true, - Some((slot, _index, _shred_type)) => slot, - }; - if !slot_bounds.contains(&slot) { - stats.slot_out_of_range += 1; - return true; - } - let shred = shred::layout::get_shred(packet); - if shred.and_then(shred::layout::get_version) != Some(shred_version) { - stats.shred_version_mismatch += 1; + if should_discard_shred(packet, root, shred_version, slot_bounds, stats) { return true; } let hash = packet_hasher.hash_packet(packet); @@ -242,12 +234,12 @@ mod tests { let mut packet = Packet::default(); let mut stats = ShredFetchStats::default(); - let slot = 1; + let slot = 2; let shred_version = 45189; let shred = Shred::new_from_data( slot, 3, // shred index - 0, // parent offset + 1, // parent offset &[], // data ShredFlags::LAST_SHRED_IN_SLOT, 0, // reference_tick @@ -264,6 +256,7 @@ mod tests { let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch); assert!(!should_discard_packet( &packet, + last_root, slot_bounds.clone(), shred_version, &hasher, @@ -278,6 +271,7 @@ mod tests { coding[0].copy_to_packet(&mut packet); assert!(!should_discard_packet( &packet, + last_root, slot_bounds, shred_version, &hasher, @@ -303,6 +297,7 @@ mod tests { // packet size is 0, so cannot get index assert!(should_discard_packet( &packet, + last_root, slot_bounds.clone(), shred_version, &hasher, @@ -311,20 +306,21 @@ mod tests { )); assert_eq!(stats.index_overrun, 1); let shred = Shred::new_from_data( - 1, - 3, - 0, - &[], + 2, // slot + 3, // index + 1, // parent_offset + &[], // data ShredFlags::LAST_SHRED_IN_SLOT, - 0, + 0, // reference_tick shred_version, - 0, + 0, // fec_set_index ); shred.copy_to_packet(&mut packet); - // rejected slot is 1, root is 3 + // rejected slot is 2, root is 3 assert!(should_discard_packet( &packet, + 3, 3..slot_bounds.end, shred_version, &hasher, @@ -335,6 +331,7 @@ mod tests { assert!(should_discard_packet( &packet, + last_root, slot_bounds.clone(), 345, // shred_version &hasher, @@ -346,6 +343,7 @@ mod tests { // Accepted for 1,3 assert!(!should_discard_packet( &packet, + last_root, slot_bounds.clone(), shred_version, &hasher, @@ -356,6 +354,7 @@ mod tests { // shreds_received should filter duplicate assert!(should_discard_packet( &packet, + last_root, slot_bounds.clone(), shred_version, &hasher, @@ -379,6 +378,7 @@ mod tests { // Slot 1 million is too high assert!(should_discard_packet( &packet, + last_root, slot_bounds.clone(), shred_version, &hasher, @@ -391,6 +391,7 @@ mod tests { shred.copy_to_packet(&mut packet); assert!(should_discard_packet( &packet, + last_root, slot_bounds, shred_version, &hasher, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index feee075bd..7b4de2cdd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -111,7 +111,7 @@ impl Tvu { leader_schedule_cache: &Arc, exit: &Arc, block_commitment_cache: Arc>, - turbine_disabled: Option>, + turbine_disabled: Arc, transaction_status_sender: Option, rewards_recorder_sender: Option, cache_block_meta_sender: Option, @@ -415,7 +415,7 @@ pub mod tests { &leader_schedule_cache, &exit, block_commitment_cache, - None, + Arc::::default(), None, None, None, diff --git a/core/src/validator.rs b/core/src/validator.rs index be397755e..bf269304c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -134,7 +134,7 @@ pub struct ValidatorConfig { pub snapshot_config: Option, pub max_ledger_shreds: Option, pub broadcast_stage_type: BroadcastStageType, - pub turbine_disabled: Option>, + pub turbine_disabled: Arc, pub enforce_ulimit_nofile: bool, pub fixed_leader_schedule: Option, pub wait_for_supermajority: Option, @@ -196,7 +196,7 @@ impl Default for ValidatorConfig { pubsub_config: PubSubConfig::default(), snapshot_config: None, broadcast_stage_type: BroadcastStageType::Standard, - turbine_disabled: None, + turbine_disabled: Arc::::default(), enforce_ulimit_nofile: true, fixed_leader_schedule: None, wait_for_supermajority: None, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 8101b4e29..9b7fdff58 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -14,12 +14,12 @@ use { rayon::{prelude::*, ThreadPool}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ - blockstore::{self, Blockstore, BlockstoreInsertionMetrics}, + blockstore::{Blockstore, BlockstoreInsertionMetrics}, leader_schedule_cache::LeaderScheduleCache, - shred::{Nonce, Shred, ShredType}, + shred::{Nonce, Shred}, }, solana_measure::measure::Measure, - solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, + solana_metrics::inc_new_counter_error, solana_perf::packet::{Packet, PacketBatch}, solana_rayon_threadlimit::get_thread_count, solana_sdk::{clock::Slot, pubkey::Pubkey}, @@ -106,11 +106,11 @@ impl WindowServiceMetrics { #[derive(Default)] struct ReceiveWindowStats { + num_iters: usize, num_packets: usize, num_shreds: usize, // num_discards: num_packets - num_shreds num_repairs: usize, elapsed: Duration, // excludes waiting time on the receiver channel. - slots: HashMap, addrs: HashMap, since: Option, } @@ -125,18 +125,12 @@ impl ReceiveWindowStats { } datapoint_info!( "receive_window_stats", + ("num_iters", self.num_iters, i64), ("num_packets", self.num_packets, i64), ("num_shreds", self.num_shreds, i64), ("num_repairs", self.num_repairs, i64), ("elapsed_micros", self.elapsed.as_micros(), i64), ); - for (slot, num_shreds) in &self.slots { - datapoint_debug!( - "receive_window_num_slot_shreds", - ("slot", *slot, i64), - ("num_shreds", *num_shreds, i64) - ); - } let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect(); let reverse_count = |(_addr, count): &_| Reverse(*count); if addrs.len() > MAX_NUM_ADDRS { @@ -156,29 +150,6 @@ impl ReceiveWindowStats { } } -fn verify_shred_slot(shred: &Shred, root: u64) -> bool { - match shred.shred_type() { - // Only data shreds have parent information - ShredType::Data => match shred.parent() { - Ok(parent) => blockstore::verify_shred_slots(shred.slot(), parent, root), - Err(_) => false, - }, - // Filter out outdated coding shreds - ShredType::Code => shred.slot() >= root, - } -} - -/// drop shreds that are from myself or not from the correct leader for the -/// shred's slot -pub(crate) fn should_retransmit_and_persist(shred: &Shred, root: u64) -> bool { - if verify_shred_slot(shred, root) { - true - } else { - inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1); - false - } -} - fn run_check_duplicate( cluster_info: &ClusterInfo, blockstore: &Blockstore, @@ -313,33 +284,25 @@ where Ok(()) } -fn recv_window( - blockstore: &Blockstore, +fn recv_window( insert_shred_sender: &Sender<(Vec, Vec>)>, verified_receiver: &Receiver>, retransmit_sender: &Sender>, - shred_filter: F, + turbine_disabled: &AtomicBool, thread_pool: &ThreadPool, stats: &mut ReceiveWindowStats, -) -> Result<()> -where - F: Fn(&Shred, /*last root:*/ Slot) -> bool + Sync, -{ - let timer = Duration::from_millis(200); - let mut packet_batches = verified_receiver.recv_timeout(timer)?; +) -> Result<()> { + const RECV_TIMEOUT: Duration = Duration::from_millis(200); + let mut packet_batches = verified_receiver.recv_timeout(RECV_TIMEOUT)?; packet_batches.extend(verified_receiver.try_iter().flatten()); let now = Instant::now(); - let last_root = blockstore.last_root(); + let turbine_disabled = turbine_disabled.load(Ordering::Relaxed); let handle_packet = |packet: &Packet| { - if packet.meta.discard() { - inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1); + if turbine_disabled || packet.meta.discard() { return None; } let serialized_shred = packet.data(..)?.to_vec(); let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?; - if !shred_filter(&shred, last_root) { - return None; - } if packet.meta.repair() { let repair_info = RepairMeta { _from_addr: packet.meta.socket_addr(), @@ -369,19 +332,11 @@ where ); stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); stats.num_shreds += shreds.len(); - for shred in &shreds { - *stats.slots.entry(shred.slot()).or_default() += 1; - } insert_shred_sender.send((shreds, repair_infos))?; - stats.num_packets += packet_batches - .iter() - .map(|packet_batch| packet_batch.len()) - .sum::(); - for packet in packet_batches - .iter() - .flat_map(|packet_batch| packet_batch.iter()) - { + stats.num_iters += 1; + stats.num_packets += packet_batches.iter().map(PacketBatch::len).sum::(); + for packet in packet_batches.iter().flat_map(PacketBatch::iter) { let addr = packet.meta.socket_addr(); *stats.addrs.entry(addr).or_default() += 1; } @@ -421,7 +376,7 @@ pub(crate) struct WindowService { impl WindowService { #[allow(clippy::too_many_arguments)] - pub(crate) fn new( + pub(crate) fn new( blockstore: Arc, verified_receiver: Receiver>, retransmit_sender: Sender>, @@ -430,18 +385,12 @@ impl WindowService { exit: Arc, repair_info: RepairInfo, leader_schedule_cache: Arc, - shred_filter: F, + turbine_disabled: Arc, verified_vote_receiver: VerifiedVoteReceiver, completed_data_sets_sender: CompletedDataSetsSender, duplicate_slots_sender: DuplicateSlotSender, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, - ) -> WindowService - where - F: 'static - + Fn(&Shred, /*last root:*/ Slot) -> bool - + std::marker::Send - + std::marker::Sync, - { + ) -> WindowService { let outstanding_requests = Arc::>::default(); let cluster_info = repair_info.cluster_info.clone(); @@ -471,7 +420,7 @@ impl WindowService { let t_insert = Self::start_window_insert_thread( exit.clone(), - blockstore.clone(), + blockstore, leader_schedule_cache, insert_receiver, duplicate_sender, @@ -483,10 +432,9 @@ impl WindowService { let t_window = Self::start_recv_window_thread( id, exit, - blockstore, insert_sender, verified_receiver, - shred_filter, + turbine_disabled, retransmit_sender, ); @@ -589,18 +537,14 @@ impl WindowService { } #[allow(clippy::too_many_arguments)] - fn start_recv_window_thread( + fn start_recv_window_thread( id: Pubkey, exit: Arc, - blockstore: Arc, insert_sender: Sender<(Vec, Vec>)>, verified_receiver: Receiver>, - shred_filter: F, + turbine_disabled: Arc, retransmit_sender: Sender>, - ) -> JoinHandle<()> - where - F: 'static + Fn(&Shred, u64) -> bool + std::marker::Send + std::marker::Sync, - { + ) -> JoinHandle<()> { let mut stats = ReceiveWindowStats::default(); Builder::new() .name("solana-window".to_string()) @@ -627,11 +571,10 @@ impl WindowService { } }; if let Err(e) = recv_window( - &blockstore, &insert_sender, &verified_receiver, &retransmit_sender, - |shred, last_root| shred_filter(shred, last_root), + &turbine_disabled, &thread_pool, &mut stats, ) { @@ -687,7 +630,6 @@ mod test { shred::{ProcessShredsStats, Shredder}, }, solana_sdk::{ - epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, hash::Hash, signature::{Keypair, Signer}, timing::timestamp, @@ -731,52 +673,6 @@ mod test { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - #[test] - fn test_should_retransmit_and_persist() { - let leader_keypair = Arc::new(Keypair::new()); - - let shreds = local_entries_to_shred(&[Entry::default()], 0, 0, &leader_keypair); - - // with a Bank for slot 0, shred continues - assert!(should_retransmit_and_persist(&shreds[0], 0)); - - // change the shred's slot so leader lookup fails - // with a Bank and no idea who leader is, shred gets thrown out - let mut bad_slot_shred = shreds[0].clone(); - bad_slot_shred.set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3); - assert!(!should_retransmit_and_persist(&bad_slot_shred, 0)); - - // with a shred where shred.slot() == root, shred gets thrown out - let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3; - let shreds = local_entries_to_shred(&[Entry::default()], root, root - 1, &leader_keypair); - assert!(!should_retransmit_and_persist(&shreds[0], root)); - - // with a shred where shred.parent() < root, shred gets thrown out - let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3; - let shreds = - local_entries_to_shred(&[Entry::default()], root + 1, root - 1, &leader_keypair); - assert!(!should_retransmit_and_persist(&shreds[0], root)); - - // coding shreds don't contain parent slot information, test that slot >= root - let mut coding_shred = Shred::new_from_parity_shard( - 5, // slot - 5, // index - &[], // parity_shard - 5, // fec_set_index - 6, // num_data_shreds - 6, // num_coding_shreds - 3, // position - 0, // version - ); - coding_shred.sign(&leader_keypair); - // shred.slot() > root, shred continues - assert!(should_retransmit_and_persist(&coding_shred, 0)); - // shred.slot() == root, shred continues - assert!(should_retransmit_and_persist(&coding_shred, 5)); - // shred.slot() < root, shred gets thrown out - assert!(!should_retransmit_and_persist(&coding_shred, 6)); - } - #[test] fn test_run_check_duplicate() { let blockstore_path = get_tmp_ledger_path!(); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c8801eae8..0ecdaeead 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -4026,7 +4026,7 @@ macro_rules! create_new_tmp_ledger_fifo_auto_delete { }; } -pub fn verify_shred_slots(slot: Slot, parent: Slot, root: Slot) -> bool { +pub(crate) fn verify_shred_slots(slot: Slot, parent: Slot, root: Slot) -> bool { if slot == 0 && parent == 0 && root == 0 { return true; // valid write to slot zero. } diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 22177c350..6e870abeb 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -56,7 +56,7 @@ pub use { }; use { self::{shred_code::ShredCode, traits::Shred as _}, - crate::blockstore::MAX_DATA_SHREDS_PER_SLOT, + crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT}, bitflags::bitflags, num_enum::{IntoPrimitive, TryFromPrimitive}, serde::{Deserialize, Serialize}, @@ -69,7 +69,7 @@ use { signature::{Keypair, Signature, Signer}, }, static_assertions::const_assert_eq, - std::fmt::Debug, + std::{fmt::Debug, ops::RangeBounds}, thiserror::Error, }; @@ -573,6 +573,15 @@ pub mod layout { .ok() } + // The caller should verify first that the shred is data and not code! + pub(super) fn get_parent_offset(shred: &[u8]) -> Option { + const OFFSET_OF_SHRED_PARENT: usize = SIZE_OF_COMMON_SHRED_HEADER; + debug_assert_eq!(get_shred_type(shred).unwrap(), ShredType::Data); + <[u8; 2]>::try_from(shred.get(OFFSET_OF_SHRED_PARENT..)?.get(..2)?) + .map(u16::from_le_bytes) + .ok() + } + // Returns slice range of the shred payload which is signed. pub(crate) fn get_signed_message_range(shred: &[u8]) -> Option> { let range = match get_shred_variant(shred).ok()? { @@ -653,48 +662,95 @@ impl TryFrom for ShredVariant { } } -// Get slot, index, and type from a packet with partial deserialize -pub fn get_shred_slot_index_type( +#[must_use] +pub fn should_discard_shred( packet: &Packet, + root: Slot, + shred_version: u16, + // Range of slots to ingest shreds for. + slot_bounds: impl RangeBounds, stats: &mut ShredFetchStats, -) -> Option<(Slot, u32, ShredType)> { +) -> bool { let shred = match layout::get_shred(packet) { None => { stats.index_overrun += 1; - return None; + return true; } Some(shred) => shred, }; if OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX > shred.len() { stats.index_overrun += 1; - return None; + return true; } let shred_type = match layout::get_shred_type(shred) { Ok(shred_type) => shred_type, Err(_) => { stats.bad_shred_type += 1; - return None; + return true; } }; let slot = match layout::get_slot(shred) { - Some(slot) => slot, + Some(slot) => { + if slot <= root || !slot_bounds.contains(&slot) { + stats.slot_out_of_range += 1; + return true; + } + slot + } None => { stats.slot_bad_deserialize += 1; - return None; + return true; } }; let index = match layout::get_index(shred) { Some(index) => index, None => { stats.index_bad_deserialize += 1; - return None; + return true; } }; - if index >= MAX_DATA_SHREDS_PER_SLOT as u32 { - stats.index_out_of_bounds += 1; - return None; + if layout::get_version(shred) != Some(shred_version) { + stats.shred_version_mismatch += 1; + return true; + } + match shred_type { + ShredType::Code => { + if index >= shred_code::MAX_CODE_SHREDS_PER_SLOT as u32 { + stats.index_out_of_bounds += 1; + return true; + } + false + } + ShredType::Data => { + if index >= MAX_DATA_SHREDS_PER_SLOT as u32 { + stats.index_out_of_bounds += 1; + return true; + } + let parent_offset = match layout::get_parent_offset(shred) { + Some(parent_offset) => parent_offset, + None => { + stats.bad_parent_offset += 1; + return true; + } + }; + if parent_offset == 0 && slot != 0 { + stats.bad_parent_offset += 1; + return true; + } + let parent = match slot.checked_sub(Slot::from(parent_offset)) { + Some(parent) => parent, + None => { + stats.bad_parent_offset += 1; + return true; + } + }; + if !blockstore::verify_shred_slots(slot, parent, root) { + stats.slot_out_of_range += 1; + return true; + } + false + } } - Some((slot, index, shred_type)) } pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { @@ -863,38 +919,82 @@ mod tests { } #[test] - fn test_shred_offsets() { + fn test_should_discard_shred() { solana_logger::setup(); let mut packet = Packet::default(); - let shred = Shred::new_from_data(1, 3, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0); + let root = 1; + let shred_version = 798; + let slot_bounds = ..16; + let shred = Shred::new_from_data( + 2, // slot + 3, // index + 1, // parent_offset + &[], // data + ShredFlags::LAST_SHRED_IN_SLOT, + 0, // reference_tick + shred_version, + 0, // fec_set_index + ); shred.copy_to_packet(&mut packet); let mut stats = ShredFetchStats::default(); - let ret = get_shred_slot_index_type(&packet, &mut stats); - assert_eq!(Some((1, 3, ShredType::Data)), ret); + assert!(!should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); assert_eq!(stats, ShredFetchStats::default()); packet.meta.size = OFFSET_OF_SHRED_VARIANT; - assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert!(should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); assert_eq!(stats.index_overrun, 1); packet.meta.size = OFFSET_OF_SHRED_INDEX; - assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert!(should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); assert_eq!(stats.index_overrun, 2); packet.meta.size = OFFSET_OF_SHRED_INDEX + 1; - assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert!(should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); assert_eq!(stats.index_overrun, 3); packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX - 1; - assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert!(should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); assert_eq!(stats.index_overrun, 4); - packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX; - assert_eq!( - Some((1, 3, ShredType::Data)), - get_shred_slot_index_type(&packet, &mut stats) - ); - assert_eq!(stats.index_overrun, 4); + packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX + 2; + assert!(should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); + assert_eq!(stats.bad_parent_offset, 1); let shred = Shred::new_from_parity_shard( 8, // slot @@ -904,26 +1004,35 @@ mod tests { 30, // num_data 4, // num_code 1, // position - 200, // version + shred_version, ); shred.copy_to_packet(&mut packet); - assert_eq!( - Some((8, 2, ShredType::Code)), - get_shred_slot_index_type(&packet, &mut stats) - ); + assert!(!should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); let shred = Shred::new_from_data( - 1, - std::u32::MAX - 10, - 0, - &[], + 2, // slot + std::u32::MAX - 10, // index + 1, // parent_offset + &[], // data ShredFlags::LAST_SHRED_IN_SLOT, - 0, - 0, - 0, + 0, // reference_tick + shred_version, + 0, // fec_set_index ); shred.copy_to_packet(&mut packet); - assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert!(should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); assert_eq!(1, stats.index_out_of_bounds); let shred = Shred::new_from_parity_shard( @@ -939,7 +1048,13 @@ mod tests { shred.copy_to_packet(&mut packet); packet.buffer_mut()[OFFSET_OF_SHRED_VARIANT] = u8::MAX; - assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats)); + assert!(should_discard_shred( + &packet, + root, + shred_version, + slot_bounds, + &mut stats + )); assert_eq!(1, stats.bad_shred_type); } @@ -1107,10 +1222,6 @@ mod tests { assert_eq!(layout::get_slot(data), Some(shred.slot())); assert_eq!(layout::get_index(data), Some(shred.index())); assert_eq!(layout::get_version(data), Some(shred.version())); - assert_eq!( - get_shred_slot_index_type(packet, &mut ShredFetchStats::default()), - Some((shred.slot(), shred.index(), shred.shred_type())) - ); match shred.shred_type() { ShredType::Code => { assert_matches!( @@ -1123,6 +1234,10 @@ mod tests { layout::get_reference_tick(data).unwrap(), shred.reference_tick() ); + let parent_offset = layout::get_parent_offset(data).unwrap(); + let slot = layout::get_slot(data).unwrap(); + let parent = slot.checked_sub(Slot::from(parent_offset)).unwrap(); + assert_eq!(parent, shred.parent().unwrap()); } } } diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs index 860db8080..25ce8a238 100644 --- a/ledger/src/shred/shred_code.rs +++ b/ledger/src/shred/shred_code.rs @@ -10,7 +10,7 @@ use { static_assertions::const_assert_eq, }; -const MAX_CODE_SHREDS_PER_SLOT: usize = MAX_DATA_SHREDS_PER_SLOT; +pub(super) const MAX_CODE_SHREDS_PER_SLOT: usize = MAX_DATA_SHREDS_PER_SLOT; const_assert_eq!(ShredCode::SIZE_OF_PAYLOAD, 1228); diff --git a/ledger/src/shred/stats.rs b/ledger/src/shred/stats.rs index 178e1b0e2..49d4bea5b 100644 --- a/ledger/src/shred/stats.rs +++ b/ledger/src/shred/stats.rs @@ -37,6 +37,7 @@ pub struct ShredFetchStats { pub slot_out_of_range: usize, pub(crate) bad_shred_type: usize, pub shred_version_mismatch: usize, + pub(crate) bad_parent_offset: usize, since: Option, } @@ -121,6 +122,7 @@ impl ShredFetchStats { ("duplicate_shred", self.duplicate_shred, i64), ("bad_shred_type", self.bad_shred_type, i64), ("shred_version_mismatch", self.shred_version_mismatch, i64), + ("bad_parent_offset", self.bad_parent_offset, i64), ); *self = Self { since: Some(Instant::now()), diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index cf65067b1..17eb16c91 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -277,7 +277,7 @@ pub fn run_cluster_partition( let cluster_lamports = node_stakes.iter().sum::() * 2; let turbine_disabled = Arc::new(AtomicBool::new(false)); let mut validator_config = ValidatorConfig { - turbine_disabled: Some(turbine_disabled.clone()), + turbine_disabled: turbine_disabled.clone(), ..ValidatorConfig::default_for_test() };