diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 8840aa4914..d568063984 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -444,7 +444,7 @@ pub struct CliConfig<'a> { pub websocket_url: String, pub signers: Vec<&'a dyn Signer>, pub keypair_path: String, - pub rpc_client: Option, + pub rpc_client: Option>, pub rpc_timeout: Duration, pub verbose: bool, pub output_format: OutputFormat, @@ -1284,17 +1284,15 @@ pub fn process_command(config: &CliConfig) -> ProcessResult { println_name_value("Commitment:", &config.commitment.commitment.to_string()); } - let mut _rpc_client; let rpc_client = if config.rpc_client.is_none() { - _rpc_client = RpcClient::new_with_timeout_and_commitment( + Arc::new(RpcClient::new_with_timeout_and_commitment( config.json_rpc_url.to_string(), config.rpc_timeout, config.commitment, - ); - &_rpc_client + )) } else { // Primarily for testing - config.rpc_client.as_ref().unwrap() + config.rpc_client.as_ref().unwrap().clone() }; match &config.command { @@ -1502,7 +1500,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult { use_deprecated_loader, allow_excessive_balance, } => process_deploy( - &rpc_client, + rpc_client, config, program_location, *address, @@ -1510,7 +1508,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult { *allow_excessive_balance, ), CliCommand::Program(program_subcommand) => { - process_program_subcommand(&rpc_client, config, program_subcommand) + process_program_subcommand(rpc_client, config, program_subcommand) } // Stake Commands @@ -2585,7 +2583,7 @@ mod tests { fn test_cli_process_command() { // Success cases let mut config = CliConfig { - rpc_client: Some(RpcClient::new_mock("succeeds".to_string())), + rpc_client: Some(Arc::new(RpcClient::new_mock("succeeds".to_string()))), json_rpc_url: "http://127.0.0.1:8899".to_string(), ..CliConfig::default() }; @@ -2785,13 +2783,13 @@ mod tests { assert!(process_command(&config).is_ok()); // sig_not_found case - config.rpc_client = Some(RpcClient::new_mock("sig_not_found".to_string())); + config.rpc_client = Some(Arc::new(RpcClient::new_mock("sig_not_found".to_string()))); let missing_signature = Signature::new(&bs58::decode("5VERv8NMvzbJMEkV8xnrLkEaWRtSz9CosKDYjCJjBRnbJLgp8uirBgmQpjKhoR4tjF3ZpRzrFmBV6UjKdiSZkQUW").into_vec().unwrap()); config.command = CliCommand::Confirm(missing_signature); assert_eq!(process_command(&config).unwrap(), "Not found"); // Tx error case - config.rpc_client = Some(RpcClient::new_mock("account_in_use".to_string())); + config.rpc_client = Some(Arc::new(RpcClient::new_mock("account_in_use".to_string()))); let any_signature = Signature::new(&bs58::decode(SIGNATURE).into_vec().unwrap()); config.command = CliCommand::Confirm(any_signature); assert_eq!( @@ -2800,7 +2798,7 @@ mod tests { ); // Failure cases - config.rpc_client = Some(RpcClient::new_mock("fails".to_string())); + config.rpc_client = Some(Arc::new(RpcClient::new_mock("fails".to_string()))); config.command = CliCommand::Airdrop { pubkey: None, @@ -2870,7 +2868,7 @@ mod tests { mocks.insert(RpcRequest::GetAccountInfo, account_info_response); let rpc_client = RpcClient::new_mock_with_mocks("".to_string(), mocks); - config.rpc_client = Some(rpc_client); + config.rpc_client = Some(Arc::new(rpc_client)); let default_keypair = Keypair::new(); config.signers = vec![&default_keypair]; diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 5a93afea3f..55d0b947f0 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -29,7 +29,6 @@ pub mod inflation; pub mod memo; pub mod nonce; pub mod program; -pub mod send_tpu; pub mod spend_utils; pub mod stake; pub mod test_utils; diff --git a/cli/src/program.rs b/cli/src/program.rs index fa61731b40..60778b29fd 100644 --- a/cli/src/program.rs +++ b/cli/src/program.rs @@ -1,4 +1,3 @@ -use crate::send_tpu::{get_leader_tpus, send_transaction_tpu}; use crate::{ checks::*, cli::{ @@ -6,7 +5,6 @@ use crate::{ ProcessResult, }, }; -use bincode::serialize; use bip39::{Language, Mnemonic, MnemonicType, Seed}; use clap::{App, AppSettings, Arg, ArgMatches, SubCommand}; use log::*; @@ -25,7 +23,7 @@ use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType}, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, - rpc_response::RpcLeaderSchedule, + tpu_client::{TpuClient, TpuClientConfig}, }; use solana_rbpf::vm::{Config, Executable}; use solana_remote_wallet::remote_wallet::RemoteWalletManager; @@ -51,20 +49,17 @@ use solana_sdk::{ }; use solana_transaction_status::TransactionConfirmationStatus; use std::{ - cmp::min, collections::HashMap, error, fs::File, io::{Read, Write}, - net::UdpSocket, path::PathBuf, sync::Arc, thread::sleep, - time::{Duration, Instant}, + time::Duration, }; const DATA_CHUNK_SIZE: usize = 229; // Keep program chunks under PACKET_DATA_SIZE -const NUM_TPU_LEADERS: u64 = 2; #[derive(Debug, PartialEq)] pub enum ProgramCliCommand { @@ -622,7 +617,7 @@ pub fn parse_program_subcommand( } pub fn process_program_subcommand( - rpc_client: &RpcClient, + rpc_client: Arc, config: &CliConfig, program_subcommand: &ProgramCliCommand, ) -> ProcessResult { @@ -638,7 +633,7 @@ pub fn process_program_subcommand( max_len, allow_excessive_balance, } => process_program_deploy( - &rpc_client, + rpc_client, config, program_location, *program_signer_index, @@ -657,7 +652,7 @@ pub fn process_program_subcommand( buffer_authority_signer_index, max_len, } => process_write_buffer( - &rpc_client, + rpc_client, config, program_location, *buffer_signer_index, @@ -746,7 +741,7 @@ fn get_default_program_keypair(program_location: &Option) -> Keypair { /// Deploy using upgradeable loader #[allow(clippy::too_many_arguments)] fn process_program_deploy( - rpc_client: &RpcClient, + rpc_client: Arc, config: &CliConfig, program_location: &Option, program_signer_index: Option, @@ -892,7 +887,7 @@ fn process_program_deploy( let result = if do_deploy { do_process_program_write_and_deploy( - rpc_client, + rpc_client.clone(), config, &program_data, buffer_data_len, @@ -907,7 +902,7 @@ fn process_program_deploy( ) } else { do_process_program_upgrade( - rpc_client, + rpc_client.clone(), config, &program_data, &program_pubkey, @@ -918,7 +913,7 @@ fn process_program_deploy( }; if result.is_ok() && is_final { process_set_authority( - rpc_client, + &rpc_client, config, Some(program_pubkey), None, @@ -933,7 +928,7 @@ fn process_program_deploy( } fn process_write_buffer( - rpc_client: &RpcClient, + rpc_client: Arc, config: &CliConfig, program_location: &str, buffer_signer_index: Option, @@ -1450,7 +1445,7 @@ fn process_close( /// Deploy using non-upgradeable loader pub fn process_deploy( - rpc_client: &RpcClient, + rpc_client: Arc, config: &CliConfig, program_location: &str, buffer_signer_index: Option, @@ -1495,7 +1490,7 @@ pub fn process_deploy( #[allow(clippy::too_many_arguments)] fn do_process_program_write_and_deploy( - rpc_client: &RpcClient, + rpc_client: Arc, config: &CliConfig, program_data: &[u8], buffer_data_len: usize, @@ -1633,7 +1628,7 @@ fn do_process_program_write_and_deploy( messages.push(message); } - check_payer(rpc_client, config, balance_needed, &messages)?; + check_payer(&rpc_client, config, balance_needed, &messages)?; send_deploy_messages( rpc_client, @@ -1660,7 +1655,7 @@ fn do_process_program_write_and_deploy( } fn do_process_program_upgrade( - rpc_client: &RpcClient, + rpc_client: Arc, config: &CliConfig, program_data: &[u8], program_id: &Pubkey, @@ -1756,7 +1751,7 @@ fn do_process_program_upgrade( ); messages.push(&final_message); - check_payer(rpc_client, config, balance_needed, &messages)?; + check_payer(&rpc_client, config, balance_needed, &messages)?; send_deploy_messages( rpc_client, config, @@ -1861,7 +1856,7 @@ fn check_payer( } fn send_deploy_messages( - rpc_client: &RpcClient, + rpc_client: Arc, config: &CliConfig, initial_message: &Option, write_messages: &Option>, @@ -1909,7 +1904,8 @@ fn send_deploy_messages( } send_and_confirm_transactions_with_spinner( - &rpc_client, + rpc_client.clone(), + &config.websocket_url, write_transactions, &[payer_signer, write_signer], config.commitment, @@ -1978,7 +1974,8 @@ fn report_ephemeral_mnemonic(words: usize, mnemonic: bip39::Mnemonic) { } fn send_and_confirm_transactions_with_spinner( - rpc_client: &RpcClient, + rpc_client: Arc, + websocket_url: &str, mut transactions: Vec, signer_keys: &T, commitment: CommitmentConfig, @@ -1986,39 +1983,19 @@ fn send_and_confirm_transactions_with_spinner( ) -> Result<(), Box> { let progress_bar = new_spinner_progress_bar(); let mut send_retries = 5; - let mut leader_schedule: Option = None; - let mut leader_schedule_epoch = 0; - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let cluster_nodes = rpc_client.get_cluster_nodes().ok(); + progress_bar.set_message("Finding leader nodes..."); + let tpu_client = TpuClient::new( + rpc_client.clone(), + websocket_url, + TpuClientConfig::default(), + )?; loop { - progress_bar.set_message("Finding leader nodes..."); - let epoch_info = rpc_client.get_epoch_info()?; - let mut slot = epoch_info.absolute_slot; - let mut last_epoch_fetch = Instant::now(); - if epoch_info.epoch > leader_schedule_epoch || leader_schedule.is_none() { - leader_schedule = rpc_client.get_leader_schedule(Some(epoch_info.absolute_slot))?; - leader_schedule_epoch = epoch_info.epoch; - } - - let mut tpu_addresses = get_leader_tpus( - min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), - NUM_TPU_LEADERS, - leader_schedule.as_ref(), - cluster_nodes.as_ref(), - ); - // Send all transactions let mut pending_transactions = HashMap::new(); let num_transactions = transactions.len(); for transaction in transactions { - if !tpu_addresses.is_empty() { - let wire_transaction = - serialize(&transaction).expect("serialization should succeed"); - for tpu_address in &tpu_addresses { - send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); - } - } else { + if !tpu_client.send_transaction(&transaction) { let _result = rpc_client .send_transaction_with_config( &transaction, @@ -2038,22 +2015,11 @@ fn send_and_confirm_transactions_with_spinner( // Throttle transactions to about 100 TPS sleep(Duration::from_millis(10)); - - // Update leader periodically - if last_epoch_fetch.elapsed() > Duration::from_millis(400) { - let epoch_info = rpc_client.get_epoch_info()?; - last_epoch_fetch = Instant::now(); - tpu_addresses = get_leader_tpus( - min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), - NUM_TPU_LEADERS, - leader_schedule.as_ref(), - cluster_nodes.as_ref(), - ); - } } // Collect statuses for all the transactions, drop those that are confirmed loop { + let mut slot = 0; let pending_signatures = pending_transactions.keys().cloned().collect::>(); for pending_signatures_chunk in pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS) @@ -2095,22 +2061,8 @@ fn send_and_confirm_transactions_with_spinner( break; } - let epoch_info = rpc_client.get_epoch_info()?; - tpu_addresses = get_leader_tpus( - min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), - NUM_TPU_LEADERS, - leader_schedule.as_ref(), - cluster_nodes.as_ref(), - ); - for transaction in pending_transactions.values() { - if !tpu_addresses.is_empty() { - let wire_transaction = - serialize(&transaction).expect("serialization should succeed"); - for tpu_address in &tpu_addresses { - send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); - } - } else { + if !tpu_client.send_transaction(transaction) { let _result = rpc_client .send_transaction_with_config( transaction, @@ -2933,7 +2885,7 @@ mod tests { write_keypair_file(&program_pubkey, &program_keypair_location).unwrap(); let config = CliConfig { - rpc_client: Some(RpcClient::new_mock("".to_string())), + rpc_client: Some(Arc::new(RpcClient::new_mock("".to_string()))), command: CliCommand::Program(ProgramCliCommand::Deploy { program_location: Some(program_location.to_str().unwrap().to_string()), buffer_signer_index: None, diff --git a/cli/src/send_tpu.rs b/cli/src/send_tpu.rs deleted file mode 100644 index 320e88e47a..0000000000 --- a/cli/src/send_tpu.rs +++ /dev/null @@ -1,46 +0,0 @@ -use log::*; -use solana_client::rpc_response::{RpcContactInfo, RpcLeaderSchedule}; -use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS; -use std::net::{SocketAddr, UdpSocket}; - -pub fn get_leader_tpus( - slot_index: u64, - num_leaders: u64, - leader_schedule: Option<&RpcLeaderSchedule>, - cluster_nodes: Option<&Vec>, -) -> Vec { - let leaders: Vec<_> = (0..num_leaders) - .filter_map(|i| { - leader_schedule? - .iter() - .find(|(_pubkey, slots)| { - slots.iter().any(|slot| { - *slot as u64 == (slot_index + (i * NUM_CONSECUTIVE_LEADER_SLOTS)) - }) - }) - .and_then(|(pubkey, _)| { - cluster_nodes? - .iter() - .find(|contact_info| contact_info.pubkey == *pubkey) - .and_then(|contact_info| contact_info.tpu) - }) - }) - .collect(); - let mut unique_leaders = vec![]; - for leader in leaders.into_iter() { - if !unique_leaders.contains(&leader) { - unique_leaders.push(leader); - } - } - unique_leaders -} - -pub fn send_transaction_tpu( - send_socket: &UdpSocket, - tpu_address: &SocketAddr, - wire_transaction: &[u8], -) { - if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) { - warn!("Failed to send transaction to {}: {:?}", tpu_address, err); - } -} diff --git a/client/src/lib.rs b/client/src/lib.rs index 18621f3b3e..61f354a62e 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -19,3 +19,4 @@ pub mod rpc_request; pub mod rpc_response; pub mod rpc_sender; pub mod thin_client; +pub mod tpu_client; diff --git a/client/src/mock_sender.rs b/client/src/mock_sender.rs index 9b89aa81fa..e40e456745 100644 --- a/client/src/mock_sender.rs +++ b/client/src/mock_sender.rs @@ -124,6 +124,7 @@ impl RpcSender for MockSender { } RpcRequest::GetTransactionCount => Value::Number(Number::from(1234)), RpcRequest::GetSlot => Value::Number(Number::from(0)), + RpcRequest::GetMaxShredInsertSlot => Value::Number(Number::from(0)), RpcRequest::RequestAirdrop => Value::String(Signature::new(&[8; 64]).to_string()), RpcRequest::SendTransaction => { let signature = if self.url == "malicious" { diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index 66c64e2c04..d3dd63bcc3 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -3,7 +3,9 @@ use { rpc_config::{ RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, }, - rpc_response::{Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo}, + rpc_response::{ + Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo, SlotUpdate, + }, }, log::*, serde::de::DeserializeOwned, @@ -340,6 +342,54 @@ impl PubsubClient { Ok((result, receiver)) } + + pub fn slot_updates_subscribe( + url: &str, + handler: impl Fn(SlotUpdate) + Send + 'static, + ) -> Result, PubsubClientError> { + let url = Url::parse(url)?; + let (socket, _response) = connect(url)?; + + let socket = Arc::new(RwLock::new(socket)); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let subscription_id = PubsubClientSubscription::::send_subscribe( + &socket, + json!({ + "jsonrpc":"2.0","id":1,"method":"slotsUpdatesSubscribe","params":[] + }) + .to_string(), + )?; + + let t_cleanup = { + let socket = socket.clone(); + std::thread::spawn(move || { + loop { + if exit_clone.load(Ordering::Relaxed) { + break; + } + match PubsubClientSubscription::read_message(&socket) { + Ok(message) => handler(message), + Err(err) => { + info!("receive error: {:?}", err); + break; + } + } + } + + info!("websocket - exited receive loop"); + }) + }; + + Ok(PubsubClientSubscription { + message_type: PhantomData, + operation: "slotsUpdates", + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }) + } } #[cfg(test)] diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs new file mode 100644 index 0000000000..ae264f9875 --- /dev/null +++ b/client/src/tpu_client.rs @@ -0,0 +1,393 @@ +use crate::{ + pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, + rpc_client::RpcClient, + rpc_response::SlotUpdate, +}; +use bincode::serialize; +use log::*; +use solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::Transaction}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + net::{SocketAddr, UdpSocket}, + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::JoinHandle, + time::{Duration, Instant}, +}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum TpuSenderError { + #[error("Pubsub error: {0:?}")] + PubsubError(#[from] PubsubClientError), + #[error("RPC error: {0:?}")] + RpcError(#[from] crate::client_error::ClientError), + #[error("IO error: {0:?}")] + IoError(#[from] std::io::Error), +} + +type Result = std::result::Result; + +/// Default number of slots used to build TPU socket fanout set +pub const DEFAULT_FANOUT_SLOTS: u64 = 12; + +/// Maximum number of slots used to build TPU socket fanout set +pub const MAX_FANOUT_SLOTS: u64 = 100; + +/// Config params for `TpuClient` +#[derive(Clone, Debug)] +pub struct TpuClientConfig { + /// The range of upcoming slots to include when determining which + /// leaders to send transactions to (min: 1, max: 100) + pub fanout_slots: u64, +} + +impl Default for TpuClientConfig { + fn default() -> Self { + Self { + fanout_slots: DEFAULT_FANOUT_SLOTS, + } + } +} + +/// Client which sends transactions directly to the current leader's TPU port over UDP. +/// The client uses RPC to determine the current leader and fetch node contact info +pub struct TpuClient { + send_socket: UdpSocket, + fanout_slots: u64, + leader_tpu_service: LeaderTpuService, + exit: Arc, +} + +impl TpuClient { + /// Serializes and sends a transaction to the current leader's TPU port + pub fn send_transaction(&self, transaction: &Transaction) -> bool { + let wire_transaction = serialize(transaction).expect("serialization should succeed"); + self.send_wire_transaction(&wire_transaction) + } + + /// Sends a transaction to the current leader's TPU port + pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool { + let mut sent = false; + for tpu_address in self + .leader_tpu_service + .leader_tpu_sockets(self.fanout_slots) + { + if self + .send_socket + .send_to(wire_transaction, tpu_address) + .is_ok() + { + sent = true; + } + } + sent + } + + /// Create a new client that disconnects when dropped + pub fn new( + rpc_client: Arc, + websocket_url: &str, + config: TpuClientConfig, + ) -> Result { + let exit = Arc::new(AtomicBool::new(false)); + let leader_tpu_service = LeaderTpuService::new(rpc_client, websocket_url, exit.clone())?; + + Ok(Self { + send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1), + leader_tpu_service, + exit, + }) + } +} + +impl Drop for TpuClient { + fn drop(&mut self) { + self.exit.store(true, Ordering::Relaxed); + self.leader_tpu_service.join(); + } +} + +struct LeaderTpuCache { + first_slot: Slot, + leaders: Vec, + leader_tpu_map: HashMap, +} + +impl LeaderTpuCache { + fn new(rpc_client: &RpcClient, first_slot: Slot) -> Self { + let leaders = Self::fetch_slot_leaders(rpc_client, first_slot).unwrap_or_default(); + let leader_tpu_map = Self::fetch_cluster_tpu_sockets(&rpc_client).unwrap_or_default(); + Self { + first_slot, + leaders, + leader_tpu_map, + } + } + + // Last slot that has a cached leader pubkey + fn last_slot(&self) -> Slot { + self.first_slot + self.leaders.len().saturating_sub(1) as u64 + } + + // Get the TPU sockets for the current leader and upcoming leaders according to fanout size + fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec { + let mut leader_set = HashSet::new(); + let mut leader_sockets = Vec::new(); + for leader_slot in current_slot..current_slot + fanout_slots { + if let Some(leader) = self.get_slot_leader(leader_slot) { + if let Some(tpu_socket) = self.leader_tpu_map.get(leader) { + if leader_set.insert(*leader) { + leader_sockets.push(*tpu_socket); + } + } + } + } + leader_sockets + } + + fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> { + if slot >= self.first_slot { + let index = slot - self.first_slot; + self.leaders.get(index as usize) + } else { + None + } + } + + fn fetch_cluster_tpu_sockets(rpc_client: &RpcClient) -> Result> { + let cluster_contact_info = rpc_client.get_cluster_nodes()?; + Ok(cluster_contact_info + .into_iter() + .filter_map(|contact_info| { + Some(( + Pubkey::from_str(&contact_info.pubkey).ok()?, + contact_info.tpu?, + )) + }) + .collect()) + } + + fn fetch_slot_leaders(rpc_client: &RpcClient, start_slot: Slot) -> Result> { + Ok(rpc_client.get_slot_leaders(start_slot, 2 * MAX_FANOUT_SLOTS)?) + } +} + +// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots +const MAX_SLOT_SKIP_DISTANCE: u64 = 48; + +#[derive(Clone, Debug)] +struct RecentLeaderSlots(Arc>>); +impl RecentLeaderSlots { + fn new(current_slot: Slot) -> Self { + let mut recent_slots = VecDeque::new(); + recent_slots.push_back(current_slot); + Self(Arc::new(RwLock::new(recent_slots))) + } + + fn record_slot(&self, current_slot: Slot) { + let mut recent_slots = self.0.write().unwrap(); + recent_slots.push_back(current_slot); + // 12 recent slots should be large enough to avoid a misbehaving + // validator from affecting the median recent slot + while recent_slots.len() > 12 { + recent_slots.pop_front(); + } + } + + // Estimate the current slot from recent slot notifications. + fn estimated_current_slot(&self) -> Slot { + let mut recent_slots: Vec = self.0.read().unwrap().iter().cloned().collect(); + assert!(!recent_slots.is_empty()); + recent_slots.sort_unstable(); + + // Validators can broadcast invalid blocks that are far in the future + // so check if the current slot is in line with the recent progression. + let max_index = recent_slots.len() - 1; + let median_index = max_index / 2; + let median_recent_slot = recent_slots[median_index]; + let expected_current_slot = median_recent_slot + (max_index - median_index) as u64; + let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE; + + // Return the highest slot that doesn't exceed what we believe is a + // reasonable slot. + recent_slots + .into_iter() + .rev() + .find(|slot| *slot <= max_reasonable_current_slot) + .unwrap() + } +} + +#[cfg(test)] +impl From> for RecentLeaderSlots { + fn from(recent_slots: Vec) -> Self { + assert!(!recent_slots.is_empty()); + Self(Arc::new(RwLock::new(recent_slots.into_iter().collect()))) + } +} + +/// Service that tracks upcoming leaders and maintains an up-to-date mapping +/// of leader id to TPU socket address. +struct LeaderTpuService { + recent_slots: RecentLeaderSlots, + leader_tpu_cache: Arc>, + subscription: Option>, + t_leader_tpu_service: Option>, +} + +impl LeaderTpuService { + fn new(rpc_client: Arc, websocket_url: &str, exit: Arc) -> Result { + let start_slot = rpc_client.get_max_shred_insert_slot()?; + + let recent_slots = RecentLeaderSlots::new(start_slot); + let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot))); + + let subscription = if !websocket_url.is_empty() { + let recent_slots = recent_slots.clone(); + Some(PubsubClient::slot_updates_subscribe( + websocket_url, + move |update| { + let current_slot = match update { + // This update indicates that a full slot was received by the connected + // node so we can stop sending transactions to the leader for that slot + SlotUpdate::Completed { slot, .. } => slot.saturating_add(1), + // This update indicates that we have just received the first shred from + // the leader for this slot and they are probably still accepting transactions. + SlotUpdate::FirstShredReceived { slot, .. } => slot, + _ => return, + }; + + recent_slots.record_slot(current_slot); + }, + )?) + } else { + None + }; + + let t_leader_tpu_service = Some({ + let recent_slots = recent_slots.clone(); + let leader_tpu_cache = leader_tpu_cache.clone(); + std::thread::Builder::new() + .name("ldr-tpu-srv".to_string()) + .spawn(move || Self::run(rpc_client, recent_slots, leader_tpu_cache, exit)) + .unwrap() + }); + + Ok(LeaderTpuService { + recent_slots, + leader_tpu_cache, + subscription, + t_leader_tpu_service, + }) + } + + fn join(&mut self) { + if let Some(mut subscription) = self.subscription.take() { + let _ = subscription.send_unsubscribe(); + let _ = subscription.shutdown(); + } + if let Some(t_handle) = self.t_leader_tpu_service.take() { + t_handle.join().unwrap(); + } + } + + fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { + let current_slot = self.recent_slots.estimated_current_slot(); + self.leader_tpu_cache + .read() + .unwrap() + .get_leader_sockets(current_slot, fanout_slots) + } + + fn run( + rpc_client: Arc, + recent_slots: RecentLeaderSlots, + leader_tpu_cache: Arc>, + exit: Arc, + ) { + let mut last_cluster_refresh = Instant::now(); + let mut sleep_ms = 1000; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + // Refresh cluster TPU ports every 5min in case validators restart with new port configuration + // or new validators come online + if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) { + if let Ok(leader_tpu_map) = LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) { + leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map; + last_cluster_refresh = Instant::now(); + } else { + sleep_ms = 100; + continue; + } + } + + // Sleep a few slots before checking if leader cache needs to be refreshed again + std::thread::sleep(Duration::from_millis(sleep_ms)); + + let current_slot = recent_slots.estimated_current_slot(); + if current_slot + >= leader_tpu_cache + .read() + .unwrap() + .last_slot() + .saturating_sub(MAX_FANOUT_SLOTS) + { + if let Ok(slot_leaders) = + LeaderTpuCache::fetch_slot_leaders(&rpc_client, current_slot) + { + let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); + leader_tpu_cache.first_slot = current_slot; + leader_tpu_cache.leaders = slot_leaders; + } else { + sleep_ms = 100; + continue; + } + } + + sleep_ms = 1000; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) { + assert_eq!(recent_slots.estimated_current_slot(), expected_slot); + } + + #[test] + fn test_recent_leader_slots() { + assert_slot(RecentLeaderSlots::new(0), 0); + + let mut recent_slots: Vec = (1..=12).collect(); + assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12); + + recent_slots.reverse(); + assert_slot(RecentLeaderSlots::from(recent_slots), 12); + + assert_slot( + RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]), + 1 + MAX_SLOT_SKIP_DISTANCE, + ); + assert_slot( + RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]), + 0, + ); + + assert_slot(RecentLeaderSlots::from(vec![1]), 1); + assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3); + assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3); + } +} diff --git a/core/src/rpc.rs b/core/src/rpc.rs index f42b343392..0d450a1825 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -2421,7 +2421,7 @@ pub mod rpc_full { &self, meta: Self::Metadata, start_slot: Slot, - end_slot: Slot, + limit: u64, ) -> Result>; #[rpc(meta, name = "minimumLedgerSlot")] diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 2c2a4d4f20..25e1ed533d 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -9,6 +9,7 @@ use solana_client::{ rpc_client::RpcClient, rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig}, rpc_response::{Response, RpcSignatureResult, SlotUpdate}, + tpu_client::{TpuClient, TpuClientConfig}, }; use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, test_validator::TestValidator}; use solana_sdk::{ @@ -378,3 +379,37 @@ fn test_rpc_subscriptions() { } } } + +#[test] +fn test_tpu_send_transaction() { + let mint_keypair = Keypair::new(); + let mint_pubkey = mint_keypair.pubkey(); + let test_validator = TestValidator::with_no_fees(mint_pubkey, None); + let rpc_client = Arc::new(RpcClient::new_with_commitment( + test_validator.rpc_url(), + CommitmentConfig::processed(), + )); + + let tpu_client = TpuClient::new( + rpc_client.clone(), + &test_validator.rpc_pubsub_url(), + TpuClientConfig::default(), + ) + .unwrap(); + + let recent_blockhash = rpc_client.get_recent_blockhash().unwrap().0; + let tx = + system_transaction::transfer(&mint_keypair, &Pubkey::new_unique(), 42, recent_blockhash); + assert!(tpu_client.send_transaction(&tx)); + + let timeout = Duration::from_secs(5); + let now = Instant::now(); + let signatures = vec![tx.signatures[0]]; + loop { + assert!(now.elapsed() < timeout); + let statuses = rpc_client.get_signature_statuses(&signatures).unwrap(); + if statuses.value.get(0).is_some() { + return; + } + } +}