verifies shred-version in fetch stage

Shred versions are not verified until window-service where resources are
already wasted to sig-verify and deserialize shreds.
The commit verifies shred-version earlier in the pipeline in fetch stage.
This commit is contained in:
behzad nouri 2022-06-19 11:30:18 -04:00
parent 1165a7f3fc
commit 1f0f5dc03e
12 changed files with 235 additions and 160 deletions

View File

@ -54,7 +54,7 @@ pub mod rewards_recorder_service;
pub mod sample_performance_service;
pub mod serve_repair;
pub mod serve_repair_service;
pub mod shred_fetch_stage;
mod shred_fetch_stage;
pub mod sigverify;
pub mod sigverify_shreds;
pub mod sigverify_stage;

View File

@ -375,7 +375,6 @@ impl RetransmitStage {
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
turbine_disabled: Option<Arc<AtomicBool>>,
shred_version: u16,
cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
verified_vote_receiver: VerifiedVoteReceiver,
@ -436,7 +435,6 @@ impl RetransmitStage {
&leader_schedule_cache_clone,
id,
last_root,
shred_version,
);
rv && !turbine_disabled
},

View File

@ -4,13 +4,14 @@ use {
crate::packet_hasher::PacketHasher,
crossbeam_channel::{unbounded, Sender},
lru::LruCache,
solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats},
solana_ledger::shred::{self, get_shred_slot_index_type, ShredFetchStats},
solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::bank_forks::BankForks,
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
net::UdpSocket,
ops::RangeBounds,
sync::{atomic::AtomicBool, Arc, RwLock},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
@ -18,17 +19,18 @@ use {
};
const DEFAULT_LRU_SIZE: usize = 10_000;
pub type ShredsReceived = LruCache<u64, ()>;
type ShredsReceived = LruCache<u64, ()>;
pub struct ShredFetchStage {
pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}
impl ShredFetchStage {
fn process_packet<F>(
p: &mut Packet,
packet: &mut Packet,
shreds_received: &mut ShredsReceived,
stats: &mut ShredFetchStats,
shred_version: u16,
last_root: Slot,
last_slot: Slot,
slots_per_epoch: u64,
@ -37,24 +39,19 @@ impl ShredFetchStage {
) where
F: Fn(&mut Packet),
{
p.meta.set_discard(true);
if let Some((slot, _index, _shred_type)) = get_shred_slot_index_type(p, stats) {
// Seems reasonable to limit shreds to 2 epochs away
if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) {
// Shred filter
let hash = packet_hasher.hash_packet(p);
if shreds_received.get(&hash).is_none() {
shreds_received.put(hash, ());
p.meta.set_discard(false);
modify(p);
} else {
stats.duplicate_shred += 1;
}
} else {
stats.slot_out_of_range += 1;
}
// Limit shreds to 2 epochs away.
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
if should_discard_packet(
packet,
slot_bounds,
shred_version,
packet_hasher,
shreds_received,
stats,
) {
packet.meta.set_discard(true);
} else {
modify(packet);
}
}
@ -62,7 +59,8 @@ impl ShredFetchStage {
fn modify_packets<F>(
recvr: PacketBatchReceiver,
sendr: Sender<Vec<PacketBatch>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
bank_forks: &RwLock<BankForks>,
shred_version: u16,
name: &'static str,
modify: F,
) where
@ -85,7 +83,7 @@ impl ShredFetchStage {
last_updated = Instant::now();
packet_hasher.reset();
shreds_received.clear();
if let Some(bank_forks) = bank_forks.as_ref() {
{
let bank_forks_r = bank_forks.read().unwrap();
last_root = bank_forks_r.root();
let working_bank = bank_forks_r.working_bank();
@ -100,6 +98,7 @@ impl ShredFetchStage {
packet,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
@ -119,7 +118,8 @@ impl ShredFetchStage {
exit: &Arc<AtomicBool>,
sender: Sender<Vec<PacketBatch>>,
recycler: PacketBatchRecycler,
bank_forks: Option<Arc<RwLock<BankForks>>>,
bank_forks: Arc<RwLock<BankForks>>,
shred_version: u16,
name: &'static str,
modify: F,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>)
@ -145,17 +145,27 @@ impl ShredFetchStage {
let modifier_hdl = Builder::new()
.name("solana-tvu-fetch-stage-packet-modifier".to_string())
.spawn(move || Self::modify_packets(packet_receiver, sender, bank_forks, name, modify))
.spawn(move || {
Self::modify_packets(
packet_receiver,
sender,
&bank_forks,
shred_version,
name,
modify,
)
})
.unwrap();
(streamers, modifier_hdl)
}
pub fn new(
pub(crate) fn new(
sockets: Vec<Arc<UdpSocket>>,
forward_sockets: Vec<Arc<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
sender: &Sender<Vec<PacketBatch>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
sender: Sender<Vec<PacketBatch>>,
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
exit: &Arc<AtomicBool>,
) -> Self {
let recycler = PacketBatchRecycler::warmed(100, 1024);
@ -166,6 +176,7 @@ impl ShredFetchStage {
sender.clone(),
recycler.clone(),
bank_forks.clone(),
shred_version,
"shred_fetch",
|_| {},
);
@ -176,6 +187,7 @@ impl ShredFetchStage {
sender.clone(),
recycler.clone(),
bank_forks.clone(),
shred_version,
"shred_fetch_tvu_forwards",
|p| p.meta.flags.insert(PacketFlags::FORWARDED),
);
@ -183,9 +195,10 @@ impl ShredFetchStage {
let (repair_receiver, repair_handler) = Self::packet_modifier(
vec![repair_socket],
exit,
sender.clone(),
sender,
recycler,
bank_forks,
shred_version,
"shred_fetch_repair",
|p| p.meta.flags.insert(PacketFlags::REPAIR),
);
@ -201,7 +214,7 @@ impl ShredFetchStage {
}
}
pub fn join(self) -> thread::Result<()> {
pub(crate) fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
@ -209,6 +222,40 @@ impl ShredFetchStage {
}
}
// Returns true if the packet should be marked as discard.
#[must_use]
fn should_discard_packet(
packet: &Packet,
// Range of slots to ingest shreds for.
slot_bounds: impl RangeBounds<Slot>,
shred_version: u16,
packet_hasher: &PacketHasher,
shreds_received: &mut ShredsReceived,
stats: &mut ShredFetchStats,
) -> bool {
let slot = match get_shred_slot_index_type(packet, stats) {
None => return true,
Some((slot, _index, _shred_type)) => slot,
};
if !slot_bounds.contains(&slot) {
stats.slot_out_of_range += 1;
return true;
}
let shred = shred::layout::get_shred(packet);
if shred.and_then(shred::layout::get_version) != Some(shred_version) {
stats.shred_version_mismatch += 1;
return true;
}
let hash = packet_hasher.hash_packet(packet);
match shreds_received.put(hash, ()) {
None => false,
Some(()) => {
stats.duplicate_shred += 1;
true
}
}
}
#[cfg(test)]
mod tests {
use {
@ -227,6 +274,7 @@ mod tests {
let mut stats = ShredFetchStats::default();
let slot = 1;
let shred_version = 45189;
let shred = Shred::new_from_data(
slot,
3, // shred index
@ -234,7 +282,7 @@ mod tests {
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
0, // reference_tick
0, // version
shred_version,
3, // fec_set_index
);
shred.copy_to_packet(&mut packet);
@ -248,6 +296,7 @@ mod tests {
&mut packet,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
@ -265,6 +314,7 @@ mod tests {
&mut packet,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
@ -283,6 +333,7 @@ mod tests {
let last_root = 0;
let last_slot = 100;
let slots_per_epoch = 10;
let shred_version = 59445;
let hasher = PacketHasher::default();
@ -291,6 +342,7 @@ mod tests {
&mut packet,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
@ -299,7 +351,17 @@ mod tests {
);
assert_eq!(stats.index_overrun, 1);
assert!(packet.meta.discard());
let shred = Shred::new_from_data(1, 3, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
packet.meta.set_discard(false);
let shred = Shred::new_from_data(
1,
3,
0,
&[],
ShredFlags::LAST_SHRED_IN_SLOT,
0,
shred_version,
0,
);
shred.copy_to_packet(&mut packet);
// rejected slot is 1, root is 3
@ -307,6 +369,7 @@ mod tests {
&mut packet,
&mut shreds_received,
&mut stats,
shred_version,
3,
last_slot,
slots_per_epoch,
@ -314,12 +377,29 @@ mod tests {
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut stats,
345, // shred_version
last_root,
last_slot,
slots_per_epoch,
&|_p| {},
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
assert_eq!(stats.shred_version_mismatch, 1);
// Accepted for 1,3
ShredFetchStage::process_packet(
&mut packet,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
@ -333,6 +413,7 @@ mod tests {
&mut packet,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
@ -340,6 +421,7 @@ mod tests {
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
let shred = Shred::new_from_data(
1_000_000,
@ -358,6 +440,7 @@ mod tests {
&mut packet,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
@ -365,6 +448,7 @@ mod tests {
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
let index = MAX_DATA_SHREDS_PER_SLOT as u32;
let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
@ -373,6 +457,7 @@ mod tests {
&mut packet,
&mut shreds_received,
&mut stats,
shred_version,
last_root,
last_slot,
slots_per_epoch,
@ -380,5 +465,6 @@ mod tests {
&hasher,
);
assert!(packet.meta.discard());
packet.meta.set_discard(false);
}
}

View File

@ -11,6 +11,7 @@ use {
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_runtime::bank_forks::BankForks,
solana_sdk::clock::Slot,
std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
@ -39,10 +40,11 @@ impl ShredSigVerifier {
packet_sender,
}
}
fn read_slots(batches: &[PacketBatch]) -> HashSet<u64> {
fn read_slots(batches: &[PacketBatch]) -> HashSet<Slot> {
batches
.iter()
.flat_map(PacketBatch::iter)
.filter(|packet| !packet.meta.discard())
.filter_map(shred::layout::get_shred)
.filter_map(shred::layout::get_slot)
.collect()

View File

@ -154,8 +154,9 @@ impl Tvu {
fetch_sockets,
forward_sockets,
repair_socket.clone(),
&fetch_sender,
Some(bank_forks.clone()),
fetch_sender,
tvu_config.shred_version,
bank_forks.clone(),
exit,
);
@ -189,7 +190,6 @@ impl Tvu {
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
turbine_disabled,
tvu_config.shred_version,
cluster_slots.clone(),
duplicate_slots_reset_sender,
verified_vote_receiver,

View File

@ -14,7 +14,7 @@ use {
rayon::{prelude::*, ThreadPool},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT},
blockstore::{self, Blockstore, BlockstoreInsertionMetrics},
leader_schedule_cache::LeaderScheduleCache,
shred::{Nonce, Shred, ShredType},
},
@ -177,7 +177,6 @@ pub(crate) fn should_retransmit_and_persist(
leader_schedule_cache: &LeaderScheduleCache,
my_pubkey: &Pubkey,
root: u64,
shred_version: u16,
) -> bool {
let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), bank.as_deref());
if let Some(leader_id) = slot_leader_pubkey {
@ -187,15 +186,6 @@ pub(crate) fn should_retransmit_and_persist(
} else if !verify_shred_slot(shred, root) {
inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
false
} else if shred.version() != shred_version {
inc_new_counter_debug!("streamer-recv_window-incorrect_shred_version", 1);
false
} else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 {
inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1);
false
} else if shred.sanitize().is_err() {
inc_new_counter_warn!("streamer-recv_window-invalid-shred", 1);
false
} else {
true
}
@ -783,17 +773,6 @@ mod test {
&cache,
&me_id,
0,
0
));
// with the wrong shred_version, shred gets thrown out
assert!(!should_retransmit_and_persist(
&shreds[0],
Some(bank.clone()),
&cache,
&me_id,
0,
1
));
// substitute leader_pubkey for me_id so it looks I was the leader
@ -804,7 +783,6 @@ mod test {
&cache,
&leader_pubkey,
0,
0
));
assert!(!should_retransmit_and_persist(
&shreds[0],
@ -812,7 +790,6 @@ mod test {
&cache,
&leader_pubkey,
0,
0
));
// change the shred's slot so leader lookup fails
@ -825,19 +802,6 @@ mod test {
&cache,
&me_id,
0,
0
));
// with an invalid index, shred gets thrown out
let mut bad_index_shred = shreds[0].clone();
bad_index_shred.set_index((MAX_DATA_SHREDS_PER_SLOT + 1) as u32);
assert!(!should_retransmit_and_persist(
&bad_index_shred,
Some(bank.clone()),
&cache,
&me_id,
0,
0
));
// with a shred where shred.slot() == root, shred gets thrown out
@ -849,7 +813,6 @@ mod test {
&cache,
&me_id,
root,
0
));
// with a shred where shred.parent() < root, shred gets thrown out
@ -862,7 +825,6 @@ mod test {
&cache,
&me_id,
root,
0
));
// coding shreds don't contain parent slot information, test that slot >= root
@ -884,7 +846,6 @@ mod test {
&cache,
&me_id,
0,
0
));
// shred.slot() == root, shred continues
assert!(should_retransmit_and_persist(
@ -893,7 +854,6 @@ mod test {
&cache,
&me_id,
5,
0
));
// shred.slot() < root, shred gets thrown out
assert!(!should_retransmit_and_persist(
@ -902,7 +862,6 @@ mod test {
&cache,
&me_id,
6,
0
));
}

View File

@ -3350,10 +3350,6 @@ fn get_last_hash<'a>(iterator: impl Iterator<Item = &'a Entry> + 'a) -> Option<H
iterator.last().map(|entry| entry.hash)
}
fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: Slot, last_root: u64) -> bool {
slot_to_write == 0 && last_root == 0 && parent_slot == 0
}
fn send_signals(
new_shreds_signals: &[Sender<bool>],
completed_slots_senders: &[Sender<Vec<u64>>],
@ -3963,22 +3959,13 @@ macro_rules! create_new_tmp_ledger_fifo_auto_delete {
};
}
pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> bool {
if !is_valid_write_to_slot_0(slot, parent_slot, last_root) {
// Check that the parent_slot < slot
if parent_slot >= slot {
return false;
}
// Ignore shreds that chain to slots before the last root
if parent_slot < last_root {
return false;
}
// Above two checks guarantee that by this point, slot > last_root
pub fn verify_shred_slots(slot: Slot, parent: Slot, root: Slot) -> bool {
if slot == 0 && parent == 0 && root == 0 {
return true; // valid write to slot zero.
}
true
// Ignore shreds that chain to slots before the root,
// or have invalid parent >= slot.
root <= parent && parent < slot
}
// Same as `create_new_ledger()` but use a temporary ledger name based on the provided `name`

View File

@ -61,7 +61,7 @@ use {
num_enum::{IntoPrimitive, TryFromPrimitive},
serde::{Deserialize, Serialize},
solana_entry::entry::{create_ticks, Entry},
solana_perf::packet::{deserialize_from_with_limit, Packet},
solana_perf::packet::Packet,
solana_sdk::{
clock::Slot,
hash::{hashv, Hash},
@ -122,8 +122,6 @@ pub enum Error {
BincodeError(#[from] bincode::Error),
#[error(transparent)]
ErasureError(#[from] reed_solomon_erasure::Error),
#[error("Invalid data shred index: {0}")]
InvalidDataShredIndex(/*shred index:*/ u32),
#[error("Invalid data size: {size}, payload: {payload}")]
InvalidDataSize { size: u16, payload: usize },
#[error("Invalid erasure shard index: {0:?}")]
@ -142,6 +140,8 @@ pub enum Error {
InvalidProofSize(/*proof_size:*/ u8),
#[error("Invalid shred flags: {0}")]
InvalidShredFlags(u8),
#[error("Invalid {0:?} shred index: {1}")]
InvalidShredIndex(ShredType, /*shred index:*/ u32),
#[error("Invalid shred type")]
InvalidShredType,
#[error("Invalid shred variant")]
@ -554,11 +554,22 @@ pub mod layout {
}
pub fn get_slot(shred: &[u8]) -> Option<Slot> {
deserialize_from_with_limit(shred.get(OFFSET_OF_SHRED_SLOT..)?).ok()
<[u8; 8]>::try_from(shred.get(OFFSET_OF_SHRED_SLOT..)?.get(..8)?)
.map(Slot::from_le_bytes)
.ok()
}
pub(super) fn get_index(shred: &[u8]) -> Option<u32> {
deserialize_from_with_limit(shred.get(OFFSET_OF_SHRED_INDEX..)?).ok()
<[u8; 4]>::try_from(shred.get(OFFSET_OF_SHRED_INDEX..)?.get(..4)?)
.map(u32::from_le_bytes)
.ok()
}
pub fn get_version(shred: &[u8]) -> Option<u16> {
const OFFSET_OF_SHRED_VERSION: usize = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX;
<[u8; 2]>::try_from(shred.get(OFFSET_OF_SHRED_VERSION..)?.get(..2)?)
.map(u16::from_le_bytes)
.ok()
}
// Returns slice range of the shred payload which is signed.
@ -1065,6 +1076,31 @@ mod tests {
}
}
fn verify_shred_layout(shred: &Shred, packet: &Packet) {
let data = layout::get_shred(packet).unwrap();
assert_eq!(layout::get_slot(data), Some(shred.slot()));
assert_eq!(layout::get_index(data), Some(shred.index()));
assert_eq!(layout::get_version(data), Some(shred.version()));
assert_eq!(
get_shred_slot_index_type(packet, &mut ShredFetchStats::default()),
Some((shred.slot(), shred.index(), shred.shred_type()))
);
match shred.shred_type() {
ShredType::Code => {
assert_matches!(
layout::get_reference_tick(data),
Err(Error::InvalidShredType)
);
}
ShredType::Data => {
assert_eq!(
layout::get_reference_tick(data).unwrap(),
shred.reference_tick()
);
}
}
}
#[test]
fn test_serde_compat_shred_data() {
const SEED: &str = "6qG9NGWEtoTugS4Zgs46u8zTccEJuRHtrNMiUayLHCxt";
@ -1102,18 +1138,7 @@ mod tests {
packet.meta.size = payload.len();
assert_eq!(shred.bytes_to_store(), payload);
assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap());
assert_eq!(
shred.reference_tick(),
layout::get_reference_tick(packet.data(..).unwrap()).unwrap()
);
assert_eq!(
layout::get_slot(packet.data(..).unwrap()),
Some(shred.slot())
);
assert_eq!(
get_shred_slot_index_type(&packet, &mut ShredFetchStats::default()),
Some((shred.slot(), shred.index(), shred.shred_type()))
);
verify_shred_layout(&shred, &packet);
}
#[test]
@ -1146,18 +1171,7 @@ mod tests {
packet.meta.size = payload.len();
assert_eq!(shred.bytes_to_store(), payload);
assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap());
assert_eq!(
shred.reference_tick(),
layout::get_reference_tick(packet.data(..).unwrap()).unwrap()
);
assert_eq!(
layout::get_slot(packet.data(..).unwrap()),
Some(shred.slot())
);
assert_eq!(
get_shred_slot_index_type(&packet, &mut ShredFetchStats::default()),
Some((shred.slot(), shred.index(), shred.shred_type()))
);
verify_shred_layout(&shred, &packet);
}
#[test]
@ -1197,14 +1211,7 @@ mod tests {
packet.meta.size = payload.len();
assert_eq!(shred.bytes_to_store(), payload);
assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap());
assert_eq!(
layout::get_slot(packet.data(..).unwrap()),
Some(shred.slot())
);
assert_eq!(
get_shred_slot_index_type(&packet, &mut ShredFetchStats::default()),
Some((shred.slot(), shred.index(), shred.shred_type()))
);
verify_shred_layout(&shred, &packet);
}
#[test]

View File

@ -322,7 +322,11 @@ impl ShredCode {
#[cfg(test)]
mod test {
use {super::*, crate::shred::MAX_DATA_SHREDS_PER_SLOT, matches::assert_matches};
use {
super::*,
crate::shred::{ShredType, MAX_DATA_SHREDS_PER_SLOT},
matches::assert_matches,
};
#[test]
fn test_sanitize_data_shred() {
@ -369,7 +373,10 @@ mod test {
{
let mut shred = shred.clone();
shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32;
assert_matches!(shred.sanitize(), Err(Error::InvalidDataShredIndex(32768)));
assert_matches!(
shred.sanitize(),
Err(Error::InvalidShredIndex(ShredType::Data, 32768))
);
}
{
let mut shred = shred.clone();
@ -423,6 +430,14 @@ mod test {
Err(Error::InvalidErasureShardIndex { .. })
);
}
{
let mut shred = shred.clone();
shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidShredIndex(ShredType::Code, 32768))
);
}
// pos >= num_coding is invalid.
{
let mut shred = shred.clone();
@ -437,21 +452,18 @@ mod test {
// shred has index > u32::MAX should fail.
{
let mut shred = shred.clone();
shred.common_header.fec_set_index = std::u32::MAX - 1;
shred.common_header.fec_set_index = MAX_DATA_SHREDS_PER_SLOT as u32 - 2;
shred.coding_header.num_data_shreds = 2;
shred.coding_header.num_coding_shreds = 4;
shred.coding_header.position = 1;
shred.common_header.index = std::u32::MAX - 1;
shred.common_header.index = MAX_DATA_SHREDS_PER_SLOT as u32 - 2;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
shred.coding_header.num_coding_shreds = 2000;
assert_matches!(
shred.sanitize(),
Err(Error::InvalidErasureShardIndex { .. })
);
assert_matches!(shred.sanitize(), Err(Error::InvalidNumCodingShreds(2000)));
// Decreasing the number of num_coding_shreds will put it within
// the allowed limit.

View File

@ -3,12 +3,15 @@ use {
common::dispatch,
legacy, merkle,
traits::{Shred, ShredCode as ShredCodeTrait},
CodingShredHeader, Error, ShredCommonHeader, MAX_DATA_SHREDS_PER_FEC_BLOCK, SIZE_OF_NONCE,
CodingShredHeader, Error, ShredCommonHeader, ShredType, MAX_DATA_SHREDS_PER_FEC_BLOCK,
MAX_DATA_SHREDS_PER_SLOT, SIZE_OF_NONCE,
},
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, signature::Signature},
static_assertions::const_assert_eq,
};
const MAX_CODE_SHREDS_PER_SLOT: usize = MAX_DATA_SHREDS_PER_SLOT;
const_assert_eq!(ShredCode::SIZE_OF_PAYLOAD, 1228);
#[derive(Clone, Debug, Eq, PartialEq)]
@ -92,15 +95,23 @@ impl From<merkle::ShredCode> for ShredCode {
#[inline]
pub(super) fn erasure_shard_index<T: ShredCodeTrait>(shred: &T) -> Option<usize> {
// Assert that the last shred index in the erasure set does not
// overshoot u32.
// overshoot MAX_{DATA,CODE}_SHREDS_PER_SLOT.
let common_header = shred.common_header();
let coding_header = shred.coding_header();
common_header
if common_header
.fec_set_index
.checked_add(u32::from(coding_header.num_data_shreds.checked_sub(1)?))?;
shred
.checked_add(u32::from(coding_header.num_data_shreds.checked_sub(1)?))? as usize
>= MAX_DATA_SHREDS_PER_SLOT
{
return None;
}
if shred
.first_coding_index()?
.checked_add(u32::from(coding_header.num_coding_shreds.checked_sub(1)?))?;
.checked_add(u32::from(coding_header.num_coding_shreds.checked_sub(1)?))? as usize
>= MAX_CODE_SHREDS_PER_SLOT
{
return None;
}
let num_data_shreds = usize::from(coding_header.num_data_shreds);
let num_coding_shreds = usize::from(coding_header.num_coding_shreds);
let position = usize::from(coding_header.position);
@ -113,15 +124,22 @@ pub(super) fn sanitize<T: ShredCodeTrait>(shred: &T) -> Result<(), Error> {
if shred.payload().len() != T::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(shred.payload().len()));
}
let common_header = shred.common_header();
let coding_header = shred.coding_header();
let _shard_index = shred.erasure_shard_index()?;
let _erasure_shard = shred.erasure_shard_as_slice()?;
if common_header.index as usize >= MAX_CODE_SHREDS_PER_SLOT {
return Err(Error::InvalidShredIndex(
ShredType::Code,
common_header.index,
));
}
let num_coding_shreds = u32::from(coding_header.num_coding_shreds);
if num_coding_shreds > 8 * MAX_DATA_SHREDS_PER_FEC_BLOCK {
return Err(Error::InvalidNumCodingShreds(
coding_header.num_coding_shreds,
));
}
let _shard_index = shred.erasure_shard_index()?;
let _erasure_shard = shred.erasure_shard_as_slice()?;
Ok(())
}

View File

@ -4,7 +4,7 @@ use {
common::dispatch,
legacy, merkle,
traits::{Shred as _, ShredData as ShredDataTrait},
DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredVariant,
DataShredHeader, Error, ShredCommonHeader, ShredFlags, ShredType, ShredVariant,
MAX_DATA_SHREDS_PER_SLOT,
},
solana_sdk::{clock::Slot, signature::Signature},
@ -132,18 +132,21 @@ pub(super) fn sanitize<T: ShredDataTrait>(shred: &T) -> Result<(), Error> {
}
let common_header = shred.common_header();
let data_header = shred.data_header();
let _shard_index = shred.erasure_shard_index()?;
let _erasure_shard = shred.erasure_shard_as_slice()?;
if common_header.index as usize >= MAX_DATA_SHREDS_PER_SLOT {
return Err(Error::InvalidDataShredIndex(common_header.index));
return Err(Error::InvalidShredIndex(
ShredType::Data,
common_header.index,
));
}
let _data = shred.data()?;
let _parent = shred.parent()?;
let flags = data_header.flags;
if flags.intersects(ShredFlags::LAST_SHRED_IN_SLOT)
&& !flags.contains(ShredFlags::DATA_COMPLETE_SHRED)
{
return Err(Error::InvalidShredFlags(data_header.flags.bits()));
}
let _data = shred.data()?;
let _parent = shred.parent()?;
let _shard_index = shred.erasure_shard_index()?;
let _erasure_shard = shred.erasure_shard_as_slice()?;
Ok(())
}

View File

@ -36,6 +36,7 @@ pub struct ShredFetchStats {
pub duplicate_shred: usize,
pub slot_out_of_range: usize,
pub(crate) bad_shred_type: usize,
pub shred_version_mismatch: usize,
since: Option<Instant>,
}
@ -118,6 +119,8 @@ impl ShredFetchStats {
("index_out_of_bounds", self.index_out_of_bounds, i64),
("slot_out_of_range", self.slot_out_of_range, i64),
("duplicate_shred", self.duplicate_shred, i64),
("bad_shred_type", self.bad_shred_type, i64),
("shred_version_mismatch", self.shred_version_mismatch, i64),
);
*self = Self {
since: Some(Instant::now()),