Refactor client::nonblocking::tpu_client to use tpu_client::nonblocki… (#30136)
Refactor client::nonblocking::tpu_client to use tpu_client::nonblocking::tpu_client use a thin wrapper only for backward compatibility
This commit is contained in:
parent
70d6614b50
commit
f3457a8485
|
@ -1,83 +1,43 @@
|
|||
pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSenderError};
|
||||
use {
|
||||
crate::{
|
||||
connection_cache::ConnectionCache,
|
||||
tpu_client::{TpuClientConfig, MAX_FANOUT_SLOTS},
|
||||
},
|
||||
bincode::serialize,
|
||||
futures_util::future::join_all,
|
||||
solana_rpc_client::{nonblocking::rpc_client::RpcClient, spinner},
|
||||
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
||||
crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig},
|
||||
solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache,
|
||||
solana_rpc_client::nonblocking::rpc_client::RpcClient,
|
||||
solana_sdk::{
|
||||
message::Message,
|
||||
signers::Signers,
|
||||
transaction::{Transaction, TransactionError},
|
||||
transport::{Result as TransportResult, TransportError},
|
||||
transport::Result as TransportResult,
|
||||
},
|
||||
solana_tpu_client::{
|
||||
nonblocking::tpu_client::temporary_pub::*,
|
||||
tpu_client::temporary_pub::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
|
||||
},
|
||||
std::{
|
||||
collections::HashMap,
|
||||
net::SocketAddr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
},
|
||||
tokio::time::{sleep, Duration, Instant},
|
||||
solana_tpu_client::nonblocking::tpu_client::{temporary_pub::*, TpuClient as BackendTpuClient},
|
||||
std::sync::Arc,
|
||||
};
|
||||
|
||||
/// Client which sends transactions directly to the current leader's TPU port over UDP.
|
||||
/// The client uses RPC to determine the current leader and fetch node contact info
|
||||
pub struct TpuClient {
|
||||
fanout_slots: u64,
|
||||
leader_tpu_service: LeaderTpuService,
|
||||
exit: Arc<AtomicBool>,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
connection_cache: Arc<ConnectionCache>,
|
||||
}
|
||||
|
||||
async fn send_wire_transaction_to_addr(
|
||||
connection_cache: &ConnectionCache,
|
||||
addr: &SocketAddr,
|
||||
wire_transaction: Vec<u8>,
|
||||
) -> TransportResult<()> {
|
||||
let conn = connection_cache.get_nonblocking_connection(addr);
|
||||
conn.send_data(&wire_transaction).await
|
||||
}
|
||||
|
||||
async fn send_wire_transaction_batch_to_addr(
|
||||
connection_cache: &ConnectionCache,
|
||||
addr: &SocketAddr,
|
||||
wire_transactions: &[Vec<u8>],
|
||||
) -> TransportResult<()> {
|
||||
let conn = connection_cache.get_nonblocking_connection(addr);
|
||||
conn.send_data_batch(wire_transactions).await
|
||||
tpu_client: BackendTpuClient,
|
||||
}
|
||||
|
||||
impl TpuClient {
|
||||
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
||||
/// size
|
||||
pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
|
||||
let wire_transaction = serialize(transaction).expect("serialization should succeed");
|
||||
self.send_wire_transaction(wire_transaction).await
|
||||
self.tpu_client.send_transaction(transaction).await
|
||||
}
|
||||
|
||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||
pub async fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
|
||||
self.try_send_wire_transaction(wire_transaction)
|
||||
self.tpu_client
|
||||
.send_wire_transaction(wire_transaction)
|
||||
.await
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
||||
/// size
|
||||
/// Returns the last error if all sends fail
|
||||
pub async fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
|
||||
let wire_transaction = serialize(transaction).expect("serialization should succeed");
|
||||
self.try_send_wire_transaction(wire_transaction).await
|
||||
self.tpu_client.try_send_transaction(transaction).await
|
||||
}
|
||||
|
||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||
|
@ -86,41 +46,9 @@ impl TpuClient {
|
|||
&self,
|
||||
wire_transaction: Vec<u8>,
|
||||
) -> TransportResult<()> {
|
||||
let leaders = self
|
||||
.leader_tpu_service
|
||||
.leader_tpu_sockets(self.fanout_slots);
|
||||
let futures = leaders
|
||||
.iter()
|
||||
.map(|addr| {
|
||||
send_wire_transaction_to_addr(
|
||||
&self.connection_cache,
|
||||
addr,
|
||||
wire_transaction.clone(),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let results: Vec<TransportResult<()>> = join_all(futures).await;
|
||||
|
||||
let mut last_error: Option<TransportError> = None;
|
||||
let mut some_success = false;
|
||||
for result in results {
|
||||
if let Err(e) = result {
|
||||
if last_error.is_none() {
|
||||
last_error = Some(e);
|
||||
}
|
||||
} else {
|
||||
some_success = true;
|
||||
}
|
||||
}
|
||||
if !some_success {
|
||||
Err(if let Some(err) = last_error {
|
||||
err
|
||||
} else {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
self.tpu_client
|
||||
.try_send_wire_transaction(wire_transaction)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send a batch of wire transactions to the current and upcoming leader TPUs according to
|
||||
|
@ -130,41 +58,9 @@ impl TpuClient {
|
|||
&self,
|
||||
wire_transactions: Vec<Vec<u8>>,
|
||||
) -> TransportResult<()> {
|
||||
let leaders = self
|
||||
.leader_tpu_service
|
||||
.leader_tpu_sockets(self.fanout_slots);
|
||||
let futures = leaders
|
||||
.iter()
|
||||
.map(|addr| {
|
||||
send_wire_transaction_batch_to_addr(
|
||||
&self.connection_cache,
|
||||
addr,
|
||||
&wire_transactions,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let results: Vec<TransportResult<()>> = join_all(futures).await;
|
||||
|
||||
let mut last_error: Option<TransportError> = None;
|
||||
let mut some_success = false;
|
||||
for result in results {
|
||||
if let Err(e) = result {
|
||||
if last_error.is_none() {
|
||||
last_error = Some(e);
|
||||
}
|
||||
} else {
|
||||
some_success = true;
|
||||
}
|
||||
}
|
||||
if !some_success {
|
||||
Err(if let Some(err) = last_error {
|
||||
err
|
||||
} else {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
self.tpu_client
|
||||
.try_send_wire_transaction_batch(wire_transactions)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Create a new client that disconnects when dropped
|
||||
|
@ -173,7 +69,7 @@ impl TpuClient {
|
|||
websocket_url: &str,
|
||||
config: TpuClientConfig,
|
||||
) -> Result<Self> {
|
||||
let connection_cache = Arc::new(ConnectionCache::default());
|
||||
let connection_cache = Arc::new(ConnectionCache::default().into());
|
||||
Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
|
||||
}
|
||||
|
||||
|
@ -182,18 +78,16 @@ impl TpuClient {
|
|||
rpc_client: Arc<RpcClient>,
|
||||
websocket_url: &str,
|
||||
config: TpuClientConfig,
|
||||
connection_cache: Arc<ConnectionCache>,
|
||||
connection_cache: Arc<BackendConnectionCache>,
|
||||
) -> Result<Self> {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let leader_tpu_service =
|
||||
LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone()).await?;
|
||||
|
||||
Ok(Self {
|
||||
fanout_slots: config.fanout_slots.clamp(1, MAX_FANOUT_SLOTS),
|
||||
leader_tpu_service,
|
||||
exit,
|
||||
tpu_client: BackendTpuClient::new_with_connection_cache(
|
||||
rpc_client,
|
||||
websocket_url,
|
||||
config,
|
||||
connection_cache,
|
||||
)
|
||||
.await?,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -202,135 +96,16 @@ impl TpuClient {
|
|||
messages: &[Message],
|
||||
signers: &T,
|
||||
) -> Result<Vec<Option<TransactionError>>> {
|
||||
let mut expired_blockhash_retries = 5;
|
||||
let progress_bar = spinner::new_progress_bar();
|
||||
progress_bar.set_message("Setting up...");
|
||||
|
||||
let mut transactions = messages
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
|
||||
.collect::<Vec<_>>();
|
||||
let total_transactions = transactions.len();
|
||||
let mut transaction_errors = vec![None; transactions.len()];
|
||||
let mut confirmed_transactions = 0;
|
||||
let mut block_height = self.rpc_client.get_block_height().await?;
|
||||
while expired_blockhash_retries > 0 {
|
||||
let (blockhash, last_valid_block_height) = self
|
||||
.rpc_client
|
||||
.get_latest_blockhash_with_commitment(self.rpc_client.commitment())
|
||||
.await?;
|
||||
|
||||
let mut pending_transactions = HashMap::new();
|
||||
for (i, mut transaction) in transactions {
|
||||
transaction.try_sign(signers, blockhash)?;
|
||||
pending_transactions.insert(transaction.signatures[0], (i, transaction));
|
||||
}
|
||||
|
||||
let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
|
||||
while block_height <= last_valid_block_height {
|
||||
let num_transactions = pending_transactions.len();
|
||||
|
||||
// Periodically re-send all pending transactions
|
||||
if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
|
||||
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
|
||||
if !self.send_transaction(transaction).await {
|
||||
let _result = self.rpc_client.send_transaction(transaction).await.ok();
|
||||
}
|
||||
set_message_for_confirmed_transactions(
|
||||
&progress_bar,
|
||||
confirmed_transactions,
|
||||
total_transactions,
|
||||
None, //block_height,
|
||||
last_valid_block_height,
|
||||
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
|
||||
);
|
||||
sleep(SEND_TRANSACTION_INTERVAL).await;
|
||||
}
|
||||
last_resend = Instant::now();
|
||||
}
|
||||
|
||||
// Wait for the next block before checking for transaction statuses
|
||||
let mut block_height_refreshes = 10;
|
||||
set_message_for_confirmed_transactions(
|
||||
&progress_bar,
|
||||
confirmed_transactions,
|
||||
total_transactions,
|
||||
Some(block_height),
|
||||
last_valid_block_height,
|
||||
&format!("Waiting for next block, {num_transactions} transactions pending..."),
|
||||
);
|
||||
let mut new_block_height = block_height;
|
||||
while block_height == new_block_height && block_height_refreshes > 0 {
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
new_block_height = self.rpc_client.get_block_height().await?;
|
||||
block_height_refreshes -= 1;
|
||||
}
|
||||
block_height = new_block_height;
|
||||
|
||||
// Collect statuses for the transactions, drop those that are confirmed
|
||||
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
|
||||
for pending_signatures_chunk in
|
||||
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
|
||||
{
|
||||
if let Ok(result) = self
|
||||
.rpc_client
|
||||
.get_signature_statuses(pending_signatures_chunk)
|
||||
self.tpu_client
|
||||
.send_and_confirm_messages_with_spinner(messages, signers)
|
||||
.await
|
||||
{
|
||||
let statuses = result.value;
|
||||
for (signature, status) in
|
||||
pending_signatures_chunk.iter().zip(statuses.into_iter())
|
||||
{
|
||||
if let Some(status) = status {
|
||||
if status.satisfies_commitment(self.rpc_client.commitment()) {
|
||||
if let Some((i, _)) = pending_transactions.remove(signature) {
|
||||
confirmed_transactions += 1;
|
||||
if status.err.is_some() {
|
||||
progress_bar
|
||||
.println(format!("Failed transaction: {status:?}"));
|
||||
}
|
||||
transaction_errors[i] = status.err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
set_message_for_confirmed_transactions(
|
||||
&progress_bar,
|
||||
confirmed_transactions,
|
||||
total_transactions,
|
||||
Some(block_height),
|
||||
last_valid_block_height,
|
||||
"Checking transaction status...",
|
||||
);
|
||||
}
|
||||
|
||||
if pending_transactions.is_empty() {
|
||||
return Ok(transaction_errors);
|
||||
}
|
||||
}
|
||||
|
||||
transactions = pending_transactions.into_values().collect();
|
||||
progress_bar.println(format!(
|
||||
"Blockhash expired. {expired_blockhash_retries} retries remaining"
|
||||
));
|
||||
expired_blockhash_retries -= 1;
|
||||
}
|
||||
Err(TpuSenderError::Custom("Max retries exceeded".into()))
|
||||
}
|
||||
|
||||
pub fn rpc_client(&self) -> &RpcClient {
|
||||
&self.rpc_client
|
||||
self.tpu_client.rpc_client()
|
||||
}
|
||||
|
||||
pub async fn shutdown(&mut self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
self.leader_tpu_service.join().await;
|
||||
}
|
||||
}
|
||||
impl Drop for TpuClient {
|
||||
fn drop(&mut self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
self.tpu_client.shutdown().await
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue