caches blockstore duplicate shreds slots (#30241)

In order to reduce number of calls to blockstore, the commit locally caches
blockstore.has_duplicate_shreds_in_slot in DuplicateShredHandler.consumed.
This commit is contained in:
behzad nouri 2023-02-10 18:19:57 +00:00 committed by GitHub
parent 0b5ae8abe1
commit ceb225f36e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 34 additions and 22 deletions

View File

@ -12,7 +12,7 @@ use {
}, },
std::{ std::{
cmp::Reverse, cmp::Reverse,
collections::{HashMap, HashSet}, collections::HashMap,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}, },
}; };
@ -34,7 +34,7 @@ pub struct DuplicateShredHandler {
// together the chunks into the original proof before anything useful is done. // together the chunks into the original proof before anything useful is done.
buffer: HashMap<(Slot, Pubkey), BufferEntry>, buffer: HashMap<(Slot, Pubkey), BufferEntry>,
// Slots for which a duplicate proof is already ingested. // Slots for which a duplicate proof is already ingested.
consumed: HashSet<Slot>, consumed: HashMap<Slot, bool>,
// Cache last root to reduce read lock. // Cache last root to reduce read lock.
last_root: Slot, last_root: Slot,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
@ -66,7 +66,7 @@ impl DuplicateShredHandler {
) -> Self { ) -> Self {
Self { Self {
buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(), buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(),
consumed: HashSet::<Slot>::default(), consumed: HashMap::<Slot, bool>::default(),
last_root: 0, last_root: 0,
cached_on_epoch: 0, cached_on_epoch: 0,
cached_staked_nodes: Arc::new(HashMap::new()), cached_staked_nodes: Arc::new(HashMap::new()),
@ -132,16 +132,15 @@ impl DuplicateShredHandler {
shred2.into_payload(), shred2.into_payload(),
)?; )?;
} }
self.consumed.insert(slot); self.consumed.insert(slot, true);
} }
Ok(()) Ok(())
} }
fn should_consume_slot(&self, slot: Slot) -> bool { fn should_consume_slot(&mut self, slot: Slot) -> bool {
slot > self.last_root slot > self.last_root
&& slot < self.last_root.saturating_add(self.cached_slots_in_epoch) && slot < self.last_root.saturating_add(self.cached_slots_in_epoch)
&& !self.consumed.contains(&slot) && should_consume_slot(slot, &self.blockstore, &mut self.consumed)
&& !self.blockstore.has_duplicate_shreds_in_slot(slot)
} }
fn maybe_prune_buffer(&mut self) { fn maybe_prune_buffer(&mut self) {
@ -151,12 +150,14 @@ impl DuplicateShredHandler {
if self.buffer.len() < BUFFER_CAPACITY.saturating_mul(2) { if self.buffer.len() < BUFFER_CAPACITY.saturating_mul(2) {
return; return;
} }
self.consumed.retain(|&slot| slot > self.last_root); self.consumed.retain(|&slot, _| slot > self.last_root);
// Filter out obsolete slots and limit number of entries per pubkey. // Filter out obsolete slots and limit number of entries per pubkey.
{ {
let mut counts = HashMap::<Pubkey, usize>::new(); let mut counts = HashMap::<Pubkey, usize>::new();
self.buffer.retain(|(slot, pubkey), _| { self.buffer.retain(|(slot, pubkey), _| {
*slot > self.last_root && !self.consumed.contains(slot) && { *slot > self.last_root
&& should_consume_slot(*slot, &self.blockstore, &mut self.consumed)
&& {
let count = counts.entry(*pubkey).or_default(); let count = counts.entry(*pubkey).or_default();
*count = count.saturating_add(1); *count = count.saturating_add(1);
*count <= MAX_NUM_ENTRIES_PER_PUBKEY *count <= MAX_NUM_ENTRIES_PER_PUBKEY
@ -181,34 +182,45 @@ impl DuplicateShredHandler {
.collect(); .collect();
// Drop entries with lowest stake and rebuffer remaining ones. // Drop entries with lowest stake and rebuffer remaining ones.
buffer.select_nth_unstable_by_key(BUFFER_CAPACITY, |&(stake, _)| Reverse(stake)); buffer.select_nth_unstable_by_key(BUFFER_CAPACITY, |&(stake, _)| Reverse(stake));
self.buffer = buffer self.buffer.extend(
buffer
.into_iter() .into_iter()
.take(BUFFER_CAPACITY) .take(BUFFER_CAPACITY)
.map(|(_, entry)| entry) .map(|(_, entry)| entry),
.collect(); );
} }
} }
// Returns false if a duplicate proof is already ingested for the slot,
// and updates local `consumed` cache with blockstore.
fn should_consume_slot(
slot: Slot,
blockstore: &Blockstore,
consumed: &mut HashMap<Slot, bool>,
) -> bool {
!*consumed
.entry(slot)
.or_insert_with(|| blockstore.has_duplicate_shreds_in_slot(slot))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use { use {
super::*, super::*,
crate::{ crate::{
cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
duplicate_shred::{from_shred, tests::new_rand_shred, DuplicateShred, Error}, duplicate_shred::{from_shred, tests::new_rand_shred},
duplicate_shred_listener::DuplicateShredHandlerTrait,
}, },
solana_ledger::{ solana_ledger::{
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
get_tmp_ledger_path_auto_delete, get_tmp_ledger_path_auto_delete,
shred::Shredder, shred::Shredder,
}, },
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::bank::Bank,
solana_sdk::{ solana_sdk::{
signature::{Keypair, Signer}, signature::{Keypair, Signer},
timing::timestamp, timing::timestamp,
}, },
std::sync::Arc,
}; };
fn create_duplicate_proof( fn create_duplicate_proof(