Refactor packet deduplication and harden bench test (#22080)

This commit is contained in:
Justin Starry 2021-12-22 23:05:10 -06:00 committed by GitHub
parent f10407dbc3
commit 93c776ce19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 55 deletions

View File

@ -5,7 +5,7 @@ use {
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_core::banking_stage::BankingStage,
solana_core::{banking_stage::BankingStage, packet_deduper::PacketDeduper},
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
blockstore::Blockstore,
@ -235,6 +235,7 @@ fn main() {
None,
replay_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -10,6 +10,7 @@ use {
rayon::prelude::*,
solana_core::{
banking_stage::{BankingStage, BankingStageStats},
packet_deduper::PacketDeduper,
qos_service::QosService,
},
solana_entry::entry::{next_hash, Entry},
@ -221,6 +222,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
);
let cluster_info = Arc::new(cluster_info);
let (s, _r) = unbounded();
let packet_deduper = PacketDeduper::default();
let _banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
@ -230,6 +232,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
None,
s,
Arc::new(RwLock::new(CostModel::default())),
packet_deduper.clone(),
);
poh_recorder.lock().unwrap().set_bank(&bank);
@ -264,6 +267,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
packet_deduper.reset();
trace!(
"time: {} checked: {} sent: {}",
duration_as_us(&now.elapsed()),

View File

@ -2,10 +2,9 @@
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use {
crate::{packet_hasher::PacketHasher, qos_service::QosService},
crate::{packet_deduper::PacketDeduper, qos_service::QosService},
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
itertools::Itertools,
lru::LruCache,
retain_mut::RetainMut,
solana_entry::entry::hash_transactions,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
@ -53,7 +52,6 @@ use {
env,
mem::size_of,
net::{SocketAddr, UdpSocket},
ops::DerefMut,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
@ -80,8 +78,6 @@ const TOTAL_BUFFERED_PACKETS: usize = 500_000;
const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
const DEFAULT_LRU_SIZE: usize = 200_000;
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
const MIN_THREADS_BANKING: u32 = 1;
@ -93,7 +89,7 @@ pub struct BankingStageStats {
new_tx_count: AtomicUsize,
dropped_packet_batches_count: AtomicUsize,
dropped_packets_count: AtomicUsize,
dropped_duplicated_packets_count: AtomicUsize,
pub(crate) dropped_duplicated_packets_count: AtomicUsize,
newly_buffered_packets_count: AtomicUsize,
current_buffered_packets_count: AtomicUsize,
current_buffered_packet_batches_count: AtomicUsize,
@ -105,7 +101,7 @@ pub struct BankingStageStats {
process_packets_elapsed: AtomicU64,
handle_retryable_packets_elapsed: AtomicU64,
filter_pending_packets_elapsed: AtomicU64,
packet_duplicate_check_elapsed: AtomicU64,
pub(crate) packet_duplicate_check_elapsed: AtomicU64,
packet_conversion_elapsed: AtomicU64,
unprocessed_packet_conversion_elapsed: AtomicU64,
transaction_processing_elapsed: AtomicU64,
@ -296,6 +292,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
packet_deduper: PacketDeduper,
) -> Self {
Self::new_num_threads(
cluster_info,
@ -307,9 +304,11 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
cost_model,
packet_deduper,
)
}
#[allow(clippy::too_many_arguments)]
fn new_num_threads(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -320,15 +319,12 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
packet_deduper: PacketDeduper,
) -> Self {
let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH);
// Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank.
let duplicates = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let data_budget = Arc::new(DataBudget::default());
// Many banks that process transactions in parallel.
assert!(num_threads >= NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING);
@ -352,7 +348,7 @@ impl BankingStage {
let mut recv_start = Instant::now();
let transaction_status_sender = transaction_status_sender.clone();
let gossip_vote_sender = gossip_vote_sender.clone();
let duplicates = duplicates.clone();
let packet_deduper = packet_deduper.clone();
let data_budget = data_budget.clone();
let cost_model = cost_model.clone();
Builder::new()
@ -368,7 +364,7 @@ impl BankingStage {
batch_limit,
transaction_status_sender,
gossip_vote_sender,
&duplicates,
&packet_deduper,
&data_budget,
cost_model,
);
@ -712,7 +708,7 @@ impl BankingStage {
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
packet_deduper: &PacketDeduper,
data_budget: &DataBudget,
cost_model: Arc<RwLock<CostModel>>,
) {
@ -769,7 +765,7 @@ impl BankingStage {
&gossip_vote_sender,
&mut buffered_packet_batches,
&banking_stage_stats,
duplicates,
packet_deduper,
&recorder,
&qos_service,
) {
@ -1332,7 +1328,7 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
buffered_packet_batches: &mut UnprocessedPacketBatches,
banking_stage_stats: &BankingStageStats,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
packet_deduper: &PacketDeduper,
recorder: &TransactionRecorder,
qos_service: &QosService,
) -> Result<(), RecvTimeoutError> {
@ -1371,7 +1367,7 @@ impl BankingStage {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
packet_deduper,
banking_stage_stats,
);
continue;
@ -1411,7 +1407,7 @@ impl BankingStage {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
packet_deduper,
banking_stage_stats,
);
@ -1439,7 +1435,7 @@ impl BankingStage {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
duplicates,
packet_deduper,
banking_stage_stats,
);
}
@ -1501,35 +1497,10 @@ impl BankingStage {
dropped_packets_count: &mut usize,
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
duplicates: &Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>,
packet_deduper: &PacketDeduper,
banking_stage_stats: &BankingStageStats,
) {
{
let original_packets_count = packet_indexes.len();
let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check");
let mut duplicates = duplicates.lock().unwrap();
let (cache, hasher) = duplicates.deref_mut();
packet_indexes.retain(|i| {
let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]);
match cache.get_mut(&packet_hash) {
Some(_hash) => false,
None => {
cache.put(packet_hash, ());
true
}
}
});
packet_duplicate_check_time.stop();
banking_stage_stats
.packet_duplicate_check_elapsed
.fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.dropped_duplicated_packets_count
.fetch_add(
original_packets_count.saturating_sub(packet_indexes.len()),
Ordering::Relaxed,
);
}
packet_deduper.dedupe_packets(&packet_batch, &mut packet_indexes, banking_stage_stats);
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packet_batches.len() >= batch_limit {
*dropped_packet_batches_count += 1;
@ -1673,6 +1644,7 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
drop(verified_sender);
drop(gossip_verified_vote_sender);
@ -1722,6 +1694,7 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
trace!("sending bank");
drop(verified_sender);
@ -1797,6 +1770,7 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
// fund another account so we can send 2 good transactions in a single batch.
@ -1948,6 +1922,7 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
PacketDeduper::default(),
);
// wait for banking_stage to eat the packets
@ -2940,10 +2915,7 @@ mod tests {
let new_packet_batch = PacketBatch::new(vec![Packet::default()]);
let packet_indexes = vec![];
let duplicates = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let packet_deduper = PacketDeduper::default();
let mut dropped_packet_batches_count = 0;
let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0;
@ -2958,7 +2930,7 @@ mod tests {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&packet_deduper,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 1);
@ -2977,7 +2949,7 @@ mod tests {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&packet_deduper,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
@ -3001,7 +2973,7 @@ mod tests {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
&duplicates,
&packet_deduper,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);
@ -3022,7 +2994,7 @@ mod tests {
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
3,
&duplicates,
&packet_deduper,
&banking_stage_stats,
);
assert_eq!(unprocessed_packets.len(), 2);

View File

@ -31,6 +31,7 @@ pub mod latest_validator_votes_for_frozen_banks;
pub mod ledger_cleanup_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_deduper;
pub mod packet_hasher;
pub mod progress_map;
pub mod qos_service;

View File

@ -0,0 +1,63 @@
use {
crate::{banking_stage::BankingStageStats, packet_hasher::PacketHasher},
lru::LruCache,
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
std::{
ops::DerefMut,
sync::{atomic::Ordering, Arc, Mutex},
},
};
const DEFAULT_LRU_SIZE: usize = 200_000;
#[derive(Clone)]
pub struct PacketDeduper(Arc<Mutex<(LruCache<u64, ()>, PacketHasher)>>);
impl Default for PacketDeduper {
fn default() -> Self {
Self(Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
))))
}
}
impl PacketDeduper {
pub fn dedupe_packets(
&self,
packet_batch: &PacketBatch,
packet_indexes: &mut Vec<usize>,
banking_stage_stats: &BankingStageStats,
) {
let original_packets_count = packet_indexes.len();
let mut packet_duplicate_check_time = Measure::start("packet_duplicate_check");
let mut duplicates = self.0.lock().unwrap();
let (cache, hasher) = duplicates.deref_mut();
packet_indexes.retain(|i| {
let packet_hash = hasher.hash_packet(&packet_batch.packets[*i]);
match cache.get_mut(&packet_hash) {
Some(_hash) => false,
None => {
cache.put(packet_hash, ());
true
}
}
});
packet_duplicate_check_time.stop();
banking_stage_stats
.packet_duplicate_check_elapsed
.fetch_add(packet_duplicate_check_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.dropped_duplicated_packets_count
.fetch_add(
original_packets_count.saturating_sub(packet_indexes.len()),
Ordering::Relaxed,
);
}
pub fn reset(&self) {
let mut duplicates = self.0.lock().unwrap();
duplicates.0.clear();
}
}

View File

@ -10,6 +10,7 @@ use {
GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker,
},
fetch_stage::FetchStage,
packet_deduper::PacketDeduper,
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
},
@ -133,6 +134,7 @@ impl Tpu {
transaction_status_sender,
replay_vote_sender,
cost_model.clone(),
PacketDeduper::default(),
);
let broadcast_stage = broadcast_type.new_broadcast_stage(