From 1f0f5dc03e198e20d2fb4355d1b72649a8516064 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 19 Jun 2022 11:30:18 -0400 Subject: [PATCH] verifies shred-version in fetch stage Shred versions are not verified until window-service where resources are already wasted to sig-verify and deserialize shreds. The commit verifies shred-version earlier in the pipeline in fetch stage. --- core/src/lib.rs | 2 +- core/src/retransmit_stage.rs | 2 - core/src/shred_fetch_stage.rs | 152 ++++++++++++++++++++++++++------- core/src/sigverify_shreds.rs | 4 +- core/src/tvu.rs | 6 +- core/src/window_service.rs | 43 +--------- ledger/src/blockstore.rs | 25 ++---- ledger/src/shred.rs | 81 ++++++++++-------- ledger/src/shred/legacy.rs | 28 ++++-- ledger/src/shred/shred_code.rs | 34 ++++++-- ledger/src/shred/shred_data.rs | 15 ++-- ledger/src/shred/stats.rs | 3 + 12 files changed, 235 insertions(+), 160 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 88c6f5537e..5062c4a8f0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -54,7 +54,7 @@ pub mod rewards_recorder_service; pub mod sample_performance_service; pub mod serve_repair; pub mod serve_repair_service; -pub mod shred_fetch_stage; +mod shred_fetch_stage; pub mod sigverify; pub mod sigverify_shreds; pub mod sigverify_stage; diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index afb465ecb8..8d0a47d50a 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -375,7 +375,6 @@ impl RetransmitStage { cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, turbine_disabled: Option>, - shred_version: u16, cluster_slots: Arc, duplicate_slots_reset_sender: DuplicateSlotsResetSender, verified_vote_receiver: VerifiedVoteReceiver, @@ -436,7 +435,6 @@ impl RetransmitStage { &leader_schedule_cache_clone, id, last_root, - shred_version, ); rv && !turbine_disabled }, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index f098fa553b..dc984f90e9 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -4,13 +4,14 @@ use { crate::packet_hasher::PacketHasher, crossbeam_channel::{unbounded, Sender}, lru::LruCache, - solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, + solana_ledger::shred::{self, get_shred_slot_index_type, ShredFetchStats}, solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, solana_runtime::bank_forks::BankForks, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, std::{ net::UdpSocket, + ops::RangeBounds, sync::{atomic::AtomicBool, Arc, RwLock}, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, @@ -18,17 +19,18 @@ use { }; const DEFAULT_LRU_SIZE: usize = 10_000; -pub type ShredsReceived = LruCache; +type ShredsReceived = LruCache; -pub struct ShredFetchStage { +pub(crate) struct ShredFetchStage { thread_hdls: Vec>, } impl ShredFetchStage { fn process_packet( - p: &mut Packet, + packet: &mut Packet, shreds_received: &mut ShredsReceived, stats: &mut ShredFetchStats, + shred_version: u16, last_root: Slot, last_slot: Slot, slots_per_epoch: u64, @@ -37,24 +39,19 @@ impl ShredFetchStage { ) where F: Fn(&mut Packet), { - p.meta.set_discard(true); - if let Some((slot, _index, _shred_type)) = get_shred_slot_index_type(p, stats) { - // Seems reasonable to limit shreds to 2 epochs away - if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) { - // Shred filter - - let hash = packet_hasher.hash_packet(p); - - if shreds_received.get(&hash).is_none() { - shreds_received.put(hash, ()); - p.meta.set_discard(false); - modify(p); - } else { - stats.duplicate_shred += 1; - } - } else { - stats.slot_out_of_range += 1; - } + // Limit shreds to 2 epochs away. + let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch); + if should_discard_packet( + packet, + slot_bounds, + shred_version, + packet_hasher, + shreds_received, + stats, + ) { + packet.meta.set_discard(true); + } else { + modify(packet); } } @@ -62,7 +59,8 @@ impl ShredFetchStage { fn modify_packets( recvr: PacketBatchReceiver, sendr: Sender>, - bank_forks: Option>>, + bank_forks: &RwLock, + shred_version: u16, name: &'static str, modify: F, ) where @@ -85,7 +83,7 @@ impl ShredFetchStage { last_updated = Instant::now(); packet_hasher.reset(); shreds_received.clear(); - if let Some(bank_forks) = bank_forks.as_ref() { + { let bank_forks_r = bank_forks.read().unwrap(); last_root = bank_forks_r.root(); let working_bank = bank_forks_r.working_bank(); @@ -100,6 +98,7 @@ impl ShredFetchStage { packet, &mut shreds_received, &mut stats, + shred_version, last_root, last_slot, slots_per_epoch, @@ -119,7 +118,8 @@ impl ShredFetchStage { exit: &Arc, sender: Sender>, recycler: PacketBatchRecycler, - bank_forks: Option>>, + bank_forks: Arc>, + shred_version: u16, name: &'static str, modify: F, ) -> (Vec>, JoinHandle<()>) @@ -145,17 +145,27 @@ impl ShredFetchStage { let modifier_hdl = Builder::new() .name("solana-tvu-fetch-stage-packet-modifier".to_string()) - .spawn(move || Self::modify_packets(packet_receiver, sender, bank_forks, name, modify)) + .spawn(move || { + Self::modify_packets( + packet_receiver, + sender, + &bank_forks, + shred_version, + name, + modify, + ) + }) .unwrap(); (streamers, modifier_hdl) } - pub fn new( + pub(crate) fn new( sockets: Vec>, forward_sockets: Vec>, repair_socket: Arc, - sender: &Sender>, - bank_forks: Option>>, + sender: Sender>, + shred_version: u16, + bank_forks: Arc>, exit: &Arc, ) -> Self { let recycler = PacketBatchRecycler::warmed(100, 1024); @@ -166,6 +176,7 @@ impl ShredFetchStage { sender.clone(), recycler.clone(), bank_forks.clone(), + shred_version, "shred_fetch", |_| {}, ); @@ -176,6 +187,7 @@ impl ShredFetchStage { sender.clone(), recycler.clone(), bank_forks.clone(), + shred_version, "shred_fetch_tvu_forwards", |p| p.meta.flags.insert(PacketFlags::FORWARDED), ); @@ -183,9 +195,10 @@ impl ShredFetchStage { let (repair_receiver, repair_handler) = Self::packet_modifier( vec![repair_socket], exit, - sender.clone(), + sender, recycler, bank_forks, + shred_version, "shred_fetch_repair", |p| p.meta.flags.insert(PacketFlags::REPAIR), ); @@ -201,7 +214,7 @@ impl ShredFetchStage { } } - pub fn join(self) -> thread::Result<()> { + pub(crate) fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } @@ -209,6 +222,40 @@ impl ShredFetchStage { } } +// Returns true if the packet should be marked as discard. +#[must_use] +fn should_discard_packet( + packet: &Packet, + // Range of slots to ingest shreds for. + slot_bounds: impl RangeBounds, + shred_version: u16, + packet_hasher: &PacketHasher, + shreds_received: &mut ShredsReceived, + stats: &mut ShredFetchStats, +) -> bool { + let slot = match get_shred_slot_index_type(packet, stats) { + None => return true, + Some((slot, _index, _shred_type)) => slot, + }; + if !slot_bounds.contains(&slot) { + stats.slot_out_of_range += 1; + return true; + } + let shred = shred::layout::get_shred(packet); + if shred.and_then(shred::layout::get_version) != Some(shred_version) { + stats.shred_version_mismatch += 1; + return true; + } + let hash = packet_hasher.hash_packet(packet); + match shreds_received.put(hash, ()) { + None => false, + Some(()) => { + stats.duplicate_shred += 1; + true + } + } +} + #[cfg(test)] mod tests { use { @@ -227,6 +274,7 @@ mod tests { let mut stats = ShredFetchStats::default(); let slot = 1; + let shred_version = 45189; let shred = Shred::new_from_data( slot, 3, // shred index @@ -234,7 +282,7 @@ mod tests { &[], // data ShredFlags::LAST_SHRED_IN_SLOT, 0, // reference_tick - 0, // version + shred_version, 3, // fec_set_index ); shred.copy_to_packet(&mut packet); @@ -248,6 +296,7 @@ mod tests { &mut packet, &mut shreds_received, &mut stats, + shred_version, last_root, last_slot, slots_per_epoch, @@ -265,6 +314,7 @@ mod tests { &mut packet, &mut shreds_received, &mut stats, + shred_version, last_root, last_slot, slots_per_epoch, @@ -283,6 +333,7 @@ mod tests { let last_root = 0; let last_slot = 100; let slots_per_epoch = 10; + let shred_version = 59445; let hasher = PacketHasher::default(); @@ -291,6 +342,7 @@ mod tests { &mut packet, &mut shreds_received, &mut stats, + shred_version, last_root, last_slot, slots_per_epoch, @@ -299,7 +351,17 @@ mod tests { ); assert_eq!(stats.index_overrun, 1); assert!(packet.meta.discard()); - let shred = Shred::new_from_data(1, 3, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0); + packet.meta.set_discard(false); + let shred = Shred::new_from_data( + 1, + 3, + 0, + &[], + ShredFlags::LAST_SHRED_IN_SLOT, + 0, + shred_version, + 0, + ); shred.copy_to_packet(&mut packet); // rejected slot is 1, root is 3 @@ -307,6 +369,7 @@ mod tests { &mut packet, &mut shreds_received, &mut stats, + shred_version, 3, last_slot, slots_per_epoch, @@ -314,12 +377,29 @@ mod tests { &hasher, ); assert!(packet.meta.discard()); + packet.meta.set_discard(false); + + ShredFetchStage::process_packet( + &mut packet, + &mut shreds_received, + &mut stats, + 345, // shred_version + last_root, + last_slot, + slots_per_epoch, + &|_p| {}, + &hasher, + ); + assert!(packet.meta.discard()); + packet.meta.set_discard(false); + assert_eq!(stats.shred_version_mismatch, 1); // Accepted for 1,3 ShredFetchStage::process_packet( &mut packet, &mut shreds_received, &mut stats, + shred_version, last_root, last_slot, slots_per_epoch, @@ -333,6 +413,7 @@ mod tests { &mut packet, &mut shreds_received, &mut stats, + shred_version, last_root, last_slot, slots_per_epoch, @@ -340,6 +421,7 @@ mod tests { &hasher, ); assert!(packet.meta.discard()); + packet.meta.set_discard(false); let shred = Shred::new_from_data( 1_000_000, @@ -358,6 +440,7 @@ mod tests { &mut packet, &mut shreds_received, &mut stats, + shred_version, last_root, last_slot, slots_per_epoch, @@ -365,6 +448,7 @@ mod tests { &hasher, ); assert!(packet.meta.discard()); + packet.meta.set_discard(false); let index = MAX_DATA_SHREDS_PER_SLOT as u32; let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0); @@ -373,6 +457,7 @@ mod tests { &mut packet, &mut shreds_received, &mut stats, + shred_version, last_root, last_slot, slots_per_epoch, @@ -380,5 +465,6 @@ mod tests { &hasher, ); assert!(packet.meta.discard()); + packet.meta.set_discard(false); } } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index f18edfe776..54dbba72cc 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -11,6 +11,7 @@ use { }, solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache}, solana_runtime::bank_forks::BankForks, + solana_sdk::clock::Slot, std::{ collections::{HashMap, HashSet}, sync::{Arc, RwLock}, @@ -39,10 +40,11 @@ impl ShredSigVerifier { packet_sender, } } - fn read_slots(batches: &[PacketBatch]) -> HashSet { + fn read_slots(batches: &[PacketBatch]) -> HashSet { batches .iter() .flat_map(PacketBatch::iter) + .filter(|packet| !packet.meta.discard()) .filter_map(shred::layout::get_shred) .filter_map(shred::layout::get_slot) .collect() diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 528bf0b609..6647132a62 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -154,8 +154,9 @@ impl Tvu { fetch_sockets, forward_sockets, repair_socket.clone(), - &fetch_sender, - Some(bank_forks.clone()), + fetch_sender, + tvu_config.shred_version, + bank_forks.clone(), exit, ); @@ -189,7 +190,6 @@ impl Tvu { cluster_slots_update_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), turbine_disabled, - tvu_config.shred_version, cluster_slots.clone(), duplicate_slots_reset_sender, verified_vote_receiver, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 7e582fa4ea..709698ce36 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -14,7 +14,7 @@ use { rayon::{prelude::*, ThreadPool}, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ - blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT}, + blockstore::{self, Blockstore, BlockstoreInsertionMetrics}, leader_schedule_cache::LeaderScheduleCache, shred::{Nonce, Shred, ShredType}, }, @@ -177,7 +177,6 @@ pub(crate) fn should_retransmit_and_persist( leader_schedule_cache: &LeaderScheduleCache, my_pubkey: &Pubkey, root: u64, - shred_version: u16, ) -> bool { let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), bank.as_deref()); if let Some(leader_id) = slot_leader_pubkey { @@ -187,15 +186,6 @@ pub(crate) fn should_retransmit_and_persist( } else if !verify_shred_slot(shred, root) { inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1); false - } else if shred.version() != shred_version { - inc_new_counter_debug!("streamer-recv_window-incorrect_shred_version", 1); - false - } else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 { - inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1); - false - } else if shred.sanitize().is_err() { - inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1); - false } else { true } @@ -783,17 +773,6 @@ mod test { &cache, &me_id, 0, - 0 - )); - - // with the wrong shred_version, shred gets thrown out - assert!(!should_retransmit_and_persist( - &shreds[0], - Some(bank.clone()), - &cache, - &me_id, - 0, - 1 )); // substitute leader_pubkey for me_id so it looks I was the leader @@ -804,7 +783,6 @@ mod test { &cache, &leader_pubkey, 0, - 0 )); assert!(!should_retransmit_and_persist( &shreds[0], @@ -812,7 +790,6 @@ mod test { &cache, &leader_pubkey, 0, - 0 )); // change the shred's slot so leader lookup fails @@ -825,19 +802,6 @@ mod test { &cache, &me_id, 0, - 0 - )); - - // with an invalid index, shred gets thrown out - let mut bad_index_shred = shreds[0].clone(); - bad_index_shred.set_index((MAX_DATA_SHREDS_PER_SLOT + 1) as u32); - assert!(!should_retransmit_and_persist( - &bad_index_shred, - Some(bank.clone()), - &cache, - &me_id, - 0, - 0 )); // with a shred where shred.slot() == root, shred gets thrown out @@ -849,7 +813,6 @@ mod test { &cache, &me_id, root, - 0 )); // with a shred where shred.parent() < root, shred gets thrown out @@ -862,7 +825,6 @@ mod test { &cache, &me_id, root, - 0 )); // coding shreds don't contain parent slot information, test that slot >= root @@ -884,7 +846,6 @@ mod test { &cache, &me_id, 0, - 0 )); // shred.slot() == root, shred continues assert!(should_retransmit_and_persist( @@ -893,7 +854,6 @@ mod test { &cache, &me_id, 5, - 0 )); // shred.slot() < root, shred gets thrown out assert!(!should_retransmit_and_persist( @@ -902,7 +862,6 @@ mod test { &cache, &me_id, 6, - 0 )); } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 62ef2db81c..c44c67f51d 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3350,10 +3350,6 @@ fn get_last_hash<'a>(iterator: impl Iterator + 'a) -> Option bool { - slot_to_write == 0 && last_root == 0 && parent_slot == 0 -} - fn send_signals( new_shreds_signals: &[Sender], completed_slots_senders: &[Sender>], @@ -3963,22 +3959,13 @@ macro_rules! create_new_tmp_ledger_fifo_auto_delete { }; } -pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> bool { - if !is_valid_write_to_slot_0(slot, parent_slot, last_root) { - // Check that the parent_slot < slot - if parent_slot >= slot { - return false; - } - - // Ignore shreds that chain to slots before the last root - if parent_slot < last_root { - return false; - } - - // Above two checks guarantee that by this point, slot > last_root +pub fn verify_shred_slots(slot: Slot, parent: Slot, root: Slot) -> bool { + if slot == 0 && parent == 0 && root == 0 { + return true; // valid write to slot zero. } - - true + // Ignore shreds that chain to slots before the root, + // or have invalid parent >= slot. + root <= parent && parent < slot } // Same as `create_new_ledger()` but use a temporary ledger name based on the provided `name` diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 63ab11a564..048dde7d52 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -61,7 +61,7 @@ use { num_enum::{IntoPrimitive, TryFromPrimitive}, serde::{Deserialize, Serialize}, solana_entry::entry::{create_ticks, Entry}, - solana_perf::packet::{deserialize_from_with_limit, Packet}, + solana_perf::packet::Packet, solana_sdk::{ clock::Slot, hash::{hashv, Hash}, @@ -122,8 +122,6 @@ pub enum Error { BincodeError(#[from] bincode::Error), #[error(transparent)] ErasureError(#[from] reed_solomon_erasure::Error), - #[error("Invalid data shred index: {0}")] - InvalidDataShredIndex(/*shred index:*/ u32), #[error("Invalid data size: {size}, payload: {payload}")] InvalidDataSize { size: u16, payload: usize }, #[error("Invalid erasure shard index: {0:?}")] @@ -142,6 +140,8 @@ pub enum Error { InvalidProofSize(/*proof_size:*/ u8), #[error("Invalid shred flags: {0}")] InvalidShredFlags(u8), + #[error("Invalid {0:?} shred index: {1}")] + InvalidShredIndex(ShredType, /*shred index:*/ u32), #[error("Invalid shred type")] InvalidShredType, #[error("Invalid shred variant")] @@ -554,11 +554,22 @@ pub mod layout { } pub fn get_slot(shred: &[u8]) -> Option { - deserialize_from_with_limit(shred.get(OFFSET_OF_SHRED_SLOT..)?).ok() + <[u8; 8]>::try_from(shred.get(OFFSET_OF_SHRED_SLOT..)?.get(..8)?) + .map(Slot::from_le_bytes) + .ok() } pub(super) fn get_index(shred: &[u8]) -> Option { - deserialize_from_with_limit(shred.get(OFFSET_OF_SHRED_INDEX..)?).ok() + <[u8; 4]>::try_from(shred.get(OFFSET_OF_SHRED_INDEX..)?.get(..4)?) + .map(u32::from_le_bytes) + .ok() + } + + pub fn get_version(shred: &[u8]) -> Option { + const OFFSET_OF_SHRED_VERSION: usize = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX; + <[u8; 2]>::try_from(shred.get(OFFSET_OF_SHRED_VERSION..)?.get(..2)?) + .map(u16::from_le_bytes) + .ok() } // Returns slice range of the shred payload which is signed. @@ -1065,6 +1076,31 @@ mod tests { } } + fn verify_shred_layout(shred: &Shred, packet: &Packet) { + let data = layout::get_shred(packet).unwrap(); + assert_eq!(layout::get_slot(data), Some(shred.slot())); + assert_eq!(layout::get_index(data), Some(shred.index())); + assert_eq!(layout::get_version(data), Some(shred.version())); + assert_eq!( + get_shred_slot_index_type(packet, &mut ShredFetchStats::default()), + Some((shred.slot(), shred.index(), shred.shred_type())) + ); + match shred.shred_type() { + ShredType::Code => { + assert_matches!( + layout::get_reference_tick(data), + Err(Error::InvalidShredType) + ); + } + ShredType::Data => { + assert_eq!( + layout::get_reference_tick(data).unwrap(), + shred.reference_tick() + ); + } + } + } + #[test] fn test_serde_compat_shred_data() { const SEED: &str = "6qG9NGWEtoTugS4Zgs46u8zTccEJuRHtrNMiUayLHCxt"; @@ -1102,18 +1138,7 @@ mod tests { packet.meta.size = payload.len(); assert_eq!(shred.bytes_to_store(), payload); assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap()); - assert_eq!( - shred.reference_tick(), - layout::get_reference_tick(packet.data(..).unwrap()).unwrap() - ); - assert_eq!( - layout::get_slot(packet.data(..).unwrap()), - Some(shred.slot()) - ); - assert_eq!( - get_shred_slot_index_type(&packet, &mut ShredFetchStats::default()), - Some((shred.slot(), shred.index(), shred.shred_type())) - ); + verify_shred_layout(&shred, &packet); } #[test] @@ -1146,18 +1171,7 @@ mod tests { packet.meta.size = payload.len(); assert_eq!(shred.bytes_to_store(), payload); assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap()); - assert_eq!( - shred.reference_tick(), - layout::get_reference_tick(packet.data(..).unwrap()).unwrap() - ); - assert_eq!( - layout::get_slot(packet.data(..).unwrap()), - Some(shred.slot()) - ); - assert_eq!( - get_shred_slot_index_type(&packet, &mut ShredFetchStats::default()), - Some((shred.slot(), shred.index(), shred.shred_type())) - ); + verify_shred_layout(&shred, &packet); } #[test] @@ -1197,14 +1211,7 @@ mod tests { packet.meta.size = payload.len(); assert_eq!(shred.bytes_to_store(), payload); assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap()); - assert_eq!( - layout::get_slot(packet.data(..).unwrap()), - Some(shred.slot()) - ); - assert_eq!( - get_shred_slot_index_type(&packet, &mut ShredFetchStats::default()), - Some((shred.slot(), shred.index(), shred.shred_type())) - ); + verify_shred_layout(&shred, &packet); } #[test] diff --git a/ledger/src/shred/legacy.rs b/ledger/src/shred/legacy.rs index 4ec27ca953..fc29eac39d 100644 --- a/ledger/src/shred/legacy.rs +++ b/ledger/src/shred/legacy.rs @@ -322,7 +322,11 @@ impl ShredCode { #[cfg(test)] mod test { - use {super::*, crate::shred::MAX_DATA_SHREDS_PER_SLOT, matches::assert_matches}; + use { + super::*, + crate::shred::{ShredType, MAX_DATA_SHREDS_PER_SLOT}, + matches::assert_matches, + }; #[test] fn test_sanitize_data_shred() { @@ -369,7 +373,10 @@ mod test { { let mut shred = shred.clone(); shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32; - assert_matches!(shred.sanitize(), Err(Error::InvalidDataShredIndex(32768))); + assert_matches!( + shred.sanitize(), + Err(Error::InvalidShredIndex(ShredType::Data, 32768)) + ); } { let mut shred = shred.clone(); @@ -423,6 +430,14 @@ mod test { Err(Error::InvalidErasureShardIndex { .. }) ); } + { + let mut shred = shred.clone(); + shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32; + assert_matches!( + shred.sanitize(), + Err(Error::InvalidShredIndex(ShredType::Code, 32768)) + ); + } // pos >= num_coding is invalid. { let mut shred = shred.clone(); @@ -437,21 +452,18 @@ mod test { // shred has index > u32::MAX should fail. { let mut shred = shred.clone(); - shred.common_header.fec_set_index = std::u32::MAX - 1; + shred.common_header.fec_set_index = MAX_DATA_SHREDS_PER_SLOT as u32 - 2; shred.coding_header.num_data_shreds = 2; shred.coding_header.num_coding_shreds = 4; shred.coding_header.position = 1; - shred.common_header.index = std::u32::MAX - 1; + shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32 - 2; assert_matches!( shred.sanitize(), Err(Error::InvalidErasureShardIndex { .. }) ); shred.coding_header.num_coding_shreds = 2000; - assert_matches!( - shred.sanitize(), - Err(Error::InvalidErasureShardIndex { .. }) - ); + assert_matches!(shred.sanitize(), Err(Error::InvalidNumCodingShreds(2000))); // Decreasing the number of num_coding_shreds will put it within // the allowed limit. diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs index 884c058482..860db8080e 100644 --- a/ledger/src/shred/shred_code.rs +++ b/ledger/src/shred/shred_code.rs @@ -3,12 +3,15 @@ use { common::dispatch, legacy, merkle, traits::{Shred, ShredCode as ShredCodeTrait}, - CodingShredHeader, Error, ShredCommonHeader, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_NONCE, + CodingShredHeader, Error, ShredCommonHeader, ShredType, MAX_DATA_SHREDS_PER_FEC_BLOCK, + MAX_DATA_SHREDS_PER_SLOT, SIZE_OF_NONCE, }, solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature}, static_assertions::const_assert_eq, }; +const MAX_CODE_SHREDS_PER_SLOT: usize = MAX_DATA_SHREDS_PER_SLOT; + const_assert_eq!(ShredCode::SIZE_OF_PAYLOAD, 1228); #[derive(Clone, Debug, Eq, PartialEq)] @@ -92,15 +95,23 @@ impl From for ShredCode { #[inline] pub(super) fn erasure_shard_index(shred: &T) -> Option { // Assert that the last shred index in the erasure set does not - // overshoot u32. + // overshoot MAX_{DATA,CODE}_SHREDS_PER_SLOT. let common_header = shred.common_header(); let coding_header = shred.coding_header(); - common_header + if common_header .fec_set_index - .checked_add(u32::from(coding_header.num_data_shreds.checked_sub(1)?))?; - shred + .checked_add(u32::from(coding_header.num_data_shreds.checked_sub(1)?))? as usize + >= MAX_DATA_SHREDS_PER_SLOT + { + return None; + } + if shred .first_coding_index()? - .checked_add(u32::from(coding_header.num_coding_shreds.checked_sub(1)?))?; + .checked_add(u32::from(coding_header.num_coding_shreds.checked_sub(1)?))? as usize + >= MAX_CODE_SHREDS_PER_SLOT + { + return None; + } let num_data_shreds = usize::from(coding_header.num_data_shreds); let num_coding_shreds = usize::from(coding_header.num_coding_shreds); let position = usize::from(coding_header.position); @@ -113,15 +124,22 @@ pub(super) fn sanitize(shred: &T) -> Result<(), Error> { if shred.payload().len() != T::SIZE_OF_PAYLOAD { return Err(Error::InvalidPayloadSize(shred.payload().len())); } + let common_header = shred.common_header(); let coding_header = shred.coding_header(); - let _shard_index = shred.erasure_shard_index()?; - let _erasure_shard = shred.erasure_shard_as_slice()?; + if common_header.index as usize >= MAX_CODE_SHREDS_PER_SLOT { + return Err(Error::InvalidShredIndex( + ShredType::Code, + common_header.index, + )); + } let num_coding_shreds = u32::from(coding_header.num_coding_shreds); if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK { return Err(Error::InvalidNumCodingShreds( coding_header.num_coding_shreds, )); } + let _shard_index = shred.erasure_shard_index()?; + let _erasure_shard = shred.erasure_shard_as_slice()?; Ok(()) } diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs index 47728fa9af..777ef6feff 100644 --- a/ledger/src/shred/shred_data.rs +++ b/ledger/src/shred/shred_data.rs @@ -4,7 +4,7 @@ use { common::dispatch, legacy, merkle, traits::{Shred as _, ShredData as ShredDataTrait}, - DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredVariant, + DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredType, ShredVariant, MAX_DATA_SHREDS_PER_SLOT, }, solana_sdk::{clock::Slot, signature::Signature}, @@ -132,18 +132,21 @@ pub(super) fn sanitize(shred: &T) -> Result<(), Error> { } let common_header = shred.common_header(); let data_header = shred.data_header(); - let _shard_index = shred.erasure_shard_index()?; - let _erasure_shard = shred.erasure_shard_as_slice()?; if common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT { - return Err(Error::InvalidDataShredIndex(common_header.index)); + return Err(Error::InvalidShredIndex( + ShredType::Data, + common_header.index, + )); } - let _data = shred.data()?; - let _parent = shred.parent()?; let flags = data_header.flags; if flags.intersects(ShredFlags::LAST_SHRED_IN_SLOT) && !flags.contains(ShredFlags::DATA_COMPLETE_SHRED) { return Err(Error::InvalidShredFlags(data_header.flags.bits())); } + let _data = shred.data()?; + let _parent = shred.parent()?; + let _shard_index = shred.erasure_shard_index()?; + let _erasure_shard = shred.erasure_shard_as_slice()?; Ok(()) } diff --git a/ledger/src/shred/stats.rs b/ledger/src/shred/stats.rs index 26c23c14fb..178e1b0e2a 100644 --- a/ledger/src/shred/stats.rs +++ b/ledger/src/shred/stats.rs @@ -36,6 +36,7 @@ pub struct ShredFetchStats { pub duplicate_shred: usize, pub slot_out_of_range: usize, pub(crate) bad_shred_type: usize, + pub shred_version_mismatch: usize, since: Option, } @@ -118,6 +119,8 @@ impl ShredFetchStats { ("index_out_of_bounds", self.index_out_of_bounds, i64), ("slot_out_of_range", self.slot_out_of_range, i64), ("duplicate_shred", self.duplicate_shred, i64), + ("bad_shred_type", self.bad_shred_type, i64), + ("shred_version_mismatch", self.shred_version_mismatch, i64), ); *self = Self { since: Some(Instant::now()),