diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index fa4719dfb..8fb10807a 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -381,6 +381,19 @@ impl PohRecorder { .slot_leader_at(current_slot + slots, None) } + /// Return the leader and slot pair after `slots_in_the_future` slots. + pub fn leader_and_slot_after_n_slots( + &self, + slots_in_the_future: u64, + ) -> Option<(Pubkey, Slot)> { + let target_slot = self + .slot_for_tick_height(self.tick_height) + .checked_add(slots_in_the_future)?; + self.leader_schedule_cache + .slot_leader_at(target_slot, None) + .map(|leader| (leader, target_slot)) + } + pub fn next_slot_leader(&self) -> Option { self.leader_after_n_slots(1) } diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index c7b6436c8..5a692944f 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -1,7 +1,10 @@ use { solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol}, solana_poh::poh_recorder::PohRecorder, - solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey}, + solana_sdk::{ + clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, + pubkey::Pubkey, + }, solana_send_transaction_service::tpu_info::TpuInfo, std::{ collections::HashMap, @@ -64,6 +67,39 @@ impl TpuInfo for ClusterTpuInfo { } unique_leaders } + + fn get_leader_tpus_with_slots( + &self, + max_count: u64, + protocol: Protocol, + ) -> Vec<(&SocketAddr, Slot)> { + let recorder = self.poh_recorder.read().unwrap(); + let leaders: Vec<_> = (0..max_count) + .filter_map(|future_slot| { + let future_slot = max_count.wrapping_sub(future_slot); + NUM_CONSECUTIVE_LEADER_SLOTS + .checked_mul(future_slot) + .and_then(|slots_in_the_future| { + recorder.leader_and_slot_after_n_slots(slots_in_the_future) + }) + }) + .collect(); + drop(recorder); + let addrs_to_slots = leaders + .into_iter() + .filter_map(|(leader_id, leader_slot)| { + self.recent_peers + .get(&leader_id) + .map(|(udp_tpu, quic_tpu)| match protocol { + Protocol::UDP => (udp_tpu, leader_slot), + Protocol::QUIC => (quic_tpu, leader_slot), + }) + }) + .collect::>(); + let mut unique_leaders = Vec::from_iter(addrs_to_slots); + unique_leaders.sort_by_key(|(_addr, slot)| *slot); + unique_leaders + } } #[cfg(test)] diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index ad7d5da97..4ee55f79b 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -10,8 +10,8 @@ use { solana_metrics::datapoint_warn, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ - hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, - timing::AtomicInterval, transport::TransportError, + clock::Slot, hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, + signature::Signature, timing::AtomicInterval, transport::TransportError, }, std::{ collections::{ @@ -565,7 +565,7 @@ impl SendTransactionService { stats: &SendTransactionServiceStats, ) { // Processing the transactions in batch - let addresses = Self::get_tpu_addresses( + let addresses = Self::get_tpu_addresses_with_slots( tpu_address, leader_info, config, @@ -574,11 +574,17 @@ impl SendTransactionService { let wire_transactions = transactions .iter() - .map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref()) + .map(|(_, transaction_info)| { + debug!( + "Sending transacation {} to (address, slot): {:?}", + transaction_info.signature, addresses, + ); + transaction_info.wire_transaction.as_ref() + }) .collect::>(); for address in &addresses { - Self::send_transactions(address, &wire_transactions, connection_cache, stats); + Self::send_transactions(address.0, &wire_transactions, connection_cache, stats); } } @@ -777,6 +783,21 @@ impl SendTransactionService { .unwrap_or_else(|| vec![tpu_address]) } + fn get_tpu_addresses_with_slots<'a, T: TpuInfo>( + tpu_address: &'a SocketAddr, + leader_info: Option<&'a T>, + config: &'a Config, + protocol: Protocol, + ) -> Vec<(&'a SocketAddr, Slot)> { + leader_info + .as_ref() + .map(|leader_info| { + leader_info.get_leader_tpus_with_slots(config.leader_forward_count, protocol) + }) + .filter(|addresses| !addresses.is_empty()) + .unwrap_or_else(|| vec![(tpu_address, 0)]) + } + pub fn join(self) -> thread::Result<()> { self.receive_txn_thread.join()?; self.exit.store(true, Ordering::Relaxed); diff --git a/send-transaction-service/src/tpu_info.rs b/send-transaction-service/src/tpu_info.rs index 6ecd56ae4..456a0ced0 100644 --- a/send-transaction-service/src/tpu_info.rs +++ b/send-transaction-service/src/tpu_info.rs @@ -1,8 +1,14 @@ -use {solana_client::connection_cache::Protocol, std::net::SocketAddr}; +use {solana_client::connection_cache::Protocol, solana_sdk::clock::Slot, std::net::SocketAddr}; pub trait TpuInfo { fn refresh_recent_peers(&mut self); fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; + /// In addition to the the tpu address, also return the leader slot + fn get_leader_tpus_with_slots( + &self, + max_count: u64, + protocol: Protocol, + ) -> Vec<(&SocketAddr, Slot)>; } #[derive(Clone)] @@ -13,4 +19,11 @@ impl TpuInfo for NullTpuInfo { fn get_leader_tpus(&self, _max_count: u64, _protocol: Protocol) -> Vec<&SocketAddr> { vec![] } + fn get_leader_tpus_with_slots( + &self, + _max_count: u64, + _protocol: Protocol, + ) -> Vec<(&SocketAddr, Slot)> { + vec![] + } }