//! Simple client that connects to a given UDP port with the QUIC protocol and provides //! an interface for sending transactions which is restricted by the server's flow control. use { crate::{ connection_cache_stats::ConnectionCacheStats, nonblocking::{ quic_client::{ QuicClient, QuicLazyInitializedEndpoint, QuicTpuConnection as NonblockingQuicTpuConnection, }, tpu_connection::TpuConnection as NonblockingTpuConnection, }, tpu_connection::{ClientStats, TpuConnection}, }, lazy_static::lazy_static, log::*, solana_sdk::transport::{Result as TransportResult, TransportError}, std::{ net::SocketAddr, sync::{atomic::Ordering, Arc, Condvar, Mutex, MutexGuard}, time::Duration, }, tokio::{runtime::Runtime, time::timeout}, }; const MAX_OUTSTANDING_TASK: u64 = 2000; const SEND_TRANSACTION_TIMEOUT_MS: u64 = 10000; /// A semaphore used for limiting the number of asynchronous tasks spawn to the /// runtime. Before spawnning a task, use acquire. After the task is done (be it /// succsess or failure), call release. struct AsyncTaskSemaphore { /// Keep the counter info about the usage counter: Mutex, /// Conditional variable for signaling when counter is decremented cond_var: Condvar, /// The maximum usage allowed by this semaphore. permits: u64, } impl AsyncTaskSemaphore { fn new(permits: u64) -> Self { Self { counter: Mutex::new(0), cond_var: Condvar::new(), permits, } } /// When returned, the lock has been locked and usage count has been /// incremented. When the returned MutexGuard is dropped the lock is dropped /// without decrementing the usage count. fn acquire(&self) -> MutexGuard { let mut count = self.counter.lock().unwrap(); *count += 1; while *count > self.permits { count = self.cond_var.wait(count).unwrap(); } count } /// Acquire the lock and decrement the usage count fn release(&self) { let mut count = self.counter.lock().unwrap(); *count -= 1; self.cond_var.notify_one(); } } lazy_static! { static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore = AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK); static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() .thread_name("quic-client") .enable_all() .build() .unwrap(); } pub struct QuicTpuConnection { inner: Arc, } impl QuicTpuConnection { pub fn new( endpoint: Arc, tpu_addr: SocketAddr, connection_stats: Arc, ) -> Self { let inner = Arc::new(NonblockingQuicTpuConnection::new( endpoint, tpu_addr, connection_stats, )); Self { inner } } pub fn new_with_client( client: Arc, connection_stats: Arc, ) -> Self { let inner = Arc::new(NonblockingQuicTpuConnection::new_with_client( client, connection_stats, )); Self { inner } } } async fn send_wire_transaction_async( connection: Arc, wire_transaction: Vec, ) -> TransportResult<()> { let result = timeout( Duration::from_millis(SEND_TRANSACTION_TIMEOUT_MS), connection.send_wire_transaction(wire_transaction), ) .await; ASYNC_TASK_SEMAPHORE.release(); handle_send_result(result, connection) } async fn send_wire_transaction_batch_async( connection: Arc, buffers: Vec>, ) -> TransportResult<()> { let time_out = SEND_TRANSACTION_TIMEOUT_MS * buffers.len() as u64; let result = timeout( Duration::from_millis(time_out), connection.send_wire_transaction_batch(&buffers), ) .await; ASYNC_TASK_SEMAPHORE.release(); handle_send_result(result, connection) } /// Check the send result and update stats if timedout. Returns the checked result. fn handle_send_result( result: Result, tokio::time::error::Elapsed>, connection: Arc, ) -> Result<(), TransportError> { match result { Ok(result) => result, Err(_err) => { let client_stats = ClientStats::default(); client_stats.send_timeout.fetch_add(1, Ordering::Relaxed); let stats = connection.connection_stats(); stats.add_client_stats(&client_stats, 0, false); info!("Timedout sending transaction {:?}", connection.tpu_addr()); Err(TransportError::Custom( "Timedout sending transaction".to_string(), )) } } } impl TpuConnection for QuicTpuConnection { fn tpu_addr(&self) -> &SocketAddr { self.inner.tpu_addr() } fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]> + Send + Sync, { RUNTIME.block_on(self.inner.send_wire_transaction_batch(buffers))?; Ok(()) } fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); let _ = RUNTIME .spawn(async move { send_wire_transaction_async(inner, wire_transaction).await }); Ok(()) } fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { let _lock = ASYNC_TASK_SEMAPHORE.acquire(); let inner = self.inner.clone(); let _ = RUNTIME.spawn(async move { send_wire_transaction_batch_async(inner, buffers).await }); Ok(()) } }