From 87b2525e03cb1f23a179f4e6da016b5d840ef6e3 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 30 Dec 2019 07:42:09 -0800 Subject: [PATCH] Limit maximum number of shreds in a slot to 32K (#7584) * Limit maximum number of shreds in a slot to 32K * mark dead slot replay as fatal error --- core/src/replay_stage.rs | 1 + core/src/shred_fetch_stage.rs | 19 ++++++++++++++++++- core/src/sigverify_shreds.rs | 8 +++----- core/src/window_service.rs | 12 +++++++++++- ledger/src/blocktree.rs | 8 ++++++++ ledger/src/blocktree_db.rs | 1 + ledger/src/shred.rs | 18 ++++++++++++++++++ ledger/src/sigverify_shreds.rs | 5 ++++- 8 files changed, 64 insertions(+), 8 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 84a1657b8c..094cfd5d5a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -537,6 +537,7 @@ impl ReplayStage { } Err(Error::BlockError(_)) => true, Err(Error::BlocktreeError(BlocktreeError::InvalidShredData(_))) => true, + Err(Error::BlocktreeError(BlocktreeError::DeadSlot)) => true, _ => false, } } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 3b43fbab38..bb9949a4c2 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -2,7 +2,10 @@ use crate::packet::{Packet, PacketsRecycler}; use crate::streamer::{self, PacketReceiver, PacketSender}; +use solana_ledger::blocktree::MAX_DATA_SHREDS_PER_SLOT; +use solana_ledger::shred::{OFFSET_OF_SHRED_INDEX, SIZE_OF_SHRED_INDEX}; use solana_perf::cuda_runtime::PinnedVec; +use solana_perf::packet::limited_deserialize; use solana_perf::recycler::Recycler; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; @@ -21,7 +24,21 @@ impl ShredFetchStage { F: Fn(&mut Packet), { while let Some(mut p) = recvr.iter().next() { - p.packets.iter_mut().for_each(|p| modify(p)); + let index_start = OFFSET_OF_SHRED_INDEX; + let index_end = index_start + SIZE_OF_SHRED_INDEX; + p.packets.iter_mut().for_each(|p| { + p.meta.discard = true; + if index_end <= p.meta.size { + if let Ok(index) = limited_deserialize::(&p.data[index_start..index_end]) { + if index < MAX_DATA_SHREDS_PER_SLOT as u32 { + p.meta.discard = false; + modify(p); + } else { + inc_new_counter_warn!("shred_fetch_stage-shred_index_overrun", 1); + } + } + } + }); if sendr.send(p).is_err() { break; } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 55d70b1fbc..a26f09d1bb 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -4,12 +4,10 @@ use crate::sigverify; use crate::sigverify_stage::SigVerifier; use solana_ledger::bank_forks::BankForks; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; -use solana_ledger::shred::ShredType; +use solana_ledger::shred::{OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_SLOT}; use solana_ledger::sigverify_shreds::verify_shreds_gpu; use solana_perf::recycler_cache::RecyclerCache; -use solana_sdk::signature::Signature; use std::collections::{HashMap, HashSet}; -use std::mem::size_of; use std::sync::{Arc, RwLock}; #[derive(Clone)] @@ -36,8 +34,8 @@ impl ShredSigVerifier { .iter() .flat_map(|batch| { batch.packets.iter().filter_map(|packet| { - let slot_start = size_of::() + size_of::(); - let slot_end = slot_start + size_of::(); + let slot_start = OFFSET_OF_SHRED_SLOT; + let slot_end = slot_start + SIZE_OF_SHRED_SLOT; trace!("slot {} {}", slot_start, slot_end,); if slot_end <= packet.meta.size { limited_deserialize(&packet.data[slot_start..slot_end]).ok() diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 1eb0b200e4..756882fa88 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -13,7 +13,7 @@ use rayon::iter::IntoParallelRefMutIterator; use rayon::iter::ParallelIterator; use rayon::ThreadPool; use solana_ledger::bank_forks::BankForks; -use solana_ledger::blocktree::{self, Blocktree}; +use solana_ledger::blocktree::{self, Blocktree, MAX_DATA_SHREDS_PER_SLOT}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; use solana_ledger::shred::Shred; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; @@ -61,6 +61,9 @@ pub fn should_retransmit_and_persist( } else if shred.version() != shred_version { inc_new_counter_debug!("streamer-recv_window-incorrect_shred_version", 1); false + } else if shred.index() >= MAX_DATA_SHREDS_PER_SLOT as u32 { + inc_new_counter_warn!("streamer-recv_window-shred_index_overrun", 1); + false } else { true } @@ -130,6 +133,13 @@ where Shred::new_from_serialized_shred(packet.data.to_vec()) { if shred_filter(&shred, last_root) { + // Mark slot as dead if the current shred is on the boundary + // of max shreds per slot. However, let the current shred + // get retransmitted. It'll allow peer nodes to see this shred + // and trigger them to mark the slot as dead. + if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 { + let _ = blocktree.set_dead_slot(shred.slot()); + } packet.meta.slot = shred.slot(); packet.meta.seed = shred.seed(); Some(shred) diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 28bab1e173..325eb77c10 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -63,6 +63,11 @@ pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100; pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK; const TIMESTAMP_SLOT_RANGE: usize = 5; +// An upper bound on maximum number of data shreds we can handle in a slot +// 32K shreds would allow ~320K peak TPS +// (32K shreds per slot * 4 TX per shred * 2.5 slots per sec) +pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; + pub type CompletedSlotsReceiver = Receiver>; // ledger window @@ -1409,6 +1414,9 @@ impl Blocktree { slot: Slot, start_index: u64, ) -> Result<(Vec, usize, bool)> { + if self.is_dead(slot) { + return Err(BlocktreeError::DeadSlot); + } let slot_meta_cf = self.db.column::(); let slot_meta = slot_meta_cf.get(slot)?; if slot_meta.is_none() { diff --git a/ledger/src/blocktree_db.rs b/ledger/src/blocktree_db.rs index 8c7f543bd4..c4dbde38db 100644 --- a/ledger/src/blocktree_db.rs +++ b/ledger/src/blocktree_db.rs @@ -41,6 +41,7 @@ pub enum BlocktreeError { InvalidShredData(Box), RocksDb(#[from] rocksdb::Error), SlotNotRooted, + DeadSlot, IO(#[from] std::io::Error), Serialize(#[from] Box), FsExtraError(#[from] fs_extra::error::Error), diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index b9c6ec416e..59170d2e04 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -31,6 +31,9 @@ pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; pub const SIZE_OF_DATA_SHRED_HEADER: usize = 3; pub const SIZE_OF_CODING_SHRED_HEADER: usize = 6; pub const SIZE_OF_SIGNATURE: usize = 64; +pub const SIZE_OF_SHRED_TYPE: usize = 1; +pub const SIZE_OF_SHRED_SLOT: usize = 8; +pub const SIZE_OF_SHRED_INDEX: usize = 4; pub const SIZE_OF_DATA_SHRED_IGNORED_TAIL: usize = SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_CODING_SHRED_HEADER; pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE @@ -38,6 +41,9 @@ pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE - SIZE_OF_DATA_SHRED_HEADER - SIZE_OF_DATA_SHRED_IGNORED_TAIL; +pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_TYPE; +pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; + thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) .build() @@ -924,6 +930,18 @@ pub mod tests { SIZE_OF_SIGNATURE, bincode::serialized_size(&Signature::default()).unwrap() as usize ); + assert_eq!( + SIZE_OF_SHRED_TYPE, + bincode::serialized_size(&ShredType::default()).unwrap() as usize + ); + assert_eq!( + SIZE_OF_SHRED_SLOT, + bincode::serialized_size(&Slot::default()).unwrap() as usize + ); + assert_eq!( + SIZE_OF_SHRED_INDEX, + bincode::serialized_size(&ShredCommonHeader::default().index).unwrap() as usize + ); } fn verify_test_code_shred(shred: &Shred, index: u32, slot: Slot, pk: &Pubkey, verify: bool) { diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index a75fcb65c1..73d2599294 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -45,6 +45,9 @@ fn verify_shred_cpu(packet: &Packet, slot_leaders: &HashMap) -> O let slot_end = slot_start + size_of::(); let msg_start = sig_end; let msg_end = packet.meta.size; + if packet.meta.discard { + return Some(0); + } trace!("slot start and end {} {}", slot_start, slot_end); if packet.meta.size < slot_end { return Some(0); @@ -104,7 +107,7 @@ fn slot_key_data_for_gpu< .map(|packet| { let slot_start = size_of::() + size_of::(); let slot_end = slot_start + size_of::(); - if packet.meta.size < slot_end { + if packet.meta.size < slot_end || packet.meta.discard { return std::u64::MAX; } let slot: Option =