Add tx weighting stage

This commit is contained in:
Stephen Akridge 2021-12-16 14:47:55 +00:00 committed by Tao Zhu
parent 664deb2157
commit 976b138e76
8 changed files with 150 additions and 16 deletions

View File

@ -159,7 +159,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
for _ in 0..batches.len() { for _ in 0..batches.len() {
if let Some(batch) = batches.pop() { if let Some(batch) = batches.pop() {
sent_len += batch.packets.len(); sent_len += batch.packets.len();
packet_s.send(batch).unwrap(); packet_s.send(vec![batch]).unwrap();
} }
} }
let mut received = 0; let mut received = 0;

View File

@ -61,6 +61,7 @@ pub mod system_monitor_service;
mod tower1_7_14; mod tower1_7_14;
pub mod tower_storage; pub mod tower_storage;
pub mod tpu; pub mod tpu;
pub mod transaction_weighting_stage;
pub mod tree_diff; pub mod tree_diff;
pub mod tvu; pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes; pub mod unfrozen_gossip_verified_vote_hashes;

View File

@ -2,17 +2,17 @@
use { use {
crate::packet_hasher::PacketHasher, crate::packet_hasher::PacketHasher,
crossbeam_channel::unbounded, crossbeam_channel::{unbounded, Sender},
lru::LruCache, lru::LruCache,
solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats},
solana_perf::{ solana_perf::{
cuda_runtime::PinnedVec, cuda_runtime::PinnedVec,
packet::{Packet, PacketBatchRecycler, PacketFlags}, packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
recycler::Recycler, recycler::Recycler,
}, },
solana_runtime::bank_forks::BankForks, solana_runtime::bank_forks::BankForks,
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, solana_streamer::streamer::{self, PacketBatchReceiver},
std::{ std::{
net::UdpSocket, net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock}, sync::{atomic::AtomicBool, Arc, RwLock},
@ -65,7 +65,7 @@ impl ShredFetchStage {
// updates packets received on a channel and sends them on another channel // updates packets received on a channel and sends them on another channel
fn modify_packets<F>( fn modify_packets<F>(
recvr: PacketBatchReceiver, recvr: PacketBatchReceiver,
sendr: PacketBatchSender, sendr: Sender<Vec<PacketBatch>>,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
name: &'static str, name: &'static str,
modify: F, modify: F,
@ -125,7 +125,7 @@ impl ShredFetchStage {
stats = ShredFetchStats::default(); stats = ShredFetchStats::default();
last_stats = Instant::now(); last_stats = Instant::now();
} }
if sendr.send(packet_batch).is_err() { if sendr.send(vec![packet_batch]).is_err() {
break; break;
} }
} }
@ -134,7 +134,7 @@ impl ShredFetchStage {
fn packet_modifier<F>( fn packet_modifier<F>(
sockets: Vec<Arc<UdpSocket>>, sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
sender: PacketBatchSender, sender: Sender<Vec<PacketBatch>>,
recycler: Recycler<PinnedVec<Packet>>, recycler: Recycler<PinnedVec<Packet>>,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
name: &'static str, name: &'static str,
@ -170,7 +170,7 @@ impl ShredFetchStage {
sockets: Vec<Arc<UdpSocket>>, sockets: Vec<Arc<UdpSocket>>,
forward_sockets: Vec<Arc<UdpSocket>>, forward_sockets: Vec<Arc<UdpSocket>>,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
sender: &PacketBatchSender, sender: &Sender<Vec<PacketBatch>>,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {

View File

@ -16,7 +16,7 @@ use {
sigverify::{count_valid_packets, shrink_batches, Deduper}, sigverify::{count_valid_packets, shrink_batches, Deduper},
}, },
solana_sdk::timing, solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, solana_streamer::streamer::{self, StreamerError},
std::{ std::{
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Instant, time::Instant,
@ -192,7 +192,7 @@ impl SigVerifier for DisabledSigVerifier {
impl SigVerifyStage { impl SigVerifyStage {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new<T: SigVerifier + 'static + Send + Clone>( pub fn new<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: Receiver<PacketBatch>, packet_receiver: Receiver<Vec<PacketBatch>>,
verified_sender: Sender<Vec<PacketBatch>>, verified_sender: Sender<Vec<PacketBatch>>,
verifier: T, verifier: T,
) -> Self { ) -> Self {
@ -227,12 +227,12 @@ impl SigVerifyStage {
fn verifier<T: SigVerifier>( fn verifier<T: SigVerifier>(
deduper: &Deduper, deduper: &Deduper,
recvr: &PacketBatchReceiver, recvr: &Receiver<Vec<PacketBatch>>,
sendr: &Sender<Vec<PacketBatch>>, sendr: &Sender<Vec<PacketBatch>>,
verifier: &T, verifier: &T,
stats: &mut SigVerifierStats, stats: &mut SigVerifierStats,
) -> Result<()> { ) -> Result<()> {
let (mut batches, num_packets, recv_duration) = streamer::recv_packet_batches(recvr)?; let (mut batches, num_packets, recv_duration) = streamer::recv_vec_packet_batches(recvr)?;
let batches_len = batches.len(); let batches_len = batches.len();
debug!( debug!(
@ -312,7 +312,7 @@ impl SigVerifyStage {
} }
fn verifier_service<T: SigVerifier + 'static + Send + Clone>( fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: PacketBatchReceiver, packet_receiver: Receiver<Vec<PacketBatch>>,
verified_sender: Sender<Vec<PacketBatch>>, verified_sender: Sender<Vec<PacketBatch>>,
verifier: &T, verifier: &T,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
@ -358,7 +358,7 @@ impl SigVerifyStage {
} }
fn verifier_services<T: SigVerifier + 'static + Send + Clone>( fn verifier_services<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: PacketBatchReceiver, packet_receiver: Receiver<Vec<PacketBatch>>,
verified_sender: Sender<Vec<PacketBatch>>, verified_sender: Sender<Vec<PacketBatch>>,
verifier: T, verifier: T,
) -> JoinHandle<()> { ) -> JoinHandle<()> {

View File

@ -12,6 +12,7 @@ use {
fetch_stage::FetchStage, fetch_stage::FetchStage,
sigverify::TransactionSigVerifier, sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage, sigverify_stage::SigVerifyStage,
transaction_weighting_stage::TransactionWeightStage,
}, },
crossbeam_channel::{unbounded, Receiver}, crossbeam_channel::{unbounded, Receiver},
solana_gossip::cluster_info::ClusterInfo, solana_gossip::cluster_info::ClusterInfo,
@ -55,6 +56,8 @@ pub struct Tpu {
cluster_info_vote_listener: ClusterInfoVoteListener, cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage, broadcast_stage: BroadcastStage,
tpu_quic_t: thread::JoinHandle<()>, tpu_quic_t: thread::JoinHandle<()>,
transaction_weight_stage: TransactionWeightStage,
vote_transaction_weight_stage: TransactionWeightStage,
} }
impl Tpu { impl Tpu {
@ -103,6 +106,25 @@ impl Tpu {
poh_recorder, poh_recorder,
tpu_coalesce_ms, tpu_coalesce_ms,
); );
let (weighted_sender, weighted_receiver) = unbounded();
let transaction_weight_stage = TransactionWeightStage::new(
packet_receiver,
weighted_sender,
bank_forks.clone(),
cluster_info.clone(),
);
let (vote_weighted_sender, vote_weighted_receiver) = unbounded();
let vote_transaction_weight_stage = TransactionWeightStage::new(
vote_packet_receiver,
vote_weighted_sender,
bank_forks.clone(),
cluster_info.clone(),
);
let (verified_sender, verified_receiver) = unbounded(); let (verified_sender, verified_receiver) = unbounded();
let tpu_quic_t = solana_streamer::quic::spawn_server( let tpu_quic_t = solana_streamer::quic::spawn_server(
@ -117,7 +139,7 @@ impl Tpu {
let sigverify_stage = { let sigverify_stage = {
let verifier = TransactionSigVerifier::default(); let verifier = TransactionSigVerifier::default();
SigVerifyStage::new(packet_receiver, verified_sender, verifier) SigVerifyStage::new(weighted_receiver, verified_sender, verifier)
}; };
let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded(); let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded();
@ -125,7 +147,7 @@ impl Tpu {
let vote_sigverify_stage = { let vote_sigverify_stage = {
let verifier = TransactionSigVerifier::new_reject_non_vote(); let verifier = TransactionSigVerifier::new_reject_non_vote();
SigVerifyStage::new( SigVerifyStage::new(
vote_packet_receiver, vote_weighted_receiver,
verified_tpu_vote_packets_sender, verified_tpu_vote_packets_sender,
verifier, verifier,
) )
@ -179,6 +201,8 @@ impl Tpu {
cluster_info_vote_listener, cluster_info_vote_listener,
broadcast_stage, broadcast_stage,
tpu_quic_t, tpu_quic_t,
transaction_weight_stage,
vote_transaction_weight_stage,
} }
} }
@ -189,6 +213,8 @@ impl Tpu {
self.vote_sigverify_stage.join(), self.vote_sigverify_stage.join(),
self.cluster_info_vote_listener.join(), self.cluster_info_vote_listener.join(),
self.banking_stage.join(), self.banking_stage.join(),
self.transaction_weight_stage.join(),
self.vote_transaction_weight_stage.join(),
]; ];
self.tpu_quic_t.join()?; self.tpu_quic_t.join()?;
let broadcast_result = self.broadcast_stage.join(); let broadcast_result = self.broadcast_stage.join();

View File

@ -0,0 +1,77 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
rayon::prelude::*,
solana_gossip::cluster_info::ClusterInfo,
solana_perf::packet::PacketBatch,
solana_runtime::bank_forks::BankForks,
solana_streamer::streamer::{self, StreamerError},
std::{
collections::HashMap,
net::IpAddr,
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
time::Instant,
},
};
pub struct TransactionWeightStage {
thread_hdl: JoinHandle<()>,
}
impl TransactionWeightStage {
pub fn new(
packet_receiver: Receiver<PacketBatch>,
sender: Sender<Vec<PacketBatch>>,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-tx-weight".to_string())
.spawn(move || {
let mut last_stakes = Instant::now();
let mut ip_to_stake: HashMap<IpAddr, u64> = HashMap::new();
loop {
if last_stakes.elapsed().as_millis() > 1000 {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
ip_to_stake = cluster_info
.tvu_peers()
.into_iter()
.filter_map(|node| {
let stake = staked_nodes.get(&node.id)?;
Some((node.tvu.ip(), *stake))
})
.collect();
last_stakes = Instant::now();
}
match streamer::recv_packet_batches(&packet_receiver) {
Ok((mut batches, _num_packets, _recv_duration)) => {
Self::apply_weights(&mut batches, &ip_to_stake);
if let Err(e) = sender.send(batches) {
info!("Sender error: {:?}", e);
}
}
Err(e) => match e {
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
StreamerError::RecvTimeout(RecvTimeoutError::Timeout) => (),
_ => error!("error: {:?}", e),
},
}
}
})
.unwrap();
Self { thread_hdl }
}
fn apply_weights(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
batches.into_par_iter().for_each(|batch| {
batch.packets.par_iter_mut().for_each(|packet| {
packet.meta.weight = *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0);
});
});
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -32,6 +32,7 @@ pub struct Meta {
pub addr: IpAddr, pub addr: IpAddr,
pub port: u16, pub port: u16,
pub flags: PacketFlags, pub flags: PacketFlags,
pub weight: u64,
} }
#[derive(Clone)] #[derive(Clone)]
@ -145,6 +146,7 @@ impl Default for Meta {
addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED), addr: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
port: 0, port: 0,
flags: PacketFlags::empty(), flags: PacketFlags::empty(),
weight: 0,
} }
} }
} }

View File

@ -256,6 +256,34 @@ fn recv_send(
Ok(()) Ok(())
} }
pub fn recv_vec_packet_batches(
recvr: &Receiver<Vec<PacketBatch>>,
) -> Result<(Vec<PacketBatch>, usize, Duration)> {
let timer = Duration::new(1, 0);
let mut packet_batches = recvr.recv_timeout(timer)?;
let recv_start = Instant::now();
trace!("got packets");
let mut num_packets = packet_batches
.iter()
.map(|packets| packets.packets.len())
.sum::<usize>();
while let Ok(packet_batch) = recvr.try_recv() {
trace!("got more packets");
num_packets += packet_batch
.iter()
.map(|packets| packets.packets.len())
.sum::<usize>();
packet_batches.extend(packet_batch);
}
let recv_duration = recv_start.elapsed();
trace!(
"packet batches len: {}, num packets: {}",
packet_batches.len(),
num_packets
);
Ok((packet_batches, num_packets, recv_duration))
}
pub fn recv_packet_batches( pub fn recv_packet_batches(
recvr: &PacketBatchReceiver, recvr: &PacketBatchReceiver,
) -> Result<(Vec<PacketBatch>, usize, Duration)> { ) -> Result<(Vec<PacketBatch>, usize, Duration)> {