solana/client/src/nonblocking/tpu_client.rs

344 lines
13 KiB
Rust

pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSenderError};
use {
crate::{
connection_cache::ConnectionCache,
nonblocking::tpu_connection::TpuConnection,
tpu_client::{TpuClientConfig, MAX_FANOUT_SLOTS},
},
bincode::serialize,
futures_util::future::join_all,
solana_rpc_client::{nonblocking::rpc_client::RpcClient, spinner},
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
solana_sdk::{
message::Message,
signers::Signers,
transaction::{Transaction, TransactionError},
transport::{Result as TransportResult, TransportError},
},
solana_tpu_client::{
nonblocking::tpu_client::temporary_pub::*,
tpu_client::temporary_pub::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
},
std::{
collections::HashMap,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
},
tokio::time::{sleep, Duration, Instant},
};
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
pub struct TpuClient {
fanout_slots: u64,
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
rpc_client: Arc<RpcClient>,
connection_cache: Arc<ConnectionCache>,
}
async fn send_wire_transaction_to_addr(
connection_cache: &ConnectionCache,
addr: &SocketAddr,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction(wire_transaction.clone()).await
}
async fn send_wire_transaction_batch_to_addr(
connection_cache: &ConnectionCache,
addr: &SocketAddr,
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction_batch(wire_transactions).await
}
impl TpuClient {
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
let wire_transaction = serialize(transaction).expect("serialization should succeed");
self.send_wire_transaction(wire_transaction).await
}
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
pub async fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
self.try_send_wire_transaction(wire_transaction)
.await
.is_ok()
}
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
/// Returns the last error if all sends fail
pub async fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
let wire_transaction = serialize(transaction).expect("serialization should succeed");
self.try_send_wire_transaction(wire_transaction).await
}
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
/// Returns the last error if all sends fail
pub async fn try_send_wire_transaction(
&self,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
let futures = leaders
.iter()
.map(|addr| {
send_wire_transaction_to_addr(
&self.connection_cache,
addr,
wire_transaction.clone(),
)
})
.collect::<Vec<_>>();
let results: Vec<TransportResult<()>> = join_all(futures).await;
let mut last_error: Option<TransportError> = None;
let mut some_success = false;
for result in results {
if let Err(e) = result {
if last_error.is_none() {
last_error = Some(e);
}
} else {
some_success = true;
}
}
if !some_success {
Err(if let Some(err) = last_error {
err
} else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
})
} else {
Ok(())
}
}
/// Send a batch of wire transactions to the current and upcoming leader TPUs according to
/// fanout size
/// Returns the last error if all sends fail
pub async fn try_send_wire_transaction_batch(
&self,
wire_transactions: Vec<Vec<u8>>,
) -> TransportResult<()> {
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
let futures = leaders
.iter()
.map(|addr| {
send_wire_transaction_batch_to_addr(
&self.connection_cache,
addr,
&wire_transactions,
)
})
.collect::<Vec<_>>();
let results: Vec<TransportResult<()>> = join_all(futures).await;
let mut last_error: Option<TransportError> = None;
let mut some_success = false;
for result in results {
if let Err(e) = result {
if last_error.is_none() {
last_error = Some(e);
}
} else {
some_success = true;
}
}
if !some_success {
Err(if let Some(err) = last_error {
err
} else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
})
} else {
Ok(())
}
}
/// Create a new client that disconnects when dropped
pub async fn new(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
) -> Result<Self> {
let connection_cache = Arc::new(ConnectionCache::default());
Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
}
/// Create a new client that disconnects when dropped
pub async fn new_with_connection_cache(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<ConnectionCache>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let leader_tpu_service =
LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone()).await?;
Ok(Self {
fanout_slots: config.fanout_slots.clamp(1, MAX_FANOUT_SLOTS),
leader_tpu_service,
exit,
rpc_client,
connection_cache,
})
}
pub async fn send_and_confirm_messages_with_spinner<T: Signers>(
&self,
messages: &[Message],
signers: &T,
) -> Result<Vec<Option<TransactionError>>> {
let mut expired_blockhash_retries = 5;
let progress_bar = spinner::new_progress_bar();
progress_bar.set_message("Setting up...");
let mut transactions = messages
.iter()
.enumerate()
.map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
.collect::<Vec<_>>();
let total_transactions = transactions.len();
let mut transaction_errors = vec![None; transactions.len()];
let mut confirmed_transactions = 0;
let mut block_height = self.rpc_client.get_block_height().await?;
while expired_blockhash_retries > 0 {
let (blockhash, last_valid_block_height) = self
.rpc_client
.get_latest_blockhash_with_commitment(self.rpc_client.commitment())
.await?;
let mut pending_transactions = HashMap::new();
for (i, mut transaction) in transactions {
transaction.try_sign(signers, blockhash)?;
pending_transactions.insert(transaction.signatures[0], (i, transaction));
}
let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
while block_height <= last_valid_block_height {
let num_transactions = pending_transactions.len();
// Periodically re-send all pending transactions
if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
if !self.send_transaction(transaction).await {
let _result = self.rpc_client.send_transaction(transaction).await.ok();
}
set_message_for_confirmed_transactions(
&progress_bar,
confirmed_transactions,
total_transactions,
None, //block_height,
last_valid_block_height,
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
);
sleep(SEND_TRANSACTION_INTERVAL).await;
}
last_resend = Instant::now();
}
// Wait for the next block before checking for transaction statuses
let mut block_height_refreshes = 10;
set_message_for_confirmed_transactions(
&progress_bar,
confirmed_transactions,
total_transactions,
Some(block_height),
last_valid_block_height,
&format!(
"Waiting for next block, {} transactions pending...",
num_transactions
),
);
let mut new_block_height = block_height;
while block_height == new_block_height && block_height_refreshes > 0 {
sleep(Duration::from_millis(500)).await;
new_block_height = self.rpc_client.get_block_height().await?;
block_height_refreshes -= 1;
}
block_height = new_block_height;
// Collect statuses for the transactions, drop those that are confirmed
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
for pending_signatures_chunk in
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
{
if let Ok(result) = self
.rpc_client
.get_signature_statuses(pending_signatures_chunk)
.await
{
let statuses = result.value;
for (signature, status) in
pending_signatures_chunk.iter().zip(statuses.into_iter())
{
if let Some(status) = status {
if status.satisfies_commitment(self.rpc_client.commitment()) {
if let Some((i, _)) = pending_transactions.remove(signature) {
confirmed_transactions += 1;
if status.err.is_some() {
progress_bar.println(format!(
"Failed transaction: {:?}",
status
));
}
transaction_errors[i] = status.err;
}
}
}
}
}
set_message_for_confirmed_transactions(
&progress_bar,
confirmed_transactions,
total_transactions,
Some(block_height),
last_valid_block_height,
"Checking transaction status...",
);
}
if pending_transactions.is_empty() {
return Ok(transaction_errors);
}
}
transactions = pending_transactions.into_values().collect();
progress_bar.println(format!(
"Blockhash expired. {} retries remaining",
expired_blockhash_retries
));
expired_blockhash_retries -= 1;
}
Err(TpuSenderError::Custom("Max retries exceeded".into()))
}
pub fn rpc_client(&self) -> &RpcClient {
&self.rpc_client
}
pub async fn shutdown(&mut self) {
self.exit.store(true, Ordering::Relaxed);
self.leader_tpu_service.join().await;
}
}
impl Drop for TpuClient {
fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed);
}
}