client: Speed up send-and-confirm-parallel (#33032)
* Rename blockheight -> block height * Clean up comments * Simplify tpu-client implementation * client: Destructure more in match arm * client: Avoid passing Arc where it isn't needed * Refactor sending into a new function * Chain work after send to make last part cleaner * Do all sending at once in the same future * Sleep before sending to avoid overwhelming
This commit is contained in:
parent
555741e4d6
commit
2124f0a13c
|
@ -5,11 +5,13 @@ use {
|
|||
},
|
||||
bincode::serialize,
|
||||
dashmap::DashMap,
|
||||
futures_util::future::{join_all, TryFutureExt},
|
||||
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
|
||||
solana_rpc_client::spinner::{self, SendTransactionProgress},
|
||||
solana_rpc_client_api::{
|
||||
client_error::ErrorKind,
|
||||
request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
|
||||
response::RpcSimulateTransactionResult,
|
||||
},
|
||||
solana_sdk::{
|
||||
hash::Hash,
|
||||
|
@ -31,11 +33,12 @@ use {
|
|||
|
||||
const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10);
|
||||
const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2);
|
||||
const SEND_INTERVAL: Duration = Duration::from_millis(10);
|
||||
type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct TransactionData {
|
||||
last_valid_blockheight: u64,
|
||||
last_valid_block_height: u64,
|
||||
message: Message,
|
||||
index: usize,
|
||||
serialized_transaction: Vec<u8>,
|
||||
|
@ -44,7 +47,7 @@ struct TransactionData {
|
|||
#[derive(Clone, Debug, Copy)]
|
||||
struct BlockHashData {
|
||||
pub blockhash: Hash,
|
||||
pub last_valid_blockheight: u64,
|
||||
pub last_valid_block_height: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Copy)]
|
||||
|
@ -53,6 +56,7 @@ pub struct SendAndConfirmConfig {
|
|||
pub resign_txs_count: Option<usize>,
|
||||
}
|
||||
|
||||
/// Sends and confirms transactions concurrently in a sync context
|
||||
pub fn send_and_confirm_transactions_in_parallel_blocking<T: Signers + ?Sized>(
|
||||
rpc_client: Arc<BlockingRpcClient>,
|
||||
tpu_client: Option<QuicTpuClient>,
|
||||
|
@ -77,18 +81,18 @@ fn create_blockhash_data_updating_task(
|
|||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Ok((blockhash, last_valid_blockheight)) = rpc_client
|
||||
if let Ok((blockhash, last_valid_block_height)) = rpc_client
|
||||
.get_latest_blockhash_with_commitment(rpc_client.commitment())
|
||||
.await
|
||||
{
|
||||
*blockhash_data_rw.write().await = BlockHashData {
|
||||
blockhash,
|
||||
last_valid_blockheight,
|
||||
last_valid_block_height,
|
||||
};
|
||||
}
|
||||
|
||||
if let Ok(blockheight) = rpc_client.get_block_height().await {
|
||||
current_block_height.store(blockheight, Ordering::Relaxed);
|
||||
if let Ok(block_height) = rpc_client.get_block_height().await {
|
||||
current_block_height.store(block_height, Ordering::Relaxed);
|
||||
}
|
||||
tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
|
||||
}
|
||||
|
@ -112,10 +116,10 @@ fn create_transaction_confirmation_task(
|
|||
let transactions_to_verify: Vec<Signature> = unconfirmed_transasction_map
|
||||
.iter()
|
||||
.filter(|x| {
|
||||
let is_not_expired = current_block_height <= x.last_valid_blockheight;
|
||||
let is_not_expired = current_block_height <= x.last_valid_block_height;
|
||||
// transaction expired between last and current check
|
||||
let is_recently_expired = last_block_height <= x.last_valid_blockheight
|
||||
&& current_block_height > x.last_valid_blockheight;
|
||||
let is_recently_expired = last_block_height <= x.last_valid_block_height
|
||||
&& current_block_height > x.last_valid_block_height;
|
||||
is_not_expired || is_recently_expired
|
||||
})
|
||||
.map(|x| *x.key())
|
||||
|
@ -177,15 +181,65 @@ fn progress_from_context_and_block_height(
|
|||
}
|
||||
}
|
||||
|
||||
async fn send_transaction_with_rpc_fallback(
|
||||
rpc_client: &RpcClient,
|
||||
tpu_client: &Option<QuicTpuClient>,
|
||||
transaction: Transaction,
|
||||
serialized_transaction: Vec<u8>,
|
||||
context: &SendingContext,
|
||||
index: usize,
|
||||
counter: usize,
|
||||
) -> Result<()> {
|
||||
tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
|
||||
let send_over_rpc = if let Some(tpu_client) = tpu_client {
|
||||
!tpu_client
|
||||
.send_wire_transaction(serialized_transaction.clone())
|
||||
.await
|
||||
} else {
|
||||
true
|
||||
};
|
||||
if send_over_rpc {
|
||||
if let Err(e) = rpc_client.send_transaction(&transaction).await {
|
||||
match &e.kind {
|
||||
ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
|
||||
// fall through on io error, we will retry the transaction
|
||||
}
|
||||
ErrorKind::TransactionError(transaction_error) => {
|
||||
context.error_map.insert(index, transaction_error.clone());
|
||||
return Ok(());
|
||||
}
|
||||
ErrorKind::RpcError(RpcError::RpcResponseError {
|
||||
data:
|
||||
RpcResponseErrorData::SendTransactionPreflightFailure(
|
||||
RpcSimulateTransactionResult {
|
||||
err: Some(transaction_error),
|
||||
..
|
||||
},
|
||||
),
|
||||
..
|
||||
}) => {
|
||||
context.error_map.insert(index, transaction_error.clone());
|
||||
return Ok(());
|
||||
}
|
||||
_ => {
|
||||
return Err(TpuSenderError::from(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sign_all_messages_and_send<T: Signers + ?Sized>(
|
||||
progress_bar: &Option<indicatif::ProgressBar>,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
rpc_client: &RpcClient,
|
||||
tpu_client: &Option<QuicTpuClient>,
|
||||
messages_with_index: Vec<(usize, Message)>,
|
||||
signers: &T,
|
||||
context: &SendingContext,
|
||||
) -> Result<()> {
|
||||
let current_transaction_count = messages_with_index.len();
|
||||
let mut futures = vec![];
|
||||
// send all the transaction messages
|
||||
for (counter, (index, message)) in messages_with_index.iter().enumerate() {
|
||||
let mut transaction = Transaction::new_unsigned(message.clone());
|
||||
|
@ -196,71 +250,48 @@ async fn sign_all_messages_and_send<T: Signers + ?Sized>(
|
|||
.try_sign(signers, blockhashdata.blockhash)
|
||||
.expect("Transaction should be signable");
|
||||
let serialized_transaction = serialize(&transaction).expect("Transaction should serailize");
|
||||
let send_over_rpc = if let Some(tpu_client) = tpu_client {
|
||||
!tpu_client
|
||||
.send_wire_transaction(serialized_transaction.clone())
|
||||
.await
|
||||
} else {
|
||||
true
|
||||
};
|
||||
if send_over_rpc {
|
||||
if let Err(e) = rpc_client.send_transaction(&transaction).await {
|
||||
match &e.kind {
|
||||
ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
|
||||
// fall through on io error, we will retry the transaction
|
||||
}
|
||||
ErrorKind::TransactionError(transaction_error) => {
|
||||
context.error_map.insert(*index, transaction_error.clone());
|
||||
continue;
|
||||
}
|
||||
ErrorKind::RpcError(rpc_error) => {
|
||||
if let RpcError::RpcResponseError {
|
||||
data:
|
||||
RpcResponseErrorData::SendTransactionPreflightFailure(simulation_result),
|
||||
..
|
||||
} = rpc_error
|
||||
{
|
||||
if let Some(transaction_error) = &simulation_result.err {
|
||||
context.error_map.insert(*index, transaction_error.clone());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return Err(TpuSenderError::from(e));
|
||||
}
|
||||
_ => {
|
||||
return Err(TpuSenderError::from(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let signature = transaction.signatures[0];
|
||||
// send to confirm the transaction
|
||||
context.unconfirmed_transasction_map.insert(
|
||||
signature,
|
||||
TransactionData {
|
||||
index: *index,
|
||||
serialized_transaction,
|
||||
last_valid_blockheight: blockhashdata.last_valid_blockheight,
|
||||
message: message.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
if let Some(progress_bar) = progress_bar {
|
||||
let progress = progress_from_context_and_block_height(
|
||||
futures.push(
|
||||
send_transaction_with_rpc_fallback(
|
||||
rpc_client,
|
||||
tpu_client,
|
||||
transaction,
|
||||
serialized_transaction.clone(),
|
||||
context,
|
||||
blockhashdata.last_valid_blockheight,
|
||||
);
|
||||
progress.set_message_for_confirmed_transactions(
|
||||
progress_bar,
|
||||
&format!(
|
||||
"Sending {}/{} transactions",
|
||||
counter + 1,
|
||||
current_transaction_count,
|
||||
),
|
||||
);
|
||||
}
|
||||
*index,
|
||||
counter,
|
||||
)
|
||||
.and_then(move |_| async move {
|
||||
// send to confirm the transaction
|
||||
context.unconfirmed_transasction_map.insert(
|
||||
signature,
|
||||
TransactionData {
|
||||
index: *index,
|
||||
serialized_transaction,
|
||||
last_valid_block_height: blockhashdata.last_valid_block_height,
|
||||
message: message.clone(),
|
||||
},
|
||||
);
|
||||
if let Some(progress_bar) = progress_bar {
|
||||
let progress = progress_from_context_and_block_height(
|
||||
context,
|
||||
blockhashdata.last_valid_block_height,
|
||||
);
|
||||
progress.set_message_for_confirmed_transactions(
|
||||
progress_bar,
|
||||
&format!(
|
||||
"Sending {}/{} transactions",
|
||||
counter + 1,
|
||||
current_transaction_count,
|
||||
),
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}),
|
||||
);
|
||||
}
|
||||
// collect to convert Vec<Result<_>> to Result<Vec<_>>
|
||||
join_all(futures).await.into_iter().collect::<Result<_>>()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -275,7 +306,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
|
|||
let transactions_to_confirm = unconfirmed_transasction_map.len();
|
||||
let max_valid_block_height = unconfirmed_transasction_map
|
||||
.iter()
|
||||
.map(|x| x.last_valid_blockheight)
|
||||
.map(|x| x.last_valid_block_height)
|
||||
.max();
|
||||
|
||||
if let Some(mut max_valid_block_height) = max_valid_block_height {
|
||||
|
@ -293,7 +324,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
|
|||
while !unconfirmed_transasction_map.is_empty()
|
||||
&& current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
|
||||
{
|
||||
let blockheight = current_block_height.load(Ordering::Relaxed);
|
||||
let block_height = current_block_height.load(Ordering::Relaxed);
|
||||
|
||||
if let Some(progress_bar) = progress_bar {
|
||||
let progress =
|
||||
|
@ -310,7 +341,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
|
|||
// any transactions sent over RPC will be automatically rebroadcast by the RPC server
|
||||
let txs_to_resend_over_tpu = unconfirmed_transasction_map
|
||||
.iter()
|
||||
.filter(|x| blockheight < x.last_valid_blockheight)
|
||||
.filter(|x| block_height < x.last_valid_block_height)
|
||||
.map(|x| x.serialized_transaction.clone())
|
||||
.collect();
|
||||
let _ = tpu_client
|
||||
|
@ -327,7 +358,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
|
|||
if let Some(max_valid_block_height_in_remaining_transaction) =
|
||||
unconfirmed_transasction_map
|
||||
.iter()
|
||||
.map(|x| x.last_valid_blockheight)
|
||||
.map(|x| x.last_valid_block_height)
|
||||
.max()
|
||||
{
|
||||
max_valid_block_height = max_valid_block_height_in_remaining_transaction;
|
||||
|
@ -344,9 +375,11 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
|
|||
}
|
||||
}
|
||||
|
||||
/// This is a new method which will be able to send and confirm a large amount of transactions
|
||||
/// Sends and confirms transactions concurrently
|
||||
///
|
||||
/// The sending and confirmation of transactions is done in parallel tasks
|
||||
/// The signer sign the transaction just before sending so that blockhash is not expired
|
||||
/// The method signs transactions just before sending so that blockhash does not
|
||||
/// expire.
|
||||
pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
tpu_client: Option<QuicTpuClient>,
|
||||
|
@ -355,12 +388,12 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
|
|||
config: SendAndConfirmConfig,
|
||||
) -> Result<Vec<Option<TransactionError>>> {
|
||||
// get current blockhash and corresponding last valid block height
|
||||
let (blockhash, last_valid_blockheight) = rpc_client
|
||||
let (blockhash, last_valid_block_height) = rpc_client
|
||||
.get_latest_blockhash_with_commitment(rpc_client.commitment())
|
||||
.await?;
|
||||
let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
|
||||
blockhash,
|
||||
last_valid_blockheight,
|
||||
last_valid_block_height,
|
||||
}));
|
||||
|
||||
// check if all the messages are signable by the signers
|
||||
|
@ -372,7 +405,7 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
|
|||
})
|
||||
.collect::<std::result::Result<Vec<()>, SignerError>>()?;
|
||||
|
||||
// get current blockheight
|
||||
// get current block height
|
||||
let block_height = rpc_client.get_block_height().await?;
|
||||
let current_block_height = Arc::new(AtomicU64::new(block_height));
|
||||
|
||||
|
@ -382,7 +415,7 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
|
|||
progress_bar
|
||||
});
|
||||
|
||||
// blockhash and blockheight update task
|
||||
// blockhash and block height update task
|
||||
let block_data_task = create_blockhash_data_updating_task(
|
||||
rpc_client.clone(),
|
||||
blockhash_data_rw.clone(),
|
||||
|
@ -436,7 +469,7 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
|
|||
|
||||
sign_all_messages_and_send(
|
||||
&progress_bar,
|
||||
rpc_client.clone(),
|
||||
&rpc_client,
|
||||
&tpu_client,
|
||||
messages_with_index,
|
||||
signers,
|
||||
|
|
|
@ -312,7 +312,6 @@ fn timeout_future<'a, Fut: Future<Output = TransportResult<()>> + 'a>(
|
|||
) -> impl Future<Output = TransportResult<()>> + 'a {
|
||||
timeout(timeout_duration, future)
|
||||
.unwrap_or_else(|_| Err(TransportError::Custom("Timed out".to_string())))
|
||||
.boxed_local()
|
||||
}
|
||||
|
||||
#[cfg(feature = "spinner")]
|
||||
|
@ -331,7 +330,6 @@ async fn sleep_and_set_message(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "spinner")]
|
||||
async fn sleep_and_send_wire_transaction_to_addr<P, M, C>(
|
||||
sleep_duration: Duration,
|
||||
connection_cache: &ConnectionCache<P, M, C>,
|
||||
|
@ -343,8 +341,7 @@ where
|
|||
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
|
||||
{
|
||||
sleep(sleep_duration).await;
|
||||
let conn = connection_cache.get_nonblocking_connection(&addr);
|
||||
conn.send_data(&wire_transaction).await
|
||||
send_wire_transaction_to_addr(connection_cache, &addr, wire_transaction).await
|
||||
}
|
||||
|
||||
async fn send_wire_transaction_to_addr<P, M, C>(
|
||||
|
|
Loading…
Reference in New Issue