flattens the internal buffer in the duplicate-shreds handler (#30196)

Duplicate-shreds handler is using a nested hash-map for the incomplete
chunks buffered. This is resulting in a convoluted logic to limit the
number of entries:
https://github.com/solana-labs/solana/blob/427bd6264/gossip/src/duplicate_shred_handler.rs#L62

This commit instead uses a flat buffer mapping (Slot, Pubkey) pairs to
the respective duplicate shreds chunks. The buffer is allowed to grow to
twice the intended capacity, at which point the extraneous entries are
removed in linear time, resulting an amortized O(1) performance.
This commit is contained in:
behzad nouri 2023-02-09 16:33:20 +00:00 committed by GitHub
parent a4fb1834d1
commit b3887af7c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 286 deletions

View File

@ -60,8 +60,8 @@ pub enum Error {
BlockstoreInsertFailed(#[from] BlockstoreError),
#[error("data chunk mismatch")]
DataChunkMismatch,
#[error("invalid chunk index")]
InvalidChunkIndex,
#[error("invalid chunk_index: {chunk_index}, num_chunks: {num_chunks}")]
InvalidChunkIndex { chunk_index: u8, num_chunks: u8 },
#[error("invalid duplicate shreds")]
InvalidDuplicateShreds,
#[error("invalid duplicate slot proof")]
@ -86,8 +86,8 @@ pub enum Error {
SlotMismatch,
#[error("type conversion error")]
TryFromIntError(#[from] TryFromIntError),
#[error("unknown slot leader")]
UnknownSlotLeader,
#[error("unknown slot leader: {0}")]
UnknownSlotLeader(Slot),
}
// Asserts that the two shreds can indicate duplicate proof for
@ -110,7 +110,8 @@ fn check_shreds(
Err(Error::InvalidDuplicateShreds)
} else {
if let Some(leader_schedule) = leader_schedule {
let slot_leader = leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader)?;
let slot_leader =
leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?;
if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
return Err(Error::InvalidSignature);
}
@ -219,7 +220,10 @@ fn check_chunk(
} else if dup.num_chunks != num_chunks {
Err(Error::NumChunksMismatch)
} else if dup.chunk_index >= num_chunks {
Err(Error::InvalidChunkIndex)
Err(Error::InvalidChunkIndex {
chunk_index: dup.chunk_index,
num_chunks,
})
} else {
Ok(())
}
@ -227,9 +231,9 @@ fn check_chunk(
}
/// Reconstructs the duplicate shreds from chunks of DuplicateShred.
pub fn into_shreds(
pub(crate) fn into_shreds(
slot_leader: &Pubkey,
chunks: impl IntoIterator<Item = DuplicateShred>,
leader: impl LeaderScheduleFn,
) -> Result<(Shred, Shred), Error> {
let mut chunks = chunks.into_iter();
let DuplicateShred {
@ -241,7 +245,6 @@ pub fn into_shreds(
chunk,
..
} = chunks.next().ok_or(Error::InvalidDuplicateShreds)?;
let slot_leader = leader(slot).ok_or(Error::UnknownSlotLeader)?;
let check_chunk = check_chunk(slot, shred_index, shred_type, num_chunks);
let mut data = HashMap::new();
data.insert(chunk_index, chunk);
@ -276,7 +279,7 @@ pub fn into_shreds(
Err(Error::ShredTypeMismatch)
} else if shred1.payload() == shred2.payload() {
Err(Error::InvalidDuplicateShreds)
} else if !shred1.verify(&slot_leader) || !shred2.verify(&slot_leader) {
} else if !shred1.verify(slot_leader) || !shred2.verify(slot_leader) {
Err(Error::InvalidSignature)
} else {
Ok((shred1, shred2))
@ -391,7 +394,7 @@ pub(crate) mod tests {
.unwrap()
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(chunks, leader_schedule).unwrap();
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}

View File

@ -1,9 +1,9 @@
use {
crate::{
duplicate_shred::{into_shreds, DuplicateShred, Error},
duplicate_shred::{self, DuplicateShred, Error},
duplicate_shred_listener::DuplicateShredHandlerTrait,
},
log::*,
log::error,
solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
solana_runtime::bank_forks::BankForks,
solana_sdk::{
@ -11,62 +11,32 @@ use {
pubkey::Pubkey,
},
std::{
cmp::Reverse,
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
},
};
const CLEANUP_EVERY_N_LOOPS: usize = 10;
// Normally num_chunks is 3, because there are two shreds (each is one packet)
// and meta data. So we discard anything larger than 3 chunks.
const MAX_NUM_CHUNKS: usize = 3;
// We only allow each pubkey to send proofs for 5 slots, because normally there
// is only 1 person sending out duplicate proofs, 1 person is leader for 4 slots,
// so we allow 5 here to limit the chunk map size.
const ALLOWED_SLOTS_PER_PUBKEY: usize = 5;
// We limit the pubkey for each slot to be 300 for now, when this limit is reached,
// we drop 50% of pubkeys with lowest stakes. It is kept at 300 because we want
// number of pubkeys after pruning to have roughly 2/3 of stake.
const MAX_PUBKEY_PER_SLOT: usize = 300;
// Limit number of entries per node.
const MAX_NUM_ENTRIES_PER_PUBKEY: usize = 128;
const BUFFER_CAPACITY: usize = 512 * MAX_NUM_ENTRIES_PER_PUBKEY;
struct ProofChunkMap {
num_chunks: u8,
chunks: [Option<DuplicateShred>; MAX_NUM_CHUNKS],
}
type BufferEntry = [Option<DuplicateShred>; MAX_NUM_CHUNKS];
impl ProofChunkMap {
fn new(num_chunks: u8) -> Self {
Self {
num_chunks,
chunks: <[Option<DuplicateShred>; MAX_NUM_CHUNKS]>::default(),
}
}
}
// Group received chunks by peer pubkey, when we receive an invalid proof,
// set the value to Frozen so we don't accept future proofs with the same key.
type SlotChunkMap = HashMap<Pubkey, ProofChunkMap>;
enum SlotStatus {
// When a valid proof has been inserted, we change the entry for that slot to Frozen
// to indicate we no longer accept proofs for this slot.
Frozen,
UnfinishedProof(SlotChunkMap),
}
pub struct DuplicateShredHandler {
// Because we use UDP for packet transfer, we can normally only send ~1500 bytes
// in each packet. We send both shreds and meta data in duplicate shred proof, and
// each shred is normally 1 packet(1500 bytes), so the whole proof is larger than
// 1 packet and it needs to be cut down as chunks for transfer. So we need to piece
// together the chunks into the original proof before anything useful is done.
chunk_map: HashMap<Slot, SlotStatus>,
// We don't want bad guys to inflate the chunk map, so we limit the number of
// pending proofs from each pubkey to ALLOWED_SLOTS_PER_PUBKEY.
validator_pending_proof_map: HashMap<Pubkey, HashSet<Slot>>,
buffer: HashMap<(Slot, Pubkey), BufferEntry>,
// Slots for which a duplicate proof is already ingested.
consumed: HashSet<Slot>,
// Cache last root to reduce read lock.
last_root: Slot,
// remember the last root slot cleaned, clear anything between last_root and last_root_cleaned.
last_root_cleaned: Slot,
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
@ -74,9 +44,6 @@ pub struct DuplicateShredHandler {
cached_on_epoch: Epoch,
cached_staked_nodes: Arc<HashMap<Pubkey, u64>>,
cached_slots_in_epoch: u64,
// Because cleanup could potentially be very expensive, only clean up when clean up
// count is 0
cleanup_count: usize,
}
impl DuplicateShredHandlerTrait for DuplicateShredHandler {
@ -84,12 +51,9 @@ impl DuplicateShredHandlerTrait for DuplicateShredHandler {
// we may send different type of CrdsData to different senders.
fn handle(&mut self, shred_data: DuplicateShred) {
self.cache_root_info();
self.maybe_prune_buffer();
if let Err(error) = self.handle_shred_data(shred_data) {
error!("handle packet: {:?}", error)
}
if self.cleanup_count.saturating_sub(1) == 0 {
self.cleanup_old_slots();
self.cleanup_count = CLEANUP_EVERY_N_LOOPS;
error!("handle packet: {error:?}")
}
}
}
@ -101,17 +65,15 @@ impl DuplicateShredHandler {
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
Self {
chunk_map: HashMap::new(),
validator_pending_proof_map: HashMap::new(),
buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(),
consumed: HashSet::<Slot>::default(),
last_root: 0,
last_root_cleaned: 0,
cached_on_epoch: 0,
cached_staked_nodes: Arc::new(HashMap::new()),
cached_slots_in_epoch: 0,
blockstore,
leader_schedule_cache,
bank_forks,
cleanup_count: CLEANUP_EVERY_N_LOOPS,
}
}
@ -134,133 +96,96 @@ impl DuplicateShredHandler {
}
}
fn handle_shred_data(&mut self, data: DuplicateShred) -> Result<(), Error> {
if self.should_insert_chunk(&data) {
let slot = data.slot;
if let Some(chunks) = self.insert_chunk(data) {
self.verify_and_apply_proof(slot, chunks)?;
// We stored the duplicate proof in this slot, no need to accept any future proof.
self.mark_slot_proof_received(slot);
}
}
Ok(())
}
fn should_insert_chunk(&mut self, data: &DuplicateShred) -> bool {
let slot = data.slot;
// Do not insert if this slot is rooted or too far away in the future or has a proof already.
if !(self.last_root..self.last_root + self.cached_slots_in_epoch).contains(&slot)
|| self.blockstore.has_duplicate_shreds_in_slot(slot)
{
return false;
}
// Discard all proofs with abnormal num_chunks.
if data.num_chunks() == 0 || usize::from(data.num_chunks()) > MAX_NUM_CHUNKS {
return false;
}
// Only allow limited unfinished proofs per pubkey to reject attackers.
if let Some(current_slots_set) = self.validator_pending_proof_map.get(&data.from) {
if !current_slots_set.contains(&slot)
&& current_slots_set.len() >= ALLOWED_SLOTS_PER_PUBKEY
{
return false;
}
}
// Also skip frozen slots or slots with a different proof than me.
match self.chunk_map.get(&slot) {
Some(SlotStatus::Frozen) => false,
Some(SlotStatus::UnfinishedProof(_)) => true,
None => true,
}
}
fn dump_pubkeys_with_low_stakes(
cached_staked_nodes: &HashMap<Pubkey, u64>,
slot_chunk_map: &mut SlotChunkMap,
) {
let mut stakes_and_keys: Vec<(u64, Pubkey)> = slot_chunk_map
.keys()
.map(|k| (cached_staked_nodes.get(k).copied().unwrap_or_default(), *k))
.collect();
stakes_and_keys.select_nth_unstable(MAX_PUBKEY_PER_SLOT / 2);
for (_, key) in stakes_and_keys {
slot_chunk_map.remove(&key);
}
}
fn mark_slot_proof_received(&mut self, slot: u64) {
self.chunk_map.insert(slot, SlotStatus::Frozen);
for (_, current_slots_set) in self.validator_pending_proof_map.iter_mut() {
current_slots_set.remove(&slot);
}
}
fn insert_chunk(&mut self, data: DuplicateShred) -> Option<Vec<DuplicateShred>> {
let chunk_map_entry = self
.chunk_map
.entry(data.slot)
.or_insert_with(|| SlotStatus::UnfinishedProof(HashMap::new()));
let slot_chunk_map = match chunk_map_entry {
SlotStatus::Frozen => return None,
SlotStatus::UnfinishedProof(slot_chunk_map) => slot_chunk_map,
};
let proof_chunk_map = slot_chunk_map
.entry(data.from)
.or_insert_with(|| ProofChunkMap::new(data.num_chunks()));
if data.num_chunks() != proof_chunk_map.num_chunks
|| data.chunk_index() >= proof_chunk_map.num_chunks
{
return None;
}
let slot = data.slot;
let from = data.from;
match proof_chunk_map
.chunks
.get_mut(usize::from(data.chunk_index()))
{
None => return None,
Some(entry) if entry.is_some() => return None,
Some(entry) => *entry = Some(data),
};
let num_chunks = proof_chunk_map.chunks.iter().flatten().count();
if num_chunks >= usize::from(proof_chunk_map.num_chunks) {
return Some(
std::mem::take(&mut proof_chunk_map.chunks)
.into_iter()
.flatten()
.collect(),
);
}
if slot_chunk_map.len() > MAX_PUBKEY_PER_SLOT {
Self::dump_pubkeys_with_low_stakes(&self.cached_staked_nodes, slot_chunk_map);
}
self.validator_pending_proof_map
.entry(from)
.or_default()
.insert(slot);
None
}
fn verify_and_apply_proof(&self, slot: Slot, chunks: Vec<DuplicateShred>) -> Result<(), Error> {
if slot <= self.last_root || self.blockstore.has_duplicate_shreds_in_slot(slot) {
fn handle_shred_data(&mut self, chunk: DuplicateShred) -> Result<(), Error> {
if !self.should_consume_slot(chunk.slot) {
return Ok(());
}
let (shred1, shred2) = into_shreds(chunks, |slot| {
self.leader_schedule_cache.slot_leader_at(slot, None)
})?;
self.blockstore
.store_duplicate_slot(slot, shred1.into_payload(), shred2.into_payload())?;
let slot = chunk.slot;
let num_chunks = chunk.num_chunks();
let chunk_index = chunk.chunk_index();
if usize::from(num_chunks) > MAX_NUM_CHUNKS || chunk_index >= num_chunks {
return Err(Error::InvalidChunkIndex {
chunk_index,
num_chunks,
});
}
let entry = self.buffer.entry((chunk.slot, chunk.from)).or_default();
*entry
.get_mut(usize::from(chunk_index))
.ok_or(Error::InvalidChunkIndex {
chunk_index,
num_chunks,
})? = Some(chunk);
// If all chunks are already received, reconstruct and store
// the duplicate slot proof in blockstore
if entry.iter().flatten().count() == usize::from(num_chunks) {
let chunks = std::mem::take(entry).into_iter().flatten();
let pubkey = self
.leader_schedule_cache
.slot_leader_at(slot, /*bank:*/ None)
.ok_or(Error::UnknownSlotLeader(slot))?;
let (shred1, shred2) = duplicate_shred::into_shreds(&pubkey, chunks)?;
if !self.blockstore.has_duplicate_shreds_in_slot(slot) {
self.blockstore.store_duplicate_slot(
slot,
shred1.into_payload(),
shred2.into_payload(),
)?;
}
self.consumed.insert(slot);
}
Ok(())
}
fn cleanup_old_slots(&mut self) {
if self.last_root_cleaned != self.last_root {
self.chunk_map.retain(|k, _| k > &self.last_root);
for (_, slots_sets) in self.validator_pending_proof_map.iter_mut() {
slots_sets.retain(|k| k > &self.last_root);
}
self.last_root_cleaned = self.last_root;
fn should_consume_slot(&self, slot: Slot) -> bool {
slot > self.last_root
&& slot < self.last_root.saturating_add(self.cached_slots_in_epoch)
&& !self.consumed.contains(&slot)
&& !self.blockstore.has_duplicate_shreds_in_slot(slot)
}
fn maybe_prune_buffer(&mut self) {
// The buffer is allowed to grow to twice the intended capacity, at
// which point the extraneous entries are removed in linear time,
// resulting an amortized O(1) performance.
if self.buffer.len() < BUFFER_CAPACITY.saturating_mul(2) {
return;
}
self.consumed.retain(|&slot| slot > self.last_root);
// Filter out obsolete slots and limit number of entries per pubkey.
{
let mut counts = HashMap::<Pubkey, usize>::new();
self.buffer.retain(|(slot, pubkey), _| {
*slot > self.last_root && !self.consumed.contains(slot) && {
let count = counts.entry(*pubkey).or_default();
*count = count.saturating_add(1);
*count <= MAX_NUM_ENTRIES_PER_PUBKEY
}
});
}
if self.buffer.len() < BUFFER_CAPACITY {
return;
}
// Lookup stake for each entry.
let mut buffer: Vec<_> = self
.buffer
.drain()
.map(|entry @ ((_, pubkey), _)| {
let stake = self
.cached_staked_nodes
.get(&pubkey)
.copied()
.unwrap_or_default();
(stake, entry)
})
.collect();
// Drop entries with lowest stake and rebuffer remaining ones.
buffer.select_nth_unstable_by_key(BUFFER_CAPACITY, |&(stake, _)| Reverse(stake));
self.buffer = buffer
.into_iter()
.take(BUFFER_CAPACITY)
.map(|(_, entry)| entry)
.collect();
}
}
@ -414,7 +339,7 @@ mod tests {
let bank_forks_ptr = Arc::new(RwLock::new(bank_forks));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks_ptr);
let mut start_slot: Slot = 1;
let start_slot: Slot = 1;
// This proof will not be accepted because num_chunks is too large.
let chunks = create_duplicate_proof(
@ -422,7 +347,7 @@ mod tests {
None,
start_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE / 10,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE / 2,
)
.unwrap();
for chunk in chunks {
@ -438,7 +363,7 @@ mod tests {
None,
future_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE / 10,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
for chunk in chunks {
@ -449,7 +374,7 @@ mod tests {
// Send in two proofs, the first proof showing up will be accepted, the following
// proofs will be discarded.
let mut chunks = create_duplicate_proof(
my_keypair.clone(),
my_keypair,
None,
start_slot,
None,
@ -458,109 +383,11 @@ mod tests {
.unwrap();
// handle chunk 0 of the first proof.
duplicate_shred_handler.handle(chunks.next().unwrap());
let chunks1 = create_duplicate_proof(
my_keypair.clone(),
None,
start_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE * 2,
)
.unwrap();
for chunk in chunks1 {
duplicate_shred_handler.handle(chunk);
}
// The second proof will be discarded.
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
// Now send in the rest of the first proof, it will succeed.
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
start_slot = 2;
let mut all_chunks = vec![];
for i in 0..ALLOWED_SLOTS_PER_PUBKEY + 1 {
all_chunks.push(
create_duplicate_proof(
my_keypair.clone(),
None,
start_slot + i as u64,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap(),
)
}
let mut done_count = 0;
let len = all_chunks.len();
while done_count < len {
done_count = 0;
for chunk_iterator in &mut all_chunks {
match chunk_iterator.next() {
Some(new_chunk) => duplicate_shred_handler.handle(new_chunk),
_ => done_count += 1,
}
}
}
for i in 0..ALLOWED_SLOTS_PER_PUBKEY {
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot + i as u64));
}
// The last proof should fail because we only allow limited entries per pubkey.
assert!(
!blockstore.has_duplicate_shreds_in_slot(start_slot + ALLOWED_SLOTS_PER_PUBKEY as u64)
);
start_slot += ALLOWED_SLOTS_PER_PUBKEY as u64 + 1;
let mut pubkeys = HashSet::new();
for _ in 0..MAX_PUBKEY_PER_SLOT + 1 {
pubkeys.insert(Keypair::new().pubkey());
}
let lowest_pubkey = *pubkeys.iter().min().unwrap();
pubkeys.remove(&lowest_pubkey);
// Now send in MAX_PUBKEY_PER_SLOT number of incomplete proofs.
for pubkey in pubkeys {
let mut chunks = create_duplicate_proof(
my_keypair.clone(),
Some(pubkey),
start_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
if let Some(chunk) = chunks.next() {
duplicate_shred_handler.handle(chunk);
}
}
// All proofs are incomplete, slot is not marked duplicate
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
let chunks = create_duplicate_proof(
my_keypair.clone(),
Some(lowest_pubkey),
start_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
// Because the slot chunk map is full and the new pubkey has no stake, this proof
// is not inserted into slot chunk map, so it cannot mark the slot duplicate.
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
// Now put in a proof where sender has stake.
let chunks = create_duplicate_proof(
my_keypair,
None,
start_slot,
None,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
)
.unwrap();
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
// Because the sender has stake, it will replace one of the pubkeys with incomplete
// proof, so we can mark the slot duplicate.
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
}
}