Get slot info along tpu leader (#32942)
Created an overload of get_leader_tpus: get_leader_tpus_with_slots to get the tpu address along with the slot. And put that information into the debug log. This information can be compared with where the the transaction confirmed to measure leader accuracy. This help us better understand the txn latency.
This commit is contained in:
parent
d3fb54e0b2
commit
2642b8a552
|
@ -381,6 +381,19 @@ impl PohRecorder {
|
|||
.slot_leader_at(current_slot + slots, None)
|
||||
}
|
||||
|
||||
/// Return the leader and slot pair after `slots_in_the_future` slots.
|
||||
pub fn leader_and_slot_after_n_slots(
|
||||
&self,
|
||||
slots_in_the_future: u64,
|
||||
) -> Option<(Pubkey, Slot)> {
|
||||
let target_slot = self
|
||||
.slot_for_tick_height(self.tick_height)
|
||||
.checked_add(slots_in_the_future)?;
|
||||
self.leader_schedule_cache
|
||||
.slot_leader_at(target_slot, None)
|
||||
.map(|leader| (leader, target_slot))
|
||||
}
|
||||
|
||||
pub fn next_slot_leader(&self) -> Option<Pubkey> {
|
||||
self.leader_after_n_slots(1)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
use {
|
||||
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
|
||||
solana_poh::poh_recorder::PohRecorder,
|
||||
solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey},
|
||||
solana_sdk::{
|
||||
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
|
||||
pubkey::Pubkey,
|
||||
},
|
||||
solana_send_transaction_service::tpu_info::TpuInfo,
|
||||
std::{
|
||||
collections::HashMap,
|
||||
|
@ -64,6 +67,39 @@ impl TpuInfo for ClusterTpuInfo {
|
|||
}
|
||||
unique_leaders
|
||||
}
|
||||
|
||||
fn get_leader_tpus_with_slots(
|
||||
&self,
|
||||
max_count: u64,
|
||||
protocol: Protocol,
|
||||
) -> Vec<(&SocketAddr, Slot)> {
|
||||
let recorder = self.poh_recorder.read().unwrap();
|
||||
let leaders: Vec<_> = (0..max_count)
|
||||
.filter_map(|future_slot| {
|
||||
let future_slot = max_count.wrapping_sub(future_slot);
|
||||
NUM_CONSECUTIVE_LEADER_SLOTS
|
||||
.checked_mul(future_slot)
|
||||
.and_then(|slots_in_the_future| {
|
||||
recorder.leader_and_slot_after_n_slots(slots_in_the_future)
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
drop(recorder);
|
||||
let addrs_to_slots = leaders
|
||||
.into_iter()
|
||||
.filter_map(|(leader_id, leader_slot)| {
|
||||
self.recent_peers
|
||||
.get(&leader_id)
|
||||
.map(|(udp_tpu, quic_tpu)| match protocol {
|
||||
Protocol::UDP => (udp_tpu, leader_slot),
|
||||
Protocol::QUIC => (quic_tpu, leader_slot),
|
||||
})
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
let mut unique_leaders = Vec::from_iter(addrs_to_slots);
|
||||
unique_leaders.sort_by_key(|(_addr, slot)| *slot);
|
||||
unique_leaders
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -10,8 +10,8 @@ use {
|
|||
solana_metrics::datapoint_warn,
|
||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||
solana_sdk::{
|
||||
hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature,
|
||||
timing::AtomicInterval, transport::TransportError,
|
||||
clock::Slot, hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign,
|
||||
signature::Signature, timing::AtomicInterval, transport::TransportError,
|
||||
},
|
||||
std::{
|
||||
collections::{
|
||||
|
@ -565,7 +565,7 @@ impl SendTransactionService {
|
|||
stats: &SendTransactionServiceStats,
|
||||
) {
|
||||
// Processing the transactions in batch
|
||||
let addresses = Self::get_tpu_addresses(
|
||||
let addresses = Self::get_tpu_addresses_with_slots(
|
||||
tpu_address,
|
||||
leader_info,
|
||||
config,
|
||||
|
@ -574,11 +574,17 @@ impl SendTransactionService {
|
|||
|
||||
let wire_transactions = transactions
|
||||
.iter()
|
||||
.map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref())
|
||||
.map(|(_, transaction_info)| {
|
||||
debug!(
|
||||
"Sending transacation {} to (address, slot): {:?}",
|
||||
transaction_info.signature, addresses,
|
||||
);
|
||||
transaction_info.wire_transaction.as_ref()
|
||||
})
|
||||
.collect::<Vec<&[u8]>>();
|
||||
|
||||
for address in &addresses {
|
||||
Self::send_transactions(address, &wire_transactions, connection_cache, stats);
|
||||
Self::send_transactions(address.0, &wire_transactions, connection_cache, stats);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -777,6 +783,21 @@ impl SendTransactionService {
|
|||
.unwrap_or_else(|| vec![tpu_address])
|
||||
}
|
||||
|
||||
fn get_tpu_addresses_with_slots<'a, T: TpuInfo>(
|
||||
tpu_address: &'a SocketAddr,
|
||||
leader_info: Option<&'a T>,
|
||||
config: &'a Config,
|
||||
protocol: Protocol,
|
||||
) -> Vec<(&'a SocketAddr, Slot)> {
|
||||
leader_info
|
||||
.as_ref()
|
||||
.map(|leader_info| {
|
||||
leader_info.get_leader_tpus_with_slots(config.leader_forward_count, protocol)
|
||||
})
|
||||
.filter(|addresses| !addresses.is_empty())
|
||||
.unwrap_or_else(|| vec![(tpu_address, 0)])
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.receive_txn_thread.join()?;
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
use {solana_client::connection_cache::Protocol, std::net::SocketAddr};
|
||||
use {solana_client::connection_cache::Protocol, solana_sdk::clock::Slot, std::net::SocketAddr};
|
||||
|
||||
pub trait TpuInfo {
|
||||
fn refresh_recent_peers(&mut self);
|
||||
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>;
|
||||
/// In addition to the the tpu address, also return the leader slot
|
||||
fn get_leader_tpus_with_slots(
|
||||
&self,
|
||||
max_count: u64,
|
||||
protocol: Protocol,
|
||||
) -> Vec<(&SocketAddr, Slot)>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -13,4 +19,11 @@ impl TpuInfo for NullTpuInfo {
|
|||
fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> {
|
||||
vec![]
|
||||
}
|
||||
fn get_leader_tpus_with_slots(
|
||||
&self,
|
||||
_max_count: u64,
|
||||
_protocol: Protocol,
|
||||
) -> Vec<(&SocketAddr, Slot)> {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue