Partial shred deserialize cleanup and shred type differentiation (#14094)
* Partial shred deserialize cleanup and shred type differentiation in retransmit * consolidate packet hashing logic
This commit is contained in:
parent
aeda8d8b91
commit
d4a174fb7c
|
@ -41,6 +41,7 @@ pub mod ledger_cleanup_service;
|
|||
pub mod non_circulating_supply;
|
||||
pub mod optimistic_confirmation_verifier;
|
||||
pub mod optimistically_confirmed_bank_tracker;
|
||||
pub mod packet_hasher;
|
||||
pub mod ping_pong;
|
||||
pub mod poh_recorder;
|
||||
pub mod poh_service;
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
// Get a unique hash value for a packet
|
||||
// Used in retransmit and shred fetch to prevent dos with same packet data.
|
||||
|
||||
use ahash::AHasher;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_perf::packet::Packet;
|
||||
use std::hash::Hasher;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PacketHasher {
|
||||
seed1: u128,
|
||||
seed2: u128,
|
||||
}
|
||||
|
||||
impl Default for PacketHasher {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
seed1: thread_rng().gen::<u128>(),
|
||||
seed2: thread_rng().gen::<u128>(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PacketHasher {
|
||||
pub fn hash_packet(&self, packet: &Packet) -> u64 {
|
||||
let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2);
|
||||
hasher.write(&packet.data[0..packet.meta.size]);
|
||||
hasher.finish()
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
*self = Self::default();
|
||||
}
|
||||
}
|
|
@ -1,8 +1,6 @@
|
|||
//! The `retransmit_stage` retransmits shreds between validators
|
||||
#![allow(clippy::rc_buffer)]
|
||||
|
||||
use crate::shred_fetch_stage::ShredFetchStage;
|
||||
use crate::shred_fetch_stage::ShredFetchStats;
|
||||
use crate::{
|
||||
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
|
||||
cluster_info_vote_listener::VerifiedVoteReceiver,
|
||||
|
@ -15,10 +13,9 @@ use crate::{
|
|||
result::{Error, Result},
|
||||
window_service::{should_retransmit_and_persist, WindowService},
|
||||
};
|
||||
use ahash::AHasher;
|
||||
use crossbeam_channel::Receiver;
|
||||
use lru::LruCache;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats};
|
||||
use solana_ledger::{
|
||||
blockstore::{Blockstore, CompletedSlotsReceiver},
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
|
@ -26,14 +23,13 @@ use solana_ledger::{
|
|||
};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::inc_new_counter_error;
|
||||
use solana_perf::packet::Packets;
|
||||
use solana_perf::packet::{Packet, Packets};
|
||||
use solana_runtime::bank_forks::BankForks;
|
||||
use solana_sdk::clock::{Epoch, Slot};
|
||||
use solana_sdk::epoch_schedule::EpochSchedule;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::timing::timestamp;
|
||||
use solana_streamer::streamer::PacketReceiver;
|
||||
use std::hash::Hasher;
|
||||
use std::{
|
||||
cmp,
|
||||
collections::hash_set::HashSet,
|
||||
|
@ -206,7 +202,42 @@ struct EpochStakesCache {
|
|||
stakes_and_index: Vec<(u64, usize)>,
|
||||
}
|
||||
|
||||
pub type ShredFilterAndSeeds = (LruCache<(Slot, u32), Vec<u64>>, u128, u128);
|
||||
use crate::packet_hasher::PacketHasher;
|
||||
// Map of shred (slot, index, is_data) => list of hash values seen for that key.
|
||||
pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
|
||||
|
||||
pub type ShredFilterAndHasher = (ShredFilter, PacketHasher);
|
||||
|
||||
// Return true if shred is already received and should skip retransmit
|
||||
fn check_if_already_received(
|
||||
packet: &Packet,
|
||||
shreds_received: &Arc<Mutex<ShredFilterAndHasher>>,
|
||||
) -> bool {
|
||||
match get_shred_slot_index_type(packet, &mut ShredFetchStats::default()) {
|
||||
Some(slot_index) => {
|
||||
let mut received = shreds_received.lock().unwrap();
|
||||
let hasher = received.1.clone();
|
||||
if let Some(sent) = received.0.get_mut(&slot_index) {
|
||||
if sent.len() < MAX_DUPLICATE_COUNT {
|
||||
let hash = hasher.hash_packet(packet);
|
||||
if sent.contains(&hash) {
|
||||
return true;
|
||||
}
|
||||
|
||||
sent.push(hash);
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
let hash = hasher.hash_packet(&packet);
|
||||
received.0.put(slot_index, vec![hash]);
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn retransmit(
|
||||
|
@ -219,7 +250,7 @@ fn retransmit(
|
|||
stats: &Arc<RetransmitStats>,
|
||||
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>,
|
||||
last_peer_update: &Arc<AtomicU64>,
|
||||
shreds_received: &Arc<Mutex<ShredFilterAndSeeds>>,
|
||||
shreds_received: &Arc<Mutex<ShredFilterAndHasher>>,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let r_lock = r.lock().unwrap();
|
||||
|
@ -271,8 +302,7 @@ fn retransmit(
|
|||
{
|
||||
let mut sr = shreds_received.lock().unwrap();
|
||||
sr.0.clear();
|
||||
sr.1 = thread_rng().gen::<u128>();
|
||||
sr.2 = thread_rng().gen::<u128>();
|
||||
sr.1.reset();
|
||||
}
|
||||
}
|
||||
let mut peers_len = 0;
|
||||
|
@ -299,33 +329,10 @@ fn retransmit(
|
|||
continue;
|
||||
}
|
||||
|
||||
match ShredFetchStage::get_slot_index(packet, &mut ShredFetchStats::default()) {
|
||||
Some(slot_index) => {
|
||||
let mut received = shreds_received.lock().unwrap();
|
||||
let seed1 = received.1;
|
||||
let seed2 = received.2;
|
||||
if let Some(sent) = received.0.get_mut(&slot_index) {
|
||||
if sent.len() < MAX_DUPLICATE_COUNT {
|
||||
let mut hasher = AHasher::new_with_keys(seed1, seed2);
|
||||
hasher.write(&packet.data[0..packet.meta.size]);
|
||||
let hash = hasher.finish();
|
||||
if sent.contains(&hash) {
|
||||
continue;
|
||||
}
|
||||
|
||||
sent.push(hash);
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
let mut hasher = AHasher::new_with_keys(seed1, seed2);
|
||||
hasher.write(&packet.data[0..packet.meta.size]);
|
||||
let hash = hasher.finish();
|
||||
received.0.put(slot_index, vec![hash]);
|
||||
}
|
||||
}
|
||||
None => continue,
|
||||
if check_if_already_received(packet, shreds_received) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
|
||||
&my_id,
|
||||
|
@ -414,7 +421,10 @@ pub fn retransmitter(
|
|||
r: Arc<Mutex<PacketReceiver>>,
|
||||
) -> Vec<JoinHandle<()>> {
|
||||
let stats = Arc::new(RetransmitStats::default());
|
||||
let shreds_received = Arc::new(Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), 0, 0)));
|
||||
let shreds_received = Arc::new(Mutex::new((
|
||||
LruCache::new(DEFAULT_LRU_SIZE),
|
||||
PacketHasher::default(),
|
||||
)));
|
||||
(0..sockets.len())
|
||||
.map(|s| {
|
||||
let sockets = sockets.clone();
|
||||
|
@ -568,6 +578,7 @@ mod tests {
|
|||
use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions};
|
||||
use solana_ledger::create_new_tmp_ledger;
|
||||
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
|
||||
use solana_ledger::shred::Shred;
|
||||
use solana_net_utils::find_available_port_in_range;
|
||||
use solana_perf::packet::{Packet, Packets};
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
|
@ -616,8 +627,7 @@ mod tests {
|
|||
);
|
||||
let _thread_hdls = vec![t_retransmit];
|
||||
|
||||
let mut shred =
|
||||
solana_ledger::shred::Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
|
||||
let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0);
|
||||
let mut packet = Packet::default();
|
||||
shred.copy_to_packet(&mut packet);
|
||||
|
||||
|
@ -642,4 +652,52 @@ mod tests {
|
|||
assert_eq!(packets.packets.len(), 1);
|
||||
assert_eq!(packets.packets[0].meta.repair, false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_already_received() {
|
||||
let mut packet = Packet::default();
|
||||
let slot = 1;
|
||||
let index = 5;
|
||||
let version = 0x40;
|
||||
let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default())));
|
||||
// unique shred for (1, 5) should pass
|
||||
assert!(!check_if_already_received(&packet, &shreds_received));
|
||||
// duplicate shred for (1, 5) blocked
|
||||
assert!(check_if_already_received(&packet, &shreds_received));
|
||||
|
||||
let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
// first duplicate shred for (1, 5) passed
|
||||
assert!(!check_if_already_received(&packet, &shreds_received));
|
||||
// then blocked
|
||||
assert!(check_if_already_received(&packet, &shreds_received));
|
||||
|
||||
let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
// 2nd duplicate shred for (1, 5) blocked
|
||||
assert!(check_if_already_received(&packet, &shreds_received));
|
||||
assert!(check_if_already_received(&packet, &shreds_received));
|
||||
|
||||
let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, 0, version);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
// Coding at (1, 5) passes
|
||||
assert!(!check_if_already_received(&packet, &shreds_received));
|
||||
// then blocked
|
||||
assert!(check_if_already_received(&packet, &shreds_received));
|
||||
|
||||
let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, 0, version);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
// 2nd unique coding at (1, 5) passes
|
||||
assert!(!check_if_already_received(&packet, &shreds_received));
|
||||
// same again is blocked
|
||||
assert!(check_if_already_received(&packet, &shreds_received));
|
||||
|
||||
let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, 0, version);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
// Another unique coding at (1, 5) always blocked
|
||||
assert!(check_if_already_received(&packet, &shreds_received));
|
||||
assert!(check_if_already_received(&packet, &shreds_received));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,17 +1,10 @@
|
|||
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
|
||||
|
||||
use ahash::AHasher;
|
||||
use crate::packet_hasher::PacketHasher;
|
||||
use lru::LruCache;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::hash::Hasher;
|
||||
|
||||
use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT;
|
||||
use solana_ledger::shred::{
|
||||
CODING_SHRED, DATA_SHRED, OFFSET_OF_SHRED_INDEX, OFFSET_OF_SHRED_SLOT, OFFSET_OF_SHRED_TYPE,
|
||||
SIZE_OF_SHRED_INDEX, SIZE_OF_SHRED_SLOT,
|
||||
};
|
||||
use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats};
|
||||
use solana_perf::cuda_runtime::PinnedVec;
|
||||
use solana_perf::packet::{limited_deserialize, Packet, PacketsRecycler};
|
||||
use solana_perf::packet::{Packet, PacketsRecycler};
|
||||
use solana_perf::recycler::Recycler;
|
||||
use solana_runtime::bank_forks::BankForks;
|
||||
use solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT};
|
||||
|
@ -27,48 +20,11 @@ use std::time::Instant;
|
|||
const DEFAULT_LRU_SIZE: usize = 10_000;
|
||||
pub type ShredsReceived = LruCache<u64, ()>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ShredFetchStats {
|
||||
index_overrun: usize,
|
||||
shred_count: usize,
|
||||
index_bad_deserialize: usize,
|
||||
index_out_of_bounds: usize,
|
||||
slot_bad_deserialize: usize,
|
||||
duplicate_shred: usize,
|
||||
slot_out_of_range: usize,
|
||||
}
|
||||
|
||||
pub struct ShredFetchStage {
|
||||
thread_hdls: Vec<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl ShredFetchStage {
|
||||
pub fn get_slot_index(p: &Packet, stats: &mut ShredFetchStats) -> Option<(u64, u32)> {
|
||||
let index_start = OFFSET_OF_SHRED_INDEX;
|
||||
let index_end = index_start + SIZE_OF_SHRED_INDEX;
|
||||
let slot_start = OFFSET_OF_SHRED_SLOT;
|
||||
let slot_end = slot_start + SIZE_OF_SHRED_SLOT;
|
||||
|
||||
if index_end <= p.meta.size {
|
||||
if let Ok(index) = limited_deserialize::<u32>(&p.data[index_start..index_end]) {
|
||||
if index < MAX_DATA_SHREDS_PER_SLOT as u32 && slot_end <= p.meta.size {
|
||||
if let Ok(slot) = limited_deserialize::<Slot>(&p.data[slot_start..slot_end]) {
|
||||
return Some((slot, index));
|
||||
} else {
|
||||
stats.slot_bad_deserialize += 1;
|
||||
}
|
||||
} else {
|
||||
stats.index_out_of_bounds += 1;
|
||||
}
|
||||
} else {
|
||||
stats.index_bad_deserialize += 1;
|
||||
}
|
||||
} else {
|
||||
stats.index_overrun += 1;
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn process_packet<F>(
|
||||
p: &mut Packet,
|
||||
shreds_received: &mut ShredsReceived,
|
||||
|
@ -77,32 +33,24 @@ impl ShredFetchStage {
|
|||
last_slot: Slot,
|
||||
slots_per_epoch: u64,
|
||||
modify: &F,
|
||||
seeds: (u128, u128),
|
||||
packet_hasher: &PacketHasher,
|
||||
) where
|
||||
F: Fn(&mut Packet),
|
||||
{
|
||||
p.meta.discard = true;
|
||||
if let Some((slot, _index)) = Self::get_slot_index(p, stats) {
|
||||
if let Some((slot, _index, _shred_type)) = get_shred_slot_index_type(p, stats) {
|
||||
// Seems reasonable to limit shreds to 2 epochs away
|
||||
if slot > last_root
|
||||
&& slot < (last_slot + 2 * slots_per_epoch)
|
||||
&& p.meta.size > OFFSET_OF_SHRED_TYPE
|
||||
{
|
||||
let shred_type = p.data[OFFSET_OF_SHRED_TYPE];
|
||||
if shred_type == DATA_SHRED || shred_type == CODING_SHRED {
|
||||
// Shred filter
|
||||
if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) {
|
||||
// Shred filter
|
||||
|
||||
let mut hasher = AHasher::new_with_keys(seeds.0, seeds.1);
|
||||
hasher.write(&p.data[0..p.meta.size]);
|
||||
let hash = hasher.finish();
|
||||
let hash = packet_hasher.hash_packet(p);
|
||||
|
||||
if shreds_received.get(&hash).is_none() {
|
||||
shreds_received.put(hash, ());
|
||||
p.meta.discard = false;
|
||||
modify(p);
|
||||
} else {
|
||||
stats.duplicate_shred += 1;
|
||||
}
|
||||
if shreds_received.get(&hash).is_none() {
|
||||
shreds_received.put(hash, ());
|
||||
p.meta.discard = false;
|
||||
modify(p);
|
||||
} else {
|
||||
stats.duplicate_shred += 1;
|
||||
}
|
||||
} else {
|
||||
stats.slot_out_of_range += 1;
|
||||
|
@ -130,12 +78,12 @@ impl ShredFetchStage {
|
|||
|
||||
let mut last_stats = Instant::now();
|
||||
let mut stats = ShredFetchStats::default();
|
||||
let mut seeds = (thread_rng().gen::<u128>(), thread_rng().gen::<u128>());
|
||||
let mut packet_hasher = PacketHasher::default();
|
||||
|
||||
while let Some(mut p) = recvr.iter().next() {
|
||||
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
|
||||
last_updated = Instant::now();
|
||||
seeds = (thread_rng().gen::<u128>(), thread_rng().gen::<u128>());
|
||||
packet_hasher.reset();
|
||||
shreds_received.clear();
|
||||
if let Some(bank_forks) = bank_forks.as_ref() {
|
||||
let bank_forks_r = bank_forks.read().unwrap();
|
||||
|
@ -156,7 +104,7 @@ impl ShredFetchStage {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&modify,
|
||||
seeds,
|
||||
&packet_hasher,
|
||||
);
|
||||
});
|
||||
if last_stats.elapsed().as_millis() > 1000 {
|
||||
|
@ -274,6 +222,7 @@ impl ShredFetchStage {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use solana_ledger::blockstore::MAX_DATA_SHREDS_PER_SLOT;
|
||||
use solana_ledger::shred::Shred;
|
||||
|
||||
#[test]
|
||||
|
@ -287,7 +236,7 @@ mod tests {
|
|||
let shred = Shred::new_from_data(slot, 3, 0, None, true, true, 0, 0, 0);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
|
||||
let seeds = (thread_rng().gen::<u128>(), thread_rng().gen::<u128>());
|
||||
let hasher = PacketHasher::default();
|
||||
|
||||
let last_root = 0;
|
||||
let last_slot = 100;
|
||||
|
@ -300,7 +249,7 @@ mod tests {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&|_p| {},
|
||||
seeds,
|
||||
&hasher,
|
||||
);
|
||||
assert!(!packet.meta.discard);
|
||||
|
||||
|
@ -315,7 +264,7 @@ mod tests {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&|_p| {},
|
||||
seeds,
|
||||
&hasher,
|
||||
);
|
||||
assert!(!packet.meta.discard);
|
||||
}
|
||||
|
@ -329,7 +278,9 @@ mod tests {
|
|||
let last_root = 0;
|
||||
let last_slot = 100;
|
||||
let slots_per_epoch = 10;
|
||||
let seeds = (thread_rng().gen::<u128>(), thread_rng().gen::<u128>());
|
||||
|
||||
let hasher = PacketHasher::default();
|
||||
|
||||
// packet size is 0, so cannot get index
|
||||
ShredFetchStage::process_packet(
|
||||
&mut packet,
|
||||
|
@ -339,7 +290,7 @@ mod tests {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&|_p| {},
|
||||
seeds,
|
||||
&hasher,
|
||||
);
|
||||
assert_eq!(stats.index_overrun, 1);
|
||||
assert!(packet.meta.discard);
|
||||
|
@ -355,7 +306,7 @@ mod tests {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&|_p| {},
|
||||
seeds,
|
||||
&hasher,
|
||||
);
|
||||
assert!(packet.meta.discard);
|
||||
|
||||
|
@ -368,7 +319,7 @@ mod tests {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&|_p| {},
|
||||
seeds,
|
||||
&hasher,
|
||||
);
|
||||
assert!(!packet.meta.discard);
|
||||
|
||||
|
@ -381,7 +332,7 @@ mod tests {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&|_p| {},
|
||||
seeds,
|
||||
&hasher,
|
||||
);
|
||||
assert!(packet.meta.discard);
|
||||
|
||||
|
@ -397,7 +348,7 @@ mod tests {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&|_p| {},
|
||||
seeds,
|
||||
&hasher,
|
||||
);
|
||||
assert!(packet.meta.discard);
|
||||
|
||||
|
@ -412,20 +363,8 @@ mod tests {
|
|||
last_slot,
|
||||
slots_per_epoch,
|
||||
&|_p| {},
|
||||
seeds,
|
||||
&hasher,
|
||||
);
|
||||
assert!(packet.meta.discard);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_shred_offsets() {
|
||||
let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0);
|
||||
let mut packet = Packet::default();
|
||||
shred.copy_to_packet(&mut packet);
|
||||
let mut stats = ShredFetchStats::default();
|
||||
assert_eq!(
|
||||
Some((1, 3)),
|
||||
ShredFetchStage::get_slot_index(&packet, &mut stats)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
//! The `shred` module defines data structures and methods to pull MTU sized data frames from the network.
|
||||
use crate::{
|
||||
blockstore::MAX_DATA_SHREDS_PER_SLOT,
|
||||
entry::{create_ticks, Entry},
|
||||
erasure::Session,
|
||||
};
|
||||
|
@ -12,7 +13,7 @@ use rayon::{
|
|||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_perf::packet::Packet;
|
||||
use solana_perf::packet::{limited_deserialize, Packet};
|
||||
use solana_rayon_threadlimit::get_thread_count;
|
||||
use solana_sdk::{
|
||||
clock::Slot,
|
||||
|
@ -309,6 +310,27 @@ impl Shred {
|
|||
Ok(shred)
|
||||
}
|
||||
|
||||
pub fn new_empty_coding(
|
||||
slot: Slot,
|
||||
index: u32,
|
||||
fec_set_index: u32,
|
||||
num_data: usize,
|
||||
num_code: usize,
|
||||
position: usize,
|
||||
version: u16,
|
||||
) -> Self {
|
||||
let (header, coding_header) = Shredder::new_coding_shred_header(
|
||||
slot,
|
||||
index,
|
||||
fec_set_index,
|
||||
num_data,
|
||||
num_code,
|
||||
position,
|
||||
version,
|
||||
);
|
||||
Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header)
|
||||
}
|
||||
|
||||
pub fn new_empty_from_header(
|
||||
common_header: ShredCommonHeader,
|
||||
data_header: DataShredHeader,
|
||||
|
@ -699,7 +721,7 @@ impl Shredder {
|
|||
// Create empty coding shreds, with correctly populated headers
|
||||
let mut coding_shreds = Vec::with_capacity(num_coding);
|
||||
(0..num_coding).for_each(|i| {
|
||||
let (header, coding_header) = Self::new_coding_shred_header(
|
||||
let shred = Shred::new_empty_coding(
|
||||
slot,
|
||||
start_index + i as u32,
|
||||
start_index,
|
||||
|
@ -708,8 +730,6 @@ impl Shredder {
|
|||
i,
|
||||
version,
|
||||
);
|
||||
let shred =
|
||||
Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header);
|
||||
coding_shreds.push(shred.payload);
|
||||
});
|
||||
|
||||
|
@ -730,7 +750,7 @@ impl Shredder {
|
|||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, payload)| {
|
||||
let (common_header, coding_header) = Self::new_coding_shred_header(
|
||||
let mut shred = Shred::new_empty_coding(
|
||||
slot,
|
||||
start_index + i as u32,
|
||||
start_index,
|
||||
|
@ -739,12 +759,8 @@ impl Shredder {
|
|||
i,
|
||||
version,
|
||||
);
|
||||
Shred {
|
||||
common_header,
|
||||
data_header: DataShredHeader::default(),
|
||||
coding_header,
|
||||
payload,
|
||||
}
|
||||
shred.payload = payload;
|
||||
shred
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
|
@ -963,6 +979,71 @@ impl Shredder {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Eq, PartialEq)]
|
||||
pub struct ShredFetchStats {
|
||||
pub index_overrun: usize,
|
||||
pub shred_count: usize,
|
||||
pub index_bad_deserialize: usize,
|
||||
pub index_out_of_bounds: usize,
|
||||
pub slot_bad_deserialize: usize,
|
||||
pub duplicate_shred: usize,
|
||||
pub slot_out_of_range: usize,
|
||||
pub bad_shred_type: usize,
|
||||
}
|
||||
|
||||
// Get slot, index, and type from a packet with partial deserialize
|
||||
pub fn get_shred_slot_index_type(
|
||||
p: &Packet,
|
||||
stats: &mut ShredFetchStats,
|
||||
) -> Option<(Slot, u32, bool)> {
|
||||
let index_start = OFFSET_OF_SHRED_INDEX;
|
||||
let index_end = index_start + SIZE_OF_SHRED_INDEX;
|
||||
let slot_start = OFFSET_OF_SHRED_SLOT;
|
||||
let slot_end = slot_start + SIZE_OF_SHRED_SLOT;
|
||||
|
||||
debug_assert!(index_end > slot_end);
|
||||
debug_assert!(index_end > OFFSET_OF_SHRED_TYPE);
|
||||
|
||||
if index_end > p.meta.size {
|
||||
stats.index_overrun += 1;
|
||||
return None;
|
||||
}
|
||||
|
||||
let index;
|
||||
match limited_deserialize::<u32>(&p.data[index_start..index_end]) {
|
||||
Ok(x) => index = x,
|
||||
Err(_e) => {
|
||||
stats.index_bad_deserialize += 1;
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
if index >= MAX_DATA_SHREDS_PER_SLOT as u32 {
|
||||
stats.index_out_of_bounds += 1;
|
||||
return None;
|
||||
}
|
||||
|
||||
let slot;
|
||||
match limited_deserialize::<Slot>(&p.data[slot_start..slot_end]) {
|
||||
Ok(x) => {
|
||||
slot = x;
|
||||
}
|
||||
Err(_e) => {
|
||||
stats.slot_bad_deserialize += 1;
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
let shred_type = p.data[OFFSET_OF_SHRED_TYPE];
|
||||
if shred_type == DATA_SHRED || shred_type == CODING_SHRED {
|
||||
return Some((slot, index, shred_type == DATA_SHRED));
|
||||
} else {
|
||||
stats.bad_shred_type += 1;
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option<usize>) -> u64 {
|
||||
let ticks = create_ticks(1, 0, Hash::default());
|
||||
max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size)
|
||||
|
@ -1707,4 +1788,60 @@ pub mod tests {
|
|||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_shred_offsets() {
|
||||
solana_logger::setup();
|
||||
let mut packet = Packet::default();
|
||||
let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
let mut stats = ShredFetchStats::default();
|
||||
let ret = get_shred_slot_index_type(&packet, &mut stats);
|
||||
assert_eq!(Some((1, 3, true)), ret);
|
||||
assert_eq!(stats, ShredFetchStats::default());
|
||||
|
||||
packet.meta.size = OFFSET_OF_SHRED_TYPE;
|
||||
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
|
||||
assert_eq!(stats.index_overrun, 1);
|
||||
|
||||
packet.meta.size = OFFSET_OF_SHRED_INDEX;
|
||||
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
|
||||
assert_eq!(stats.index_overrun, 2);
|
||||
|
||||
packet.meta.size = OFFSET_OF_SHRED_INDEX + 1;
|
||||
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
|
||||
assert_eq!(stats.index_overrun, 3);
|
||||
|
||||
packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX - 1;
|
||||
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
|
||||
assert_eq!(stats.index_overrun, 4);
|
||||
|
||||
packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX;
|
||||
assert_eq!(
|
||||
Some((1, 3, true)),
|
||||
get_shred_slot_index_type(&packet, &mut stats)
|
||||
);
|
||||
assert_eq!(stats.index_overrun, 4);
|
||||
|
||||
let shred = Shred::new_empty_coding(8, 2, 10, 30, 4, 7, 200);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
assert_eq!(
|
||||
Some((8, 2, false)),
|
||||
get_shred_slot_index_type(&packet, &mut stats)
|
||||
);
|
||||
|
||||
let shred = Shred::new_from_data(1, std::u32::MAX - 10, 0, None, true, true, 0, 0, 0);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
|
||||
assert_eq!(1, stats.index_out_of_bounds);
|
||||
|
||||
let (mut header, coding_header) =
|
||||
Shredder::new_coding_shred_header(8, 2, 10, 30, 4, 7, 200);
|
||||
header.shred_type = ShredType(u8::MAX);
|
||||
let shred = Shred::new_empty_from_header(header, DataShredHeader::default(), coding_header);
|
||||
shred.copy_to_packet(&mut packet);
|
||||
|
||||
assert_eq!(None, get_shred_slot_index_type(&packet, &mut stats));
|
||||
assert_eq!(1, stats.bad_shred_type);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue