FindPacketSenderStake: Remove parallelism to improve performance (#25562)

* FindPacketSenderStake: Remove parallelism to improve performance

The work unit sizes were so small that using the thread pool
slowed down this stage significantly.

* fix checks

Co-authored-by: Justin Starry <justin@solana.com>
This commit is contained in:
Christian Kamm 2022-05-26 13:17:52 +02:00 committed by GitHub
parent 178eba4912
commit 0efb7478cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 25 deletions

View File

@ -1,10 +1,7 @@
use { use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
lazy_static::lazy_static,
rayon::{prelude::*, ThreadPool},
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_perf::packet::PacketBatch, solana_perf::packet::PacketBatch,
solana_rayon_threadlimit::get_thread_count,
solana_sdk::timing::timestamp, solana_sdk::timing::timestamp,
solana_streamer::streamer::{self, StreamerError}, solana_streamer::streamer::{self, StreamerError},
std::{ std::{
@ -15,18 +12,10 @@ use {
}, },
}; };
lazy_static! { // Try to target 50ms, rough timings from a testnet validator
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("transaction_sender_stake_stage_{}", ix))
.build()
.unwrap();
}
// Try to target 50ms, rough timings from mainnet machines
// //
// 50ms/(1us/packet) = 50k packets // 50ms/(200ns/packet) = 250k packets
const MAX_FINDPACKETSENDERSTAKE_BATCH: usize = 50_000; const MAX_FINDPACKETSENDERSTAKE_BATCH: usize = 250_000;
pub type FindPacketSenderStakeSender = Sender<Vec<PacketBatch>>; pub type FindPacketSenderStakeSender = Sender<Vec<PacketBatch>>;
pub type FindPacketSenderStakeReceiver = Receiver<Vec<PacketBatch>>; pub type FindPacketSenderStakeReceiver = Receiver<Vec<PacketBatch>>;
@ -157,17 +146,15 @@ impl FindPacketSenderStakeStage {
} }
fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) { fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
PAR_THREAD_POOL.install(|| {
batches batches
.into_par_iter() .iter_mut()
.flat_map(|batch| batch.par_iter_mut()) .flat_map(|batch| batch.iter_mut())
.for_each(|packet| { .for_each(|packet| {
packet.meta.sender_stake = ip_to_stake packet.meta.sender_stake = ip_to_stake
.get(&packet.meta.addr) .get(&packet.meta.addr)
.copied() .copied()
.unwrap_or_default(); .unwrap_or_default();
}); });
});
} }
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {