diff --git a/gossip/src/duplicate_shred_handler.rs b/gossip/src/duplicate_shred_handler.rs index a0cf0f0f0d..7789404b97 100644 --- a/gossip/src/duplicate_shred_handler.rs +++ b/gossip/src/duplicate_shred_handler.rs @@ -12,7 +12,7 @@ use { }, std::{ cmp::Reverse, - collections::{HashMap, HashSet}, + collections::HashMap, sync::{Arc, RwLock}, }, }; @@ -34,7 +34,7 @@ pub struct DuplicateShredHandler { // together the chunks into the original proof before anything useful is done. buffer: HashMap<(Slot, Pubkey), BufferEntry>, // Slots for which a duplicate proof is already ingested. - consumed: HashSet, + consumed: HashMap, // Cache last root to reduce read lock. last_root: Slot, blockstore: Arc, @@ -66,7 +66,7 @@ impl DuplicateShredHandler { ) -> Self { Self { buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(), - consumed: HashSet::::default(), + consumed: HashMap::::default(), last_root: 0, cached_on_epoch: 0, cached_staked_nodes: Arc::new(HashMap::new()), @@ -132,16 +132,15 @@ impl DuplicateShredHandler { shred2.into_payload(), )?; } - self.consumed.insert(slot); + self.consumed.insert(slot, true); } Ok(()) } - fn should_consume_slot(&self, slot: Slot) -> bool { + fn should_consume_slot(&mut self, slot: Slot) -> bool { slot > self.last_root && slot < self.last_root.saturating_add(self.cached_slots_in_epoch) - && !self.consumed.contains(&slot) - && !self.blockstore.has_duplicate_shreds_in_slot(slot) + && should_consume_slot(slot, &self.blockstore, &mut self.consumed) } fn maybe_prune_buffer(&mut self) { @@ -151,16 +150,18 @@ impl DuplicateShredHandler { if self.buffer.len() < BUFFER_CAPACITY.saturating_mul(2) { return; } - self.consumed.retain(|&slot| slot > self.last_root); + self.consumed.retain(|&slot, _| slot > self.last_root); // Filter out obsolete slots and limit number of entries per pubkey. { let mut counts = HashMap::::new(); self.buffer.retain(|(slot, pubkey), _| { - *slot > self.last_root && !self.consumed.contains(slot) && { - let count = counts.entry(*pubkey).or_default(); - *count = count.saturating_add(1); - *count <= MAX_NUM_ENTRIES_PER_PUBKEY - } + *slot > self.last_root + && should_consume_slot(*slot, &self.blockstore, &mut self.consumed) + && { + let count = counts.entry(*pubkey).or_default(); + *count = count.saturating_add(1); + *count <= MAX_NUM_ENTRIES_PER_PUBKEY + } }); } if self.buffer.len() < BUFFER_CAPACITY { @@ -181,34 +182,45 @@ impl DuplicateShredHandler { .collect(); // Drop entries with lowest stake and rebuffer remaining ones. buffer.select_nth_unstable_by_key(BUFFER_CAPACITY, |&(stake, _)| Reverse(stake)); - self.buffer = buffer - .into_iter() - .take(BUFFER_CAPACITY) - .map(|(_, entry)| entry) - .collect(); + self.buffer.extend( + buffer + .into_iter() + .take(BUFFER_CAPACITY) + .map(|(_, entry)| entry), + ); } } +// Returns false if a duplicate proof is already ingested for the slot, +// and updates local `consumed` cache with blockstore. +fn should_consume_slot( + slot: Slot, + blockstore: &Blockstore, + consumed: &mut HashMap, +) -> bool { + !*consumed + .entry(slot) + .or_insert_with(|| blockstore.has_duplicate_shreds_in_slot(slot)) +} + #[cfg(test)] mod tests { use { super::*, crate::{ cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, - duplicate_shred::{from_shred, tests::new_rand_shred, DuplicateShred, Error}, - duplicate_shred_listener::DuplicateShredHandlerTrait, + duplicate_shred::{from_shred, tests::new_rand_shred}, }, solana_ledger::{ genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, get_tmp_ledger_path_auto_delete, shred::Shredder, }, - solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_runtime::bank::Bank, solana_sdk::{ signature::{Keypair, Signer}, timing::timestamp, }, - std::sync::Arc, }; fn create_duplicate_proof(