From e1972f07fa318977543e87eeb2b43497422535fa Mon Sep 17 00:00:00 2001 From: Jon Cinque Date: Wed, 23 Aug 2023 11:46:17 +0200 Subject: [PATCH] rpc-client: Encapsulate `set_message_for_confirmed_transactions` (#32941) --- ...nd_and_confirm_transactions_in_parallel.rs | 66 +++++++++---------- rpc-client/src/spinner.rs | 23 ++++++- tpu-client/src/nonblocking/tpu_client.rs | 62 ++++------------- 3 files changed, 68 insertions(+), 83 deletions(-) diff --git a/client/src/send_and_confirm_transactions_in_parallel.rs b/client/src/send_and_confirm_transactions_in_parallel.rs index 322d9e4a83..fa1dcc7733 100644 --- a/client/src/send_and_confirm_transactions_in_parallel.rs +++ b/client/src/send_and_confirm_transactions_in_parallel.rs @@ -6,7 +6,7 @@ use { bincode::serialize, dashmap::DashMap, solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, - solana_rpc_client::spinner, + solana_rpc_client::spinner::{self, SendTransactionProgress}, solana_rpc_client_api::{ client_error::ErrorKind, request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS}, @@ -18,13 +18,10 @@ use { signers::Signers, transaction::{Transaction, TransactionError}, }, - solana_tpu_client::{ - nonblocking::tpu_client::set_message_for_confirmed_transactions, - tpu_client::{Result, TpuSenderError}, - }, + solana_tpu_client::tpu_client::{Result, TpuSenderError}, std::{ sync::{ - atomic::{AtomicU32, AtomicU64, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, time::Duration, @@ -103,7 +100,7 @@ fn create_transaction_confirmation_task( current_block_height: Arc, unconfirmed_transasction_map: Arc>, errors_map: Arc>, - num_confirmed_transactions: Arc, + num_confirmed_transactions: Arc, ) -> JoinHandle<()> { tokio::spawn(async move { // check transactions that are not expired or have just expired between two checks @@ -160,10 +157,25 @@ struct SendingContext { unconfirmed_transasction_map: Arc>, error_map: Arc>, blockhash_data_rw: Arc>, - num_confirmed_transactions: Arc, + num_confirmed_transactions: Arc, total_transactions: usize, current_block_height: Arc, } +fn progress_from_context_and_block_height( + context: &SendingContext, + last_valid_block_height: u64, +) -> SendTransactionProgress { + SendTransactionProgress { + confirmed_transactions: context + .num_confirmed_transactions + .load(std::sync::atomic::Ordering::Relaxed), + total_transactions: context.total_transactions, + block_height: context + .current_block_height + .load(std::sync::atomic::Ordering::Relaxed), + last_valid_block_height, + } +} async fn sign_all_messages_and_send( progress_bar: &Option, @@ -235,14 +247,12 @@ async fn sign_all_messages_and_send( ); if let Some(progress_bar) = progress_bar { - set_message_for_confirmed_transactions( - progress_bar, - context - .num_confirmed_transactions - .load(std::sync::atomic::Ordering::Relaxed), - context.total_transactions, - None, + let progress = progress_from_context_and_block_height( + context, blockhashdata.last_valid_blockheight, + ); + progress.set_message_for_confirmed_transactions( + progress_bar, &format!( "Sending {}/{} transactions", counter + 1, @@ -260,9 +270,7 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction context: &SendingContext, ) { let unconfirmed_transasction_map = context.unconfirmed_transasction_map.clone(); - let num_confirmed_transactions = context.num_confirmed_transactions.clone(); let current_block_height = context.current_block_height.clone(); - let total_transactions = context.total_transactions; let transactions_to_confirm = unconfirmed_transasction_map.len(); let max_valid_block_height = unconfirmed_transasction_map @@ -272,12 +280,9 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction if let Some(mut max_valid_block_height) = max_valid_block_height { if let Some(progress_bar) = progress_bar { - set_message_for_confirmed_transactions( + let progress = progress_from_context_and_block_height(context, max_valid_block_height); + progress.set_message_for_confirmed_transactions( progress_bar, - num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed), - total_transactions, - Some(current_block_height.load(Ordering::Relaxed)), - max_valid_block_height, &format!( "Waiting for next block, {transactions_to_confirm} transactions pending..." ), @@ -291,12 +296,10 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction let blockheight = current_block_height.load(Ordering::Relaxed); if let Some(progress_bar) = progress_bar { - set_message_for_confirmed_transactions( + let progress = + progress_from_context_and_block_height(context, max_valid_block_height); + progress.set_message_for_confirmed_transactions( progress_bar, - num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed), - total_transactions, - Some(blockheight), - max_valid_block_height, "Checking transaction status...", ); } @@ -332,12 +335,9 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction } if let Some(progress_bar) = progress_bar { - set_message_for_confirmed_transactions( + let progress = progress_from_context_and_block_height(context, max_valid_block_height); + progress.set_message_for_confirmed_transactions( progress_bar, - num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed), - total_transactions, - Some(current_block_height.load(Ordering::Relaxed)), - max_valid_block_height, "Checking transaction status...", ); } @@ -391,7 +391,7 @@ pub async fn send_and_confirm_transactions_in_parallel( let unconfirmed_transasction_map = Arc::new(DashMap::::new()); let error_map = Arc::new(DashMap::new()); - let num_confirmed_transactions = Arc::new(AtomicU32::new(0)); + let num_confirmed_transactions = Arc::new(AtomicUsize::new(0)); // tasks which confirms the transactions that were sent let transaction_confirming_task = create_transaction_confirmation_task( rpc_client.clone(), diff --git a/rpc-client/src/spinner.rs b/rpc-client/src/spinner.rs index 49a135956b..ecf6c947a8 100644 --- a/rpc-client/src/spinner.rs +++ b/rpc-client/src/spinner.rs @@ -14,8 +14,29 @@ pub fn new_progress_bar() -> ProgressBar { progress_bar.set_style( ProgressStyle::default_spinner() .template("{spinner:.green} {wide_msg}") - .expect("ProgresStyle::template direct input to be correct"), + .expect("ProgressStyle::template direct input to be correct"), ); progress_bar.enable_steady_tick(Duration::from_millis(100)); progress_bar } + +#[derive(Debug, Default)] +pub struct SendTransactionProgress { + pub confirmed_transactions: usize, + pub total_transactions: usize, + pub block_height: u64, + pub last_valid_block_height: u64, +} + +impl SendTransactionProgress { + pub fn set_message_for_confirmed_transactions(&self, progress_bar: &ProgressBar, status: &str) { + progress_bar.set_message(format!( + "{:>5.1}% | {:<40} [block height {}; re-sign in {} blocks]", + self.confirmed_transactions as f64 * 100. / self.total_transactions as f64, + status, + self.block_height, + self.last_valid_block_height + .saturating_sub(self.block_height), + )); + } +} diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 7f98c1d0ed..2ddc1cfce2 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -45,36 +45,11 @@ use { #[cfg(feature = "spinner")] use { crate::tpu_client::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL}, - indicatif::ProgressBar, - solana_rpc_client::spinner, + solana_rpc_client::spinner::{self, SendTransactionProgress}, solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, solana_sdk::{message::Message, signers::Signers, transaction::TransactionError}, }; -#[cfg(feature = "spinner")] -pub fn set_message_for_confirmed_transactions( - progress_bar: &ProgressBar, - confirmed_transactions: u32, - total_transactions: usize, - block_height: Option, - last_valid_block_height: u64, - status: &str, -) { - progress_bar.set_message(format!( - "{:>5.1}% | {:<40}{}", - confirmed_transactions as f64 * 100. / total_transactions as f64, - status, - match block_height { - Some(block_height) => format!( - " [block height {}; re-sign in {} blocks]", - block_height, - last_valid_block_height.saturating_sub(block_height), - ), - None => String::new(), - }, - )); -} - #[derive(Error, Debug)] pub enum TpuSenderError { #[error("Pubsub error: {0:?}")] @@ -453,6 +428,7 @@ where messages: &[Message], signers: &T, ) -> Result>> { + let mut progress = SendTransactionProgress::default(); let progress_bar = spinner::new_progress_bar(); progress_bar.set_message("Setting up..."); @@ -461,15 +437,15 @@ where .enumerate() .map(|(i, message)| (i, Transaction::new_unsigned(message.clone()))) .collect::>(); - let total_transactions = transactions.len(); + progress.total_transactions = transactions.len(); let mut transaction_errors = vec![None; transactions.len()]; - let mut confirmed_transactions = 0; - let mut block_height = self.rpc_client.get_block_height().await?; + progress.block_height = self.rpc_client.get_block_height().await?; for expired_blockhash_retries in (0..5).rev() { let (blockhash, last_valid_block_height) = self .rpc_client .get_latest_blockhash_with_commitment(self.rpc_client.commitment()) .await?; + progress.last_valid_block_height = last_valid_block_height; let mut pending_transactions = HashMap::new(); for (i, mut transaction) in transactions { @@ -478,7 +454,7 @@ where } let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL; - while block_height <= last_valid_block_height { + while progress.block_height <= progress.last_valid_block_height { let num_transactions = pending_transactions.len(); // Periodically re-send all pending transactions @@ -487,12 +463,8 @@ where if !self.send_transaction(transaction).await { let _result = self.rpc_client.send_transaction(transaction).await.ok(); } - set_message_for_confirmed_transactions( + progress.set_message_for_confirmed_transactions( &progress_bar, - confirmed_transactions, - total_transactions, - None, //block_height, - last_valid_block_height, &format!("Sending {}/{} transactions", index + 1, num_transactions,), ); sleep(SEND_TRANSACTION_INTERVAL).await; @@ -502,21 +474,17 @@ where // Wait for the next block before checking for transaction statuses let mut block_height_refreshes = 10; - set_message_for_confirmed_transactions( + progress.set_message_for_confirmed_transactions( &progress_bar, - confirmed_transactions, - total_transactions, - Some(block_height), - last_valid_block_height, &format!("Waiting for next block, {num_transactions} transactions pending..."), ); - let mut new_block_height = block_height; - while block_height == new_block_height && block_height_refreshes > 0 { + let mut new_block_height = progress.block_height; + while progress.block_height == new_block_height && block_height_refreshes > 0 { sleep(Duration::from_millis(500)).await; new_block_height = self.rpc_client.get_block_height().await?; block_height_refreshes -= 1; } - block_height = new_block_height; + progress.block_height = new_block_height; // Collect statuses for the transactions, drop those that are confirmed let pending_signatures = pending_transactions.keys().cloned().collect::>(); @@ -535,7 +503,7 @@ where if let Some(status) = status { if status.satisfies_commitment(self.rpc_client.commitment()) { if let Some((i, _)) = pending_transactions.remove(signature) { - confirmed_transactions += 1; + progress.confirmed_transactions += 1; if status.err.is_some() { progress_bar .println(format!("Failed transaction: {status:?}")); @@ -546,12 +514,8 @@ where } } } - set_message_for_confirmed_transactions( + progress.set_message_for_confirmed_transactions( &progress_bar, - confirmed_transactions, - total_transactions, - Some(block_height), - last_valid_block_height, "Checking transaction status...", ); }