From dbac50dcc89102aa2e6b7b54b10628a2f3a9ba96 Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Tue, 25 Jul 2023 20:08:01 +0200 Subject: [PATCH] New method to send large number of transactions to the cluster (#32388) * Adding a method which sends and confirm a large number of transactions in parallel and using it to deploy programs * Using the new method to deploy programs * Minor changes, break when transaction map is empty * Updating cargo for all lock files * Sorting dependencies * Changes after tnelson's comment * confirm recently expired transactions and improve tracking of transaction sending * Minor changes * more changes after tnelson's comments * Adding serialized transaction in TransactionData * Update client/src/send_and_confirm_transactions_in_parallel.rs Co-authored-by: Trent Nelson * Update client/src/send_and_confirm_transactions_in_parallel.rs Co-authored-by: Trent Nelson * nit changes from trent * Fixing kirills comments, adding some unit tests * fixing ci, and minor change * addressing Kirills nit comments --------- Co-authored-by: Trent Nelson --- Cargo.lock | 2 + client-test/Cargo.toml | 1 + ...nd_and_confrim_transactions_in_parallel.rs | 139 +++++ client/Cargo.toml | 1 + client/src/lib.rs | 1 + ...nd_and_confirm_transactions_in_parallel.rs | 477 ++++++++++++++++++ programs/sbf/Cargo.lock | 1 + tpu-client/src/nonblocking/tpu_client.rs | 2 +- 8 files changed, 623 insertions(+), 1 deletion(-) create mode 100644 client-test/tests/send_and_confrim_transactions_in_parallel.rs create mode 100644 client/src/send_and_confirm_transactions_in_parallel.rs diff --git a/Cargo.lock b/Cargo.lock index 2df926ce08..e62c058efe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5535,6 +5535,7 @@ dependencies = [ "async-trait", "bincode", "crossbeam-channel", + "dashmap 4.0.2", "futures 0.3.28", "futures-util", "indexmap 2.0.0", @@ -5567,6 +5568,7 @@ version = "1.17.0" dependencies = [ "futures-util", "serde_json", + "solana-client", "solana-ledger", "solana-logger", "solana-measure", diff --git a/client-test/Cargo.toml b/client-test/Cargo.toml index d069f59711..ac6fabf0a2 100644 --- a/client-test/Cargo.toml +++ b/client-test/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } [dependencies] futures-util = { workspace = true } serde_json = { workspace = true } +solana-client = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-merkle-tree = { workspace = true } diff --git a/client-test/tests/send_and_confrim_transactions_in_parallel.rs b/client-test/tests/send_and_confrim_transactions_in_parallel.rs new file mode 100644 index 0000000000..a2ee3291f4 --- /dev/null +++ b/client-test/tests/send_and_confrim_transactions_in_parallel.rs @@ -0,0 +1,139 @@ +use { + solana_client::{ + nonblocking::tpu_client::TpuClient, + send_and_confirm_transactions_in_parallel::{ + send_and_confirm_transactions_in_parallel_blocking, SendAndConfrimConfig, + }, + }, + solana_rpc_client::rpc_client::RpcClient, + solana_sdk::{ + commitment_config::CommitmentConfig, message::Message, native_token::sol_to_lamports, + pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction, + }, + solana_streamer::socket::SocketAddrSpace, + solana_test_validator::TestValidator, + std::sync::Arc, +}; + +const NUM_TRANSACTIONS: usize = 1000; + +fn create_messages(from: Pubkey, to: Pubkey) -> (Vec, f64) { + let mut messages = vec![]; + let mut sum = 0.0; + for i in 1..NUM_TRANSACTIONS { + let amount_to_transfer = i as f64; + let ix = system_instruction::transfer(&from, &to, sol_to_lamports(amount_to_transfer)); + let message = Message::new(&[ix], Some(&from)); + messages.push(message); + sum += amount_to_transfer; + } + (messages, sum) +} + +#[test] +fn test_send_and_confirm_transactions_in_parallel_without_tpu_client() { + solana_logger::setup(); + + let alice = Keypair::new(); + let test_validator = + TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified); + + let bob_pubkey = solana_sdk::pubkey::new_rand(); + let alice_pubkey = alice.pubkey(); + + let rpc_client = Arc::new(RpcClient::new(test_validator.rpc_url())); + + assert_eq!( + rpc_client.get_version().unwrap().solana_core, + solana_version::semver!() + ); + + let original_alice_balance = rpc_client.get_balance(&alice.pubkey()).unwrap(); + let (messages, sum) = create_messages(alice_pubkey, bob_pubkey); + + let txs_errors = send_and_confirm_transactions_in_parallel_blocking( + rpc_client.clone(), + None, + &messages, + &[&alice], + SendAndConfrimConfig { + with_spinner: false, + resign_txs_count: Some(5), + }, + ); + assert!(txs_errors.is_ok()); + assert!(txs_errors.unwrap().iter().all(|x| x.is_none())); + + assert_eq!( + rpc_client + .get_balance_with_commitment(&bob_pubkey, CommitmentConfig::processed()) + .unwrap() + .value, + sol_to_lamports(sum) + ); + assert_eq!( + rpc_client + .get_balance_with_commitment(&alice_pubkey, CommitmentConfig::processed()) + .unwrap() + .value, + original_alice_balance - sol_to_lamports(sum) + ); +} + +#[test] +fn test_send_and_confirm_transactions_in_parallel_with_tpu_client() { + solana_logger::setup(); + + let alice = Keypair::new(); + let test_validator = + TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified); + + let bob_pubkey = solana_sdk::pubkey::new_rand(); + let alice_pubkey = alice.pubkey(); + + let rpc_client = Arc::new(RpcClient::new(test_validator.rpc_url())); + + assert_eq!( + rpc_client.get_version().unwrap().solana_core, + solana_version::semver!() + ); + + let original_alice_balance = rpc_client.get_balance(&alice.pubkey()).unwrap(); + let (messages, sum) = create_messages(alice_pubkey, bob_pubkey); + let ws_url = test_validator.rpc_pubsub_url(); + let tpu_client_fut = TpuClient::new( + "temp", + rpc_client.get_inner_client().clone(), + ws_url.as_str(), + solana_client::tpu_client::TpuClientConfig::default(), + ); + let tpu_client = rpc_client.runtime().block_on(tpu_client_fut).unwrap(); + + let txs_errors = send_and_confirm_transactions_in_parallel_blocking( + rpc_client.clone(), + Some(tpu_client), + &messages, + &[&alice], + SendAndConfrimConfig { + with_spinner: false, + resign_txs_count: Some(5), + }, + ); + assert!(txs_errors.is_ok()); + assert!(txs_errors.unwrap().iter().all(|x| x.is_none())); + + assert_eq!( + rpc_client + .get_balance_with_commitment(&bob_pubkey, CommitmentConfig::processed()) + .unwrap() + .value, + sol_to_lamports(sum) + ); + assert_eq!( + rpc_client + .get_balance_with_commitment(&alice_pubkey, CommitmentConfig::processed()) + .unwrap() + .value, + original_alice_balance - sol_to_lamports(sum) + ); +} diff --git a/client/Cargo.toml b/client/Cargo.toml index c246a9d2a6..abb0307bb3 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -12,6 +12,7 @@ edition = { workspace = true } [dependencies] async-trait = { workspace = true } bincode = { workspace = true } +dashmap = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } indexmap = { workspace = true } diff --git a/client/src/lib.rs b/client/src/lib.rs index 3703d3438e..1148d9f82a 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -3,6 +3,7 @@ pub mod connection_cache; pub mod nonblocking; pub mod quic_client; +pub mod send_and_confirm_transactions_in_parallel; pub mod thin_client; pub mod tpu_client; pub mod tpu_connection; diff --git a/client/src/send_and_confirm_transactions_in_parallel.rs b/client/src/send_and_confirm_transactions_in_parallel.rs new file mode 100644 index 0000000000..50d42f3888 --- /dev/null +++ b/client/src/send_and_confirm_transactions_in_parallel.rs @@ -0,0 +1,477 @@ +use { + crate::{ + nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient}, + rpc_client::RpcClient as BlockingRpcClient, + }, + bincode::serialize, + dashmap::DashMap, + solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, + solana_rpc_client::spinner, + solana_rpc_client_api::{ + client_error::ErrorKind, + request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS}, + }, + solana_sdk::{ + hash::Hash, + message::Message, + signature::{Signature, SignerError}, + signers::Signers, + transaction::{Transaction, TransactionError}, + }, + solana_tpu_client::{ + nonblocking::tpu_client::set_message_for_confirmed_transactions, + tpu_client::{Result, TpuSenderError}, + }, + std::{ + sync::{ + atomic::{AtomicU32, AtomicU64, Ordering}, + Arc, + }, + time::Duration, + }, + tokio::{sync::RwLock, task::JoinHandle, time::Instant}, +}; + +const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10); +const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2); +type QuicTpuClient = TpuClient; + +#[derive(Clone, Debug)] +struct TransactionData { + last_valid_blockheight: u64, + message: Message, + index: usize, + serialized_transaction: Vec, +} + +#[derive(Clone, Debug, Copy)] +struct BlockHashData { + pub blockhash: Hash, + pub last_valid_blockheight: u64, +} + +#[derive(Clone, Debug, Copy)] +pub struct SendAndConfrimConfig { + pub with_spinner: bool, + pub resign_txs_count: Option, +} + +pub fn send_and_confirm_transactions_in_parallel_blocking( + rpc_client: Arc, + tpu_client: Option, + messages: &[Message], + signers: &T, + config: SendAndConfrimConfig, +) -> Result>> { + let fut = send_and_confirm_transactions_in_parallel( + rpc_client.get_inner_client().clone(), + tpu_client, + messages, + signers, + config, + ); + tokio::task::block_in_place(|| rpc_client.runtime().block_on(fut)) +} + +fn create_blockhash_data_updating_task( + rpc_client: Arc, + blockhash_data_rw: Arc>, + current_block_height: Arc, +) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + if let Ok((blockhash, last_valid_blockheight)) = rpc_client + .get_latest_blockhash_with_commitment(rpc_client.commitment()) + .await + { + *blockhash_data_rw.write().await = BlockHashData { + blockhash, + last_valid_blockheight, + }; + } + + if let Ok(blockheight) = rpc_client.get_block_height().await { + current_block_height.store(blockheight, Ordering::Relaxed); + } + tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await; + } + }) +} + +fn create_transaction_confirmation_task( + rpc_client: Arc, + current_block_height: Arc, + unconfirmed_transasction_map: Arc>, + errors_map: Arc>, + num_confirmed_transactions: Arc, +) -> JoinHandle<()> { + tokio::spawn(async move { + // check transactions that are not expired or have just expired between two checks + let mut last_block_height = current_block_height.load(Ordering::Relaxed); + + loop { + if !unconfirmed_transasction_map.is_empty() { + let current_block_height = current_block_height.load(Ordering::Relaxed); + let transactions_to_verify: Vec = unconfirmed_transasction_map + .iter() + .filter(|x| { + let is_not_expired = current_block_height <= x.last_valid_blockheight; + // transaction expired between last and current check + let is_recently_expired = last_block_height <= x.last_valid_blockheight + && current_block_height > x.last_valid_blockheight; + is_not_expired || is_recently_expired + }) + .map(|x| *x.key()) + .collect(); + for signatures in + transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS) + { + if let Ok(result) = rpc_client.get_signature_statuses(signatures).await { + let statuses = result.value; + for (signature, status) in signatures.iter().zip(statuses.into_iter()) { + if let Some((status, data)) = status + .filter(|status| { + status.satisfies_commitment(rpc_client.commitment()) + }) + .and_then(|status| { + unconfirmed_transasction_map + .remove(signature) + .map(|(_, data)| (status, data)) + }) + { + num_confirmed_transactions.fetch_add(1, Ordering::Relaxed); + if let Some(error) = status.err { + errors_map.insert(data.index, error); + } + }; + } + } + } + + last_block_height = current_block_height; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + }) +} + +#[derive(Clone, Debug)] +struct SendingContext { + unconfirmed_transasction_map: Arc>, + error_map: Arc>, + blockhash_data_rw: Arc>, + num_confirmed_transactions: Arc, + total_transactions: usize, + current_block_height: Arc, +} + +async fn sign_all_messages_and_send( + progress_bar: &Option, + rpc_client: Arc, + tpu_client: &Option, + messages_with_index: Vec<(usize, Message)>, + signers: &T, + context: &SendingContext, +) -> Result<()> { + let current_transaction_count = messages_with_index.len(); + // send all the transaction messages + for (counter, (index, message)) in messages_with_index.iter().enumerate() { + let mut transaction = Transaction::new_unsigned(message.clone()); + let blockhashdata = *context.blockhash_data_rw.read().await; + + // we have already checked if all transactions are signable. + transaction + .try_sign(signers, blockhashdata.blockhash) + .expect("Transaction should be signable"); + let serialized_transaction = serialize(&transaction).expect("Transaction should serailize"); + let send_over_rpc = if let Some(tpu_client) = tpu_client { + !tpu_client + .send_wire_transaction(serialized_transaction.clone()) + .await + } else { + true + }; + if send_over_rpc { + if let Err(e) = rpc_client.send_transaction(&transaction).await { + match &e.kind { + ErrorKind::Io(_) | ErrorKind::Reqwest(_) => { + // fall through on io error, we will retry the transaction + } + ErrorKind::TransactionError(transaction_error) => { + context.error_map.insert(*index, transaction_error.clone()); + continue; + } + ErrorKind::RpcError(rpc_error) => { + if let RpcError::RpcResponseError { + data: + RpcResponseErrorData::SendTransactionPreflightFailure(simulation_result), + .. + } = rpc_error + { + if let Some(transaction_error) = &simulation_result.err { + context.error_map.insert(*index, transaction_error.clone()); + continue; + } + } + return Err(TpuSenderError::from(e)); + } + _ => { + return Err(TpuSenderError::from(e)); + } + } + } + } + + let signature = transaction.signatures[0]; + // send to confirm the transaction + context.unconfirmed_transasction_map.insert( + signature, + TransactionData { + index: *index, + serialized_transaction, + last_valid_blockheight: blockhashdata.last_valid_blockheight, + message: message.clone(), + }, + ); + + if let Some(progress_bar) = progress_bar { + set_message_for_confirmed_transactions( + progress_bar, + context + .num_confirmed_transactions + .load(std::sync::atomic::Ordering::Relaxed), + context.total_transactions, + None, + blockhashdata.last_valid_blockheight, + &format!( + "Sending {}/{} transactions", + counter + 1, + current_transaction_count, + ), + ); + } + } + Ok(()) +} + +async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu( + progress_bar: &Option, + tpu_client: &Option, + context: &SendingContext, +) { + let unconfirmed_transasction_map = context.unconfirmed_transasction_map.clone(); + let num_confirmed_transactions = context.num_confirmed_transactions.clone(); + let current_block_height = context.current_block_height.clone(); + let total_transactions = context.total_transactions; + + let transactions_to_confirm = unconfirmed_transasction_map.len(); + let max_valid_block_height = unconfirmed_transasction_map + .iter() + .map(|x| x.last_valid_blockheight) + .max(); + + if let Some(mut max_valid_block_height) = max_valid_block_height { + if let Some(progress_bar) = progress_bar { + set_message_for_confirmed_transactions( + progress_bar, + num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed), + total_transactions, + Some(current_block_height.load(Ordering::Relaxed)), + max_valid_block_height, + &format!( + "Waiting for next block, {transactions_to_confirm} transactions pending..." + ), + ); + } + + // wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction + while !unconfirmed_transasction_map.is_empty() + && current_block_height.load(Ordering::Relaxed) < max_valid_block_height + { + let blockheight = current_block_height.load(Ordering::Relaxed); + + if let Some(progress_bar) = progress_bar { + set_message_for_confirmed_transactions( + progress_bar, + num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed), + total_transactions, + Some(blockheight), + max_valid_block_height, + "Checking transaction status...", + ); + } + + if let Some(tpu_client) = tpu_client { + let instant = Instant::now(); + // retry sending transaction only over TPU port + // any transactions sent over RPC will be automatically rebroadcast by the RPC server + let txs_to_resend_over_tpu = unconfirmed_transasction_map + .iter() + .filter(|x| blockheight < x.last_valid_blockheight) + .map(|x| x.serialized_transaction.clone()) + .collect(); + let _ = tpu_client + .try_send_wire_transaction_batch(txs_to_resend_over_tpu) + .await; + + let elapsed = instant.elapsed(); + if elapsed < TPU_RESEND_REFRESH_RATE { + tokio::time::sleep(TPU_RESEND_REFRESH_RATE - elapsed).await; + } + } else { + tokio::time::sleep(Duration::from_millis(100)).await; + } + if let Some(max_valid_block_height_in_remaining_transaction) = + unconfirmed_transasction_map + .iter() + .map(|x| x.last_valid_blockheight) + .max() + { + max_valid_block_height = max_valid_block_height_in_remaining_transaction; + } + } + + if let Some(progress_bar) = progress_bar { + set_message_for_confirmed_transactions( + progress_bar, + num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed), + total_transactions, + Some(current_block_height.load(Ordering::Relaxed)), + max_valid_block_height, + "Checking transaction status...", + ); + } + } +} + +/// This is a new method which will be able to send and confirm a large amount of transactions +/// The sending and confirmation of transactions is done in parallel tasks +/// The signer sign the transaction just before sending so that blockhash is not expired +pub async fn send_and_confirm_transactions_in_parallel( + rpc_client: Arc, + tpu_client: Option, + messages: &[Message], + signers: &T, + config: SendAndConfrimConfig, +) -> Result>> { + // get current blockhash and corresponding last valid block height + let (blockhash, last_valid_blockheight) = rpc_client + .get_latest_blockhash_with_commitment(rpc_client.commitment()) + .await?; + let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData { + blockhash, + last_valid_blockheight, + })); + + // check if all the messages are signable by the signers + messages + .iter() + .map(|x| { + let mut transaction = Transaction::new_unsigned(x.clone()); + transaction.try_sign(signers, blockhash) + }) + .collect::, SignerError>>()?; + + // get current blockheight + let block_height = rpc_client.get_block_height().await?; + let current_block_height = Arc::new(AtomicU64::new(block_height)); + + let progress_bar = config.with_spinner.then(|| { + let progress_bar = spinner::new_progress_bar(); + progress_bar.set_message("Setting up..."); + progress_bar + }); + + // blockhash and blockheight update task + let block_data_task = create_blockhash_data_updating_task( + rpc_client.clone(), + blockhash_data_rw.clone(), + current_block_height.clone(), + ); + + let unconfirmed_transasction_map = Arc::new(DashMap::::new()); + let error_map = Arc::new(DashMap::new()); + let num_confirmed_transactions = Arc::new(AtomicU32::new(0)); + // tasks which confirms the transactions that were sent + let transaction_confirming_task = create_transaction_confirmation_task( + rpc_client.clone(), + current_block_height.clone(), + unconfirmed_transasction_map.clone(), + error_map.clone(), + num_confirmed_transactions.clone(), + ); + + // transaction sender task + let total_transactions = messages.len(); + let mut initial = true; + let signing_count = config.resign_txs_count.unwrap_or(1); + let context = SendingContext { + unconfirmed_transasction_map: unconfirmed_transasction_map.clone(), + blockhash_data_rw: blockhash_data_rw.clone(), + num_confirmed_transactions: num_confirmed_transactions.clone(), + current_block_height: current_block_height.clone(), + error_map: error_map.clone(), + total_transactions, + }; + + for expired_blockhash_retries in (0..signing_count).rev() { + // only send messages which have not been confirmed + let messages_with_index: Vec<(usize, Message)> = if initial { + initial = false; + messages.iter().cloned().enumerate().collect() + } else { + // remove all the confirmed transactions + unconfirmed_transasction_map + .iter() + .map(|x| (x.index, x.message.clone())) + .collect() + }; + + if messages_with_index.is_empty() { + break; + } + + // clear the map so that we can start resending + unconfirmed_transasction_map.clear(); + + sign_all_messages_and_send( + &progress_bar, + rpc_client.clone(), + &tpu_client, + messages_with_index, + signers, + &context, + ) + .await?; + + // wait until all the transactions are confirmed or expired + confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu( + &progress_bar, + &tpu_client, + &context, + ) + .await; + + if unconfirmed_transasction_map.is_empty() { + break; + } + + if let Some(progress_bar) = &progress_bar { + progress_bar.println(format!( + "Blockhash expired. {expired_blockhash_retries} retries remaining" + )); + } + } + + block_data_task.abort(); + transaction_confirming_task.abort(); + if unconfirmed_transasction_map.is_empty() { + let mut transaction_errors = vec![None; messages.len()]; + for iterator in error_map.iter() { + transaction_errors[*iterator.key()] = Some(iterator.value().clone()); + } + Ok(transaction_errors) + } else { + Err(TpuSenderError::Custom("Max retries exceeded".into())) + } +} diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f105b551a2..280f63a9a6 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -4723,6 +4723,7 @@ version = "1.17.0" dependencies = [ "async-trait", "bincode", + "dashmap", "futures 0.3.28", "futures-util", "indexmap 2.0.0", diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 1c3557b2dc..7f98c1d0ed 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -52,7 +52,7 @@ use { }; #[cfg(feature = "spinner")] -fn set_message_for_confirmed_transactions( +pub fn set_message_for_confirmed_transactions( progress_bar: &ProgressBar, confirmed_transactions: u32, total_transactions: usize,