From 93c776ce19bf92aa318d4ae78a9168035f75b48d Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Wed, 22 Dec 2021 23:05:10 -0600 Subject: [PATCH] Refactor packet deduplication and harden bench test (#22080) --- banking-bench/src/main.rs | 3 +- core/benches/banking_stage.rs | 4 ++ core/src/banking_stage.rs | 80 ++++++++++++----------------------- core/src/lib.rs | 1 + core/src/packet_deduper.rs | 63 +++++++++++++++++++++++++++ core/src/tpu.rs | 2 + 6 files changed, 98 insertions(+), 55 deletions(-) create mode 100644 core/src/packet_deduper.rs diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 3d2f57f4bc..fa5672e189 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -5,7 +5,7 @@ use { log::*, rand::{thread_rng, Rng}, rayon::prelude::*, - solana_core::banking_stage::BankingStage, + solana_core::{banking_stage::BankingStage, packet_deduper::PacketDeduper}, solana_gossip::cluster_info::{ClusterInfo, Node}, solana_ledger::{ blockstore::Blockstore, @@ -235,6 +235,7 @@ fn main() { None, replay_vote_sender, Arc::new(RwLock::new(CostModel::default())), + PacketDeduper::default(), ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index ef87086318..25c5bcf540 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -10,6 +10,7 @@ use { rayon::prelude::*, solana_core::{ banking_stage::{BankingStage, BankingStageStats}, + packet_deduper::PacketDeduper, qos_service::QosService, }, solana_entry::entry::{next_hash, Entry}, @@ -221,6 +222,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { ); let cluster_info = Arc::new(cluster_info); let (s, _r) = unbounded(); + let packet_deduper = PacketDeduper::default(); let _banking_stage = BankingStage::new( &cluster_info, &poh_recorder, @@ -230,6 +232,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { None, s, Arc::new(RwLock::new(CostModel::default())), + packet_deduper.clone(), ); poh_recorder.lock().unwrap().set_bank(&bank); @@ -264,6 +267,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { // in this chunk, but since we rotate between CHUNKS then // we should clear them by the time we come around again to re-use that chunk. bank.clear_signatures(); + packet_deduper.reset(); trace!( "time: {} checked: {} sent: {}", duration_as_us(&now.elapsed()), diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 1383710775..c9965a3026 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2,10 +2,9 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. use { - crate::{packet_hasher::PacketHasher, qos_service::QosService}, + crate::{packet_deduper::PacketDeduper, qos_service::QosService}, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, itertools::Itertools, - lru::LruCache, retain_mut::RetainMut, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, @@ -53,7 +52,6 @@ use { env, mem::size_of, net::{SocketAddr, UdpSocket}, - ops::DerefMut, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, RwLock, @@ -80,8 +78,6 @@ const TOTAL_BUFFERED_PACKETS: usize = 500_000; const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; -const DEFAULT_LRU_SIZE: usize = 200_000; - const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; @@ -93,7 +89,7 @@ pub struct BankingStageStats { new_tx_count: AtomicUsize, dropped_packet_batches_count: AtomicUsize, dropped_packets_count: AtomicUsize, - dropped_duplicated_packets_count: AtomicUsize, + pub(crate) dropped_duplicated_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize, current_buffered_packet_batches_count: AtomicUsize, @@ -105,7 +101,7 @@ pub struct BankingStageStats { process_packets_elapsed: AtomicU64, handle_retryable_packets_elapsed: AtomicU64, filter_pending_packets_elapsed: AtomicU64, - packet_duplicate_check_elapsed: AtomicU64, + pub(crate) packet_duplicate_check_elapsed: AtomicU64, packet_conversion_elapsed: AtomicU64, unprocessed_packet_conversion_elapsed: AtomicU64, transaction_processing_elapsed: AtomicU64, @@ -296,6 +292,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, + packet_deduper: PacketDeduper, ) -> Self { Self::new_num_threads( cluster_info, @@ -307,9 +304,11 @@ impl BankingStage { transaction_status_sender, gossip_vote_sender, cost_model, + packet_deduper, ) } + #[allow(clippy::too_many_arguments)] fn new_num_threads( cluster_info: &Arc, poh_recorder: &Arc>, @@ -320,15 +319,12 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, + packet_deduper: PacketDeduper, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. - let duplicates = Arc::new(Mutex::new(( - LruCache::new(DEFAULT_LRU_SIZE), - PacketHasher::default(), - ))); let data_budget = Arc::new(DataBudget::default()); // Many banks that process transactions in parallel. assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING); @@ -352,7 +348,7 @@ impl BankingStage { let mut recv_start = Instant::now(); let transaction_status_sender = transaction_status_sender.clone(); let gossip_vote_sender = gossip_vote_sender.clone(); - let duplicates = duplicates.clone(); + let packet_deduper = packet_deduper.clone(); let data_budget = data_budget.clone(); let cost_model = cost_model.clone(); Builder::new() @@ -368,7 +364,7 @@ impl BankingStage { batch_limit, transaction_status_sender, gossip_vote_sender, - &duplicates, + &packet_deduper, &data_budget, cost_model, ); @@ -712,7 +708,7 @@ impl BankingStage { batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - duplicates: &Arc, PacketHasher)>>, + packet_deduper: &PacketDeduper, data_budget: &DataBudget, cost_model: Arc>, ) { @@ -769,7 +765,7 @@ impl BankingStage { &gossip_vote_sender, &mut buffered_packet_batches, &banking_stage_stats, - duplicates, + packet_deduper, &recorder, &qos_service, ) { @@ -1332,7 +1328,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, buffered_packet_batches: &mut UnprocessedPacketBatches, banking_stage_stats: &BankingStageStats, - duplicates: &Arc, PacketHasher)>>, + packet_deduper: &PacketDeduper, recorder: &TransactionRecorder, qos_service: &QosService, ) -> Result<(), RecvTimeoutError> { @@ -1371,7 +1367,7 @@ impl BankingStage { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - duplicates, + packet_deduper, banking_stage_stats, ); continue; @@ -1411,7 +1407,7 @@ impl BankingStage { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - duplicates, + packet_deduper, banking_stage_stats, ); @@ -1439,7 +1435,7 @@ impl BankingStage { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - duplicates, + packet_deduper, banking_stage_stats, ); } @@ -1501,35 +1497,10 @@ impl BankingStage { dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, batch_limit: usize, - duplicates: &Arc, PacketHasher)>>, + packet_deduper: &PacketDeduper, banking_stage_stats: &BankingStageStats, ) { - { - let original_packets_count = packet_indexes.len(); - let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check"); - let mut duplicates = duplicates.lock().unwrap(); - let (cache, hasher) = duplicates.deref_mut(); - packet_indexes.retain(|i| { - let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]); - match cache.get_mut(&packet_hash) { - Some(_hash) => false, - None => { - cache.put(packet_hash, ()); - true - } - } - }); - packet_duplicate_check_time.stop(); - banking_stage_stats - .packet_duplicate_check_elapsed - .fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed); - banking_stage_stats - .dropped_duplicated_packets_count - .fetch_add( - original_packets_count.saturating_sub(packet_indexes.len()), - Ordering::Relaxed, - ); - } + packet_deduper.dedupe_packets(&packet_batch, &mut packet_indexes, banking_stage_stats); if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packet_batches.len() >= batch_limit { *dropped_packet_batches_count += 1; @@ -1673,6 +1644,7 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), + PacketDeduper::default(), ); drop(verified_sender); drop(gossip_verified_vote_sender); @@ -1722,6 +1694,7 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), + PacketDeduper::default(), ); trace!("sending bank"); drop(verified_sender); @@ -1797,6 +1770,7 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), + PacketDeduper::default(), ); // fund another account so we can send 2 good transactions in a single batch. @@ -1948,6 +1922,7 @@ mod tests { None, gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), + PacketDeduper::default(), ); // wait for banking_stage to eat the packets @@ -2940,10 +2915,7 @@ mod tests { let new_packet_batch = PacketBatch::new(vec![Packet::default()]); let packet_indexes = vec![]; - let duplicates = Arc::new(Mutex::new(( - LruCache::new(DEFAULT_LRU_SIZE), - PacketHasher::default(), - ))); + let packet_deduper = PacketDeduper::default(); let mut dropped_packet_batches_count = 0; let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; @@ -2958,7 +2930,7 @@ mod tests { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - &duplicates, + &packet_deduper, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); @@ -2977,7 +2949,7 @@ mod tests { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - &duplicates, + &packet_deduper, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); @@ -3001,7 +2973,7 @@ mod tests { &mut dropped_packets_count, &mut newly_buffered_packets_count, batch_limit, - &duplicates, + &packet_deduper, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); @@ -3022,7 +2994,7 @@ mod tests { &mut dropped_packets_count, &mut newly_buffered_packets_count, 3, - &duplicates, + &packet_deduper, &banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); diff --git a/core/src/lib.rs b/core/src/lib.rs index ec1be4e61e..7db3c2591e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -31,6 +31,7 @@ pub mod latest_validator_votes_for_frozen_banks; pub mod ledger_cleanup_service; pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; +pub mod packet_deduper; pub mod packet_hasher; pub mod progress_map; pub mod qos_service; diff --git a/core/src/packet_deduper.rs b/core/src/packet_deduper.rs new file mode 100644 index 0000000000..24c3aea10e --- /dev/null +++ b/core/src/packet_deduper.rs @@ -0,0 +1,63 @@ +use { + crate::{banking_stage::BankingStageStats, packet_hasher::PacketHasher}, + lru::LruCache, + solana_measure::measure::Measure, + solana_perf::packet::PacketBatch, + std::{ + ops::DerefMut, + sync::{atomic::Ordering, Arc, Mutex}, + }, +}; + +const DEFAULT_LRU_SIZE: usize = 200_000; + +#[derive(Clone)] +pub struct PacketDeduper(Arc, PacketHasher)>>); + +impl Default for PacketDeduper { + fn default() -> Self { + Self(Arc::new(Mutex::new(( + LruCache::new(DEFAULT_LRU_SIZE), + PacketHasher::default(), + )))) + } +} + +impl PacketDeduper { + pub fn dedupe_packets( + &self, + packet_batch: &PacketBatch, + packet_indexes: &mut Vec, + banking_stage_stats: &BankingStageStats, + ) { + let original_packets_count = packet_indexes.len(); + let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check"); + let mut duplicates = self.0.lock().unwrap(); + let (cache, hasher) = duplicates.deref_mut(); + packet_indexes.retain(|i| { + let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]); + match cache.get_mut(&packet_hash) { + Some(_hash) => false, + None => { + cache.put(packet_hash, ()); + true + } + } + }); + packet_duplicate_check_time.stop(); + banking_stage_stats + .packet_duplicate_check_elapsed + .fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed); + banking_stage_stats + .dropped_duplicated_packets_count + .fetch_add( + original_packets_count.saturating_sub(packet_indexes.len()), + Ordering::Relaxed, + ); + } + + pub fn reset(&self) { + let mut duplicates = self.0.lock().unwrap(); + duplicates.0.clear(); + } +} diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4b4eadf92c..d2afd1829a 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -10,6 +10,7 @@ use { GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker, }, fetch_stage::FetchStage, + packet_deduper::PacketDeduper, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, }, @@ -133,6 +134,7 @@ impl Tpu { transaction_status_sender, replay_vote_sender, cost_model.clone(), + PacketDeduper::default(), ); let broadcast_stage = broadcast_type.new_broadcast_stage(