client: Start resending sooner during `send_and_confirm_transactions_in_parallel` (#348)

client: Confirm sooner during send_and_confirm_in_parallel
This commit is contained in:
Jon C 2024-03-21 14:35:09 +01:00 committed by GitHub
parent 1e08e90498
commit b2f4fb306e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 57 additions and 51 deletions

View File

@ -5,7 +5,7 @@ use {
}, },
bincode::serialize, bincode::serialize,
dashmap::DashMap, dashmap::DashMap,
futures_util::future::{join_all, TryFutureExt}, futures_util::future::{join_all, FutureExt},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::spinner::{self, SendTransactionProgress}, solana_rpc_client::spinner::{self, SendTransactionProgress},
solana_rpc_client_api::{ solana_rpc_client_api::{
@ -188,9 +188,7 @@ async fn send_transaction_with_rpc_fallback(
serialized_transaction: Vec<u8>, serialized_transaction: Vec<u8>,
context: &SendingContext, context: &SendingContext,
index: usize, index: usize,
counter: usize,
) -> Result<()> { ) -> Result<()> {
tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
let send_over_rpc = if let Some(tpu_client) = tpu_client { let send_over_rpc = if let Some(tpu_client) = tpu_client {
!tpu_client !tpu_client
.send_wire_transaction(serialized_transaction.clone()) .send_wire_transaction(serialized_transaction.clone())
@ -261,44 +259,42 @@ async fn sign_all_messages_and_send<T: Signers + ?Sized>(
.expect("Transaction should be signable"); .expect("Transaction should be signable");
let serialized_transaction = serialize(&transaction).expect("Transaction should serialize"); let serialized_transaction = serialize(&transaction).expect("Transaction should serialize");
let signature = transaction.signatures[0]; let signature = transaction.signatures[0];
futures.push( futures.push(async move {
tokio::time::sleep(SEND_INTERVAL.saturating_mul(counter as u32)).await;
// send to confirm the transaction
context.unconfirmed_transaction_map.insert(
signature,
TransactionData {
index: *index,
serialized_transaction: serialized_transaction.clone(),
last_valid_block_height: blockhashdata.last_valid_block_height,
message: message.clone(),
},
);
if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(
context,
blockhashdata.last_valid_block_height,
);
progress.set_message_for_confirmed_transactions(
progress_bar,
&format!(
"Sending {}/{} transactions",
counter + 1,
current_transaction_count,
),
);
}
send_transaction_with_rpc_fallback( send_transaction_with_rpc_fallback(
rpc_client, rpc_client,
tpu_client, tpu_client,
transaction, transaction,
serialized_transaction.clone(), serialized_transaction,
context, context,
*index, *index,
counter,
) )
.and_then(move |_| async move { .await
// send to confirm the transaction });
context.unconfirmed_transaction_map.insert(
signature,
TransactionData {
index: *index,
serialized_transaction,
last_valid_block_height: blockhashdata.last_valid_block_height,
message: message.clone(),
},
);
if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(
context,
blockhashdata.last_valid_block_height,
);
progress.set_message_for_confirmed_transactions(
progress_bar,
&format!(
"Sending {}/{} transactions",
counter + 1,
current_transaction_count,
),
);
}
Ok(())
}),
);
} }
// collect to convert Vec<Result<_>> to Result<Vec<_>> // collect to convert Vec<Result<_>> to Result<Vec<_>>
join_all(futures).await.into_iter().collect::<Result<_>>()?; join_all(futures).await.into_iter().collect::<Result<_>>()?;
@ -477,23 +473,33 @@ pub async fn send_and_confirm_transactions_in_parallel<T: Signers + ?Sized>(
// clear the map so that we can start resending // clear the map so that we can start resending
unconfirmed_transasction_map.clear(); unconfirmed_transasction_map.clear();
sign_all_messages_and_send( let futures = [
&progress_bar, sign_all_messages_and_send(
&rpc_client, &progress_bar,
&tpu_client, &rpc_client,
messages_with_index, &tpu_client,
signers, messages_with_index,
&context, signers,
) &context,
.await?; )
.boxed_local(),
// wait until all the transactions are confirmed or expired async {
confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu( // Give the signing and sending a head start before trying to
&progress_bar, // confirm and resend
&tpu_client, tokio::time::sleep(TPU_RESEND_REFRESH_RATE).await;
&context, confirm_transactions_till_block_height_and_resend_unexpired_transaction_over_tpu(
) &progress_bar,
.await; &tpu_client,
&context,
)
.await;
// Infallible, but required to have the same return type as
// `sign_all_messages_and_send`
Ok(())
}
.boxed_local(),
];
join_all(futures).await.into_iter().collect::<Result<_>>()?;
if unconfirmed_transasction_map.is_empty() { if unconfirmed_transasction_map.is_empty() {
break; break;