Limit maximum number of shreds in a slot to 32K (#7584)

* Limit maximum number of shreds in a slot to 32K

* mark dead slot replay as fatal error
This commit is contained in:
Pankaj Garg 2019-12-30 07:42:09 -08:00 committed by GitHub
parent faa77aca2e
commit 87b2525e03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 64 additions and 8 deletions

View File

@ -537,6 +537,7 @@ impl ReplayStage {
}
Err(Error::BlockError(_)) => true,
Err(Error::BlocktreeError(BlocktreeError::InvalidShredData(_))) => true,
Err(Error::BlocktreeError(BlocktreeError::DeadSlot)) => true,
_ => false,
}
}

View File

@ -2,7 +2,10 @@
use crate::packet::{Packet, PacketsRecycler};
use crate::streamer::{self, PacketReceiver, PacketSender};
use solana_ledger::blocktree::MAX_DATA_SHREDS_PER_SLOT;
use solana_ledger::shred::{OFFSET_OF_SHRED_INDEX, SIZE_OF_SHRED_INDEX};
use solana_perf::cuda_runtime::PinnedVec;
use solana_perf::packet::limited_deserialize;
use solana_perf::recycler::Recycler;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
@ -21,7 +24,21 @@ impl ShredFetchStage {
F: Fn(&mut Packet),
{
while let Some(mut p) = recvr.iter().next() {
p.packets.iter_mut().for_each(|p| modify(p));
let index_start = OFFSET_OF_SHRED_INDEX;
let index_end = index_start + SIZE_OF_SHRED_INDEX;
p.packets.iter_mut().for_each(|p| {
p.meta.discard = true;
if index_end <= p.meta.size {
if let Ok(index) = limited_deserialize::<u32>(&p.data[index_start..index_end]) {
if index < MAX_DATA_SHREDS_PER_SLOT as u32 {
p.meta.discard = false;
modify(p);
} else {
inc_new_counter_warn!("shred_fetch_stage-shred_index_overrun", 1);
}
}
}
});
if sendr.send(p).is_err() {
break;
}

View File

@ -4,12 +4,10 @@ use crate::sigverify;
use crate::sigverify_stage::SigVerifier;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::shred::ShredType;
use solana_ledger::shred::{OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_SLOT};
use solana_ledger::sigverify_shreds::verify_shreds_gpu;
use solana_perf::recycler_cache::RecyclerCache;
use solana_sdk::signature::Signature;
use std::collections::{HashMap, HashSet};
use std::mem::size_of;
use std::sync::{Arc, RwLock};
#[derive(Clone)]
@ -36,8 +34,8 @@ impl ShredSigVerifier {
.iter()
.flat_map(|batch| {
batch.packets.iter().filter_map(|packet| {
let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_end = slot_start + size_of::<u64>();
let slot_start = OFFSET_OF_SHRED_SLOT;
let slot_end = slot_start + SIZE_OF_SHRED_SLOT;
trace!("slot {} {}", slot_start, slot_end,);
if slot_end <= packet.meta.size {
limited_deserialize(&packet.data[slot_start..slot_end]).ok()

View File

@ -13,7 +13,7 @@ use rayon::iter::IntoParallelRefMutIterator;
use rayon::iter::ParallelIterator;
use rayon::ThreadPool;
use solana_ledger::bank_forks::BankForks;
use solana_ledger::blocktree::{self, Blocktree};
use solana_ledger::blocktree::{self, Blocktree, MAX_DATA_SHREDS_PER_SLOT};
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::shred::Shred;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
@ -61,6 +61,9 @@ pub fn should_retransmit_and_persist(
} else if shred.version() != shred_version {
inc_new_counter_debug!("streamer-recv_window-incorrect_shred_version", 1);
false
} else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 {
inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1);
false
} else {
true
}
@ -130,6 +133,13 @@ where
Shred::new_from_serialized_shred(packet.data.to_vec())
{
if shred_filter(&shred, last_root) {
// Mark slot as dead if the current shred is on the boundary
// of max shreds per slot. However, let the current shred
// get retransmitted. It'll allow peer nodes to see this shred
// and trigger them to mark the slot as dead.
if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 {
let _ = blocktree.set_dead_slot(shred.slot());
}
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed();
Some(shred)

View File

@ -63,6 +63,11 @@ pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100;
pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK;
const TIMESTAMP_SLOT_RANGE: usize = 5;
// An upper bound on maximum number of data shreds we can handle in a slot
// 32K shreds would allow ~320K peak TPS
// (32K shreds per slot * 4 TX per shred * 2.5 slots per sec)
pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768;
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
// ledger window
@ -1409,6 +1414,9 @@ impl Blocktree {
slot: Slot,
start_index: u64,
) -> Result<(Vec<Entry>, usize, bool)> {
if self.is_dead(slot) {
return Err(BlocktreeError::DeadSlot);
}
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = slot_meta_cf.get(slot)?;
if slot_meta.is_none() {

View File

@ -41,6 +41,7 @@ pub enum BlocktreeError {
InvalidShredData(Box<bincode::ErrorKind>),
RocksDb(#[from] rocksdb::Error),
SlotNotRooted,
DeadSlot,
IO(#[from] std::io::Error),
Serialize(#[from] Box<bincode::ErrorKind>),
FsExtraError(#[from] fs_extra::error::Error),

View File

@ -31,6 +31,9 @@ pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83;
pub const SIZE_OF_DATA_SHRED_HEADER: usize = 3;
pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6;
pub const SIZE_OF_SIGNATURE: usize = 64;
pub const SIZE_OF_SHRED_TYPE: usize = 1;
pub const SIZE_OF_SHRED_SLOT: usize = 8;
pub const SIZE_OF_SHRED_INDEX: usize = 4;
pub const SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize =
SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER;
pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
@ -38,6 +41,9 @@ pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
- SIZE_OF_DATA_SHRED_HEADER
- SIZE_OF_DATA_SHRED_IGNORED_TAIL;
pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE;
pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
@ -924,6 +930,18 @@ pub mod tests {
SIZE_OF_SIGNATURE,
bincode::serialized_size(&Signature::default()).unwrap() as usize
);
assert_eq!(
SIZE_OF_SHRED_TYPE,
bincode::serialized_size(&ShredType::default()).unwrap() as usize
);
assert_eq!(
SIZE_OF_SHRED_SLOT,
bincode::serialized_size(&Slot::default()).unwrap() as usize
);
assert_eq!(
SIZE_OF_SHRED_INDEX,
bincode::serialized_size(&ShredCommonHeader::default().index).unwrap() as usize
);
}
fn verify_test_code_shred(shred: &Shred, index: u32, slot: Slot, pk: &Pubkey, verify: bool) {

View File

@ -45,6 +45,9 @@ fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap<u64, [u8; 32]>) -> O
let slot_end = slot_start + size_of::<u64>();
let msg_start = sig_end;
let msg_end = packet.meta.size;
if packet.meta.discard {
return Some(0);
}
trace!("slot start and end {} {}", slot_start, slot_end);
if packet.meta.size < slot_end {
return Some(0);
@ -104,7 +107,7 @@ fn slot_key_data_for_gpu<
.map(|packet| {
let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_end = slot_start + size_of::<u64>();
if packet.meta.size < slot_end {
if packet.meta.size < slot_end || packet.meta.discard {
return std::u64::MAX;
}
let slot: Option<u64> =