diff --git a/client/src/send_and_confirm_transactions_in_parallel.rs b/client/src/send_and_confirm_transactions_in_parallel.rs index fa1dcc7733..70f79eba19 100644 --- a/client/src/send_and_confirm_transactions_in_parallel.rs +++ b/client/src/send_and_confirm_transactions_in_parallel.rs @@ -5,11 +5,13 @@ use { }, bincode::serialize, dashmap::DashMap, + futures_util::future::{join_all, TryFutureExt}, solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_rpc_client::spinner::{self, SendTransactionProgress}, solana_rpc_client_api::{ client_error::ErrorKind, request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS}, + response::RpcSimulateTransactionResult, }, solana_sdk::{ hash::Hash, @@ -31,11 +33,12 @@ use { const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10); const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2); +const SEND_INTERVAL: Duration = Duration::from_millis(10); type QuicTpuClient = TpuClient; #[derive(Clone, Debug)] struct TransactionData { - last_valid_blockheight: u64, + last_valid_block_height: u64, message: Message, index: usize, serialized_transaction: Vec, @@ -44,7 +47,7 @@ struct TransactionData { #[derive(Clone, Debug, Copy)] struct BlockHashData { pub blockhash: Hash, - pub last_valid_blockheight: u64, + pub last_valid_block_height: u64, } #[derive(Clone, Debug, Copy)] @@ -53,6 +56,7 @@ pub struct SendAndConfirmConfig { pub resign_txs_count: Option, } +/// Sends and confirms transactions concurrently in a sync context pub fn send_and_confirm_transactions_in_parallel_blocking( rpc_client: Arc, tpu_client: Option, @@ -77,18 +81,18 @@ fn create_blockhash_data_updating_task( ) -> JoinHandle<()> { tokio::spawn(async move { loop { - if let Ok((blockhash, last_valid_blockheight)) = rpc_client + if let Ok((blockhash, last_valid_block_height)) = rpc_client .get_latest_blockhash_with_commitment(rpc_client.commitment()) .await { *blockhash_data_rw.write().await = BlockHashData { blockhash, - last_valid_blockheight, + last_valid_block_height, }; } - if let Ok(blockheight) = rpc_client.get_block_height().await { - current_block_height.store(blockheight, Ordering::Relaxed); + if let Ok(block_height) = rpc_client.get_block_height().await { + current_block_height.store(block_height, Ordering::Relaxed); } tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await; } @@ -112,10 +116,10 @@ fn create_transaction_confirmation_task( let transactions_to_verify: Vec = unconfirmed_transasction_map .iter() .filter(|x| { - let is_not_expired = current_block_height <= x.last_valid_blockheight; + let is_not_expired = current_block_height <= x.last_valid_block_height; // transaction expired between last and current check - let is_recently_expired = last_block_height <= x.last_valid_blockheight - && current_block_height > x.last_valid_blockheight; + let is_recently_expired = last_block_height <= x.last_valid_block_height + && current_block_height > x.last_valid_block_height; is_not_expired || is_recently_expired }) .map(|x| *x.key()) @@ -177,15 +181,65 @@ fn progress_from_context_and_block_height( } } +async fn send_transaction_with_rpc_fallback( + rpc_client: &RpcClient, + tpu_client: &Option, + transaction: Transaction, + serialized_transaction: Vec, + context: &SendingContext, + index: usize, + counter: usize, +) -> Result<()> { + tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await; + let send_over_rpc = if let Some(tpu_client) = tpu_client { + !tpu_client + .send_wire_transaction(serialized_transaction.clone()) + .await + } else { + true + }; + if send_over_rpc { + if let Err(e) = rpc_client.send_transaction(&transaction).await { + match &e.kind { + ErrorKind::Io(_) | ErrorKind::Reqwest(_) => { + // fall through on io error, we will retry the transaction + } + ErrorKind::TransactionError(transaction_error) => { + context.error_map.insert(index, transaction_error.clone()); + return Ok(()); + } + ErrorKind::RpcError(RpcError::RpcResponseError { + data: + RpcResponseErrorData::SendTransactionPreflightFailure( + RpcSimulateTransactionResult { + err: Some(transaction_error), + .. + }, + ), + .. + }) => { + context.error_map.insert(index, transaction_error.clone()); + return Ok(()); + } + _ => { + return Err(TpuSenderError::from(e)); + } + } + } + } + Ok(()) +} + async fn sign_all_messages_and_send( progress_bar: &Option, - rpc_client: Arc, + rpc_client: &RpcClient, tpu_client: &Option, messages_with_index: Vec<(usize, Message)>, signers: &T, context: &SendingContext, ) -> Result<()> { let current_transaction_count = messages_with_index.len(); + let mut futures = vec![]; // send all the transaction messages for (counter, (index, message)) in messages_with_index.iter().enumerate() { let mut transaction = Transaction::new_unsigned(message.clone()); @@ -196,71 +250,48 @@ async fn sign_all_messages_and_send( .try_sign(signers, blockhashdata.blockhash) .expect("Transaction should be signable"); let serialized_transaction = serialize(&transaction).expect("Transaction should serailize"); - let send_over_rpc = if let Some(tpu_client) = tpu_client { - !tpu_client - .send_wire_transaction(serialized_transaction.clone()) - .await - } else { - true - }; - if send_over_rpc { - if let Err(e) = rpc_client.send_transaction(&transaction).await { - match &e.kind { - ErrorKind::Io(_) | ErrorKind::Reqwest(_) => { - // fall through on io error, we will retry the transaction - } - ErrorKind::TransactionError(transaction_error) => { - context.error_map.insert(*index, transaction_error.clone()); - continue; - } - ErrorKind::RpcError(rpc_error) => { - if let RpcError::RpcResponseError { - data: - RpcResponseErrorData::SendTransactionPreflightFailure(simulation_result), - .. - } = rpc_error - { - if let Some(transaction_error) = &simulation_result.err { - context.error_map.insert(*index, transaction_error.clone()); - continue; - } - } - return Err(TpuSenderError::from(e)); - } - _ => { - return Err(TpuSenderError::from(e)); - } - } - } - } - let signature = transaction.signatures[0]; - // send to confirm the transaction - context.unconfirmed_transasction_map.insert( - signature, - TransactionData { - index: *index, - serialized_transaction, - last_valid_blockheight: blockhashdata.last_valid_blockheight, - message: message.clone(), - }, - ); - - if let Some(progress_bar) = progress_bar { - let progress = progress_from_context_and_block_height( + futures.push( + send_transaction_with_rpc_fallback( + rpc_client, + tpu_client, + transaction, + serialized_transaction.clone(), context, - blockhashdata.last_valid_blockheight, - ); - progress.set_message_for_confirmed_transactions( - progress_bar, - &format!( - "Sending {}/{} transactions", - counter + 1, - current_transaction_count, - ), - ); - } + *index, + counter, + ) + .and_then(move |_| async move { + // send to confirm the transaction + context.unconfirmed_transasction_map.insert( + signature, + TransactionData { + index: *index, + serialized_transaction, + last_valid_block_height: blockhashdata.last_valid_block_height, + message: message.clone(), + }, + ); + if let Some(progress_bar) = progress_bar { + let progress = progress_from_context_and_block_height( + context, + blockhashdata.last_valid_block_height, + ); + progress.set_message_for_confirmed_transactions( + progress_bar, + &format!( + "Sending {}/{} transactions", + counter + 1, + current_transaction_count, + ), + ); + } + Ok(()) + }), + ); } + // collect to convert Vec> to Result> + join_all(futures).await.into_iter().collect::>()?; Ok(()) } @@ -275,7 +306,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction let transactions_to_confirm = unconfirmed_transasction_map.len(); let max_valid_block_height = unconfirmed_transasction_map .iter() - .map(|x| x.last_valid_blockheight) + .map(|x| x.last_valid_block_height) .max(); if let Some(mut max_valid_block_height) = max_valid_block_height { @@ -293,7 +324,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction while !unconfirmed_transasction_map.is_empty() && current_block_height.load(Ordering::Relaxed) <= max_valid_block_height { - let blockheight = current_block_height.load(Ordering::Relaxed); + let block_height = current_block_height.load(Ordering::Relaxed); if let Some(progress_bar) = progress_bar { let progress = @@ -310,7 +341,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction // any transactions sent over RPC will be automatically rebroadcast by the RPC server let txs_to_resend_over_tpu = unconfirmed_transasction_map .iter() - .filter(|x| blockheight < x.last_valid_blockheight) + .filter(|x| block_height < x.last_valid_block_height) .map(|x| x.serialized_transaction.clone()) .collect(); let _ = tpu_client @@ -327,7 +358,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction if let Some(max_valid_block_height_in_remaining_transaction) = unconfirmed_transasction_map .iter() - .map(|x| x.last_valid_blockheight) + .map(|x| x.last_valid_block_height) .max() { max_valid_block_height = max_valid_block_height_in_remaining_transaction; @@ -344,9 +375,11 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction } } -/// This is a new method which will be able to send and confirm a large amount of transactions +/// Sends and confirms transactions concurrently +/// /// The sending and confirmation of transactions is done in parallel tasks -/// The signer sign the transaction just before sending so that blockhash is not expired +/// The method signs transactions just before sending so that blockhash does not +/// expire. pub async fn send_and_confirm_transactions_in_parallel( rpc_client: Arc, tpu_client: Option, @@ -355,12 +388,12 @@ pub async fn send_and_confirm_transactions_in_parallel( config: SendAndConfirmConfig, ) -> Result>> { // get current blockhash and corresponding last valid block height - let (blockhash, last_valid_blockheight) = rpc_client + let (blockhash, last_valid_block_height) = rpc_client .get_latest_blockhash_with_commitment(rpc_client.commitment()) .await?; let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData { blockhash, - last_valid_blockheight, + last_valid_block_height, })); // check if all the messages are signable by the signers @@ -372,7 +405,7 @@ pub async fn send_and_confirm_transactions_in_parallel( }) .collect::, SignerError>>()?; - // get current blockheight + // get current block height let block_height = rpc_client.get_block_height().await?; let current_block_height = Arc::new(AtomicU64::new(block_height)); @@ -382,7 +415,7 @@ pub async fn send_and_confirm_transactions_in_parallel( progress_bar }); - // blockhash and blockheight update task + // blockhash and block height update task let block_data_task = create_blockhash_data_updating_task( rpc_client.clone(), blockhash_data_rw.clone(), @@ -436,7 +469,7 @@ pub async fn send_and_confirm_transactions_in_parallel( sign_all_messages_and_send( &progress_bar, - rpc_client.clone(), + &rpc_client, &tpu_client, messages_with_index, signers, diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 79c3652000..ea1bb98a56 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -312,7 +312,6 @@ fn timeout_future<'a, Fut: Future> + 'a>( ) -> impl Future> + 'a { timeout(timeout_duration, future) .unwrap_or_else(|_| Err(TransportError::Custom("Timed out".to_string()))) - .boxed_local() } #[cfg(feature = "spinner")] @@ -331,7 +330,6 @@ async fn sleep_and_set_message( Ok(()) } -#[cfg(feature = "spinner")] async fn sleep_and_send_wire_transaction_to_addr( sleep_duration: Duration, connection_cache: &ConnectionCache, @@ -343,8 +341,7 @@ where M: ConnectionManager, { sleep(sleep_duration).await; - let conn = connection_cache.get_nonblocking_connection(&addr); - conn.send_data(&wire_transaction).await + send_wire_transaction_to_addr(connection_cache, &addr, wire_transaction).await } async fn send_wire_transaction_to_addr(