Add TPU client for sending txs to the current leader tpu port (#16736)

* Add TPU client for sending txs to the current leader tpu port

* Update tpu_client.rs
This commit is contained in:
Justin Starry 2021-04-23 09:35:12 +08:00 committed by GitHub
parent fc12841d95
commit 75b8434b76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 523 additions and 140 deletions

View File

@ -444,7 +444,7 @@ pub struct CliConfig<'a> {
pub websocket_url: String,
pub signers: Vec<&'a dyn Signer>,
pub keypair_path: String,
pub rpc_client: Option<RpcClient>,
pub rpc_client: Option<Arc<RpcClient>>,
pub rpc_timeout: Duration,
pub verbose: bool,
pub output_format: OutputFormat,
@ -1284,17 +1284,15 @@ pub fn process_command(config: &CliConfig) -> ProcessResult {
println_name_value("Commitment:", &config.commitment.commitment.to_string());
}
let mut _rpc_client;
let rpc_client = if config.rpc_client.is_none() {
_rpc_client = RpcClient::new_with_timeout_and_commitment(
Arc::new(RpcClient::new_with_timeout_and_commitment(
config.json_rpc_url.to_string(),
config.rpc_timeout,
config.commitment,
);
&_rpc_client
))
} else {
// Primarily for testing
config.rpc_client.as_ref().unwrap()
config.rpc_client.as_ref().unwrap().clone()
};
match &config.command {
@ -1502,7 +1500,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult {
use_deprecated_loader,
allow_excessive_balance,
} => process_deploy(
&rpc_client,
rpc_client,
config,
program_location,
*address,
@ -1510,7 +1508,7 @@ pub fn process_command(config: &CliConfig) -> ProcessResult {
*allow_excessive_balance,
),
CliCommand::Program(program_subcommand) => {
process_program_subcommand(&rpc_client, config, program_subcommand)
process_program_subcommand(rpc_client, config, program_subcommand)
}
// Stake Commands
@ -2585,7 +2583,7 @@ mod tests {
fn test_cli_process_command() {
// Success cases
let mut config = CliConfig {
rpc_client: Some(RpcClient::new_mock("succeeds".to_string())),
rpc_client: Some(Arc::new(RpcClient::new_mock("succeeds".to_string()))),
json_rpc_url: "http://127.0.0.1:8899".to_string(),
..CliConfig::default()
};
@ -2785,13 +2783,13 @@ mod tests {
assert!(process_command(&config).is_ok());
// sig_not_found case
config.rpc_client = Some(RpcClient::new_mock("sig_not_found".to_string()));
config.rpc_client = Some(Arc::new(RpcClient::new_mock("sig_not_found".to_string())));
let missing_signature = Signature::new(&bs58::decode("5VERv8NMvzbJMEkV8xnrLkEaWRtSz9CosKDYjCJjBRnbJLgp8uirBgmQpjKhoR4tjF3ZpRzrFmBV6UjKdiSZkQUW").into_vec().unwrap());
config.command = CliCommand::Confirm(missing_signature);
assert_eq!(process_command(&config).unwrap(), "Not found");
// Tx error case
config.rpc_client = Some(RpcClient::new_mock("account_in_use".to_string()));
config.rpc_client = Some(Arc::new(RpcClient::new_mock("account_in_use".to_string())));
let any_signature = Signature::new(&bs58::decode(SIGNATURE).into_vec().unwrap());
config.command = CliCommand::Confirm(any_signature);
assert_eq!(
@ -2800,7 +2798,7 @@ mod tests {
);
// Failure cases
config.rpc_client = Some(RpcClient::new_mock("fails".to_string()));
config.rpc_client = Some(Arc::new(RpcClient::new_mock("fails".to_string())));
config.command = CliCommand::Airdrop {
pubkey: None,
@ -2870,7 +2868,7 @@ mod tests {
mocks.insert(RpcRequest::GetAccountInfo, account_info_response);
let rpc_client = RpcClient::new_mock_with_mocks("".to_string(), mocks);
config.rpc_client = Some(rpc_client);
config.rpc_client = Some(Arc::new(rpc_client));
let default_keypair = Keypair::new();
config.signers = vec![&default_keypair];

View File

@ -29,7 +29,6 @@ pub mod inflation;
pub mod memo;
pub mod nonce;
pub mod program;
pub mod send_tpu;
pub mod spend_utils;
pub mod stake;
pub mod test_utils;

View File

@ -1,4 +1,3 @@
use crate::send_tpu::{get_leader_tpus, send_transaction_tpu};
use crate::{
checks::*,
cli::{
@ -6,7 +5,6 @@ use crate::{
ProcessResult,
},
};
use bincode::serialize;
use bip39::{Language, Mnemonic, MnemonicType, Seed};
use clap::{App, AppSettings, Arg, ArgMatches, SubCommand};
use log::*;
@ -25,7 +23,7 @@ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
rpc_response::RpcLeaderSchedule,
tpu_client::{TpuClient, TpuClientConfig},
};
use solana_rbpf::vm::{Config, Executable};
use solana_remote_wallet::remote_wallet::RemoteWalletManager;
@ -51,20 +49,17 @@ use solana_sdk::{
};
use solana_transaction_status::TransactionConfirmationStatus;
use std::{
cmp::min,
collections::HashMap,
error,
fs::File,
io::{Read, Write},
net::UdpSocket,
path::PathBuf,
sync::Arc,
thread::sleep,
time::{Duration, Instant},
time::Duration,
};
const DATA_CHUNK_SIZE: usize = 229; // Keep program chunks under PACKET_DATA_SIZE
const NUM_TPU_LEADERS: u64 = 2;
#[derive(Debug, PartialEq)]
pub enum ProgramCliCommand {
@ -622,7 +617,7 @@ pub fn parse_program_subcommand(
}
pub fn process_program_subcommand(
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
config: &CliConfig,
program_subcommand: &ProgramCliCommand,
) -> ProcessResult {
@ -638,7 +633,7 @@ pub fn process_program_subcommand(
max_len,
allow_excessive_balance,
} => process_program_deploy(
&rpc_client,
rpc_client,
config,
program_location,
*program_signer_index,
@ -657,7 +652,7 @@ pub fn process_program_subcommand(
buffer_authority_signer_index,
max_len,
} => process_write_buffer(
&rpc_client,
rpc_client,
config,
program_location,
*buffer_signer_index,
@ -746,7 +741,7 @@ fn get_default_program_keypair(program_location: &Option<String>) -> Keypair {
/// Deploy using upgradeable loader
#[allow(clippy::too_many_arguments)]
fn process_program_deploy(
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
config: &CliConfig,
program_location: &Option<String>,
program_signer_index: Option<SignerIndex>,
@ -892,7 +887,7 @@ fn process_program_deploy(
let result = if do_deploy {
do_process_program_write_and_deploy(
rpc_client,
rpc_client.clone(),
config,
&program_data,
buffer_data_len,
@ -907,7 +902,7 @@ fn process_program_deploy(
)
} else {
do_process_program_upgrade(
rpc_client,
rpc_client.clone(),
config,
&program_data,
&program_pubkey,
@ -918,7 +913,7 @@ fn process_program_deploy(
};
if result.is_ok() && is_final {
process_set_authority(
rpc_client,
&rpc_client,
config,
Some(program_pubkey),
None,
@ -933,7 +928,7 @@ fn process_program_deploy(
}
fn process_write_buffer(
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
config: &CliConfig,
program_location: &str,
buffer_signer_index: Option<SignerIndex>,
@ -1450,7 +1445,7 @@ fn process_close(
/// Deploy using non-upgradeable loader
pub fn process_deploy(
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
config: &CliConfig,
program_location: &str,
buffer_signer_index: Option<SignerIndex>,
@ -1495,7 +1490,7 @@ pub fn process_deploy(
#[allow(clippy::too_many_arguments)]
fn do_process_program_write_and_deploy(
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
config: &CliConfig,
program_data: &[u8],
buffer_data_len: usize,
@ -1633,7 +1628,7 @@ fn do_process_program_write_and_deploy(
messages.push(message);
}
check_payer(rpc_client, config, balance_needed, &messages)?;
check_payer(&rpc_client, config, balance_needed, &messages)?;
send_deploy_messages(
rpc_client,
@ -1660,7 +1655,7 @@ fn do_process_program_write_and_deploy(
}
fn do_process_program_upgrade(
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
config: &CliConfig,
program_data: &[u8],
program_id: &Pubkey,
@ -1756,7 +1751,7 @@ fn do_process_program_upgrade(
);
messages.push(&final_message);
check_payer(rpc_client, config, balance_needed, &messages)?;
check_payer(&rpc_client, config, balance_needed, &messages)?;
send_deploy_messages(
rpc_client,
config,
@ -1861,7 +1856,7 @@ fn check_payer(
}
fn send_deploy_messages(
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
config: &CliConfig,
initial_message: &Option<Message>,
write_messages: &Option<Vec<Message>>,
@ -1909,7 +1904,8 @@ fn send_deploy_messages(
}
send_and_confirm_transactions_with_spinner(
&rpc_client,
rpc_client.clone(),
&config.websocket_url,
write_transactions,
&[payer_signer, write_signer],
config.commitment,
@ -1978,7 +1974,8 @@ fn report_ephemeral_mnemonic(words: usize, mnemonic: bip39::Mnemonic) {
}
fn send_and_confirm_transactions_with_spinner<T: Signers>(
rpc_client: &RpcClient,
rpc_client: Arc<RpcClient>,
websocket_url: &str,
mut transactions: Vec<Transaction>,
signer_keys: &T,
commitment: CommitmentConfig,
@ -1986,39 +1983,19 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
) -> Result<(), Box<dyn error::Error>> {
let progress_bar = new_spinner_progress_bar();
let mut send_retries = 5;
let mut leader_schedule: Option<RpcLeaderSchedule> = None;
let mut leader_schedule_epoch = 0;
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let cluster_nodes = rpc_client.get_cluster_nodes().ok();
progress_bar.set_message("Finding leader nodes...");
let tpu_client = TpuClient::new(
rpc_client.clone(),
websocket_url,
TpuClientConfig::default(),
)?;
loop {
progress_bar.set_message("Finding leader nodes...");
let epoch_info = rpc_client.get_epoch_info()?;
let mut slot = epoch_info.absolute_slot;
let mut last_epoch_fetch = Instant::now();
if epoch_info.epoch > leader_schedule_epoch || leader_schedule.is_none() {
leader_schedule = rpc_client.get_leader_schedule(Some(epoch_info.absolute_slot))?;
leader_schedule_epoch = epoch_info.epoch;
}
let mut tpu_addresses = get_leader_tpus(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
NUM_TPU_LEADERS,
leader_schedule.as_ref(),
cluster_nodes.as_ref(),
);
// Send all transactions
let mut pending_transactions = HashMap::new();
let num_transactions = transactions.len();
for transaction in transactions {
if !tpu_addresses.is_empty() {
let wire_transaction =
serialize(&transaction).expect("serialization should succeed");
for tpu_address in &tpu_addresses {
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction);
}
} else {
if !tpu_client.send_transaction(&transaction) {
let _result = rpc_client
.send_transaction_with_config(
&transaction,
@ -2038,22 +2015,11 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
// Throttle transactions to about 100 TPS
sleep(Duration::from_millis(10));
// Update leader periodically
if last_epoch_fetch.elapsed() > Duration::from_millis(400) {
let epoch_info = rpc_client.get_epoch_info()?;
last_epoch_fetch = Instant::now();
tpu_addresses = get_leader_tpus(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
NUM_TPU_LEADERS,
leader_schedule.as_ref(),
cluster_nodes.as_ref(),
);
}
}
// Collect statuses for all the transactions, drop those that are confirmed
loop {
let mut slot = 0;
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
for pending_signatures_chunk in
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
@ -2095,22 +2061,8 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
break;
}
let epoch_info = rpc_client.get_epoch_info()?;
tpu_addresses = get_leader_tpus(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
NUM_TPU_LEADERS,
leader_schedule.as_ref(),
cluster_nodes.as_ref(),
);
for transaction in pending_transactions.values() {
if !tpu_addresses.is_empty() {
let wire_transaction =
serialize(&transaction).expect("serialization should succeed");
for tpu_address in &tpu_addresses {
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction);
}
} else {
if !tpu_client.send_transaction(transaction) {
let _result = rpc_client
.send_transaction_with_config(
transaction,
@ -2933,7 +2885,7 @@ mod tests {
write_keypair_file(&program_pubkey, &program_keypair_location).unwrap();
let config = CliConfig {
rpc_client: Some(RpcClient::new_mock("".to_string())),
rpc_client: Some(Arc::new(RpcClient::new_mock("".to_string()))),
command: CliCommand::Program(ProgramCliCommand::Deploy {
program_location: Some(program_location.to_str().unwrap().to_string()),
buffer_signer_index: None,

View File

@ -1,46 +0,0 @@
use log::*;
use solana_client::rpc_response::{RpcContactInfo, RpcLeaderSchedule};
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use std::net::{SocketAddr, UdpSocket};
pub fn get_leader_tpus(
slot_index: u64,
num_leaders: u64,
leader_schedule: Option<&RpcLeaderSchedule>,
cluster_nodes: Option<&Vec<RpcContactInfo>>,
) -> Vec<SocketAddr> {
let leaders: Vec<_> = (0..num_leaders)
.filter_map(|i| {
leader_schedule?
.iter()
.find(|(_pubkey, slots)| {
slots.iter().any(|slot| {
*slot as u64 == (slot_index + (i * NUM_CONSECUTIVE_LEADER_SLOTS))
})
})
.and_then(|(pubkey, _)| {
cluster_nodes?
.iter()
.find(|contact_info| contact_info.pubkey == *pubkey)
.and_then(|contact_info| contact_info.tpu)
})
})
.collect();
let mut unique_leaders = vec![];
for leader in leaders.into_iter() {
if !unique_leaders.contains(&leader) {
unique_leaders.push(leader);
}
}
unique_leaders
}
pub fn send_transaction_tpu(
send_socket: &UdpSocket,
tpu_address: &SocketAddr,
wire_transaction: &[u8],
) {
if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) {
warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
}
}

View File

@ -19,3 +19,4 @@ pub mod rpc_request;
pub mod rpc_response;
pub mod rpc_sender;
pub mod thin_client;
pub mod tpu_client;

View File

@ -124,6 +124,7 @@ impl RpcSender for MockSender {
}
RpcRequest::GetTransactionCount => Value::Number(Number::from(1234)),
RpcRequest::GetSlot => Value::Number(Number::from(0)),
RpcRequest::GetMaxShredInsertSlot => Value::Number(Number::from(0)),
RpcRequest::RequestAirdrop => Value::String(Signature::new(&[8; 64]).to_string()),
RpcRequest::SendTransaction => {
let signature = if self.url == "malicious" {

View File

@ -3,7 +3,9 @@ use {
rpc_config::{
RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter,
},
rpc_response::{Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo},
rpc_response::{
Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo, SlotUpdate,
},
},
log::*,
serde::de::DeserializeOwned,
@ -340,6 +342,54 @@ impl PubsubClient {
Ok((result, receiver))
}
pub fn slot_updates_subscribe(
url: &str,
handler: impl Fn(SlotUpdate) + Send + 'static,
) -> Result<PubsubClientSubscription<SlotUpdate>, PubsubClientError> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let socket = Arc::new(RwLock::new(socket));
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let subscription_id = PubsubClientSubscription::<SlotUpdate>::send_subscribe(
&socket,
json!({
"jsonrpc":"2.0","id":1,"method":"slotsUpdatesSubscribe","params":[]
})
.to_string(),
)?;
let t_cleanup = {
let socket = socket.clone();
std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
match PubsubClientSubscription::read_message(&socket) {
Ok(message) => handler(message),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
}
info!("websocket - exited receive loop");
})
};
Ok(PubsubClientSubscription {
message_type: PhantomData,
operation: "slotsUpdates",
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
})
}
}
#[cfg(test)]

393
client/src/tpu_client.rs Normal file
View File

@ -0,0 +1,393 @@
use crate::{
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
rpc_client::RpcClient,
rpc_response::SlotUpdate,
};
use bincode::serialize;
use log::*;
use solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::Transaction};
use std::{
collections::{HashMap, HashSet, VecDeque},
net::{SocketAddr, UdpSocket},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::JoinHandle,
time::{Duration, Instant},
};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum TpuSenderError {
#[error("Pubsub error: {0:?}")]
PubsubError(#[from] PubsubClientError),
#[error("RPC error: {0:?}")]
RpcError(#[from] crate::client_error::ClientError),
#[error("IO error: {0:?}")]
IoError(#[from] std::io::Error),
}
type Result<T> = std::result::Result<T, TpuSenderError>;
/// Default number of slots used to build TPU socket fanout set
pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
/// Maximum number of slots used to build TPU socket fanout set
pub const MAX_FANOUT_SLOTS: u64 = 100;
/// Config params for `TpuClient`
#[derive(Clone, Debug)]
pub struct TpuClientConfig {
/// The range of upcoming slots to include when determining which
/// leaders to send transactions to (min: 1, max: 100)
pub fanout_slots: u64,
}
impl Default for TpuClientConfig {
fn default() -> Self {
Self {
fanout_slots: DEFAULT_FANOUT_SLOTS,
}
}
}
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
pub struct TpuClient {
send_socket: UdpSocket,
fanout_slots: u64,
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
}
impl TpuClient {
/// Serializes and sends a transaction to the current leader's TPU port
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
let wire_transaction = serialize(transaction).expect("serialization should succeed");
self.send_wire_transaction(&wire_transaction)
}
/// Sends a transaction to the current leader's TPU port
pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool {
let mut sent = false;
for tpu_address in self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots)
{
if self
.send_socket
.send_to(wire_transaction, tpu_address)
.is_ok()
{
sent = true;
}
}
sent
}
/// Create a new client that disconnects when dropped
pub fn new(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let leader_tpu_service = LeaderTpuService::new(rpc_client, websocket_url, exit.clone())?;
Ok(Self {
send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
leader_tpu_service,
exit,
})
}
}
impl Drop for TpuClient {
fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed);
self.leader_tpu_service.join();
}
}
struct LeaderTpuCache {
first_slot: Slot,
leaders: Vec<Pubkey>,
leader_tpu_map: HashMap<Pubkey, SocketAddr>,
}
impl LeaderTpuCache {
fn new(rpc_client: &RpcClient, first_slot: Slot) -> Self {
let leaders = Self::fetch_slot_leaders(rpc_client, first_slot).unwrap_or_default();
let leader_tpu_map = Self::fetch_cluster_tpu_sockets(&rpc_client).unwrap_or_default();
Self {
first_slot,
leaders,
leader_tpu_map,
}
}
// Last slot that has a cached leader pubkey
fn last_slot(&self) -> Slot {
self.first_slot + self.leaders.len().saturating_sub(1) as u64
}
// Get the TPU sockets for the current leader and upcoming leaders according to fanout size
fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec<SocketAddr> {
let mut leader_set = HashSet::new();
let mut leader_sockets = Vec::new();
for leader_slot in current_slot..current_slot + fanout_slots {
if let Some(leader) = self.get_slot_leader(leader_slot) {
if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
if leader_set.insert(*leader) {
leader_sockets.push(*tpu_socket);
}
}
}
}
leader_sockets
}
fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
if slot >= self.first_slot {
let index = slot - self.first_slot;
self.leaders.get(index as usize)
} else {
None
}
}
fn fetch_cluster_tpu_sockets(rpc_client: &RpcClient) -> Result<HashMap<Pubkey, SocketAddr>> {
let cluster_contact_info = rpc_client.get_cluster_nodes()?;
Ok(cluster_contact_info
.into_iter()
.filter_map(|contact_info| {
Some((
Pubkey::from_str(&contact_info.pubkey).ok()?,
contact_info.tpu?,
))
})
.collect())
}
fn fetch_slot_leaders(rpc_client: &RpcClient, start_slot: Slot) -> Result<Vec<Pubkey>> {
Ok(rpc_client.get_slot_leaders(start_slot, 2 * MAX_FANOUT_SLOTS)?)
}
}
// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots
const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
#[derive(Clone, Debug)]
struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
impl RecentLeaderSlots {
fn new(current_slot: Slot) -> Self {
let mut recent_slots = VecDeque::new();
recent_slots.push_back(current_slot);
Self(Arc::new(RwLock::new(recent_slots)))
}
fn record_slot(&self, current_slot: Slot) {
let mut recent_slots = self.0.write().unwrap();
recent_slots.push_back(current_slot);
// 12 recent slots should be large enough to avoid a misbehaving
// validator from affecting the median recent slot
while recent_slots.len() > 12 {
recent_slots.pop_front();
}
}
// Estimate the current slot from recent slot notifications.
fn estimated_current_slot(&self) -> Slot {
let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
assert!(!recent_slots.is_empty());
recent_slots.sort_unstable();
// Validators can broadcast invalid blocks that are far in the future
// so check if the current slot is in line with the recent progression.
let max_index = recent_slots.len() - 1;
let median_index = max_index / 2;
let median_recent_slot = recent_slots[median_index];
let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
// Return the highest slot that doesn't exceed what we believe is a
// reasonable slot.
recent_slots
.into_iter()
.rev()
.find(|slot| *slot <= max_reasonable_current_slot)
.unwrap()
}
}
#[cfg(test)]
impl From<Vec<Slot>> for RecentLeaderSlots {
fn from(recent_slots: Vec<Slot>) -> Self {
assert!(!recent_slots.is_empty());
Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
}
}
/// Service that tracks upcoming leaders and maintains an up-to-date mapping
/// of leader id to TPU socket address.
struct LeaderTpuService {
recent_slots: RecentLeaderSlots,
leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
subscription: Option<PubsubClientSubscription<SlotUpdate>>,
t_leader_tpu_service: Option<JoinHandle<()>>,
}
impl LeaderTpuService {
fn new(rpc_client: Arc<RpcClient>, websocket_url: &str, exit: Arc<AtomicBool>) -> Result<Self> {
let start_slot = rpc_client.get_max_shred_insert_slot()?;
let recent_slots = RecentLeaderSlots::new(start_slot);
let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)));
let subscription = if !websocket_url.is_empty() {
let recent_slots = recent_slots.clone();
Some(PubsubClient::slot_updates_subscribe(
websocket_url,
move |update| {
let current_slot = match update {
// This update indicates that a full slot was received by the connected
// node so we can stop sending transactions to the leader for that slot
SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
// This update indicates that we have just received the first shred from
// the leader for this slot and they are probably still accepting transactions.
SlotUpdate::FirstShredReceived { slot, .. } => slot,
_ => return,
};
recent_slots.record_slot(current_slot);
},
)?)
} else {
None
};
let t_leader_tpu_service = Some({
let recent_slots = recent_slots.clone();
let leader_tpu_cache = leader_tpu_cache.clone();
std::thread::Builder::new()
.name("ldr-tpu-srv".to_string())
.spawn(move || Self::run(rpc_client, recent_slots, leader_tpu_cache, exit))
.unwrap()
});
Ok(LeaderTpuService {
recent_slots,
leader_tpu_cache,
subscription,
t_leader_tpu_service,
})
}
fn join(&mut self) {
if let Some(mut subscription) = self.subscription.take() {
let _ = subscription.send_unsubscribe();
let _ = subscription.shutdown();
}
if let Some(t_handle) = self.t_leader_tpu_service.take() {
t_handle.join().unwrap();
}
}
fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
let current_slot = self.recent_slots.estimated_current_slot();
self.leader_tpu_cache
.read()
.unwrap()
.get_leader_sockets(current_slot, fanout_slots)
}
fn run(
rpc_client: Arc<RpcClient>,
recent_slots: RecentLeaderSlots,
leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
exit: Arc<AtomicBool>,
) {
let mut last_cluster_refresh = Instant::now();
let mut sleep_ms = 1000;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
// Refresh cluster TPU ports every 5min in case validators restart with new port configuration
// or new validators come online
if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
if let Ok(leader_tpu_map) = LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) {
leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map;
last_cluster_refresh = Instant::now();
} else {
sleep_ms = 100;
continue;
}
}
// Sleep a few slots before checking if leader cache needs to be refreshed again
std::thread::sleep(Duration::from_millis(sleep_ms));
let current_slot = recent_slots.estimated_current_slot();
if current_slot
>= leader_tpu_cache
.read()
.unwrap()
.last_slot()
.saturating_sub(MAX_FANOUT_SLOTS)
{
if let Ok(slot_leaders) =
LeaderTpuCache::fetch_slot_leaders(&rpc_client, current_slot)
{
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
leader_tpu_cache.first_slot = current_slot;
leader_tpu_cache.leaders = slot_leaders;
} else {
sleep_ms = 100;
continue;
}
}
sleep_ms = 1000;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
}
#[test]
fn test_recent_leader_slots() {
assert_slot(RecentLeaderSlots::new(0), 0);
let mut recent_slots: Vec<Slot> = (1..=12).collect();
assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
recent_slots.reverse();
assert_slot(RecentLeaderSlots::from(recent_slots), 12);
assert_slot(
RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
1 + MAX_SLOT_SKIP_DISTANCE,
);
assert_slot(
RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
0,
);
assert_slot(RecentLeaderSlots::from(vec![1]), 1);
assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
}
}

View File

@ -2421,7 +2421,7 @@ pub mod rpc_full {
&self,
meta: Self::Metadata,
start_slot: Slot,
end_slot: Slot,
limit: u64,
) -> Result<Vec<String>>;
#[rpc(meta, name = "minimumLedgerSlot")]

View File

@ -9,6 +9,7 @@ use solana_client::{
rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig},
rpc_response::{Response, RpcSignatureResult, SlotUpdate},
tpu_client::{TpuClient, TpuClientConfig},
};
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, test_validator::TestValidator};
use solana_sdk::{
@ -378,3 +379,37 @@ fn test_rpc_subscriptions() {
}
}
}
#[test]
fn test_tpu_send_transaction() {
let mint_keypair = Keypair::new();
let mint_pubkey = mint_keypair.pubkey();
let test_validator = TestValidator::with_no_fees(mint_pubkey, None);
let rpc_client = Arc::new(RpcClient::new_with_commitment(
test_validator.rpc_url(),
CommitmentConfig::processed(),
));
let tpu_client = TpuClient::new(
rpc_client.clone(),
&test_validator.rpc_pubsub_url(),
TpuClientConfig::default(),
)
.unwrap();
let recent_blockhash = rpc_client.get_recent_blockhash().unwrap().0;
let tx =
system_transaction::transfer(&mint_keypair, &Pubkey::new_unique(), 42, recent_blockhash);
assert!(tpu_client.send_transaction(&tx));
let timeout = Duration::from_secs(5);
let now = Instant::now();
let signatures = vec![tx.signatures[0]];
loop {
assert!(now.elapsed() < timeout);
let statuses = rpc_client.get_signature_statuses(&signatures).unwrap();
if statuses.value.get(0).is_some() {
return;
}
}
}