From 62f6a78ccd0f8038e3f0896b7d033b01ee0b6406 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Tue, 3 Sep 2019 21:50:57 +0000 Subject: [PATCH] Make data plane shred filter parallel again (#5740) --- core/src/window_service.rs | 51 +++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 6a9c68ae64..c32f94749d 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -4,11 +4,14 @@ use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; +use crate::packet::Packets; use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; use crate::shred::Shred; use crate::streamer::{PacketReceiver, PacketSender}; +use rayon::iter::{ParallelBridge, ParallelIterator}; +use rayon::ThreadPool; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; @@ -63,6 +66,7 @@ fn recv_window( r: &PacketReceiver, retransmit: &PacketSender, shred_filter: F, + thread_pool: &ThreadPool, ) -> Result<()> where F: Fn(&Shred, &[u8]) -> bool, @@ -77,26 +81,28 @@ where let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len()); - let mut shreds = vec![]; - let mut discards = vec![]; - for (i, packet) in packets.packets.iter_mut().enumerate() { - if let Ok(s) = bincode::deserialize(&packet.data) { - let shred: Shred = s; - if shred_filter(&shred, &packet.data) { - packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(); - shreds.push(shred); - } else { - discards.push(i); - } - } else { - discards.push(i); - } - } - - for i in discards.into_iter().rev() { - packets.packets.remove(i); - } + let (shreds, packets): (Vec<_>, Vec<_>) = thread_pool.install(|| { + packets + .packets + .drain(..) + .par_bridge() + .filter_map(|mut packet| { + if let Ok(s) = bincode::deserialize(&packet.data) { + let shred: Shred = s; + if shred_filter(&shred, &packet.data) { + packet.meta.slot = shred.slot(); + packet.meta.seed = shred.seed(); + Some((shred, packet)) + } else { + None + } + } else { + None + } + }) + .unzip() + }); + let packets = Packets::new(packets); trace!("{:?} shreds from packets", shreds.len()); @@ -187,6 +193,10 @@ impl WindowService { let id = cluster_info.read().unwrap().id(); trace!("{}: RECV_WINDOW started", id); let mut now = Instant::now(); + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .build() + .unwrap(); loop { if exit.load(Ordering::Relaxed) { break; @@ -207,6 +217,7 @@ impl WindowService { .map(|bank_forks| bank_forks.read().unwrap().working_bank()), ) }, + &thread_pool, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,