diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 64414db11d..f4ad970e36 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -69,8 +69,8 @@ fn retransmit( let mut compute_turbine_peers_total = 0; for mut packets in packet_v { for packet in packets.packets.iter_mut() { - // skip repair packets - if packet.meta.repair { + // skip discarded packets and repair packets + if packet.meta.discard || packet.meta.repair { total_packets -= 1; continue; } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 88aa793499..ad7aa1d238 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -9,7 +9,7 @@ use crate::result::{Error, Result}; use crate::service::Service; use crate::shred::Shred; use crate::streamer::{PacketReceiver, PacketSender}; -use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}; +use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; use rayon::ThreadPool; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_rayon_threadlimit::get_thread_count; @@ -23,8 +23,6 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; -pub const NUM_THREADS: u32 = 10; - fn verify_shred_slot(shred: &Shred, root: u64) -> bool { if shred.is_data() { // Only data shreds have parent information @@ -89,40 +87,26 @@ where inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len()); let last_root = blocktree.last_root(); - let (shreds, packets_ix): (Vec<_>, Vec<_>) = thread_pool.install(|| { + let shreds: Vec<_> = thread_pool.install(|| { packets .packets .par_iter_mut() - .enumerate() - .filter_map(|(i, packet)| { + .filter_map(|packet| { if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) { if shred_filter(&shred, last_root) { packet.meta.slot = shred.slot(); packet.meta.seed = shred.seed(); - Some((shred, i)) + Some(shred) } else { + packet.meta.discard = true; None } } else { + packet.meta.discard = true; None } }) - .unzip() - }); - // to avoid lookups into the `packets_ix` vec, this block manually tracks where we are in that vec - // and since `packets.packets.retain` and the `packets_ix` vec are both in order, - // we should be able to automatically drop any packets in the index gaps. - let mut retain_ix = 0; - let mut i = 0; - packets.packets.retain(|_| { - let retain = if !packets_ix.is_empty() && i == packets_ix[retain_ix] { - retain_ix = (packets_ix.len() - 1).min(retain_ix + 1); - true - } else { - false - }; - i += 1; - retain + .collect() }); trace!("{:?} shreds from packets", shreds.len()); diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 2a3ddf2fdf..7b1c8a7235 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -13,6 +13,7 @@ pub struct Meta { pub size: usize, pub forward: bool, pub repair: bool, + pub discard: bool, pub addr: [u16; 8], pub port: u16, pub v6: bool,