Time out async sends to avoid slackers stuck in the queue for too long (#28545)
* Time out async sends to avoid slackers stuck in the queue for too long * Fixed a clippy error * Added stats for timeout counts * Link with stats correctly
This commit is contained in:
parent
1ba81acf9a
commit
f10ef763dc
|
@ -56,6 +56,10 @@ impl ConnectionCacheStats {
|
||||||
client_stats.make_connection_ms.load(Ordering::Relaxed),
|
client_stats.make_connection_ms.load(Ordering::Relaxed),
|
||||||
Ordering::Relaxed,
|
Ordering::Relaxed,
|
||||||
);
|
);
|
||||||
|
self.total_client_stats.send_timeout.fetch_add(
|
||||||
|
client_stats.send_timeout.load(Ordering::Relaxed),
|
||||||
|
Ordering::Relaxed,
|
||||||
|
);
|
||||||
self.sent_packets
|
self.sent_packets
|
||||||
.fetch_add(num_packets as u64, Ordering::Relaxed);
|
.fetch_add(num_packets as u64, Ordering::Relaxed);
|
||||||
self.total_batches.fetch_add(1, Ordering::Relaxed);
|
self.total_batches.fetch_add(1, Ordering::Relaxed);
|
||||||
|
@ -188,6 +192,13 @@ impl ConnectionCacheStats {
|
||||||
self.batch_failure.swap(0, Ordering::Relaxed),
|
self.batch_failure.swap(0, Ordering::Relaxed),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"send_timeout",
|
||||||
|
self.total_client_stats
|
||||||
|
.send_timeout
|
||||||
|
.swap(0, Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -524,6 +524,10 @@ impl QuicTpuConnection {
|
||||||
self.client.stats()
|
self.client.stats()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn connection_stats(&self) -> Arc<ConnectionCacheStats> {
|
||||||
|
self.connection_stats.clone()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
endpoint: Arc<QuicLazyInitializedEndpoint>,
|
endpoint: Arc<QuicLazyInitializedEndpoint>,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
|
|
@ -11,18 +11,21 @@ use {
|
||||||
},
|
},
|
||||||
tpu_connection::TpuConnection as NonblockingTpuConnection,
|
tpu_connection::TpuConnection as NonblockingTpuConnection,
|
||||||
},
|
},
|
||||||
tpu_connection::TpuConnection,
|
tpu_connection::{ClientStats, TpuConnection},
|
||||||
},
|
},
|
||||||
lazy_static::lazy_static,
|
lazy_static::lazy_static,
|
||||||
solana_sdk::transport::Result as TransportResult,
|
log::*,
|
||||||
|
solana_sdk::transport::{Result as TransportResult, TransportError},
|
||||||
std::{
|
std::{
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
sync::{Arc, Condvar, Mutex, MutexGuard},
|
sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard},
|
||||||
|
time::Duration,
|
||||||
},
|
},
|
||||||
tokio::runtime::Runtime,
|
tokio::{runtime::Runtime, time::timeout},
|
||||||
};
|
};
|
||||||
|
|
||||||
const MAX_OUTSTANDING_TASK: u64 = 2000;
|
const MAX_OUTSTANDING_TASK: u64 = 2000;
|
||||||
|
const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000;
|
||||||
|
|
||||||
/// A semaphore used for limiting the number of asynchronous tasks spawn to the
|
/// A semaphore used for limiting the number of asynchronous tasks spawn to the
|
||||||
/// runtime. Before spawnning a task, use acquire. After the task is done (be it
|
/// runtime. Before spawnning a task, use acquire. After the task is done (be it
|
||||||
|
@ -108,18 +111,48 @@ async fn send_wire_transaction_async(
|
||||||
connection: Arc<NonblockingQuicTpuConnection>,
|
connection: Arc<NonblockingQuicTpuConnection>,
|
||||||
wire_transaction: Vec<u8>,
|
wire_transaction: Vec<u8>,
|
||||||
) -> TransportResult<()> {
|
) -> TransportResult<()> {
|
||||||
let result = connection.send_wire_transaction(wire_transaction).await;
|
let result = timeout(
|
||||||
|
Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS),
|
||||||
|
connection.send_wire_transaction(wire_transaction),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
ASYNC_TASK_SEMAPHORE.release();
|
ASYNC_TASK_SEMAPHORE.release();
|
||||||
result
|
handle_send_result(result, connection)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_wire_transaction_batch_async(
|
async fn send_wire_transaction_batch_async(
|
||||||
connection: Arc<NonblockingQuicTpuConnection>,
|
connection: Arc<NonblockingQuicTpuConnection>,
|
||||||
buffers: Vec<Vec<u8>>,
|
buffers: Vec<Vec<u8>>,
|
||||||
) -> TransportResult<()> {
|
) -> TransportResult<()> {
|
||||||
let result = connection.send_wire_transaction_batch(&buffers).await;
|
let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64;
|
||||||
|
|
||||||
|
let result = timeout(
|
||||||
|
Duration::from_millis(time_out),
|
||||||
|
connection.send_wire_transaction_batch(&buffers),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
ASYNC_TASK_SEMAPHORE.release();
|
ASYNC_TASK_SEMAPHORE.release();
|
||||||
result
|
handle_send_result(result, connection)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check the send result and update stats if timedout. Returns the checked result.
|
||||||
|
fn handle_send_result(
|
||||||
|
result: Result<Result<(), TransportError>, tokio::time::error::Elapsed>,
|
||||||
|
connection: Arc<NonblockingQuicTpuConnection>,
|
||||||
|
) -> Result<(), TransportError> {
|
||||||
|
match result {
|
||||||
|
Ok(result) => result,
|
||||||
|
Err(_err) => {
|
||||||
|
let client_stats = ClientStats::default();
|
||||||
|
client_stats.send_timeout.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let stats = connection.connection_stats();
|
||||||
|
stats.add_client_stats(&client_stats, 0, false);
|
||||||
|
info!("Timedout sending transaction {:?}", connection.tpu_addr());
|
||||||
|
Err(TransportError::Custom(
|
||||||
|
"Timedout sending transaction".to_string(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TpuConnection for QuicTpuConnection {
|
impl TpuConnection for QuicTpuConnection {
|
||||||
|
|
|
@ -21,6 +21,7 @@ pub struct ClientStats {
|
||||||
pub tx_data_blocked: MovingStat,
|
pub tx_data_blocked: MovingStat,
|
||||||
pub tx_acks: MovingStat,
|
pub tx_acks: MovingStat,
|
||||||
pub make_connection_ms: AtomicU64,
|
pub make_connection_ms: AtomicU64,
|
||||||
|
pub send_timeout: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[enum_dispatch]
|
#[enum_dispatch]
|
||||||
|
|
Loading…
Reference in New Issue