From 976b138e7698407a4eb02d13f5ff60abf192647e Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 16 Dec 2021 14:47:55 +0000 Subject: [PATCH] Add tx weighting stage --- core/benches/sigverify_stage.rs | 2 +- core/src/lib.rs | 1 + core/src/shred_fetch_stage.rs | 14 ++--- core/src/sigverify_stage.rs | 12 ++-- core/src/tpu.rs | 30 +++++++++- core/src/transaction_weighting_stage.rs | 77 +++++++++++++++++++++++++ sdk/src/packet.rs | 2 + streamer/src/streamer.rs | 28 +++++++++ 8 files changed, 150 insertions(+), 16 deletions(-) create mode 100644 core/src/transaction_weighting_stage.rs diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index f5defc814..501ed1386 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -159,7 +159,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { for _ in 0..batches.len() { if let Some(batch) = batches.pop() { sent_len += batch.packets.len(); - packet_s.send(batch).unwrap(); + packet_s.send(vec![batch]).unwrap(); } } let mut received = 0; diff --git a/core/src/lib.rs b/core/src/lib.rs index 8f8e9adc3..ae7029cfd 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -61,6 +61,7 @@ pub mod system_monitor_service; mod tower1_7_14; pub mod tower_storage; pub mod tpu; +pub mod transaction_weighting_stage; pub mod tree_diff; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index ff3db68b1..b3e4c2d6b 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -2,17 +2,17 @@ use { crate::packet_hasher::PacketHasher, - crossbeam_channel::unbounded, + crossbeam_channel::{unbounded, Sender}, lru::LruCache, solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, solana_perf::{ cuda_runtime::PinnedVec, - packet::{Packet, PacketBatchRecycler, PacketFlags}, + packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, recycler::Recycler, }, solana_runtime::bank_forks::BankForks, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, - solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, + solana_streamer::streamer::{self, PacketBatchReceiver}, std::{ net::UdpSocket, sync::{atomic::AtomicBool, Arc, RwLock}, @@ -65,7 +65,7 @@ impl ShredFetchStage { // updates packets received on a channel and sends them on another channel fn modify_packets( recvr: PacketBatchReceiver, - sendr: PacketBatchSender, + sendr: Sender>, bank_forks: Option>>, name: &'static str, modify: F, @@ -125,7 +125,7 @@ impl ShredFetchStage { stats = ShredFetchStats::default(); last_stats = Instant::now(); } - if sendr.send(packet_batch).is_err() { + if sendr.send(vec![packet_batch]).is_err() { break; } } @@ -134,7 +134,7 @@ impl ShredFetchStage { fn packet_modifier( sockets: Vec>, exit: &Arc, - sender: PacketBatchSender, + sender: Sender>, recycler: Recycler>, bank_forks: Option>>, name: &'static str, @@ -170,7 +170,7 @@ impl ShredFetchStage { sockets: Vec>, forward_sockets: Vec>, repair_socket: Arc, - sender: &PacketBatchSender, + sender: &Sender>, bank_forks: Option>>, exit: &Arc, ) -> Self { diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 03bf6005b..cdbe0a96b 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -16,7 +16,7 @@ use { sigverify::{count_valid_packets, shrink_batches, Deduper}, }, solana_sdk::timing, - solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, + solana_streamer::streamer::{self, StreamerError}, std::{ thread::{self, Builder, JoinHandle}, time::Instant, @@ -192,7 +192,7 @@ impl SigVerifier for DisabledSigVerifier { impl SigVerifyStage { #[allow(clippy::new_ret_no_self)] pub fn new( - packet_receiver: Receiver, + packet_receiver: Receiver>, verified_sender: Sender>, verifier: T, ) -> Self { @@ -227,12 +227,12 @@ impl SigVerifyStage { fn verifier( deduper: &Deduper, - recvr: &PacketBatchReceiver, + recvr: &Receiver>, sendr: &Sender>, verifier: &T, stats: &mut SigVerifierStats, ) -> Result<()> { - let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; + let (mut batches, num_packets, recv_duration) = streamer::recv_vec_packet_batches(recvr)?; let batches_len = batches.len(); debug!( @@ -312,7 +312,7 @@ impl SigVerifyStage { } fn verifier_service( - packet_receiver: PacketBatchReceiver, + packet_receiver: Receiver>, verified_sender: Sender>, verifier: &T, ) -> JoinHandle<()> { @@ -358,7 +358,7 @@ impl SigVerifyStage { } fn verifier_services( - packet_receiver: PacketBatchReceiver, + packet_receiver: Receiver>, verified_sender: Sender>, verifier: T, ) -> JoinHandle<()> { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index a1deaf571..4c6489e3d 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -12,6 +12,7 @@ use { fetch_stage::FetchStage, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, + transaction_weighting_stage::TransactionWeightStage, }, crossbeam_channel::{unbounded, Receiver}, solana_gossip::cluster_info::ClusterInfo, @@ -55,6 +56,8 @@ pub struct Tpu { cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, tpu_quic_t: thread::JoinHandle<()>, + transaction_weight_stage: TransactionWeightStage, + vote_transaction_weight_stage: TransactionWeightStage, } impl Tpu { @@ -103,6 +106,25 @@ impl Tpu { poh_recorder, tpu_coalesce_ms, ); + + let (weighted_sender, weighted_receiver) = unbounded(); + + let transaction_weight_stage = TransactionWeightStage::new( + packet_receiver, + weighted_sender, + bank_forks.clone(), + cluster_info.clone(), + ); + + let (vote_weighted_sender, vote_weighted_receiver) = unbounded(); + + let vote_transaction_weight_stage = TransactionWeightStage::new( + vote_packet_receiver, + vote_weighted_sender, + bank_forks.clone(), + cluster_info.clone(), + ); + let (verified_sender, verified_receiver) = unbounded(); let tpu_quic_t = solana_streamer::quic::spawn_server( @@ -117,7 +139,7 @@ impl Tpu { let sigverify_stage = { let verifier = TransactionSigVerifier::default(); - SigVerifyStage::new(packet_receiver, verified_sender, verifier) + SigVerifyStage::new(weighted_receiver, verified_sender, verifier) }; let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); @@ -125,7 +147,7 @@ impl Tpu { let vote_sigverify_stage = { let verifier = TransactionSigVerifier::new_reject_non_vote(); SigVerifyStage::new( - vote_packet_receiver, + vote_weighted_receiver, verified_tpu_vote_packets_sender, verifier, ) @@ -179,6 +201,8 @@ impl Tpu { cluster_info_vote_listener, broadcast_stage, tpu_quic_t, + transaction_weight_stage, + vote_transaction_weight_stage, } } @@ -189,6 +213,8 @@ impl Tpu { self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), + self.transaction_weight_stage.join(), + self.vote_transaction_weight_stage.join(), ]; self.tpu_quic_t.join()?; let broadcast_result = self.broadcast_stage.join(); diff --git a/core/src/transaction_weighting_stage.rs b/core/src/transaction_weighting_stage.rs new file mode 100644 index 000000000..6dba1fc0d --- /dev/null +++ b/core/src/transaction_weighting_stage.rs @@ -0,0 +1,77 @@ +use { + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + rayon::prelude::*, + solana_gossip::cluster_info::ClusterInfo, + solana_perf::packet::PacketBatch, + solana_runtime::bank_forks::BankForks, + solana_streamer::streamer::{self, StreamerError}, + std::{ + collections::HashMap, + net::IpAddr, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, + time::Instant, + }, +}; + +pub struct TransactionWeightStage { + thread_hdl: JoinHandle<()>, +} + +impl TransactionWeightStage { + pub fn new( + packet_receiver: Receiver, + sender: Sender>, + bank_forks: Arc>, + cluster_info: Arc, + ) -> Self { + let thread_hdl = Builder::new() + .name("sol-tx-weight".to_string()) + .spawn(move || { + let mut last_stakes = Instant::now(); + let mut ip_to_stake: HashMap = HashMap::new(); + loop { + if last_stakes.elapsed().as_millis() > 1000 { + let root_bank = bank_forks.read().unwrap().root_bank(); + let staked_nodes = root_bank.staked_nodes(); + ip_to_stake = cluster_info + .tvu_peers() + .into_iter() + .filter_map(|node| { + let stake = staked_nodes.get(&node.id)?; + Some((node.tvu.ip(), *stake)) + }) + .collect(); + last_stakes = Instant::now(); + } + match streamer::recv_packet_batches(&packet_receiver) { + Ok((mut batches, _num_packets, _recv_duration)) => { + Self::apply_weights(&mut batches, &ip_to_stake); + if let Err(e) = sender.send(batches) { + info!("Sender error: {:?}", e); + } + } + Err(e) => match e { + StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break, + StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (), + _ => error!("error: {:?}", e), + }, + } + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn apply_weights(batches: &mut [PacketBatch], ip_to_stake: &HashMap) { + batches.into_par_iter().for_each(|batch| { + batch.packets.par_iter_mut().for_each(|packet| { + packet.meta.weight = *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0); + }); + }); + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index efea21904..e79620dbe 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -32,6 +32,7 @@ pub struct Meta { pub addr: IpAddr, pub port: u16, pub flags: PacketFlags, + pub weight: u64, } #[derive(Clone)] @@ -145,6 +146,7 @@ impl Default for Meta { addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED), port: 0, flags: PacketFlags::empty(), + weight: 0, } } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 81535c8d2..b16a9eff6 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -256,6 +256,34 @@ fn recv_send( Ok(()) } +pub fn recv_vec_packet_batches( + recvr: &Receiver>, +) -> Result<(Vec, usize, Duration)> { + let timer = Duration::new(1, 0); + let mut packet_batches = recvr.recv_timeout(timer)?; + let recv_start = Instant::now(); + trace!("got packets"); + let mut num_packets = packet_batches + .iter() + .map(|packets| packets.packets.len()) + .sum::(); + while let Ok(packet_batch) = recvr.try_recv() { + trace!("got more packets"); + num_packets += packet_batch + .iter() + .map(|packets| packets.packets.len()) + .sum::(); + packet_batches.extend(packet_batch); + } + let recv_duration = recv_start.elapsed(); + trace!( + "packet batches len: {}, num packets: {}", + packet_batches.len(), + num_packets + ); + Ok((packet_batches, num_packets, recv_duration)) +} + pub fn recv_packet_batches( recvr: &PacketBatchReceiver, ) -> Result<(Vec, usize, Duration)> {