From 4d0abebe0e142a7f87b0c03f55aede7fb1ee14f9 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 6 Apr 2023 21:33:43 +0000 Subject: [PATCH] removes Packet Meta.sender_stake and find_packet_sender_stake_stage (#31077) Packet Meta.sender_stake is unused since https://github.com/solana-labs/solana/pull/26512 removed sender_stake from banking-stage buffer prioritization. --- core/benches/sigverify_stage.rs | 2 +- core/benches/unprocessed_packet_batches.rs | 21 +-- core/src/find_packet_sender_stake_stage.rs | 163 --------------------- core/src/lib.rs | 1 - core/src/sigverify_stage.rs | 16 +- core/src/tpu.rs | 34 +---- sdk/src/packet.rs | 2 - streamer/src/nonblocking/quic.rs | 5 - streamer/src/streamer.rs | 28 ---- 9 files changed, 15 insertions(+), 257 deletions(-) delete mode 100644 core/src/find_packet_sender_stake_stage.rs diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index c5b6541488..0477813891 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -177,7 +177,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) { batch .iter_mut() .for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET); - packet_s.send(vec![batch]).unwrap(); + packet_s.send(batch).unwrap(); } let mut received = 0; let mut total_tracer_packets_received_in_sigverify_stage = 0; diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs index 4732b5cbfc..554e60a097 100644 --- a/core/benches/unprocessed_packet_batches.rs +++ b/core/benches/unprocessed_packet_batches.rs @@ -4,7 +4,6 @@ extern crate test; use { - rand::distributions::{Distribution, Uniform}, solana_core::{ forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, unprocessed_packet_batches::*, @@ -30,16 +29,14 @@ fn build_packet_batch( ) -> (PacketBatch, Vec) { let packet_batch = PacketBatch::new( (0..packet_per_batch_count) - .map(|sender_stake| { + .map(|_| { let tx = system_transaction::transfer( &Keypair::new(), &solana_sdk::pubkey::new_rand(), 1, recent_blockhash.unwrap_or_else(Hash::new_unique), ); - let mut packet = Packet::from_data(None, tx).unwrap(); - packet.meta_mut().sender_stake = sender_stake as u64; - packet + Packet::from_data(None, tx).unwrap() }) .collect(), ); @@ -52,9 +49,6 @@ fn build_randomized_packet_batch( packet_per_batch_count: usize, recent_blockhash: Option, ) -> (PacketBatch, Vec) { - let mut rng = rand::thread_rng(); - let distribution = Uniform::from(0..200_000); - let packet_batch = PacketBatch::new( (0..packet_per_batch_count) .map(|_| { @@ -64,10 +58,7 @@ fn build_randomized_packet_batch( 1, recent_blockhash.unwrap_or_else(Hash::new_unique), ); - let mut packet = Packet::from_data(None, tx).unwrap(); - let sender_stake = distribution.sample(&mut rng); - packet.meta_mut().sender_stake = sender_stake as u64; - packet + Packet::from_data(None, tx).unwrap() }) .collect(), ); @@ -119,11 +110,7 @@ fn bench_packet_clone(bencher: &mut Bencher) { let mut timer = Measure::start("insert_batch"); packet_batch.iter().for_each(|packet| { - let mut packet = packet.clone(); - packet.meta_mut().sender_stake *= 2; - if packet.meta().sender_stake > 2 { - outer_packet = packet; - } + outer_packet = packet.clone(); }); timer.stop(); diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs deleted file mode 100644 index bc23652b5e..0000000000 --- a/core/src/find_packet_sender_stake_stage.rs +++ /dev/null @@ -1,163 +0,0 @@ -use { - crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, - solana_measure::measure::Measure, - solana_perf::packet::PacketBatch, - solana_sdk::timing::timestamp, - solana_streamer::streamer::{self, StakedNodes, StreamerError}, - std::{ - collections::HashMap, - net::IpAddr, - sync::{Arc, RwLock}, - thread::{self, Builder, JoinHandle}, - }, -}; - -// Try to target 50ms, rough timings from a testnet validator -// -// 50ms/(200ns/packet) = 250k packets -const MAX_FINDPACKETSENDERSTAKE_BATCH: usize = 250_000; - -pub type FindPacketSenderStakeSender = Sender>; -pub type FindPacketSenderStakeReceiver = Receiver>; - -#[derive(Debug, Default)] -struct FindPacketSenderStakeStats { - last_print: u64, - refresh_ip_to_stake_time: u64, - apply_sender_stakes_time: u64, - send_batches_time: u64, - receive_batches_time: u64, - total_batches: u64, - total_packets: u64, - total_discard_random: usize, - total_discard_random_time_us: usize, -} - -impl FindPacketSenderStakeStats { - fn report(&mut self, name: &'static str) { - let now = timestamp(); - let elapsed_ms = now - self.last_print; - if elapsed_ms > 2000 { - datapoint_info!( - name, - ( - "refresh_ip_to_stake_time_us", - self.refresh_ip_to_stake_time as i64, - i64 - ), - ( - "apply_sender_stakes_time_us", - self.apply_sender_stakes_time as i64, - i64 - ), - ("send_batches_time_us", self.send_batches_time as i64, i64), - ( - "receive_batches_time_ns", - self.receive_batches_time as i64, - i64 - ), - ("total_batches", self.total_batches as i64, i64), - ("total_packets", self.total_packets as i64, i64), - ("total_discard_random", self.total_discard_random, i64), - ( - "total_discard_random_time_us", - self.total_discard_random_time_us, - i64 - ), - ); - *self = FindPacketSenderStakeStats::default(); - self.last_print = now; - } - } -} - -pub struct FindPacketSenderStakeStage { - thread_hdl: JoinHandle<()>, -} - -impl FindPacketSenderStakeStage { - pub fn new( - packet_receiver: streamer::PacketBatchReceiver, - sender: FindPacketSenderStakeSender, - staked_nodes: Arc>, - name: &'static str, - ) -> Self { - let mut stats = FindPacketSenderStakeStats::default(); - let thread_hdl = Builder::new() - .name("solPktStake".to_string()) - .spawn(move || loop { - match streamer::recv_packet_batches(&packet_receiver) { - Ok((mut batches, num_packets, recv_duration)) => { - let num_batches = batches.len(); - - let mut discard_random_time = - Measure::start("findpacketsenderstake_discard_random_time"); - let non_discarded_packets = solana_perf::discard::discard_batches_randomly( - &mut batches, - MAX_FINDPACKETSENDERSTAKE_BATCH, - num_packets, - ); - let num_discarded_randomly = - num_packets.saturating_sub(non_discarded_packets); - discard_random_time.stop(); - - let mut apply_sender_stakes_time = - Measure::start("apply_sender_stakes_time"); - let mut apply_stake = || { - let ip_to_stake = staked_nodes.read().unwrap(); - Self::apply_sender_stakes(&mut batches, &ip_to_stake.ip_stake_map); - }; - apply_stake(); - apply_sender_stakes_time.stop(); - - let mut send_batches_time = Measure::start("send_batches_time"); - if let Err(e) = sender.send(batches) { - info!("Sender error: {:?}", e); - } - send_batches_time.stop(); - - stats.apply_sender_stakes_time = stats - .apply_sender_stakes_time - .saturating_add(apply_sender_stakes_time.as_us()); - stats.send_batches_time = stats - .send_batches_time - .saturating_add(send_batches_time.as_us()); - stats.receive_batches_time = stats - .receive_batches_time - .saturating_add(recv_duration.as_nanos() as u64); - stats.total_batches = - stats.total_batches.saturating_add(num_batches as u64); - stats.total_packets = - stats.total_packets.saturating_add(num_packets as u64); - stats.total_discard_random_time_us += discard_random_time.as_us() as usize; - stats.total_discard_random += num_discarded_randomly; - } - Err(e) => match e { - StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break, - StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (), - _ => error!("error: {:?}", e), - }, - } - - stats.report(name); - }) - .unwrap(); - Self { thread_hdl } - } - - fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap) { - batches - .iter_mut() - .flat_map(|batch| batch.iter_mut()) - .for_each(|packet| { - packet.meta_mut().sender_stake = ip_to_stake - .get(&packet.meta().addr) - .copied() - .unwrap_or_default(); - }); - } - - pub fn join(self) -> thread::Result<()> { - self.thread_hdl.join() - } -} diff --git a/core/src/lib.rs b/core/src/lib.rs index fd94082279..6f530313ab 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -27,7 +27,6 @@ pub mod cost_update_service; pub mod drop_bank_service; pub mod duplicate_repair_status; pub mod fetch_stage; -pub mod find_packet_sender_stake_stage; pub mod fork_choice; pub mod forward_packet_batches_by_accounts; pub mod gen_keys; diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index d9c69f862d..aad92f5b98 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -6,9 +6,9 @@ //! if perf-libs are available use { - crate::{find_packet_sender_stake_stage::FindPacketSenderStakeReceiver, sigverify}, + crate::sigverify, core::time::Duration, - crossbeam_channel::{RecvTimeoutError, SendError}, + crossbeam_channel::{Receiver, RecvTimeoutError, SendError}, itertools::Itertools, solana_measure::measure::Measure, solana_perf::{ @@ -236,7 +236,7 @@ impl SigVerifier for DisabledSigVerifier { impl SigVerifyStage { pub fn new( - packet_receiver: FindPacketSenderStakeReceiver, + packet_receiver: Receiver, verifier: T, name: &'static str, ) -> Self { @@ -292,11 +292,11 @@ impl SigVerifyStage { fn verifier( deduper: &Deduper, - recvr: &FindPacketSenderStakeReceiver, + recvr: &Receiver, verifier: &mut T, stats: &mut SigVerifierStats, ) -> Result<(), T::SendType> { - let (mut batches, num_packets, recv_duration) = streamer::recv_vec_packet_batches(recvr)?; + let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; let batches_len = batches.len(); debug!( @@ -405,7 +405,7 @@ impl SigVerifyStage { } fn verifier_service( - packet_receiver: FindPacketSenderStakeReceiver, + packet_receiver: Receiver, mut verifier: T, name: &'static str, ) -> JoinHandle<()> { @@ -450,7 +450,7 @@ impl SigVerifyStage { } fn verifier_services( - packet_receiver: FindPacketSenderStakeReceiver, + packet_receiver: Receiver, verifier: T, name: &'static str, ) -> JoinHandle<()> { @@ -574,7 +574,7 @@ mod tests { .iter_mut() .for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET); assert_eq!(batch.len(), packets_per_batch); - packet_s.send(vec![batch]).unwrap(); + packet_s.send(batch).unwrap(); } let mut received = 0; let mut total_tracer_packets_received_in_sigverify_stage = 0; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 02c4705efb..b36c4d77ec 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -12,7 +12,6 @@ use { GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker, }, fetch_stage::FetchStage, - find_packet_sender_stake_stage::FindPacketSenderStakeStage, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, staked_nodes_updater_service::StakedNodesUpdaterService, @@ -67,8 +66,6 @@ pub struct Tpu { broadcast_stage: BroadcastStage, tpu_quic_t: thread::JoinHandle<()>, tpu_forwards_quic_t: thread::JoinHandle<()>, - find_packet_sender_stake_stage: FindPacketSenderStakeStage, - vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage, staked_nodes_updater_service: StakedNodesUpdaterService, tracer_thread_hdl: TracerThread, } @@ -141,25 +138,6 @@ impl Tpu { shared_staked_nodes_overrides, ); - let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded(); - - let find_packet_sender_stake_stage = FindPacketSenderStakeStage::new( - packet_receiver, - find_packet_sender_stake_sender, - staked_nodes.clone(), - "Tpu", - ); - - let (vote_find_packet_sender_stake_sender, vote_find_packet_sender_stake_receiver) = - unbounded(); - - let vote_find_packet_sender_stake_stage = FindPacketSenderStakeStage::new( - vote_packet_receiver, - vote_find_packet_sender_stake_sender, - staked_nodes.clone(), - "Vote", - ); - let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); let stats = Arc::new(StreamStats::default()); @@ -205,18 +183,14 @@ impl Tpu { let sigverify_stage = { let verifier = TransactionSigVerifier::new(non_vote_sender); - SigVerifyStage::new(find_packet_sender_stake_receiver, verifier, "tpu-verifier") + SigVerifyStage::new(packet_receiver, verifier, "tpu-verifier") }; let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender); - SigVerifyStage::new( - vote_find_packet_sender_stake_receiver, - verifier, - "tpu-vote-verifier", - ) + SigVerifyStage::new(vote_packet_receiver, verifier, "tpu-vote-verifier") }; let (gossip_vote_sender, gossip_vote_receiver) = @@ -271,8 +245,6 @@ impl Tpu { broadcast_stage, tpu_quic_t, tpu_forwards_quic_t, - find_packet_sender_stake_stage, - vote_find_packet_sender_stake_stage, staked_nodes_updater_service, tracer_thread_hdl, } @@ -285,8 +257,6 @@ impl Tpu { self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), - self.find_packet_sender_stake_stage.join(), - self.vote_find_packet_sender_stake_stage.join(), self.staked_nodes_updater_service.join(), self.tpu_quic_t.join(), self.tpu_forwards_quic_t.join(), diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index efe4271291..0d0a2d0605 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -39,7 +39,6 @@ pub struct Meta { pub addr: IpAddr, pub port: u16, pub flags: PacketFlags, - pub sender_stake: u64, } // serde_as is used as a work around because array isn't supported by serde @@ -243,7 +242,6 @@ impl Default for Meta { addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED), port: 0, flags: PacketFlags::empty(), - sender_stake: 0, } } } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 892ff7205a..0c495a0037 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -351,7 +351,6 @@ fn handle_and_cache_new_connection( connection_table, stream_exit, params.stats.clone(), - params.stake, peer_type, wait_for_chunk_timeout, )); @@ -689,7 +688,6 @@ async fn handle_connection( connection_table: Arc>, stream_exit: Arc, stats: Arc, - stake: u64, peer_type: ConnectionPeerType, wait_for_chunk_timeout: Duration, ) { @@ -736,7 +734,6 @@ async fn handle_connection( &remote_addr, &packet_sender, stats.clone(), - stake, peer_type, ) .await @@ -788,7 +785,6 @@ async fn handle_chunk( remote_addr: &SocketAddr, packet_sender: &AsyncSender, stats: Arc, - stake: u64, peer_type: ConnectionPeerType, ) -> bool { match chunk { @@ -817,7 +813,6 @@ async fn handle_chunk( if packet_accum.is_none() { let mut meta = Meta::default(); meta.set_socket_addr(remote_addr); - meta.sender_stake = stake; *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 77f31f5907..0a4006079f 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -313,34 +313,6 @@ fn recv_send( Ok(()) } -pub fn recv_vec_packet_batches( - recvr: &Receiver>, -) -> Result<(Vec, usize, Duration)> { - let timer = Duration::new(1, 0); - let mut packet_batches = recvr.recv_timeout(timer)?; - let recv_start = Instant::now(); - trace!("got packets"); - let mut num_packets = packet_batches - .iter() - .map(|packets| packets.len()) - .sum::(); - while let Ok(packet_batch) = recvr.try_recv() { - trace!("got more packets"); - num_packets += packet_batch - .iter() - .map(|packets| packets.len()) - .sum::(); - packet_batches.extend(packet_batch); - } - let recv_duration = recv_start.elapsed(); - trace!( - "packet batches len: {}, num packets: {}", - packet_batches.len(), - num_packets - ); - Ok((packet_batches, num_packets, recv_duration)) -} - pub fn recv_packet_batches( recvr: &PacketBatchReceiver, ) -> Result<(Vec, usize, Duration)> {