tpu-client: Speed up performance by awaiting all futures at once (#32945)

* tpu-client: Await all futures at once

* Add timeout when sending to not waste time on down nodes

* Update comment to make it clearer that we're not spiking
This commit is contained in:
Jon Cinque 2023-08-24 13:04:00 +02:00 committed by GitHub
parent 329c6f131b
commit b42249fffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 140 additions and 7 deletions

View File

@ -2,7 +2,10 @@ pub use crate::tpu_client::Result;
use {
crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS},
bincode::serialize,
futures_util::{future::join_all, stream::StreamExt},
futures_util::{
future::{join_all, FutureExt, TryFutureExt},
stream::StreamExt,
},
log::*,
solana_connection_cache::{
connection_cache::{
@ -29,6 +32,8 @@ use {
},
std::{
collections::{HashMap, HashSet},
future::Future,
iter,
net::SocketAddr,
str::FromStr,
sync::{
@ -45,6 +50,7 @@ use {
#[cfg(feature = "spinner")]
use {
crate::tpu_client::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
indicatif::ProgressBar,
solana_rpc_client::spinner::{self, SendTransactionProgress},
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
solana_sdk::{message::Message, signers::Signers, transaction::TransactionError},
@ -247,6 +253,100 @@ pub struct TpuClient<
connection_cache: Arc<ConnectionCache<P, M, C>>,
}
/// Helper function which generates futures to all be awaited together for maximum
/// throughput
#[cfg(feature = "spinner")]
fn send_wire_transaction_futures<'a, P, M, C>(
progress_bar: &'a ProgressBar,
progress: &'a SendTransactionProgress,
index: usize,
num_transactions: usize,
wire_transaction: Vec<u8>,
leaders: Vec<SocketAddr>,
connection_cache: &'a ConnectionCache<P, M, C>,
) -> Vec<impl Future<Output = TransportResult<()>> + 'a>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
{
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let sleep_duration = SEND_TRANSACTION_INTERVAL.saturating_mul(index as u32);
let send_timeout = SEND_TIMEOUT_INTERVAL.saturating_add(sleep_duration);
leaders
.into_iter()
.map(|addr| {
timeout_future(
send_timeout,
sleep_and_send_wire_transaction_to_addr(
sleep_duration,
connection_cache,
addr,
wire_transaction.clone(),
),
)
.boxed_local() // required to make types work simply
})
.chain(iter::once(
timeout_future(
send_timeout,
sleep_and_set_message(
sleep_duration,
progress_bar,
progress,
index,
num_transactions,
),
)
.boxed_local(), // required to make types work simply
))
.collect::<Vec<_>>()
}
// Wrap an existing future with a timeout.
//
// Useful for end-users who don't need a persistent connection to each validator,
// and want to abort more quickly.
fn timeout_future<'a, Fut: Future<Output = TransportResult<()>> + 'a>(
timeout_duration: Duration,
future: Fut,
) -> impl Future<Output = TransportResult<()>> + 'a {
timeout(timeout_duration, future)
.unwrap_or_else(|_| Err(TransportError::Custom("Timed out".to_string())))
.boxed_local()
}
#[cfg(feature = "spinner")]
async fn sleep_and_set_message(
sleep_duration: Duration,
progress_bar: &ProgressBar,
progress: &SendTransactionProgress,
index: usize,
num_transactions: usize,
) -> TransportResult<()> {
sleep(sleep_duration).await;
progress.set_message_for_confirmed_transactions(
progress_bar,
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
);
Ok(())
}
#[cfg(feature = "spinner")]
async fn sleep_and_send_wire_transaction_to_addr<P, M, C>(
sleep_duration: Duration,
connection_cache: &ConnectionCache<P, M, C>,
addr: SocketAddr,
wire_transaction: Vec<u8>,
) -> TransportResult<()>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
{
sleep(sleep_duration).await;
let conn = connection_cache.get_nonblocking_connection(&addr);
conn.send_data(&wire_transaction).await
}
async fn send_wire_transaction_to_addr<P, M, C>(
connection_cache: &ConnectionCache<P, M, C>,
addr: &SocketAddr,
@ -459,15 +559,48 @@ where
// Periodically re-send all pending transactions
if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
// Prepare futures for all transactions
let mut futures = vec![];
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
if !self.send_transaction(transaction).await {
let wire_transaction = serialize(transaction).unwrap();
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
futures.extend(send_wire_transaction_futures(
&progress_bar,
&progress,
index,
num_transactions,
wire_transaction,
leaders,
&self.connection_cache,
));
}
// Start the process of sending them all
let results = join_all(futures).await;
progress.set_message_for_confirmed_transactions(
&progress_bar,
"Checking sent transactions",
);
for (index, (tx_results, (_i, transaction))) in results
.chunks(self.fanout_slots as usize)
.zip(pending_transactions.values())
.enumerate()
{
// Only report an error if every future in the chunk errored
if tx_results.iter().all(|r| r.is_err()) {
progress.set_message_for_confirmed_transactions(
&progress_bar,
&format!(
"Resending failed transaction {} of {}",
index + 1,
num_transactions,
),
);
let _result = self.rpc_client.send_transaction(transaction).await.ok();
}
progress.set_message_for_confirmed_transactions(
&progress_bar,
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
);
sleep(SEND_TRANSACTION_INTERVAL).await;
}
last_resend = Instant::now();
}