From 7d56fa836336c21f6295a27a209f2e33fad8f8e4 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 22 Jul 2021 14:49:21 +0000 Subject: [PATCH] sends packets in batches from sigverify-stage (#18446) sigverify-stage is breaking batches to single-item vectors before sending them down the channel: https://github.com/solana-labs/solana/blob/d451363dc/core/src/sigverify_stage.rs#L88-L92 Also simplifying window-service code, reducing number of nested branches. --- core/src/sigverify_stage.rs | 8 +-- core/src/window_service.rs | 104 ++++++++++++++---------------------- 2 files changed, 41 insertions(+), 71 deletions(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 91354a843e..d3de643735 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -84,13 +84,7 @@ impl SigVerifyStage { len, id ); - - let verified_batch = verifier.verify_batch(batch); - - for v in verified_batch { - sendr.send(vec![v])?; - } - + sendr.send(verifier.verify_batch(batch))?; verify_batch_time.stop(); debug!( diff --git a/core/src/window_service.rs b/core/src/window_service.rs index b4c0039597..a80f0d4938 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -13,9 +13,7 @@ use crate::{ use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, }; -use rayon::iter::IntoParallelRefMutIterator; -use rayon::iter::ParallelIterator; -use rayon::ThreadPool; +use rayon::{prelude::*, ThreadPool}; use solana_gossip::cluster_info::ClusterInfo; use solana_ledger::{ blockstore::{self, Blockstore, BlockstoreInsertionMetrics, MAX_DATA_SHREDS_PER_SLOT}, @@ -23,7 +21,7 @@ use solana_ledger::{ shred::{Nonce, Shred}, }; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; -use solana_perf::packet::Packets; +use solana_perf::packet::{Packet, Packets}; use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}; @@ -31,6 +29,7 @@ use solana_streamer::streamer::PacketSender; use std::collections::HashSet; use std::{ net::{SocketAddr, UdpSocket}, + ops::Deref, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, @@ -227,74 +226,51 @@ where { let timer = Duration::from_millis(200); let mut packets = verified_receiver.recv_timeout(timer)?; - let mut total_packets: usize = packets.iter().map(|p| p.packets.len()).sum(); - - while let Ok(mut more_packets) = verified_receiver.try_recv() { - let count: usize = more_packets.iter().map(|p| p.packets.len()).sum(); - total_packets += count; - packets.append(&mut more_packets) - } - + packets.extend(verified_receiver.try_iter().flatten()); + let total_packets: usize = packets.iter().map(|p| p.packets.len()).sum(); let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", total_packets); let root_bank = bank_forks.read().unwrap().root_bank(); let last_root = blockstore.last_root(); + let handle_packet = |packet: &mut Packet| { + if packet.meta.discard { + inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1); + return None; + } + // shred fetch stage should be sending packets + // with sufficiently large buffers. Needed to ensure + // call to `new_from_serialized_shred` is safe. + assert_eq!(packet.data.len(), PACKET_DATA_SIZE); + let serialized_shred = packet.data.to_vec(); + let shred = match Shred::new_from_serialized_shred(serialized_shred) { + Ok(shred) if shred_filter(&shred, last_root) => { + let leader_pubkey = + leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref())); + packet.meta.slot = shred.slot(); + packet.meta.seed = shred.seed(leader_pubkey, root_bank.deref()); + shred + } + Ok(_) | Err(_) => { + packet.meta.discard = true; + return None; + } + }; + if packet.meta.repair { + let repair_info = RepairMeta { + _from_addr: packet.meta.addr(), + // If can't parse the nonce, dump the packet. + nonce: repair_response::nonce(&packet.data)?, + }; + Some((shred, Some(repair_info))) + } else { + Some((shred, None)) + } + }; let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets .par_iter_mut() - .flat_map(|packets| { - packets - .packets - .iter_mut() - .filter_map(|packet| { - if packet.meta.discard { - inc_new_counter_debug!( - "streamer-recv_window-invalid_or_unnecessary_packet", - 1 - ); - None - } else { - // shred fetch stage should be sending packets - // with sufficiently large buffers. Needed to ensure - // call to `new_from_serialized_shred` is safe. - assert_eq!(packet.data.len(), PACKET_DATA_SIZE); - let serialized_shred = packet.data.to_vec(); - if let Ok(shred) = Shred::new_from_serialized_shred(serialized_shred) { - let repair_info = { - if packet.meta.repair { - if let Some(nonce) = repair_response::nonce(&packet.data) { - let repair_info = RepairMeta { - _from_addr: packet.meta.addr(), - nonce, - }; - Some(repair_info) - } else { - // If can't parse the nonce, dump the packet - return None; - } - } else { - None - } - }; - if shred_filter(&shred, last_root) { - let leader_pubkey = leader_schedule_cache - .slot_leader_at(shred.slot(), Some(&root_bank)); - packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(leader_pubkey, &root_bank); - Some((shred, repair_info)) - } else { - packet.meta.discard = true; - None - } - } else { - packet.meta.discard = true; - None - } - } - }) - .collect::>() - }) + .flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet)) .unzip() });