moves shreds deduper to shred-sigverify stage (#30786)

Shreds arriving at tvu/tvu_forward/repair sockets are each processed in
a separate thread, and since each thread has its own deduper, the
duplicates across these sockets are not filtered out.
Using a common deduper across these threads will require an RwLock
wrapper and may introduce lock contention.
The commit instead moves the shred-deduper to shred-sigverify-stage
where all these shreds arrive through the same channel.
This commit is contained in:
behzad nouri 2023-03-22 13:19:16 +00:00 committed by GitHub
parent 721719d776
commit 25b7811869
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 89 deletions

View File

@ -5,10 +5,7 @@ use {
crossbeam_channel::{unbounded, Sender},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::{
deduper::Deduper,
packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
},
solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT},
@ -26,10 +23,6 @@ use {
},
};
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);
pub(crate) struct ShredFetchStage {
thread_hdls: Vec<JoinHandle<()>>,
}
@ -47,8 +40,6 @@ impl ShredFetchStage {
turbine_disabled: Arc<AtomicBool>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
let mut last_updated = Instant::now();
let mut keypair = repair_context
.as_ref()
@ -63,9 +54,6 @@ impl ShredFetchStage {
let mut stats = ShredFetchStats::default();
for mut packet_batch in recvr {
if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE) {
stats.num_deduper_saturations += 1;
}
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
{
@ -102,12 +90,11 @@ impl ShredFetchStage {
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
for packet in packet_batch.iter_mut() {
if turbine_disabled
|| should_discard_packet(
|| should_discard_shred(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
)
@ -245,39 +232,6 @@ impl ShredFetchStage {
}
}
// Returns true if the packet should be marked as discard.
#[must_use]
fn should_discard_packet<const K: usize>(
packet: &Packet,
root: Slot,
max_slot: Slot, // Max slot to ingest shreds for.
shred_version: u16,
deduper: &Deduper<K, [u8]>,
should_drop_merkle_shreds: impl Fn(Slot) -> bool,
stats: &mut ShredFetchStats,
) -> bool {
if should_discard_shred(
packet,
root,
max_slot,
shred_version,
should_drop_merkle_shreds,
stats,
) {
return true;
}
if packet
.data(..)
.map(|data| deduper.dedup(data))
.unwrap_or(true)
{
stats.duplicate_shred += 1;
true
} else {
false
}
}
#[must_use]
fn should_drop_merkle_shreds(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
@ -299,13 +253,12 @@ mod tests {
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{ReedSolomonCache, Shred, ShredFlags},
},
solana_sdk::packet::Packet,
};
#[test]
fn test_data_code_same_index() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
@ -327,12 +280,11 @@ mod tests {
let last_slot = 100;
let slots_per_epoch = 10;
let max_slot = last_slot + 2 * slots_per_epoch;
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -342,12 +294,11 @@ mod tests {
&ReedSolomonCache::default(),
);
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -356,8 +307,6 @@ mod tests {
#[test]
fn test_shred_filter() {
solana_logger::setup();
let mut rng = rand::thread_rng();
let deduper = Deduper::<2, [u8]>::new(&mut rng, /*num_bits:*/ 640_007);
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();
let last_root = 0;
@ -367,12 +316,11 @@ mod tests {
let max_slot = last_slot + 2 * slots_per_epoch;
// packet size is 0, so cannot get index
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -390,51 +338,36 @@ mod tests {
shred.copy_to_packet(&mut packet);
// rejected slot is 2, root is 3
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
3,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.slot_out_of_range, 1);
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
345, // shred_version
&deduper,
345, // shred_version
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.shred_version_mismatch, 1);
// Accepted for 1,3
assert!(!should_discard_packet(
assert!(!should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
// deduper should filter duplicate
assert!(should_discard_packet(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
assert_eq!(stats.duplicate_shred, 1);
let shred = Shred::new_from_data(
1_000_000,
3,
@ -448,12 +381,11 @@ mod tests {
shred.copy_to_packet(&mut packet);
// Slot 1 million is too high
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));
@ -461,12 +393,11 @@ mod tests {
let index = MAX_DATA_SHREDS_PER_SLOT as u32;
let shred = Shred::new_from_data(5, index, 0, &[], ShredFlags::LAST_SHRED_IN_SLOT, 0, 0, 0);
shred.copy_to_packet(&mut packet);
assert!(should_discard_packet(
assert!(should_discard_shred(
&packet,
last_root,
max_slot,
shred_version,
&deduper,
|_| false, // should_drop_merkle_shreds
&mut stats,
));

View File

@ -1,11 +1,11 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{ThreadPool, ThreadPoolBuilder},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_perf::{self, deduper::Deduper, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, pubkey::Pubkey},
@ -17,6 +17,10 @@ use {
},
};
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);
#[allow(clippy::enum_variant_names)]
enum Error {
RecvDisconnected,
@ -40,7 +44,12 @@ pub(crate) fn spawn_shred_sigverify(
.build()
.unwrap();
let run_shred_sigverify = move || {
let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2, [u8]>::new(&mut rng, DEDUPER_NUM_BITS);
loop {
if deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE) {
stats.num_deduper_saturations += 1;
}
match run_shred_sigverify(
&thread_pool,
// We can't store the pubkey outside the loop
@ -49,6 +58,7 @@ pub(crate) fn spawn_shred_sigverify(
&bank_forks,
&leader_schedule_cache,
&recycler_cache,
&deduper,
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
@ -69,12 +79,13 @@ pub(crate) fn spawn_shred_sigverify(
}
#[allow(clippy::too_many_arguments)]
fn run_shred_sigverify(
fn run_shred_sigverify<const K: usize>(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
@ -89,6 +100,20 @@ fn run_shred_sigverify(
stats.num_iters += 1;
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
stats.num_duplicates += thread_pool.install(|| {
packets
.par_iter_mut()
.flatten()
.filter(|packet| {
!packet.meta().discard()
&& packet
.data(..)
.map(|data| deduper.dedup(data))
.unwrap_or(true)
})
.map(|packet| packet.meta_mut().set_discard(true))
.count()
});
verify_packets(
thread_pool,
self_pubkey,
@ -197,8 +222,10 @@ struct ShredSigVerifyStats {
since: Instant,
num_iters: usize,
num_packets: usize,
num_discards_pre: usize,
num_deduper_saturations: usize,
num_discards_post: usize,
num_discards_pre: usize,
num_duplicates: usize,
num_retransmit_shreds: usize,
elapsed_micros: u64,
}
@ -212,7 +239,9 @@ impl ShredSigVerifyStats {
num_iters: 0usize,
num_packets: 0usize,
num_discards_pre: 0usize,
num_deduper_saturations: 0usize,
num_discards_post: 0usize,
num_duplicates: 0usize,
num_retransmit_shreds: 0usize,
elapsed_micros: 0u64,
}
@ -227,7 +256,9 @@ impl ShredSigVerifyStats {
("num_iters", self.num_iters, i64),
("num_packets", self.num_packets, i64),
("num_discards_pre", self.num_discards_pre, i64),
("num_deduper_saturations", self.num_deduper_saturations, i64),
("num_discards_post", self.num_discards_post, i64),
("num_duplicates", self.num_duplicates, i64),
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
("elapsed_micros", self.elapsed_micros, i64),
);

View File

@ -32,7 +32,6 @@ pub struct ProcessShredsStats {
pub struct ShredFetchStats {
pub index_overrun: usize,
pub shred_count: usize,
pub num_deduper_saturations: usize,
pub(crate) num_shreds_merkle_code: usize,
pub(crate) num_shreds_merkle_data: usize,
pub ping_count: usize,
@ -40,7 +39,6 @@ pub struct ShredFetchStats {
pub(crate) index_bad_deserialize: usize,
pub(crate) index_out_of_bounds: usize,
pub(crate) slot_bad_deserialize: usize,
pub duplicate_shred: usize,
pub slot_out_of_range: usize,
pub(crate) bad_shred_type: usize,
pub shred_version_mismatch: usize,
@ -118,7 +116,6 @@ impl ShredFetchStats {
name,
("index_overrun", self.index_overrun, i64),
("shred_count", self.shred_count, i64),
("num_deduper_saturations", self.num_deduper_saturations, i64),
("num_shreds_merkle_code", self.num_shreds_merkle_code, i64),
("num_shreds_merkle_data", self.num_shreds_merkle_data, i64),
("ping_count", self.ping_count, i64),
@ -127,7 +124,6 @@ impl ShredFetchStats {
("index_bad_deserialize", self.index_bad_deserialize, i64),
("index_out_of_bounds", self.index_out_of_bounds, i64),
("slot_out_of_range", self.slot_out_of_range, i64),
("duplicate_shred", self.duplicate_shred, i64),
("bad_shred_type", self.bad_shred_type, i64),
("shred_version_mismatch", self.shred_version_mismatch, i64),
("bad_parent_offset", self.bad_parent_offset, i64),