diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d6dac79f8d..cc78b1613d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -316,7 +316,11 @@ impl Tvu { let duplicate_shred_listener = DuplicateShredListener::new( exit.clone(), cluster_info.clone(), - DuplicateShredHandler::new(blockstore, leader_schedule_cache.clone()), + DuplicateShredHandler::new( + blockstore, + leader_schedule_cache.clone(), + bank_forks.clone(), + ), ); Ok(Tvu { diff --git a/gossip/src/duplicate_shred_handler.rs b/gossip/src/duplicate_shred_handler.rs index 8b557dc839..1f50b709ad 100644 --- a/gossip/src/duplicate_shred_handler.rs +++ b/gossip/src/duplicate_shred_handler.rs @@ -4,12 +4,15 @@ use { duplicate_shred_listener::DuplicateShredHandlerTrait, }, log::*, - lru::LruCache, solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, - solana_sdk::{clock::Slot, pubkey::Pubkey}, + solana_runtime::bank_forks::BankForks, + solana_sdk::{ + clock::{Epoch, Slot}, + pubkey::Pubkey, + }, std::{ collections::{HashMap, HashSet}, - sync::Arc, + sync::{Arc, RwLock}, }, }; @@ -21,11 +24,10 @@ const MAX_NUM_CHUNKS: u8 = 3; // is only 1 person sending out duplicate proofs, 1 person is leader for 4 slots, // so we allow 5 here to limit the chunk map size. const ALLOWED_SLOTS_PER_PUBKEY: usize = 5; -// To prevent an attacker inflating this map, we discard any proof which is too -// far away in the future compared to root. -const MAX_SLOT_DISTANCE_TO_ROOT: Slot = 100; -// We limit the pubkey for each slot to be 100 for now. -const MAX_PUBKEY_PER_SLOT: usize = 100; +// We limit the pubkey for each slot to be 300 for now, when this limit is reached, +// we drop 50% of pubkeys with lowest stakes. It is kept at 300 because we want +// number of pubkeys after pruning to have roughly 2/3 of stake. +const MAX_PUBKEY_PER_SLOT: usize = 300; struct ProofChunkMap { num_chunks: u8, @@ -45,7 +47,7 @@ impl ProofChunkMap { // Group received chunks by peer pubkey, when we receive an invalid proof, // set the value to Frozen so we don't accept future proofs with the same key. -type SlotChunkMap = LruCache; +type SlotChunkMap = HashMap; enum SlotStatus { // When a valid proof has been inserted, we change the entry for that slot to Frozen @@ -63,10 +65,17 @@ pub struct DuplicateShredHandler { // We don't want bad guys to inflate the chunk map, so we limit the number of // pending proofs from each pubkey to ALLOWED_SLOTS_PER_PUBKEY. validator_pending_proof_map: HashMap>, - // remember the last root slot handled, clear anything older than last_root. + // Cache last root to reduce read lock. last_root: Slot, + // remember the last root slot cleaned, clear anything between last_root and last_root_cleaned. + last_root_cleaned: Slot, blockstore: Arc, leader_schedule_cache: Arc, + bank_forks: Arc>, + // Cache information from root bank so we could function correctly without reading roots. + cached_on_epoch: Epoch, + cached_staked_nodes: Arc>, + cached_slots_in_epoch: u64, // Because cleanup could potentially be very expensive, only clean up when clean up // count is 0 cleanup_count: usize, @@ -76,6 +85,7 @@ impl DuplicateShredHandlerTrait for DuplicateShredHandler { // Here we are sending data one by one rather than in a batch because in the future // we may send different type of CrdsData to different senders. fn handle(&mut self, shred_data: DuplicateShred) { + self.cache_root_info(); if let Err(error) = self.handle_shred_data(shred_data) { error!("handle packet: {:?}", error) } @@ -90,17 +100,42 @@ impl DuplicateShredHandler { pub fn new( blockstore: Arc, leader_schedule_cache: Arc, + bank_forks: Arc>, ) -> Self { Self { chunk_map: HashMap::new(), validator_pending_proof_map: HashMap::new(), last_root: 0, + last_root_cleaned: 0, + cached_on_epoch: 0, + cached_staked_nodes: Arc::new(HashMap::new()), + cached_slots_in_epoch: 0, blockstore, leader_schedule_cache, + bank_forks, cleanup_count: CLEANUP_EVERY_N_LOOPS, } } + fn cache_root_info(&mut self) { + let last_root = self.blockstore.last_root(); + if last_root == self.last_root && !self.cached_staked_nodes.is_empty() { + return; + } + self.last_root = last_root; + if let Ok(bank_fork) = self.bank_forks.try_read() { + let root_bank = bank_fork.root_bank(); + let epoch_info = root_bank.get_epoch_info(); + if self.cached_staked_nodes.is_empty() || self.cached_on_epoch < epoch_info.epoch { + self.cached_on_epoch = epoch_info.epoch; + if let Some(cached_staked_nodes) = root_bank.epoch_staked_nodes(epoch_info.epoch) { + self.cached_staked_nodes = cached_staked_nodes; + } + self.cached_slots_in_epoch = epoch_info.slots_in_epoch; + } + } + } + fn handle_shred_data(&mut self, data: DuplicateShred) -> Result<(), Error> { if self.should_insert_chunk(&data) { let slot = data.slot; @@ -117,12 +152,10 @@ impl DuplicateShredHandler { Ok(()) } - fn should_insert_chunk(&self, data: &DuplicateShred) -> bool { + fn should_insert_chunk(&mut self, data: &DuplicateShred) -> bool { let slot = data.slot; // Do not insert if this slot is rooted or too far away in the future or has a proof already. - let last_root = self.blockstore.last_root(); - if slot <= last_root - || slot > last_root + MAX_SLOT_DISTANCE_TO_ROOT + if !(self.last_root..self.last_root + self.cached_slots_in_epoch).contains(&slot) || self.blockstore.has_duplicate_shreds_in_slot(slot) { return false; @@ -139,21 +172,29 @@ impl DuplicateShredHandler { return false; } } - // Also skip frozen slots or slots with an older proof than me. + // Also skip frozen slots or slots with a different proof than me. match self.chunk_map.get(&slot) { - Some(SlotStatus::Frozen) => { - return false; - } - Some(SlotStatus::UnfinishedProof(slot_map)) => { - if let Some(proof_chunkmap) = slot_map.peek(&data.from) { - if proof_chunkmap.wallclock < data.wallclock { - return false; - } - } - } - None => {} + Some(SlotStatus::Frozen) => false, + Some(SlotStatus::UnfinishedProof(slot_map)) => match slot_map.get(&data.from) { + None => true, + Some(proof_chunkmap) => proof_chunkmap.wallclock == data.wallclock, + }, + None => true, + } + } + + fn dump_pubkeys_with_low_stakes( + cached_staked_nodes: &HashMap, + slot_chunk_map: &mut SlotChunkMap, + ) { + let mut stakes_and_keys: Vec<(u64, Pubkey)> = slot_chunk_map + .keys() + .map(|k| (cached_staked_nodes.get(k).copied().unwrap_or_default(), *k)) + .collect(); + stakes_and_keys.select_nth_unstable(MAX_PUBKEY_PER_SLOT / 2); + for (_, key) in stakes_and_keys { + slot_chunk_map.remove(&key); } - true } fn mark_slot_proof_received(&mut self, slot: u64) { @@ -167,49 +208,41 @@ impl DuplicateShredHandler { if let SlotStatus::UnfinishedProof(slot_chunk_map) = self .chunk_map .entry(data.slot) - .or_insert_with(|| SlotStatus::UnfinishedProof(LruCache::new(MAX_PUBKEY_PER_SLOT))) + .or_insert_with(|| SlotStatus::UnfinishedProof(HashMap::new())) { - if !slot_chunk_map.contains(&data.from) { - slot_chunk_map.put( - data.from, - ProofChunkMap::new(data.num_chunks(), data.wallclock), - ); - } - if let Some(mut proof_chunk_map) = slot_chunk_map.get_mut(&data.from) { - if proof_chunk_map.wallclock > data.wallclock { - proof_chunk_map.num_chunks = data.num_chunks(); - proof_chunk_map.wallclock = data.wallclock; - proof_chunk_map.chunks.clear(); - } - let num_chunks = data.num_chunks(); - let chunk_index = data.chunk_index(); - let slot = data.slot; - let from = data.from; - if num_chunks == proof_chunk_map.num_chunks - && chunk_index < num_chunks - && !proof_chunk_map.chunks.contains_key(&chunk_index) - { - proof_chunk_map.chunks.insert(chunk_index, data); - if proof_chunk_map.chunks.len() >= proof_chunk_map.num_chunks.into() { - let mut result: Vec = Vec::new(); - for i in 0..num_chunks { - result.push(proof_chunk_map.chunks.remove(&i).unwrap()) - } - return Ok(Some(result)); + let proof_chunk_map = slot_chunk_map + .entry(data.from) + .or_insert_with(|| ProofChunkMap::new(data.num_chunks(), data.wallclock)); + + let num_chunks = data.num_chunks(); + let chunk_index = data.chunk_index(); + let slot = data.slot; + let from = data.from; + if num_chunks == proof_chunk_map.num_chunks + && chunk_index < num_chunks + && !proof_chunk_map.chunks.contains_key(&chunk_index) + { + proof_chunk_map.chunks.insert(chunk_index, data); + if proof_chunk_map.chunks.len() >= proof_chunk_map.num_chunks.into() { + let mut result: Vec = Vec::new(); + for i in 0..num_chunks { + result.push(proof_chunk_map.chunks.remove(&i).unwrap()) } + return Ok(Some(result)); + } else if slot_chunk_map.len() > MAX_PUBKEY_PER_SLOT { + Self::dump_pubkeys_with_low_stakes(&self.cached_staked_nodes, slot_chunk_map); } - self.validator_pending_proof_map - .entry(from) - .or_default() - .insert(slot); } + self.validator_pending_proof_map + .entry(from) + .or_default() + .insert(slot); } Ok(None) } fn verify_and_apply_proof(&self, slot: Slot, chunks: Vec) -> Result<(), Error> { - if slot <= self.blockstore.last_root() || self.blockstore.has_duplicate_shreds_in_slot(slot) - { + if slot <= self.last_root || self.blockstore.has_duplicate_shreds_in_slot(slot) { return Ok(()); } let (shred1, shred2) = into_shreds(chunks, |slot| { @@ -221,13 +254,12 @@ impl DuplicateShredHandler { } fn cleanup_old_slots(&mut self) { - let new_last_root = self.blockstore.last_root(); - if self.last_root < new_last_root { - self.chunk_map.retain(|k, _| k > &new_last_root); + if self.last_root_cleaned != self.last_root { + self.chunk_map.retain(|k, _| k > &self.last_root); for (_, slots_sets) in self.validator_pending_proof_map.iter_mut() { - slots_sets.retain(|k| k > &new_last_root); + slots_sets.retain(|k| k > &self.last_root); } - self.last_root = new_last_root + self.last_root_cleaned = self.last_root; } } } @@ -256,6 +288,7 @@ mod tests { fn create_duplicate_proof( keypair: Arc, + sender_pubkey: Option, slot: u64, expected_error: Option, chunk_size: usize, @@ -279,9 +312,13 @@ mod tests { Some(Error::InvalidDuplicateShreds) => shred1.clone(), _ => new_rand_shred(&mut rng, next_shred_index, &shredder, &my_keypair), }; + let sender = match sender_pubkey { + Some(pubkey) => pubkey, + None => my_keypair.pubkey(), + }; let chunks = from_shred( shred1, - my_keypair.pubkey(), + sender, shred2.payload().clone(), None:: Option>, timestamp(), // wallclock @@ -304,10 +341,14 @@ mod tests { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank( &bank_forks.working_bank(), )); - let mut duplicate_shred_handler = - DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache); + let mut duplicate_shred_handler = DuplicateShredHandler::new( + blockstore.clone(), + leader_schedule_cache, + Arc::new(RwLock::new(bank_forks)), + ); let chunks = create_duplicate_proof( my_keypair.clone(), + None, 1, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, @@ -315,6 +356,7 @@ mod tests { .unwrap(); let chunks1 = create_duplicate_proof( my_keypair.clone(), + None, 2, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, @@ -339,6 +381,7 @@ mod tests { ] { match create_duplicate_proof( my_keypair.clone(), + None, 3, Some(error), DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, @@ -368,13 +411,16 @@ mod tests { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank( &bank_forks.working_bank(), )); + let bank_forks_ptr = Arc::new(RwLock::new(bank_forks)); let mut duplicate_shred_handler = - DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache); + DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks_ptr); + let mut start_slot: Slot = 1; // This proof will not be accepted because num_chunks is too large. let chunks = create_duplicate_proof( my_keypair.clone(), - 1, + None, + start_slot, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE / 10, ) @@ -382,12 +428,14 @@ mod tests { for chunk in chunks { duplicate_shred_handler.handle(chunk); } - assert!(!blockstore.has_duplicate_shreds_in_slot(1)); + assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); // This proof will be rejected because the slot is too far away in the future. - let future_slot = blockstore.last_root() + MAX_SLOT_DISTANCE_TO_ROOT + 1; + let future_slot = + blockstore.last_root() + duplicate_shred_handler.cached_slots_in_epoch + start_slot; let chunks = create_duplicate_proof( my_keypair.clone(), + None, future_slot, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE / 10, @@ -398,37 +446,45 @@ mod tests { } assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot)); - // Send in two proofs, only the proof with older wallclock will be accepted. - let chunks = create_duplicate_proof( + // Send in two proofs, the first proof showing up will be accepted, the following + // proofs will be discarded. + let mut chunks = create_duplicate_proof( my_keypair.clone(), - 1, + None, + start_slot, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, ) .unwrap(); + // handle chunk 0 of the first proof. + duplicate_shred_handler.handle(chunks.next().unwrap()); let chunks1 = create_duplicate_proof( my_keypair.clone(), - 1, + None, + start_slot, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, ) .unwrap(); - for (chunk1, chunk2) in chunks1.zip(chunks) { - duplicate_shred_handler.handle(chunk1); - // The first proof will never succeed because it's replaced in chunkmap by next one - // with older wallclock. - assert!(!blockstore.has_duplicate_shreds_in_slot(1)); - duplicate_shred_handler.handle(chunk2); + for chunk in chunks1 { + duplicate_shred_handler.handle(chunk); } - // The second proof will succeed. - assert!(blockstore.has_duplicate_shreds_in_slot(1)); + // The second proof will be discarded. + assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + // Now send in the rest of the first proof, it will succeed. + for chunk in chunks { + duplicate_shred_handler.handle(chunk); + } + assert!(blockstore.has_duplicate_shreds_in_slot(start_slot)); + start_slot = 2; let mut all_chunks = vec![]; for i in 0..ALLOWED_SLOTS_PER_PUBKEY + 1 { all_chunks.push( create_duplicate_proof( my_keypair.clone(), - (2 + i).try_into().unwrap(), + None, + start_slot + i as u64, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, ) @@ -447,10 +503,64 @@ mod tests { } } for i in 0..ALLOWED_SLOTS_PER_PUBKEY { - assert!(blockstore.has_duplicate_shreds_in_slot((2 + i).try_into().unwrap())); + assert!(blockstore.has_duplicate_shreds_in_slot(start_slot + i as u64)); } // The last proof should fail because we only allow limited entries per pubkey. - assert!(!blockstore - .has_duplicate_shreds_in_slot((2 + ALLOWED_SLOTS_PER_PUBKEY).try_into().unwrap())); + assert!( + !blockstore.has_duplicate_shreds_in_slot(start_slot + ALLOWED_SLOTS_PER_PUBKEY as u64) + ); + + start_slot += ALLOWED_SLOTS_PER_PUBKEY as u64 + 1; + let mut pubkeys = HashSet::new(); + for _ in 0..MAX_PUBKEY_PER_SLOT + 1 { + pubkeys.insert(Keypair::new().pubkey()); + } + let lowest_pubkey = *pubkeys.iter().min().unwrap(); + pubkeys.remove(&lowest_pubkey); + // Now send in MAX_PUBKEY_PER_SLOT number of incomplete proofs. + for pubkey in pubkeys { + let mut chunks = create_duplicate_proof( + my_keypair.clone(), + Some(pubkey), + start_slot, + None, + DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, + ) + .unwrap(); + if let Some(chunk) = chunks.next() { + duplicate_shred_handler.handle(chunk); + } + } + // All proofs are incomplete, slot is not marked duplicate + assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + let chunks = create_duplicate_proof( + my_keypair.clone(), + Some(lowest_pubkey), + start_slot, + None, + DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, + ) + .unwrap(); + // Because the slot chunk map is full and the new pubkey has no stake, this proof + // is not inserted into slot chunk map, so it cannot mark the slot duplicate. + for chunk in chunks { + duplicate_shred_handler.handle(chunk); + } + assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + // Now put in a proof where sender has stake. + let chunks = create_duplicate_proof( + my_keypair, + None, + start_slot, + None, + DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, + ) + .unwrap(); + for chunk in chunks { + duplicate_shred_handler.handle(chunk); + } + // Because the sender has stake, it will replace one of the pubkeys with incomplete + // proof, so we can mark the slot duplicate. + assert!(blockstore.has_duplicate_shreds_in_slot(start_slot)); } }