Make data plane shred filter parallel again (#5740)

This commit is contained in:
Sagar Dhawan 2019-09-03 21:50:57 +00:00 committed by GitHub
parent f7e039e7ac
commit 62f6a78ccd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 20 deletions

View File

@ -4,11 +4,14 @@
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::packet::Packets;
use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::shred::Shred;
use crate::streamer::{PacketReceiver, PacketSender};
use rayon::iter::{ParallelBridge, ParallelIterator};
use rayon::ThreadPool;
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey;
@ -63,6 +66,7 @@ fn recv_window<F>(
r: &PacketReceiver,
retransmit: &PacketSender,
shred_filter: F,
thread_pool: &ThreadPool,
) -> Result<()>
where
F: Fn(&Shred, &[u8]) -> bool,
@ -77,26 +81,28 @@ where
let now = Instant::now();
inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len());
let mut shreds = vec![];
let mut discards = vec![];
for (i, packet) in packets.packets.iter_mut().enumerate() {
if let Ok(s) = bincode::deserialize(&packet.data) {
let shred: Shred = s;
if shred_filter(&shred, &packet.data) {
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed();
shreds.push(shred);
} else {
discards.push(i);
}
} else {
discards.push(i);
}
}
for i in discards.into_iter().rev() {
packets.packets.remove(i);
}
let (shreds, packets): (Vec<_>, Vec<_>) = thread_pool.install(|| {
packets
.packets
.drain(..)
.par_bridge()
.filter_map(|mut packet| {
if let Ok(s) = bincode::deserialize(&packet.data) {
let shred: Shred = s;
if shred_filter(&shred, &packet.data) {
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed();
Some((shred, packet))
} else {
None
}
} else {
None
}
})
.unzip()
});
let packets = Packets::new(packets);
trace!("{:?} shreds from packets", shreds.len());
@ -187,6 +193,10 @@ impl WindowService {
let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id);
let mut now = Instant::now();
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.build()
.unwrap();
loop {
if exit.load(Ordering::Relaxed) {
break;
@ -207,6 +217,7 @@ impl WindowService {
.map(|bank_forks| bank_forks.read().unwrap().working_bank()),
)
},
&thread_pool,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,