Fix copying packets in Window Service (#6429)
* Fix copying packets in Window Service * Parallelize over batches instead of within batches
This commit is contained in:
parent
951e1f8b48
commit
364583ea5c
|
@ -78,19 +78,24 @@ where
|
||||||
F: Fn(&Shred, u64) -> bool + Sync,
|
F: Fn(&Shred, u64) -> bool + Sync,
|
||||||
{
|
{
|
||||||
let timer = Duration::from_millis(200);
|
let timer = Duration::from_millis(200);
|
||||||
let mut packets = r.recv_timeout(timer)?;
|
let mut packets = vec![r.recv_timeout(timer)?];
|
||||||
|
let mut total_packets = packets[0].packets.len();
|
||||||
|
|
||||||
while let Ok(mut more_packets) = r.try_recv() {
|
while let Ok(more_packets) = r.try_recv() {
|
||||||
packets.packets.append(&mut more_packets.packets)
|
total_packets += more_packets.packets.len();
|
||||||
|
packets.push(more_packets)
|
||||||
}
|
}
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len());
|
inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
|
||||||
|
|
||||||
let last_root = blocktree.last_root();
|
let last_root = blocktree.last_root();
|
||||||
let shreds: Vec<_> = thread_pool.install(|| {
|
let shreds: Vec<_> = thread_pool.install(|| {
|
||||||
packets
|
packets
|
||||||
.packets
|
|
||||||
.par_iter_mut()
|
.par_iter_mut()
|
||||||
|
.flat_map(|packets| {
|
||||||
|
packets
|
||||||
|
.packets
|
||||||
|
.iter_mut()
|
||||||
.filter_map(|packet| {
|
.filter_map(|packet| {
|
||||||
if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) {
|
if let Ok(shred) = Shred::new_from_serialized_shred(packet.data.to_vec()) {
|
||||||
if shred_filter(&shred, last_root) {
|
if shred_filter(&shred, last_root) {
|
||||||
|
@ -106,21 +111,21 @@ where
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
})
|
||||||
.collect()
|
.collect()
|
||||||
});
|
});
|
||||||
|
|
||||||
trace!("{:?} shreds from packets", shreds.len());
|
trace!("{:?} shreds from packets", shreds.len());
|
||||||
|
|
||||||
trace!(
|
trace!("{} num total shreds received: {}", my_pubkey, total_packets);
|
||||||
"{} num shreds received: {}",
|
|
||||||
my_pubkey,
|
|
||||||
packets.packets.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
|
for packets in packets.into_iter() {
|
||||||
if !packets.packets.is_empty() {
|
if !packets.packets.is_empty() {
|
||||||
// Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit)
|
// Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit)
|
||||||
let _ = retransmit.send(packets);
|
let _ = retransmit.send(packets);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?;
|
blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue