client: Timeout resends during `send_and_confirm_in_parallel` (#358)

* client: Timeout resends during `send_and_confirm_in_parallel`

* Clarify constant
This commit is contained in:
Jon C 2024-03-22 12:10:00 +01:00 committed by GitHub
parent 8f830c418c
commit 36c66f5111
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 32 additions and 22 deletions

View File

@ -31,7 +31,7 @@ use {
tokio::{sync::RwLock, task::JoinHandle, time::Instant},
};
const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10);
const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2);
const SEND_INTERVAL: Duration = Duration::from_millis(10);
type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
@ -326,21 +326,20 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
);
}
if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}
// wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction
while !unconfirmed_transaction_map.is_empty()
&& current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
{
let block_height = current_block_height.load(Ordering::Relaxed);
if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}
if let Some(tpu_client) = tpu_client {
let instant = Instant::now();
// retry sending transaction only over TPU port
@ -349,10 +348,29 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
.iter()
.filter(|x| block_height < x.last_valid_block_height)
.map(|x| x.serialized_transaction.clone())
.collect();
let _ = tpu_client
.try_send_wire_transaction_batch(txs_to_resend_over_tpu)
.await;
.collect::<Vec<_>>();
let num_txs_to_resend = txs_to_resend_over_tpu.len();
// This is a "reasonable" constant for how long it should
// take to fan the transactions out, taken from
// `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures`
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let message = if tokio::time::timeout(
SEND_TIMEOUT_INTERVAL,
tpu_client.try_send_wire_transaction_batch(txs_to_resend_over_tpu),
)
.await
.is_err()
{
format!("Timed out resending {num_txs_to_resend} transactions...")
} else {
format!("Resent {num_txs_to_resend} transactions...")
};
if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(progress_bar, &message);
}
let elapsed = instant.elapsed();
if elapsed < TPU_RESEND_REFRESH_RATE {
@ -370,14 +388,6 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
max_valid_block_height = max_valid_block_height_in_remaining_transaction;
}
}
if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}
}
}