verifies shred slot and parent in fetch stage (#26225)

Shred slot and parent are not verified until window-service where
resources are already wasted to sig-verify and deserialize shreds.
This commit moves above verification to earlier in the pipeline in fetch
stage.
This commit is contained in:
behzad nouri 2022-06-28 12:45:50 +00:00 committed by GitHub
parent e8fed88669
commit 348fe9ebe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 227 additions and 216 deletions

View File

@ -40,9 +40,13 @@ pub fn repair_response_packet_from_bytes(
Some(packet)
}
pub fn nonce(packet: &Packet) -> Option<Nonce> {
let nonce_start = packet.meta.size.checked_sub(SIZE_OF_NONCE)?;
packet.deserialize_slice(nonce_start..).ok()
pub(crate) fn nonce(packet: &Packet) -> Option<Nonce> {
// Nonces are attached to both repair and ancestor hashes responses.
let data = packet.data(..)?;
let offset = data.len().checked_sub(SIZE_OF_NONCE)?;
<[u8; SIZE_OF_NONCE]>::try_from(&data[offset..])
.map(Nonce::from_le_bytes)
.ok()
}
#[cfg(test)]

View File

@ -11,7 +11,7 @@ use {
completed_data_sets_service::CompletedDataSetsSender,
packet_hasher::PacketHasher,
repair_service::{DuplicateSlotsResetSender, RepairInfo},
window_service::{should_retransmit_and_persist, WindowService},
window_service::WindowService,
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
itertools::{izip, Itertools},
@ -421,7 +421,7 @@ impl RetransmitStage {
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
turbine_disabled: Option<Arc<AtomicBool>>,
turbine_disabled: Arc<AtomicBool>,
cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
verified_vote_receiver: VerifiedVoteReceiver,
@ -470,14 +470,7 @@ impl RetransmitStage {
exit,
repair_info,
leader_schedule_cache,
move |shred, last_root| {
let turbine_disabled = turbine_disabled
.as_ref()
.map(|x| x.load(Ordering::Relaxed))
.unwrap_or(false);
let rv = should_retransmit_and_persist(shred, last_root);
rv && !turbine_disabled
},
turbine_disabled,
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,

View File

@ -4,7 +4,7 @@ use {
crate::packet_hasher::PacketHasher,
crossbeam_channel::{unbounded, Sender},
lru::LruCache,
solana_ledger::shred::{self, get_shred_slot_index_type, ShredFetchStats},
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::bank_forks::BankForks,
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
@ -67,6 +67,7 @@ impl ShredFetchStage {
for packet in packet_batch.iter_mut() {
if should_discard_packet(
packet,
last_root,
slot_bounds.clone(),
shred_version,
&packet_hasher,
@ -195,6 +196,7 @@ impl ShredFetchStage {
#[must_use]
fn should_discard_packet(
packet: &Packet,
root: Slot,
// Range of slots to ingest shreds for.
slot_bounds: impl RangeBounds<Slot>,
shred_version: u16,
@ -202,17 +204,7 @@ fn should_discard_packet(
shreds_received: &mut ShredsReceived,
stats: &mut ShredFetchStats,
) -> bool {
let slot = match get_shred_slot_index_type(packet, stats) {
None => return true,
Some((slot, _index, _shred_type)) => slot,
};
if !slot_bounds.contains(&slot) {
stats.slot_out_of_range += 1;
return true;
}
let shred = shred::layout::get_shred(packet);
if shred.and_then(shred::layout::get_version) != Some(shred_version) {
stats.shred_version_mismatch += 1;
if should_discard_shred(packet, root, shred_version, slot_bounds, stats) {
return true;
}
let hash = packet_hasher.hash_packet(packet);
@ -242,12 +234,12 @@ mod tests {
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
let slot = 1;
let slot = 2;
let shred_version = 45189;
let shred = Shred::new_from_data(
slot,
3, // shred index
0, // parent offset
1, // parent offset
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
0, // reference_tick
@ -264,6 +256,7 @@ mod tests {
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
@ -278,6 +271,7 @@ mod tests {
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds,
shred_version,
&hasher,
@ -303,6 +297,7 @@ mod tests {
// packet size is 0, so cannot get index
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
@ -311,20 +306,21 @@ mod tests {
));
assert_eq!(stats.index_overrun, 1);
let shred = Shred::new_from_data(
1,
3,
0,
&[],
2, // slot
3, // index
1, // parent_offset
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
0,
0, // reference_tick
shred_version,
0,
0, // fec_set_index
);
shred.copy_to_packet(&mut packet);
// rejected slot is 1, root is 3
// rejected slot is 2, root is 3
assert!(should_discard_packet(
&packet,
3,
3..slot_bounds.end,
shred_version,
&hasher,
@ -335,6 +331,7 @@ mod tests {
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
345, // shred_version
&hasher,
@ -346,6 +343,7 @@ mod tests {
// Accepted for 1,3
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
@ -356,6 +354,7 @@ mod tests {
// shreds_received should filter duplicate
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
@ -379,6 +378,7 @@ mod tests {
// Slot 1 million is too high
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
@ -391,6 +391,7 @@ mod tests {
shred.copy_to_packet(&mut packet);
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds,
shred_version,
&hasher,

View File

@ -111,7 +111,7 @@ impl Tvu {
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
turbine_disabled: Option<Arc<AtomicBool>>,
turbine_disabled: Arc<AtomicBool>,
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
@ -415,7 +415,7 @@ pub mod tests {
&leader_schedule_cache,
&exit,
block_commitment_cache,
None,
Arc::<AtomicBool>::default(),
None,
None,
None,

View File

@ -134,7 +134,7 @@ pub struct ValidatorConfig {
pub snapshot_config: Option<SnapshotConfig>,
pub max_ledger_shreds: Option<u64>,
pub broadcast_stage_type: BroadcastStageType,
pub turbine_disabled: Option<Arc<AtomicBool>>,
pub turbine_disabled: Arc<AtomicBool>,
pub enforce_ulimit_nofile: bool,
pub fixed_leader_schedule: Option<FixedSchedule>,
pub wait_for_supermajority: Option<Slot>,
@ -196,7 +196,7 @@ impl Default for ValidatorConfig {
pubsub_config: PubSubConfig::default(),
snapshot_config: None,
broadcast_stage_type: BroadcastStageType::Standard,
turbine_disabled: None,
turbine_disabled: Arc::<AtomicBool>::default(),
enforce_ulimit_nofile: true,
fixed_leader_schedule: None,
wait_for_supermajority: None,

View File

@ -14,12 +14,12 @@ use {
rayon::{prelude::*, ThreadPool},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::{self, Blockstore, BlockstoreInsertionMetrics},
blockstore::{Blockstore, BlockstoreInsertionMetrics},
leader_schedule_cache::LeaderScheduleCache,
shred::{Nonce, Shred, ShredType},
shred::{Nonce, Shred},
},
solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, PacketBatch},
solana_rayon_threadlimit::get_thread_count,
solana_sdk::{clock::Slot, pubkey::Pubkey},
@ -106,11 +106,11 @@ impl WindowServiceMetrics {
#[derive(Default)]
struct ReceiveWindowStats {
num_iters: usize,
num_packets: usize,
num_shreds: usize, // num_discards: num_packets - num_shreds
num_repairs: usize,
elapsed: Duration, // excludes waiting time on the receiver channel.
slots: HashMap<Slot, /*num shreds:*/ usize>,
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
since: Option<Instant>,
}
@ -125,18 +125,12 @@ impl ReceiveWindowStats {
}
datapoint_info!(
"receive_window_stats",
("num_iters", self.num_iters, i64),
("num_packets", self.num_packets, i64),
("num_shreds", self.num_shreds, i64),
("num_repairs", self.num_repairs, i64),
("elapsed_micros", self.elapsed.as_micros(), i64),
);
for (slot, num_shreds) in &self.slots {
datapoint_debug!(
"receive_window_num_slot_shreds",
("slot", *slot, i64),
("num_shreds", *num_shreds, i64)
);
}
let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect();
let reverse_count = |(_addr, count): &_| Reverse(*count);
if addrs.len() > MAX_NUM_ADDRS {
@ -156,29 +150,6 @@ impl ReceiveWindowStats {
}
}
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
match shred.shred_type() {
// Only data shreds have parent information
ShredType::Data => match shred.parent() {
Ok(parent) => blockstore::verify_shred_slots(shred.slot(), parent, root),
Err(_) => false,
},
// Filter out outdated coding shreds
ShredType::Code => shred.slot() >= root,
}
}
/// drop shreds that are from myself or not from the correct leader for the
/// shred's slot
pub(crate) fn should_retransmit_and_persist(shred: &Shred, root: u64) -> bool {
if verify_shred_slot(shred, root) {
true
} else {
inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
false
}
}
fn run_check_duplicate(
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
@ -313,33 +284,25 @@ where
Ok(())
}
fn recv_window<F>(
blockstore: &Blockstore,
fn recv_window(
insert_shred_sender: &Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: &Receiver<Vec<PacketBatch>>,
retransmit_sender: &Sender<Vec<Shred>>,
shred_filter: F,
turbine_disabled: &AtomicBool,
thread_pool: &ThreadPool,
stats: &mut ReceiveWindowStats,
) -> Result<()>
where
F: Fn(&Shred, /*last root:*/ Slot) -> bool + Sync,
{
let timer = Duration::from_millis(200);
let mut packet_batches = verified_receiver.recv_timeout(timer)?;
) -> Result<()> {
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut packet_batches = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
packet_batches.extend(verified_receiver.try_iter().flatten());
let now = Instant::now();
let last_root = blockstore.last_root();
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
let handle_packet = |packet: &Packet| {
if packet.meta.discard() {
inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1);
if turbine_disabled || packet.meta.discard() {
return None;
}
let serialized_shred = packet.data(..)?.to_vec();
let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?;
if !shred_filter(&shred, last_root) {
return None;
}
if packet.meta.repair() {
let repair_info = RepairMeta {
_from_addr: packet.meta.socket_addr(),
@ -369,19 +332,11 @@ where
);
stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
stats.num_shreds += shreds.len();
for shred in &shreds {
*stats.slots.entry(shred.slot()).or_default() += 1;
}
insert_shred_sender.send((shreds, repair_infos))?;
stats.num_packets += packet_batches
.iter()
.map(|packet_batch| packet_batch.len())
.sum::<usize>();
for packet in packet_batches
.iter()
.flat_map(|packet_batch| packet_batch.iter())
{
stats.num_iters += 1;
stats.num_packets += packet_batches.iter().map(PacketBatch::len).sum::<usize>();
for packet in packet_batches.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta.socket_addr();
*stats.addrs.entry(addr).or_default() += 1;
}
@ -421,7 +376,7 @@ pub(crate) struct WindowService {
impl WindowService {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new<F>(
pub(crate) fn new(
blockstore: Arc<Blockstore>,
verified_receiver: Receiver<Vec<PacketBatch>>,
retransmit_sender: Sender<Vec<Shred>>,
@ -430,18 +385,12 @@ impl WindowService {
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
leader_schedule_cache: Arc<LeaderScheduleCache>,
shred_filter: F,
turbine_disabled: Arc<AtomicBool>,
verified_vote_receiver: VerifiedVoteReceiver,
completed_data_sets_sender: CompletedDataSetsSender,
duplicate_slots_sender: DuplicateSlotSender,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> WindowService
where
F: 'static
+ Fn(&Shred, /*last root:*/ Slot) -> bool
+ std::marker::Send
+ std::marker::Sync,
{
) -> WindowService {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
let cluster_info = repair_info.cluster_info.clone();
@ -471,7 +420,7 @@ impl WindowService {
let t_insert = Self::start_window_insert_thread(
exit.clone(),
blockstore.clone(),
blockstore,
leader_schedule_cache,
insert_receiver,
duplicate_sender,
@ -483,10 +432,9 @@ impl WindowService {
let t_window = Self::start_recv_window_thread(
id,
exit,
blockstore,
insert_sender,
verified_receiver,
shred_filter,
turbine_disabled,
retransmit_sender,
);
@ -589,18 +537,14 @@ impl WindowService {
}
#[allow(clippy::too_many_arguments)]
fn start_recv_window_thread<F>(
fn start_recv_window_thread(
id: Pubkey,
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
insert_sender: Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: Receiver<Vec<PacketBatch>>,
shred_filter: F,
turbine_disabled: Arc<AtomicBool>,
retransmit_sender: Sender<Vec<Shred>>,
) -> JoinHandle<()>
where
F: 'static + Fn(&Shred, u64) -> bool + std::marker::Send + std::marker::Sync,
{
) -> JoinHandle<()> {
let mut stats = ReceiveWindowStats::default();
Builder::new()
.name("solana-window".to_string())
@ -627,11 +571,10 @@ impl WindowService {
}
};
if let Err(e) = recv_window(
&blockstore,
&insert_sender,
&verified_receiver,
&retransmit_sender,
|shred, last_root| shred_filter(shred, last_root),
&turbine_disabled,
&thread_pool,
&mut stats,
) {
@ -687,7 +630,6 @@ mod test {
shred::{ProcessShredsStats, Shredder},
},
solana_sdk::{
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
hash::Hash,
signature::{Keypair, Signer},
timing::timestamp,
@ -731,52 +673,6 @@ mod test {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_should_retransmit_and_persist() {
let leader_keypair = Arc::new(Keypair::new());
let shreds = local_entries_to_shred(&[Entry::default()], 0, 0, &leader_keypair);
// with a Bank for slot 0, shred continues
assert!(should_retransmit_and_persist(&shreds[0], 0));
// change the shred's slot so leader lookup fails
// with a Bank and no idea who leader is, shred gets thrown out
let mut bad_slot_shred = shreds[0].clone();
bad_slot_shred.set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
assert!(!should_retransmit_and_persist(&bad_slot_shred, 0));
// with a shred where shred.slot() == root, shred gets thrown out
let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds = local_entries_to_shred(&[Entry::default()], root, root - 1, &leader_keypair);
assert!(!should_retransmit_and_persist(&shreds[0], root));
// with a shred where shred.parent() < root, shred gets thrown out
let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds =
local_entries_to_shred(&[Entry::default()], root + 1, root - 1, &leader_keypair);
assert!(!should_retransmit_and_persist(&shreds[0], root));
// coding shreds don't contain parent slot information, test that slot >= root
let mut coding_shred = Shred::new_from_parity_shard(
5, // slot
5, // index
&[], // parity_shard
5, // fec_set_index
6, // num_data_shreds
6, // num_coding_shreds
3, // position
0, // version
);
coding_shred.sign(&leader_keypair);
// shred.slot() > root, shred continues
assert!(should_retransmit_and_persist(&coding_shred, 0));
// shred.slot() == root, shred continues
assert!(should_retransmit_and_persist(&coding_shred, 5));
// shred.slot() < root, shred gets thrown out
assert!(!should_retransmit_and_persist(&coding_shred, 6));
}
#[test]
fn test_run_check_duplicate() {
let blockstore_path = get_tmp_ledger_path!();

View File

@ -4026,7 +4026,7 @@ macro_rules! create_new_tmp_ledger_fifo_auto_delete {
};
}
pub fn verify_shred_slots(slot: Slot, parent: Slot, root: Slot) -> bool {
pub(crate) fn verify_shred_slots(slot: Slot, parent: Slot, root: Slot) -> bool {
if slot == 0 && parent == 0 && root == 0 {
return true; // valid write to slot zero.
}

View File

@ -56,7 +56,7 @@ pub use {
};
use {
self::{shred_code::ShredCode, traits::Shred as _},
crate::blockstore::MAX_DATA_SHREDS_PER_SLOT,
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
serde::{Deserialize, Serialize},
@ -69,7 +69,7 @@ use {
signature::{Keypair, Signature, Signer},
},
static_assertions::const_assert_eq,
std::fmt::Debug,
std::{fmt::Debug, ops::RangeBounds},
thiserror::Error,
};
@ -573,6 +573,15 @@ pub mod layout {
.ok()
}
// The caller should verify first that the shred is data and not code!
pub(super) fn get_parent_offset(shred: &[u8]) -> Option<u16> {
const OFFSET_OF_SHRED_PARENT: usize = SIZE_OF_COMMON_SHRED_HEADER;
debug_assert_eq!(get_shred_type(shred).unwrap(), ShredType::Data);
<[u8; 2]>::try_from(shred.get(OFFSET_OF_SHRED_PARENT..)?.get(..2)?)
.map(u16::from_le_bytes)
.ok()
}
// Returns slice range of the shred payload which is signed.
pub(crate) fn get_signed_message_range(shred: &[u8]) -> Option<Range<usize>> {
let range = match get_shred_variant(shred).ok()? {
@ -653,48 +662,95 @@ impl TryFrom<u8> for ShredVariant {
}
}
// Get slot, index, and type from a packet with partial deserialize
pub fn get_shred_slot_index_type(
#[must_use]
pub fn should_discard_shred(
packet: &Packet,
root: Slot,
shred_version: u16,
// Range of slots to ingest shreds for.
slot_bounds: impl RangeBounds<Slot>,
stats: &mut ShredFetchStats,
) -> Option<(Slot, u32, ShredType)> {
) -> bool {
let shred = match layout::get_shred(packet) {
None => {
stats.index_overrun += 1;
return None;
return true;
}
Some(shred) => shred,
};
if OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX > shred.len() {
stats.index_overrun += 1;
return None;
return true;
}
let shred_type = match layout::get_shred_type(shred) {
Ok(shred_type) => shred_type,
Err(_) => {
stats.bad_shred_type += 1;
return None;
return true;
}
};
let slot = match layout::get_slot(shred) {
Some(slot) => slot,
Some(slot) => {
if slot <= root || !slot_bounds.contains(&slot) {
stats.slot_out_of_range += 1;
return true;
}
slot
}
None => {
stats.slot_bad_deserialize += 1;
return None;
return true;
}
};
let index = match layout::get_index(shred) {
Some(index) => index,
None => {
stats.index_bad_deserialize += 1;
return None;
return true;
}
};
if index >= MAX_DATA_SHREDS_PER_SLOT as u32 {
stats.index_out_of_bounds += 1;
return None;
if layout::get_version(shred) != Some(shred_version) {
stats.shred_version_mismatch += 1;
return true;
}
match shred_type {
ShredType::Code => {
if index >= shred_code::MAX_CODE_SHREDS_PER_SLOT as u32 {
stats.index_out_of_bounds += 1;
return true;
}
false
}
ShredType::Data => {
if index >= MAX_DATA_SHREDS_PER_SLOT as u32 {
stats.index_out_of_bounds += 1;
return true;
}
let parent_offset = match layout::get_parent_offset(shred) {
Some(parent_offset) => parent_offset,
None => {
stats.bad_parent_offset += 1;
return true;
}
};
if parent_offset == 0 && slot != 0 {
stats.bad_parent_offset += 1;
return true;
}
let parent = match slot.checked_sub(Slot::from(parent_offset)) {
Some(parent) => parent,
None => {
stats.bad_parent_offset += 1;
return true;
}
};
if !blockstore::verify_shred_slots(slot, parent, root) {
stats.slot_out_of_range += 1;
return true;
}
false
}
}
Some((slot, index, shred_type))
}
pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option<usize>) -> u64 {
@ -863,38 +919,82 @@ mod tests {
}
#[test]
fn test_shred_offsets() {
fn test_should_discard_shred() {
solana_logger::setup();
let mut packet = Packet::default();
let shred = Shred::new_from_data(1, 3, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
let root = 1;
let shred_version = 798;
let slot_bounds = ..16;
let shred = Shred::new_from_data(
2, // slot
3, // index
1, // parent_offset
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
0, // reference_tick
shred_version,
0, // fec_set_index
);
shred.copy_to_packet(&mut packet);
let mut stats = ShredFetchStats::default();
let ret = get_shred_slot_index_type(&packet, &mut stats);
assert_eq!(Some((1, 3, ShredType::Data)), ret);
assert!(!should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats, ShredFetchStats::default());
packet.meta.size = OFFSET_OF_SHRED_VARIANT;
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
assert!(should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.index_overrun, 1);
packet.meta.size = OFFSET_OF_SHRED_INDEX;
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
assert!(should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.index_overrun, 2);
packet.meta.size = OFFSET_OF_SHRED_INDEX + 1;
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
assert!(should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.index_overrun, 3);
packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX - 1;
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
assert!(should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.index_overrun, 4);
packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX;
assert_eq!(
Some((1, 3, ShredType::Data)),
get_shred_slot_index_type(&packet, &mut stats)
);
assert_eq!(stats.index_overrun, 4);
packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX + 2;
assert!(should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(stats.bad_parent_offset, 1);
let shred = Shred::new_from_parity_shard(
8, // slot
@ -904,26 +1004,35 @@ mod tests {
30, // num_data
4, // num_code
1, // position
200, // version
shred_version,
);
shred.copy_to_packet(&mut packet);
assert_eq!(
Some((8, 2, ShredType::Code)),
get_shred_slot_index_type(&packet, &mut stats)
);
assert!(!should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
let shred = Shred::new_from_data(
1,
std::u32::MAX - 10,
0,
&[],
2, // slot
std::u32::MAX - 10, // index
1, // parent_offset
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
0,
0,
0,
0, // reference_tick
shred_version,
0, // fec_set_index
);
shred.copy_to_packet(&mut packet);
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
assert!(should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(1, stats.index_out_of_bounds);
let shred = Shred::new_from_parity_shard(
@ -939,7 +1048,13 @@ mod tests {
shred.copy_to_packet(&mut packet);
packet.buffer_mut()[OFFSET_OF_SHRED_VARIANT] = u8::MAX;
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
assert!(should_discard_shred(
&packet,
root,
shred_version,
slot_bounds,
&mut stats
));
assert_eq!(1, stats.bad_shred_type);
}
@ -1107,10 +1222,6 @@ mod tests {
assert_eq!(layout::get_slot(data), Some(shred.slot()));
assert_eq!(layout::get_index(data), Some(shred.index()));
assert_eq!(layout::get_version(data), Some(shred.version()));
assert_eq!(
get_shred_slot_index_type(packet, &mut ShredFetchStats::default()),
Some((shred.slot(), shred.index(), shred.shred_type()))
);
match shred.shred_type() {
ShredType::Code => {
assert_matches!(
@ -1123,6 +1234,10 @@ mod tests {
layout::get_reference_tick(data).unwrap(),
shred.reference_tick()
);
let parent_offset = layout::get_parent_offset(data).unwrap();
let slot = layout::get_slot(data).unwrap();
let parent = slot.checked_sub(Slot::from(parent_offset)).unwrap();
assert_eq!(parent, shred.parent().unwrap());
}
}
}

View File

@ -10,7 +10,7 @@ use {
static_assertions::const_assert_eq,
};
const MAX_CODE_SHREDS_PER_SLOT: usize = MAX_DATA_SHREDS_PER_SLOT;
pub(super) const MAX_CODE_SHREDS_PER_SLOT: usize = MAX_DATA_SHREDS_PER_SLOT;
const_assert_eq!(ShredCode::SIZE_OF_PAYLOAD, 1228);

View File

@ -37,6 +37,7 @@ pub struct ShredFetchStats {
pub slot_out_of_range: usize,
pub(crate) bad_shred_type: usize,
pub shred_version_mismatch: usize,
pub(crate) bad_parent_offset: usize,
since: Option<Instant>,
}
@ -121,6 +122,7 @@ impl ShredFetchStats {
("duplicate_shred", self.duplicate_shred, i64),
("bad_shred_type", self.bad_shred_type, i64),
("shred_version_mismatch", self.shred_version_mismatch, i64),
("bad_parent_offset", self.bad_parent_offset, i64),
);
*self = Self {
since: Some(Instant::now()),

View File

@ -277,7 +277,7 @@ pub fn run_cluster_partition<C>(
let cluster_lamports = node_stakes.iter().sum::<u64>() * 2;
let turbine_disabled = Arc::new(AtomicBool::new(false));
let mut validator_config = ValidatorConfig {
turbine_disabled: Some(turbine_disabled.clone()),
turbine_disabled: turbine_disabled.clone(),
..ValidatorConfig::default_for_test()
};