increases shred-fetch-stage deduper capacity and reset-cycle (#30690)

This commit is contained in:
behzad nouri 2023-03-17 00:05:29 +00:00 committed by GitHub
parent 10f49d4e26
commit 93f696dac7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 36 additions and 56 deletions

View File

@ -4,7 +4,6 @@
use {
ahash::AHasher,
rand::{thread_rng, Rng},
solana_perf::packet::Packet,
std::hash::Hasher,
};
@ -24,10 +23,6 @@ impl Default for PacketHasher {
}
impl PacketHasher {
pub(crate) fn hash_packet(&self, packet: &Packet) -> u64 {
self.hash_data(packet.data(..).unwrap_or_default())
}
pub(crate) fn hash_shred(&self, shred: &[u8]) -> u64 {
self.hash_data(shred)
}

View File

@ -1,15 +1,14 @@
//! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
use {
crate::{
cluster_nodes::check_feature_activation, packet_hasher::PacketHasher,
serve_repair::ServeRepair,
},
crate::{cluster_nodes::check_feature_activation, serve_repair::ServeRepair},
crossbeam_channel::{unbounded, Sender},
lru::LruCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
solana_perf::{
deduper::Deduper,
packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
@ -24,7 +23,9 @@ use {
},
};
const DEFAULT_LRU_SIZE: usize = 10_000;
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);
pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
@ -42,7 +43,9 @@ impl ShredFetchStage {
repair_context: Option<(&UdpSocket, &ClusterInfo)>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE);
let mut rng = rand::thread_rng();
let mut deduper =
Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS);
let mut last_updated = Instant::now();
let mut keypair = repair_context
.as_ref()
@ -55,13 +58,11 @@ impl ShredFetchStage {
let mut slots_per_epoch = 0;
let mut stats = ShredFetchStats::default();
let mut packet_hasher = PacketHasher::default();
for mut packet_batch in recvr {
deduper.maybe_reset(&mut rng, &DEDUPER_RESET_CYCLE);
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
packet_hasher.reset();
shreds_received.clear();
{
let bank_forks_r = bank_forks.read().unwrap();
last_root = bank_forks_r.root();
@ -99,8 +100,7 @@ impl ShredFetchStage {
last_root,
max_slot,
shred_version,
&packet_hasher,
&mut shreds_received,
&deduper,
should_drop_merkle_shreds,
&mut stats,
) {
@ -232,13 +232,12 @@ impl ShredFetchStage {
// Returns true if the packet should be marked as discard.
#[must_use]
fn should_discard_packet(
fn should_discard_packet<const K: usize>(
packet: &Packet,
root: Slot,
max_slot: Slot, // Max slot to ingest shreds for.
shred_version: u16,
packet_hasher: &PacketHasher,
shreds_received: &mut LruCache<u64, ()>,
deduper: &Deduper<K>,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
@ -252,13 +251,11 @@ fn should_discard_packet(
) {
return true;
}
let hash = packet_hasher.hash_packet(packet);
match shreds_received.put(hash, ()) {
None => false,
Some(()) => {
stats.duplicate_shred += 1;
true
}
if deduper.dedup_packet(packet) {
stats.duplicate_shred += 1;
true
} else {
false
}
}
@ -288,7 +285,8 @@ mod tests {
#[test]
fn test_data_code_same_index() {
solana_logger::setup();
let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE);
let mut rng = rand::thread_rng();
let deduper = Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
@ -306,8 +304,6 @@ mod tests {
);
shred.copy_to_packet(&mut packet);
let hasher = PacketHasher::default();
let last_root = 0;
let last_slot = 100;
let slots_per_epoch = 10;
@ -317,8 +313,7 @@ mod tests {
last_root,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -333,8 +328,7 @@ mod tests {
last_root,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -343,7 +337,8 @@ mod tests {
#[test]
fn test_shred_filter() {
solana_logger::setup();
let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE);
let mut rng = rand::thread_rng();
let deduper = Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
let last_root = 0;
@ -352,16 +347,13 @@ mod tests {
let shred_version = 59445;
let max_slot = last_slot + 2 * slots_per_epoch;
let hasher = PacketHasher::default();
// packet size is 0, so cannot get index
assert!(should_discard_packet(
&packet,
last_root,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -384,8 +376,7 @@ mod tests {
3,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -396,8 +387,7 @@ mod tests {
last_root,
max_slot,
345, // shred_version
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -409,20 +399,18 @@ mod tests {
last_root,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
// shreds_received should filter duplicate
// deduper should filter duplicate
assert!(should_discard_packet(
&packet,
last_root,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -446,8 +434,7 @@ mod tests {
last_root,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -460,8 +447,7 @@ mod tests {
last_root,
max_slot,
shred_version,
&hasher,
&mut shreds_received,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

View File

@ -2309,8 +2309,7 @@ mod tests {
sender.send(()).unwrap();
});
// timeout of 30s for shutting down the validators
let timeout = Duration::from_secs(30);
let timeout = Duration::from_secs(60);
if let Err(RecvTimeoutError::Timeout) = receiver.recv_timeout(timeout) {
panic!("timeout for shutting down validators",);
}

View File

@ -51,7 +51,7 @@ impl<const K: usize> Deduper<K> {
// Returns true if the packet is duplicate.
#[must_use]
#[allow(clippy::integer_arithmetic)]
fn dedup_packet(&self, packet: &Packet) -> bool {
pub fn dedup_packet(&self, packet: &Packet) -> bool {
// Should not dedup packet if already discarded.
debug_assert!(!packet.meta().discard());
let mut out = true;