From 364583ea5c214551421fef709ad29d8744d3794b Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Mon, 21 Oct 2019 12:04:52 -0700 Subject: [PATCH] Fix copying packets in Window Service (#6429) * Fix copying packets in Window Service * Parallelize over batches instead of within batches --- core/src/window_service.rs | 59 +++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 6246c478a6..9fc69348bc 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -78,48 +78,53 @@ where F: Fn(&Shred, u64) -> bool + Sync, { let timer = Duration::from_millis(200); - let mut packets = r.recv_timeout(timer)?; + let mut packets = vec![r.recv_timeout(timer)?]; + let mut total_packets = packets[0].packets.len(); - while let Ok(mut more_packets) = r.try_recv() { - packets.packets.append(&mut more_packets.packets) + while let Ok(more_packets) = r.try_recv() { + total_packets += more_packets.packets.len(); + packets.push(more_packets) } let now = Instant::now(); - inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len()); + inc_new_counter_debug!("streamer-recv_window-recv", total_packets); let last_root = blocktree.last_root(); let shreds: Vec<_> = thread_pool.install(|| { packets - .packets .par_iter_mut() - .filter_map(|packet| { - if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) { - if shred_filter(&shred, last_root) { - packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(); - Some(shred) - } else { - packet.meta.discard = true; - None - } - } else { - packet.meta.discard = true; - None - } + .flat_map(|packets| { + packets + .packets + .iter_mut() + .filter_map(|packet| { + if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) { + if shred_filter(&shred, last_root) { + packet.meta.slot = shred.slot(); + packet.meta.seed = shred.seed(); + Some(shred) + } else { + packet.meta.discard = true; + None + } + } else { + packet.meta.discard = true; + None + } + }) + .collect::>() }) .collect() }); trace!("{:?} shreds from packets", shreds.len()); - trace!( - "{} num shreds received: {}", - my_pubkey, - packets.packets.len() - ); + trace!("{} num total shreds received: {}", my_pubkey, total_packets); - if !packets.packets.is_empty() { - // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) - let _ = retransmit.send(packets); + for packets in packets.into_iter() { + if !packets.packets.is_empty() { + // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) + let _ = retransmit.send(packets); + } } blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?;