add timing metrics, some renaming
This commit is contained in:
parent
fd515097d8
commit
c478fe2047
|
@ -1,9 +1,5 @@
|
|||
use {
|
||||
crate::{
|
||||
broadcast_stage::BroadcastStage,
|
||||
find_packet_sender_stake_stage::FindPacketSenderStakeStage,
|
||||
retransmit_stage::RetransmitStage,
|
||||
},
|
||||
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
|
||||
itertools::Itertools,
|
||||
lru::LruCache,
|
||||
rand::{seq::SliceRandom, Rng, SeedableRng},
|
||||
|
@ -32,7 +28,7 @@ use {
|
|||
collections::HashMap,
|
||||
iter::repeat_with,
|
||||
marker::PhantomData,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||
net::SocketAddr,
|
||||
ops::Deref,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant},
|
||||
|
@ -317,19 +313,6 @@ impl ClusterNodes<RetransmitStage> {
|
|||
}
|
||||
}
|
||||
|
||||
impl ClusterNodes<FindPacketSenderStakeStage> {
|
||||
pub(crate) fn get_ip_to_stakes(&self) -> HashMap<IpAddr, u64> {
|
||||
self.compat_index
|
||||
.iter()
|
||||
.filter_map(|(_, i)| {
|
||||
let node = &self.nodes[*i];
|
||||
let contact_info = node.contact_info()?;
|
||||
Some((contact_info.tvu.ip(), node.stake))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_cluster_nodes<T: 'static>(
|
||||
cluster_info: &ClusterInfo,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
|
@ -505,20 +488,9 @@ pub fn make_test_cluster<R: Rng>(
|
|||
ClusterInfo,
|
||||
) {
|
||||
let (unstaked_numerator, unstaked_denominator) = unstaked_ratio.unwrap_or((1, 7));
|
||||
let mut ip_addr_octet: usize = 0;
|
||||
let mut nodes: Vec<_> = repeat_with(|| {
|
||||
let mut contact_info = ContactInfo::new_rand(rng, None);
|
||||
contact_info.tvu.set_ip(IpAddr::V4(Ipv4Addr::new(
|
||||
127,
|
||||
0,
|
||||
0,
|
||||
(ip_addr_octet % 256) as u8,
|
||||
)));
|
||||
ip_addr_octet += 1;
|
||||
contact_info
|
||||
})
|
||||
.take(num_nodes)
|
||||
.collect();
|
||||
let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None))
|
||||
.take(num_nodes)
|
||||
.collect();
|
||||
nodes.shuffle(rng);
|
||||
let this_node = nodes[0].clone();
|
||||
let mut stakes: HashMap<Pubkey, u64> = nodes
|
||||
|
@ -713,35 +685,4 @@ mod tests {
|
|||
assert_eq!(*peer, peers[index]);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cluster_nodes_transaction_weight() {
|
||||
solana_logger::setup();
|
||||
let mut rng = rand::thread_rng();
|
||||
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 14, None);
|
||||
let cluster_nodes = new_cluster_nodes::<FindPacketSenderStakeStage>(&cluster_info, &stakes);
|
||||
|
||||
// All nodes with contact-info should be in the index.
|
||||
assert_eq!(cluster_nodes.compat_index.len(), nodes.len());
|
||||
// Staked nodes with no contact-info should be included.
|
||||
assert!(cluster_nodes.nodes.len() > nodes.len());
|
||||
|
||||
let ip_to_stake = cluster_nodes.get_ip_to_stakes();
|
||||
|
||||
// Only staked nodes with contact_info should be in the ip_to_stake
|
||||
let stacked_nodes_with_contact_info: HashMap<_, _> = stakes
|
||||
.iter()
|
||||
.filter_map(|(pubkey, stake)| {
|
||||
let node = nodes.iter().find(|node| node.id == *pubkey)?;
|
||||
Some((node.tvu.ip(), stake))
|
||||
})
|
||||
.collect();
|
||||
ip_to_stake.iter().for_each(|(ip, stake)| {
|
||||
// ignoring the 0 staked, because non-stacked nodes are defaulted into 0 stake.
|
||||
if *stake > 0 {
|
||||
let expected_stake = stacked_nodes_with_contact_info.get(ip).unwrap();
|
||||
assert_eq!(stake, *expected_stake);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
use {
|
||||
crate::cluster_nodes::ClusterNodesCache,
|
||||
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
|
||||
rayon::{prelude::*, ThreadPool},
|
||||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_measure::measure::Measure,
|
||||
solana_perf::packet::PacketBatch,
|
||||
solana_rayon_threadlimit::get_thread_count,
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::timing::timestamp,
|
||||
solana_streamer::streamer::{self, StreamerError},
|
||||
std::{
|
||||
cell::RefCell,
|
||||
|
@ -17,9 +18,7 @@ use {
|
|||
},
|
||||
};
|
||||
|
||||
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
|
||||
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
|
||||
const STAKES_REFRESH_PERIOD_IN_MS: u128 = 1000;
|
||||
const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5);
|
||||
|
||||
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(get_thread_count())
|
||||
|
@ -27,43 +26,109 @@ thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::
|
|||
.build()
|
||||
.unwrap()));
|
||||
|
||||
pub type FindPacketSenderStakeSender = Sender<Vec<PacketBatch>>;
|
||||
pub type FindPacketSenderStakeReceiver = Receiver<Vec<PacketBatch>>;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct FindPacketSenderStakeStats {
|
||||
last_print: u64,
|
||||
refresh_ip_to_stake_time: u64,
|
||||
apply_sender_stakes_time: u64,
|
||||
send_batches_time: u64,
|
||||
receive_batches_time: u64,
|
||||
total_batches: u64,
|
||||
total_packets: u64,
|
||||
}
|
||||
|
||||
impl FindPacketSenderStakeStats {
|
||||
fn report(&mut self) {
|
||||
let now = timestamp();
|
||||
let elapsed_ms = now - self.last_print;
|
||||
if elapsed_ms > 2000 {
|
||||
datapoint_info!(
|
||||
"find_packet_sender_stake-services_stats",
|
||||
(
|
||||
"refresh_ip_to_stake_time",
|
||||
self.refresh_ip_to_stake_time as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"apply_sender_stakes_time",
|
||||
self.apply_sender_stakes_time as i64,
|
||||
i64
|
||||
),
|
||||
("send_batches_time", self.send_batches_time as i64, i64),
|
||||
(
|
||||
"receive_batches_time",
|
||||
self.receive_batches_time as i64,
|
||||
i64
|
||||
),
|
||||
("total_batches", self.total_batches as i64, i64),
|
||||
("total_packets", self.total_packets as i64, i64),
|
||||
);
|
||||
*self = FindPacketSenderStakeStats::default();
|
||||
self.last_print = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FindPacketSenderStakeStage {
|
||||
thread_hdl: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl FindPacketSenderStakeStage {
|
||||
pub fn new(
|
||||
packet_receiver: Receiver<PacketBatch>,
|
||||
sender: Sender<Vec<PacketBatch>>,
|
||||
packet_receiver: streamer::PacketBatchReceiver,
|
||||
sender: FindPacketSenderStakeSender,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
) -> Self {
|
||||
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<FindPacketSenderStakeStage>::new(
|
||||
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
|
||||
CLUSTER_NODES_CACHE_TTL,
|
||||
));
|
||||
let mut stats = FindPacketSenderStakeStats::default();
|
||||
let thread_hdl = Builder::new()
|
||||
.name("sol-tx-sender_stake".to_string())
|
||||
.name("find-packet-sender-stake".to_string())
|
||||
.spawn(move || {
|
||||
let mut last_stakes = Instant::now();
|
||||
let mut ip_to_stake: HashMap<IpAddr, u64> = HashMap::new();
|
||||
loop {
|
||||
if last_stakes.elapsed().as_millis() > STAKES_REFRESH_PERIOD_IN_MS {
|
||||
let (root_bank, working_bank) = {
|
||||
let bank_forks = bank_forks.read().unwrap();
|
||||
(bank_forks.root_bank(), bank_forks.working_bank())
|
||||
};
|
||||
ip_to_stake = cluster_nodes_cache
|
||||
.get(root_bank.slot(), &root_bank, &working_bank, &cluster_info)
|
||||
.get_ip_to_stakes();
|
||||
last_stakes = Instant::now();
|
||||
}
|
||||
let mut refresh_ip_to_stake_time = Measure::start("refresh_ip_to_stake_time");
|
||||
Self::try_refresh_ip_to_stake(
|
||||
&mut last_stakes,
|
||||
&mut ip_to_stake,
|
||||
bank_forks.clone(),
|
||||
cluster_info.clone(),
|
||||
);
|
||||
refresh_ip_to_stake_time.stop();
|
||||
stats.refresh_ip_to_stake_time = stats
|
||||
.refresh_ip_to_stake_time
|
||||
.saturating_add(refresh_ip_to_stake_time.as_us());
|
||||
|
||||
match streamer::recv_packet_batches(&packet_receiver) {
|
||||
Ok((mut batches, _num_packets, _recv_duration)) => {
|
||||
Ok((mut batches, num_packets, recv_duration)) => {
|
||||
let num_batches = batches.len();
|
||||
let mut apply_sender_stakes_time =
|
||||
Measure::start("apply_sender_stakes_time");
|
||||
Self::apply_sender_stakes(&mut batches, &ip_to_stake);
|
||||
apply_sender_stakes_time.stop();
|
||||
|
||||
let mut send_batches_time = Measure::start("send_batches_time");
|
||||
if let Err(e) = sender.send(batches) {
|
||||
info!("Sender error: {:?}", e);
|
||||
}
|
||||
send_batches_time.stop();
|
||||
|
||||
stats.apply_sender_stakes_time = stats
|
||||
.apply_sender_stakes_time
|
||||
.saturating_add(apply_sender_stakes_time.as_us());
|
||||
stats.send_batches_time = stats
|
||||
.send_batches_time
|
||||
.saturating_add(send_batches_time.as_us());
|
||||
stats.receive_batches_time = stats
|
||||
.receive_batches_time
|
||||
.saturating_add(recv_duration.as_nanos() as u64);
|
||||
stats.total_batches =
|
||||
stats.total_batches.saturating_add(num_batches as u64);
|
||||
stats.total_packets =
|
||||
stats.total_packets.saturating_add(num_packets as u64);
|
||||
}
|
||||
Err(e) => match e {
|
||||
StreamerError::RecvTimeout(RecvTimeoutError::Disconnected) => break,
|
||||
|
@ -71,12 +136,35 @@ impl FindPacketSenderStakeStage {
|
|||
_ => error!("error: {:?}", e),
|
||||
},
|
||||
}
|
||||
|
||||
stats.report();
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
Self { thread_hdl }
|
||||
}
|
||||
|
||||
fn try_refresh_ip_to_stake(
|
||||
last_stakes: &mut Instant,
|
||||
ip_to_stake: &mut HashMap<IpAddr, u64>,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
) {
|
||||
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
let staked_nodes = root_bank.staked_nodes();
|
||||
*ip_to_stake = cluster_info
|
||||
.tvu_peers()
|
||||
.into_iter()
|
||||
.filter_map(|node| {
|
||||
let stake = staked_nodes.get(&node.id)?;
|
||||
Some((node.tvu.ip(), *stake))
|
||||
})
|
||||
.collect();
|
||||
*last_stakes = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
|
||||
PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
|
|
|
@ -6,9 +6,9 @@
|
|||
//! if perf-libs are available
|
||||
|
||||
use {
|
||||
crate::sigverify,
|
||||
crate::{find_packet_sender_stake_stage, sigverify},
|
||||
core::time::Duration,
|
||||
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
|
||||
crossbeam_channel::{RecvTimeoutError, SendError, Sender},
|
||||
itertools::Itertools,
|
||||
solana_measure::measure::Measure,
|
||||
solana_perf::{
|
||||
|
@ -192,7 +192,7 @@ impl SigVerifier for DisabledSigVerifier {
|
|||
impl SigVerifyStage {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new<T: SigVerifier + 'static + Send + Clone>(
|
||||
packet_receiver: Receiver<Vec<PacketBatch>>,
|
||||
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
|
||||
verified_sender: Sender<Vec<PacketBatch>>,
|
||||
verifier: T,
|
||||
) -> Self {
|
||||
|
@ -227,7 +227,7 @@ impl SigVerifyStage {
|
|||
|
||||
fn verifier<T: SigVerifier>(
|
||||
deduper: &Deduper,
|
||||
recvr: &Receiver<Vec<PacketBatch>>,
|
||||
recvr: &find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
|
||||
sendr: &Sender<Vec<PacketBatch>>,
|
||||
verifier: &T,
|
||||
stats: &mut SigVerifierStats,
|
||||
|
@ -312,7 +312,7 @@ impl SigVerifyStage {
|
|||
}
|
||||
|
||||
fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
|
||||
packet_receiver: Receiver<Vec<PacketBatch>>,
|
||||
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
|
||||
verified_sender: Sender<Vec<PacketBatch>>,
|
||||
verifier: &T,
|
||||
) -> JoinHandle<()> {
|
||||
|
@ -358,7 +358,7 @@ impl SigVerifyStage {
|
|||
}
|
||||
|
||||
fn verifier_services<T: SigVerifier + 'static + Send + Clone>(
|
||||
packet_receiver: Receiver<Vec<PacketBatch>>,
|
||||
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
|
||||
verified_sender: Sender<Vec<PacketBatch>>,
|
||||
verifier: T,
|
||||
) -> JoinHandle<()> {
|
||||
|
|
|
@ -56,8 +56,8 @@ pub struct Tpu {
|
|||
cluster_info_vote_listener: ClusterInfoVoteListener,
|
||||
broadcast_stage: BroadcastStage,
|
||||
tpu_quic_t: thread::JoinHandle<()>,
|
||||
transaction_weight_stage: FindPacketSenderStakeStage,
|
||||
vote_transaction_weight_stage: FindPacketSenderStakeStage,
|
||||
find_packet_sender_stake_stage: FindPacketSenderStakeStage,
|
||||
vote_find_packet_sender_stake_stage: FindPacketSenderStakeStage,
|
||||
}
|
||||
|
||||
impl Tpu {
|
||||
|
@ -107,20 +107,21 @@ impl Tpu {
|
|||
tpu_coalesce_ms,
|
||||
);
|
||||
|
||||
let (weighted_packet_sender, weighted_packet_receiver) = unbounded();
|
||||
let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded();
|
||||
|
||||
let transaction_weight_stage = FindPacketSenderStakeStage::new(
|
||||
let find_packet_sender_stake_stage = FindPacketSenderStakeStage::new(
|
||||
packet_receiver,
|
||||
weighted_packet_sender,
|
||||
find_packet_sender_stake_sender,
|
||||
bank_forks.clone(),
|
||||
cluster_info.clone(),
|
||||
);
|
||||
|
||||
let (vote_weighted_packet_sender, vote_weighted_packet_receiver) = unbounded();
|
||||
let (vote_find_packet_sender_stake_sender, vote_find_packet_sender_stake_receiver) =
|
||||
unbounded();
|
||||
|
||||
let vote_transaction_weight_stage = FindPacketSenderStakeStage::new(
|
||||
let vote_find_packet_sender_stake_stage = FindPacketSenderStakeStage::new(
|
||||
vote_packet_receiver,
|
||||
vote_weighted_packet_sender,
|
||||
vote_find_packet_sender_stake_sender,
|
||||
bank_forks.clone(),
|
||||
cluster_info.clone(),
|
||||
);
|
||||
|
@ -139,7 +140,7 @@ impl Tpu {
|
|||
|
||||
let sigverify_stage = {
|
||||
let verifier = TransactionSigVerifier::default();
|
||||
SigVerifyStage::new(weighted_packet_receiver, verified_sender, verifier)
|
||||
SigVerifyStage::new(find_packet_sender_stake_receiver, verified_sender, verifier)
|
||||
};
|
||||
|
||||
let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded();
|
||||
|
@ -147,7 +148,7 @@ impl Tpu {
|
|||
let vote_sigverify_stage = {
|
||||
let verifier = TransactionSigVerifier::new_reject_non_vote();
|
||||
SigVerifyStage::new(
|
||||
vote_weighted_packet_receiver,
|
||||
vote_find_packet_sender_stake_receiver,
|
||||
verified_tpu_vote_packets_sender,
|
||||
verifier,
|
||||
)
|
||||
|
@ -201,8 +202,8 @@ impl Tpu {
|
|||
cluster_info_vote_listener,
|
||||
broadcast_stage,
|
||||
tpu_quic_t,
|
||||
transaction_weight_stage,
|
||||
vote_transaction_weight_stage,
|
||||
find_packet_sender_stake_stage,
|
||||
vote_find_packet_sender_stake_stage,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -213,8 +214,8 @@ impl Tpu {
|
|||
self.vote_sigverify_stage.join(),
|
||||
self.cluster_info_vote_listener.join(),
|
||||
self.banking_stage.join(),
|
||||
self.transaction_weight_stage.join(),
|
||||
self.vote_transaction_weight_stage.join(),
|
||||
self.find_packet_sender_stake_stage.join(),
|
||||
self.vote_find_packet_sender_stake_stage.join(),
|
||||
];
|
||||
self.tpu_quic_t.join()?;
|
||||
let broadcast_result = self.broadcast_stage.join();
|
||||
|
|
Loading…
Reference in New Issue