From f10ef763dc60c94c5065bf509b3524da44158bcf Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 31 Oct 2022 13:46:45 -0700 Subject: [PATCH] Time out async sends to avoid slackers stuck in the queue for too long (#28545) * Time out async sends to avoid slackers stuck in the queue for too long * Fixed a clippy error * Added stats for timeout counts * Link with stats correctly --- tpu-client/src/connection_cache_stats.rs | 11 +++++ tpu-client/src/nonblocking/quic_client.rs | 4 ++ tpu-client/src/quic_client.rs | 49 +++++++++++++++++++---- tpu-client/src/tpu_connection.rs | 1 + 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/tpu-client/src/connection_cache_stats.rs b/tpu-client/src/connection_cache_stats.rs index 31d88ddfa..235827cb6 100644 --- a/tpu-client/src/connection_cache_stats.rs +++ b/tpu-client/src/connection_cache_stats.rs @@ -56,6 +56,10 @@ impl ConnectionCacheStats { client_stats.make_connection_ms.load(Ordering::Relaxed), Ordering::Relaxed, ); + self.total_client_stats.send_timeout.fetch_add( + client_stats.send_timeout.load(Ordering::Relaxed), + Ordering::Relaxed, + ); self.sent_packets .fetch_add(num_packets as u64, Ordering::Relaxed); self.total_batches.fetch_add(1, Ordering::Relaxed); @@ -188,6 +192,13 @@ impl ConnectionCacheStats { self.batch_failure.swap(0, Ordering::Relaxed), i64 ), + ( + "send_timeout", + self.total_client_stats + .send_timeout + .swap(0, Ordering::Relaxed), + i64 + ), ); } } diff --git a/tpu-client/src/nonblocking/quic_client.rs b/tpu-client/src/nonblocking/quic_client.rs index e9339f5ef..cfd8159cb 100644 --- a/tpu-client/src/nonblocking/quic_client.rs +++ b/tpu-client/src/nonblocking/quic_client.rs @@ -524,6 +524,10 @@ impl QuicTpuConnection { self.client.stats() } + pub fn connection_stats(&self) -> Arc { + self.connection_stats.clone() + } + pub fn new( endpoint: Arc, addr: SocketAddr, diff --git a/tpu-client/src/quic_client.rs b/tpu-client/src/quic_client.rs index f11b7f32b..c34dddfb7 100644 --- a/tpu-client/src/quic_client.rs +++ b/tpu-client/src/quic_client.rs @@ -11,18 +11,21 @@ use { }, tpu_connection::TpuConnection as NonblockingTpuConnection, }, - tpu_connection::TpuConnection, + tpu_connection::{ClientStats, TpuConnection}, }, lazy_static::lazy_static, - solana_sdk::transport::Result as TransportResult, + log::*, + solana_sdk::transport::{Result as TransportResult, TransportError}, std::{ net::SocketAddr, - sync::{Arc, Condvar, Mutex, MutexGuard}, + sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard}, + time::Duration, }, - tokio::runtime::Runtime, + tokio::{runtime::Runtime, time::timeout}, }; const MAX_OUTSTANDING_TASK: u64 = 2000; +const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000; /// A semaphore used for limiting the number of asynchronous tasks spawn to the /// runtime. Before spawnning a task, use acquire. After the task is done (be it @@ -108,18 +111,48 @@ async fn send_wire_transaction_async( connection: Arc, wire_transaction: Vec, ) -> TransportResult<()> { - let result = connection.send_wire_transaction(wire_transaction).await; + let result = timeout( + Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS), + connection.send_wire_transaction(wire_transaction), + ) + .await; ASYNC_TASK_SEMAPHORE.release(); - result + handle_send_result(result, connection) } async fn send_wire_transaction_batch_async( connection: Arc, buffers: Vec>, ) -> TransportResult<()> { - let result = connection.send_wire_transaction_batch(&buffers).await; + let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64; + + let result = timeout( + Duration::from_millis(time_out), + connection.send_wire_transaction_batch(&buffers), + ) + .await; ASYNC_TASK_SEMAPHORE.release(); - result + handle_send_result(result, connection) +} + +/// Check the send result and update stats if timedout. Returns the checked result. +fn handle_send_result( + result: Result, tokio::time::error::Elapsed>, + connection: Arc, +) -> Result<(), TransportError> { + match result { + Ok(result) => result, + Err(_err) => { + let client_stats = ClientStats::default(); + client_stats.send_timeout.fetch_add(1, Ordering::Relaxed); + let stats = connection.connection_stats(); + stats.add_client_stats(&client_stats, 0, false); + info!("Timedout sending transaction {:?}", connection.tpu_addr()); + Err(TransportError::Custom( + "Timedout sending transaction".to_string(), + )) + } + } } impl TpuConnection for QuicTpuConnection { diff --git a/tpu-client/src/tpu_connection.rs b/tpu-client/src/tpu_connection.rs index 9f0231937..0825c0698 100644 --- a/tpu-client/src/tpu_connection.rs +++ b/tpu-client/src/tpu_connection.rs @@ -21,6 +21,7 @@ pub struct ClientStats { pub tx_data_blocked: MovingStat, pub tx_acks: MovingStat, pub make_connection_ms: AtomicU64, + pub send_timeout: AtomicU64, } #[enum_dispatch]