New method to send large number of transactions to the cluster (#32388)

* Adding a method which sends and confirm a large number of transactions in parallel and using it to deploy programs

* Using the new method to deploy programs

* Minor changes, break when transaction map is empty

* Updating cargo for all lock files

* Sorting dependencies

* Changes after tnelson's comment

* confirm recently expired transactions and improve tracking of transaction sending

* Minor changes

* more changes after tnelson's comments

* Adding serialized transaction in TransactionData

* Update client/src/send_and_confirm_transactions_in_parallel.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update client/src/send_and_confirm_transactions_in_parallel.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* nit changes from trent

* Fixing kirills comments, adding some unit tests

* fixing ci, and minor change

* addressing Kirills nit comments

---------

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
This commit is contained in:
galactus 2023-07-25 20:08:01 +02:00 committed by GitHub
parent 28d7e59cac
commit dbac50dcc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 623 additions and 1 deletions

2
Cargo.lock generated
View File

@ -5535,6 +5535,7 @@ dependencies = [
"async-trait",
"bincode",
"crossbeam-channel",
"dashmap 4.0.2",
"futures 0.3.28",
"futures-util",
"indexmap 2.0.0",
@ -5567,6 +5568,7 @@ version = "1.17.0"
dependencies = [
"futures-util",
"serde_json",
"solana-client",
"solana-ledger",
"solana-logger",
"solana-measure",

View File

@ -13,6 +13,7 @@ edition = { workspace = true }
[dependencies]
futures-util = { workspace = true }
serde_json = { workspace = true }
solana-client = { workspace = true }
solana-ledger = { workspace = true }
solana-measure = { workspace = true }
solana-merkle-tree = { workspace = true }

View File

@ -0,0 +1,139 @@
use {
solana_client::{
nonblocking::tpu_client::TpuClient,
send_and_confirm_transactions_in_parallel::{
send_and_confirm_transactions_in_parallel_blocking, SendAndConfrimConfig,
},
},
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
commitment_config::CommitmentConfig, message::Message, native_token::sol_to_lamports,
pubkey::Pubkey, signature::Keypair, signer::Signer, system_instruction,
},
solana_streamer::socket::SocketAddrSpace,
solana_test_validator::TestValidator,
std::sync::Arc,
};
const NUM_TRANSACTIONS: usize = 1000;
fn create_messages(from: Pubkey, to: Pubkey) -> (Vec<Message>, f64) {
let mut messages = vec![];
let mut sum = 0.0;
for i in 1..NUM_TRANSACTIONS {
let amount_to_transfer = i as f64;
let ix = system_instruction::transfer(&from, &to, sol_to_lamports(amount_to_transfer));
let message = Message::new(&[ix], Some(&from));
messages.push(message);
sum += amount_to_transfer;
}
(messages, sum)
}
#[test]
fn test_send_and_confirm_transactions_in_parallel_without_tpu_client() {
solana_logger::setup();
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let bob_pubkey = solana_sdk::pubkey::new_rand();
let alice_pubkey = alice.pubkey();
let rpc_client = Arc::new(RpcClient::new(test_validator.rpc_url()));
assert_eq!(
rpc_client.get_version().unwrap().solana_core,
solana_version::semver!()
);
let original_alice_balance = rpc_client.get_balance(&alice.pubkey()).unwrap();
let (messages, sum) = create_messages(alice_pubkey, bob_pubkey);
let txs_errors = send_and_confirm_transactions_in_parallel_blocking(
rpc_client.clone(),
None,
&messages,
&[&alice],
SendAndConfrimConfig {
with_spinner: false,
resign_txs_count: Some(5),
},
);
assert!(txs_errors.is_ok());
assert!(txs_errors.unwrap().iter().all(|x| x.is_none()));
assert_eq!(
rpc_client
.get_balance_with_commitment(&bob_pubkey, CommitmentConfig::processed())
.unwrap()
.value,
sol_to_lamports(sum)
);
assert_eq!(
rpc_client
.get_balance_with_commitment(&alice_pubkey, CommitmentConfig::processed())
.unwrap()
.value,
original_alice_balance - sol_to_lamports(sum)
);
}
#[test]
fn test_send_and_confirm_transactions_in_parallel_with_tpu_client() {
solana_logger::setup();
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let bob_pubkey = solana_sdk::pubkey::new_rand();
let alice_pubkey = alice.pubkey();
let rpc_client = Arc::new(RpcClient::new(test_validator.rpc_url()));
assert_eq!(
rpc_client.get_version().unwrap().solana_core,
solana_version::semver!()
);
let original_alice_balance = rpc_client.get_balance(&alice.pubkey()).unwrap();
let (messages, sum) = create_messages(alice_pubkey, bob_pubkey);
let ws_url = test_validator.rpc_pubsub_url();
let tpu_client_fut = TpuClient::new(
"temp",
rpc_client.get_inner_client().clone(),
ws_url.as_str(),
solana_client::tpu_client::TpuClientConfig::default(),
);
let tpu_client = rpc_client.runtime().block_on(tpu_client_fut).unwrap();
let txs_errors = send_and_confirm_transactions_in_parallel_blocking(
rpc_client.clone(),
Some(tpu_client),
&messages,
&[&alice],
SendAndConfrimConfig {
with_spinner: false,
resign_txs_count: Some(5),
},
);
assert!(txs_errors.is_ok());
assert!(txs_errors.unwrap().iter().all(|x| x.is_none()));
assert_eq!(
rpc_client
.get_balance_with_commitment(&bob_pubkey, CommitmentConfig::processed())
.unwrap()
.value,
sol_to_lamports(sum)
);
assert_eq!(
rpc_client
.get_balance_with_commitment(&alice_pubkey, CommitmentConfig::processed())
.unwrap()
.value,
original_alice_balance - sol_to_lamports(sum)
);
}

View File

@ -12,6 +12,7 @@ edition = { workspace = true }
[dependencies]
async-trait = { workspace = true }
bincode = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
indexmap = { workspace = true }

View File

@ -3,6 +3,7 @@
pub mod connection_cache;
pub mod nonblocking;
pub mod quic_client;
pub mod send_and_confirm_transactions_in_parallel;
pub mod thin_client;
pub mod tpu_client;
pub mod tpu_connection;

View File

@ -0,0 +1,477 @@
use {
crate::{
nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient},
rpc_client::RpcClient as BlockingRpcClient,
},
bincode::serialize,
dashmap::DashMap,
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::spinner,
solana_rpc_client_api::{
client_error::ErrorKind,
request::{RpcError, RpcResponseErrorData, MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS},
},
solana_sdk::{
hash::Hash,
message::Message,
signature::{Signature, SignerError},
signers::Signers,
transaction::{Transaction, TransactionError},
},
solana_tpu_client::{
nonblocking::tpu_client::set_message_for_confirmed_transactions,
tpu_client::{Result, TpuSenderError},
},
std::{
sync::{
atomic::{AtomicU32, AtomicU64, Ordering},
Arc,
},
time::Duration,
},
tokio::{sync::RwLock, task::JoinHandle, time::Instant},
};
const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10);
const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2);
type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
#[derive(Clone, Debug)]
struct TransactionData {
last_valid_blockheight: u64,
message: Message,
index: usize,
serialized_transaction: Vec<u8>,
}
#[derive(Clone, Debug, Copy)]
struct BlockHashData {
pub blockhash: Hash,
pub last_valid_blockheight: u64,
}
#[derive(Clone, Debug, Copy)]
pub struct SendAndConfrimConfig {
pub with_spinner: bool,
pub resign_txs_count: Option<usize>,
}
pub fn send_and_confirm_transactions_in_parallel_blocking<T: Signers + ?Sized>(
rpc_client: Arc<BlockingRpcClient>,
tpu_client: Option<QuicTpuClient>,
messages: &[Message],
signers: &T,
config: SendAndConfrimConfig,
) -> Result<Vec<Option<TransactionError>>> {
let fut = send_and_confirm_transactions_in_parallel(
rpc_client.get_inner_client().clone(),
tpu_client,
messages,
signers,
config,
);
tokio::task::block_in_place(|| rpc_client.runtime().block_on(fut))
}
fn create_blockhash_data_updating_task(
rpc_client: Arc<RpcClient>,
blockhash_data_rw: Arc<RwLock<BlockHashData>>,
current_block_height: Arc<AtomicU64>,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
if let Ok((blockhash, last_valid_blockheight)) = rpc_client
.get_latest_blockhash_with_commitment(rpc_client.commitment())
.await
{
*blockhash_data_rw.write().await = BlockHashData {
blockhash,
last_valid_blockheight,
};
}
if let Ok(blockheight) = rpc_client.get_block_height().await {
current_block_height.store(blockheight, Ordering::Relaxed);
}
tokio::time::sleep(BLOCKHASH_REFRESH_RATE).await;
}
})
}
fn create_transaction_confirmation_task(
rpc_client: Arc<RpcClient>,
current_block_height: Arc<AtomicU64>,
unconfirmed_transasction_map: Arc<DashMap<Signature, TransactionData>>,
errors_map: Arc<DashMap<usize, TransactionError>>,
num_confirmed_transactions: Arc<AtomicU32>,
) -> JoinHandle<()> {
tokio::spawn(async move {
// check transactions that are not expired or have just expired between two checks
let mut last_block_height = current_block_height.load(Ordering::Relaxed);
loop {
if !unconfirmed_transasction_map.is_empty() {
let current_block_height = current_block_height.load(Ordering::Relaxed);
let transactions_to_verify: Vec<Signature> = unconfirmed_transasction_map
.iter()
.filter(|x| {
let is_not_expired = current_block_height <= x.last_valid_blockheight;
// transaction expired between last and current check
let is_recently_expired = last_block_height <= x.last_valid_blockheight
&& current_block_height > x.last_valid_blockheight;
is_not_expired || is_recently_expired
})
.map(|x| *x.key())
.collect();
for signatures in
transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
{
if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {
let statuses = result.value;
for (signature, status) in signatures.iter().zip(statuses.into_iter()) {
if let Some((status, data)) = status
.filter(|status| {
status.satisfies_commitment(rpc_client.commitment())
})
.and_then(|status| {
unconfirmed_transasction_map
.remove(signature)
.map(|(_, data)| (status, data))
})
{
num_confirmed_transactions.fetch_add(1, Ordering::Relaxed);
if let Some(error) = status.err {
errors_map.insert(data.index, error);
}
};
}
}
}
last_block_height = current_block_height;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
})
}
#[derive(Clone, Debug)]
struct SendingContext {
unconfirmed_transasction_map: Arc<DashMap<Signature, TransactionData>>,
error_map: Arc<DashMap<usize, TransactionError>>,
blockhash_data_rw: Arc<RwLock<BlockHashData>>,
num_confirmed_transactions: Arc<AtomicU32>,
total_transactions: usize,
current_block_height: Arc<AtomicU64>,
}
async fn sign_all_messages_and_send<T: Signers + ?Sized>(
progress_bar: &Option<indicatif::ProgressBar>,
rpc_client: Arc<RpcClient>,
tpu_client: &Option<QuicTpuClient>,
messages_with_index: Vec<(usize, Message)>,
signers: &T,
context: &SendingContext,
) -> Result<()> {
let current_transaction_count = messages_with_index.len();
// send all the transaction messages
for (counter, (index, message)) in messages_with_index.iter().enumerate() {
let mut transaction = Transaction::new_unsigned(message.clone());
let blockhashdata = *context.blockhash_data_rw.read().await;
// we have already checked if all transactions are signable.
transaction
.try_sign(signers, blockhashdata.blockhash)
.expect("Transaction should be signable");
let serialized_transaction = serialize(&transaction).expect("Transaction should serailize");
let send_over_rpc = if let Some(tpu_client) = tpu_client {
!tpu_client
.send_wire_transaction(serialized_transaction.clone())
.await
} else {
true
};
if send_over_rpc {
if let Err(e) = rpc_client.send_transaction(&transaction).await {
match &e.kind {
ErrorKind::Io(_) | ErrorKind::Reqwest(_) => {
// fall through on io error, we will retry the transaction
}
ErrorKind::TransactionError(transaction_error) => {
context.error_map.insert(*index, transaction_error.clone());
continue;
}
ErrorKind::RpcError(rpc_error) => {
if let RpcError::RpcResponseError {
data:
RpcResponseErrorData::SendTransactionPreflightFailure(simulation_result),
..
} = rpc_error
{
if let Some(transaction_error) = &simulation_result.err {
context.error_map.insert(*index, transaction_error.clone());
continue;
}
}
return Err(TpuSenderError::from(e));
}
_ => {
return Err(TpuSenderError::from(e));
}
}
}
}
let signature = transaction.signatures[0];
// send to confirm the transaction
context.unconfirmed_transasction_map.insert(
signature,
TransactionData {
index: *index,
serialized_transaction,
last_valid_blockheight: blockhashdata.last_valid_blockheight,
message: message.clone(),
},
);
if let Some(progress_bar) = progress_bar {
set_message_for_confirmed_transactions(
progress_bar,
context
.num_confirmed_transactions
.load(std::sync::atomic::Ordering::Relaxed),
context.total_transactions,
None,
blockhashdata.last_valid_blockheight,
&format!(
"Sending {}/{} transactions",
counter + 1,
current_transaction_count,
),
);
}
}
Ok(())
}
async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
progress_bar: &Option<indicatif::ProgressBar>,
tpu_client: &Option<QuicTpuClient>,
context: &SendingContext,
) {
let unconfirmed_transasction_map = context.unconfirmed_transasction_map.clone();
let num_confirmed_transactions = context.num_confirmed_transactions.clone();
let current_block_height = context.current_block_height.clone();
let total_transactions = context.total_transactions;
let transactions_to_confirm = unconfirmed_transasction_map.len();
let max_valid_block_height = unconfirmed_transasction_map
.iter()
.map(|x| x.last_valid_blockheight)
.max();
if let Some(mut max_valid_block_height) = max_valid_block_height {
if let Some(progress_bar) = progress_bar {
set_message_for_confirmed_transactions(
progress_bar,
num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed),
total_transactions,
Some(current_block_height.load(Ordering::Relaxed)),
max_valid_block_height,
&format!(
"Waiting for next block, {transactions_to_confirm} transactions pending..."
),
);
}
// wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction
while !unconfirmed_transasction_map.is_empty()
&& current_block_height.load(Ordering::Relaxed) < max_valid_block_height
{
let blockheight = current_block_height.load(Ordering::Relaxed);
if let Some(progress_bar) = progress_bar {
set_message_for_confirmed_transactions(
progress_bar,
num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed),
total_transactions,
Some(blockheight),
max_valid_block_height,
"Checking transaction status...",
);
}
if let Some(tpu_client) = tpu_client {
let instant = Instant::now();
// retry sending transaction only over TPU port
// any transactions sent over RPC will be automatically rebroadcast by the RPC server
let txs_to_resend_over_tpu = unconfirmed_transasction_map
.iter()
.filter(|x| blockheight < x.last_valid_blockheight)
.map(|x| x.serialized_transaction.clone())
.collect();
let _ = tpu_client
.try_send_wire_transaction_batch(txs_to_resend_over_tpu)
.await;
let elapsed = instant.elapsed();
if elapsed < TPU_RESEND_REFRESH_RATE {
tokio::time::sleep(TPU_RESEND_REFRESH_RATE - elapsed).await;
}
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
if let Some(max_valid_block_height_in_remaining_transaction) =
unconfirmed_transasction_map
.iter()
.map(|x| x.last_valid_blockheight)
.max()
{
max_valid_block_height = max_valid_block_height_in_remaining_transaction;
}
}
if let Some(progress_bar) = progress_bar {
set_message_for_confirmed_transactions(
progress_bar,
num_confirmed_transactions.load(std::sync::atomic::Ordering::Relaxed),
total_transactions,
Some(current_block_height.load(Ordering::Relaxed)),
max_valid_block_height,
"Checking transaction status...",
);
}
}
}
/// This is a new method which will be able to send and confirm a large amount of transactions
/// The sending and confirmation of transactions is done in parallel tasks
/// The signer sign the transaction just before sending so that blockhash is not expired
pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
rpc_client: Arc<RpcClient>,
tpu_client: Option<QuicTpuClient>,
messages: &[Message],
signers: &T,
config: SendAndConfrimConfig,
) -> Result<Vec<Option<TransactionError>>> {
// get current blockhash and corresponding last valid block height
let (blockhash, last_valid_blockheight) = rpc_client
.get_latest_blockhash_with_commitment(rpc_client.commitment())
.await?;
let blockhash_data_rw = Arc::new(RwLock::new(BlockHashData {
blockhash,
last_valid_blockheight,
}));
// check if all the messages are signable by the signers
messages
.iter()
.map(|x| {
let mut transaction = Transaction::new_unsigned(x.clone());
transaction.try_sign(signers, blockhash)
})
.collect::<std::result::Result<Vec<()>, SignerError>>()?;
// get current blockheight
let block_height = rpc_client.get_block_height().await?;
let current_block_height = Arc::new(AtomicU64::new(block_height));
let progress_bar = config.with_spinner.then(|| {
let progress_bar = spinner::new_progress_bar();
progress_bar.set_message("Setting up...");
progress_bar
});
// blockhash and blockheight update task
let block_data_task = create_blockhash_data_updating_task(
rpc_client.clone(),
blockhash_data_rw.clone(),
current_block_height.clone(),
);
let unconfirmed_transasction_map = Arc::new(DashMap::<Signature, TransactionData>::new());
let error_map = Arc::new(DashMap::new());
let num_confirmed_transactions = Arc::new(AtomicU32::new(0));
// tasks which confirms the transactions that were sent
let transaction_confirming_task = create_transaction_confirmation_task(
rpc_client.clone(),
current_block_height.clone(),
unconfirmed_transasction_map.clone(),
error_map.clone(),
num_confirmed_transactions.clone(),
);
// transaction sender task
let total_transactions = messages.len();
let mut initial = true;
let signing_count = config.resign_txs_count.unwrap_or(1);
let context = SendingContext {
unconfirmed_transasction_map: unconfirmed_transasction_map.clone(),
blockhash_data_rw: blockhash_data_rw.clone(),
num_confirmed_transactions: num_confirmed_transactions.clone(),
current_block_height: current_block_height.clone(),
error_map: error_map.clone(),
total_transactions,
};
for expired_blockhash_retries in (0..signing_count).rev() {
// only send messages which have not been confirmed
let messages_with_index: Vec<(usize, Message)> = if initial {
initial = false;
messages.iter().cloned().enumerate().collect()
} else {
// remove all the confirmed transactions
unconfirmed_transasction_map
.iter()
.map(|x| (x.index, x.message.clone()))
.collect()
};
if messages_with_index.is_empty() {
break;
}
// clear the map so that we can start resending
unconfirmed_transasction_map.clear();
sign_all_messages_and_send(
&progress_bar,
rpc_client.clone(),
&tpu_client,
messages_with_index,
signers,
&context,
)
.await?;
// wait until all the transactions are confirmed or expired
confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
&progress_bar,
&tpu_client,
&context,
)
.await;
if unconfirmed_transasction_map.is_empty() {
break;
}
if let Some(progress_bar) = &progress_bar {
progress_bar.println(format!(
"Blockhash expired. {expired_blockhash_retries} retries remaining"
));
}
}
block_data_task.abort();
transaction_confirming_task.abort();
if unconfirmed_transasction_map.is_empty() {
let mut transaction_errors = vec![None; messages.len()];
for iterator in error_map.iter() {
transaction_errors[*iterator.key()] = Some(iterator.value().clone());
}
Ok(transaction_errors)
} else {
Err(TpuSenderError::Custom("Max retries exceeded".into()))
}
}

View File

@ -4723,6 +4723,7 @@ version = "1.17.0"
dependencies = [
"async-trait",
"bincode",
"dashmap",
"futures 0.3.28",
"futures-util",
"indexmap 2.0.0",

View File

@ -52,7 +52,7 @@ use {
};
#[cfg(feature = "spinner")]
fn set_message_for_confirmed_transactions(
pub fn set_message_for_confirmed_transactions(
progress_bar: &ProgressBar,
confirmed_transactions: u32,
total_transactions: usize,