Implement updating the connection cache stats in the client (#25308)

This commit is contained in:
ryleung-solana 2022-05-19 15:29:27 -04:00 committed by GitHub
parent 8cbe93962d
commit b857528a03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 79 additions and 65 deletions

View File

@ -30,7 +30,7 @@ pub enum Connection {
}
#[derive(Default)]
struct ConnectionCacheStats {
pub struct ConnectionCacheStats {
cache_hits: AtomicU64,
cache_misses: AtomicU64,
cache_evictions: AtomicU64,
@ -46,7 +46,7 @@ struct ConnectionCacheStats {
// Need to track these separately per-connection
// because we need to track the base stat value from quinn
total_client_stats: ClientStats,
pub total_client_stats: ClientStats,
}
const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000;
@ -287,9 +287,15 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0),
None => {
let connection = if map.use_quic {
Connection::Quic(Arc::new(QuicTpuConnection::new(*addr)))
Connection::Quic(Arc::new(QuicTpuConnection::new(
*addr,
map.stats.clone(),
)))
} else {
Connection::Udp(Arc::new(UdpTpuConnection::new(*addr)))
Connection::Udp(Arc::new(UdpTpuConnection::new(
*addr,
map.stats.clone(),
)))
};
// evict a connection if the cache is reaching upper bounds
@ -345,47 +351,10 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc<ConnectionCacheStats>)
eviction_timing_ms,
} = get_or_add_connection(addr);
let other_stats = if let Connection::Quic(conn) = &connection {
conn.stats().map(|s| (conn.base_stats(), s))
} else {
None
};
if report_stats {
connection_cache_stats.report();
}
if let Some((connection_stats, new_stats)) = other_stats {
connection_cache_stats
.total_client_stats
.congestion_events
.update_stat(
&connection_stats.congestion_events,
new_stats.path.congestion_events,
);
connection_cache_stats
.total_client_stats
.tx_streams_blocked_uni
.update_stat(
&connection_stats.tx_streams_blocked_uni,
new_stats.frame_tx.streams_blocked_uni,
);
connection_cache_stats
.total_client_stats
.tx_data_blocked
.update_stat(
&connection_stats.tx_data_blocked,
new_stats.frame_tx.data_blocked,
);
connection_cache_stats
.total_client_stats
.tx_acks
.update_stat(&connection_stats.tx_acks, new_stats.frame_tx.acks);
}
if cache_hit {
connection_cache_stats
.cache_hits

View File

@ -4,6 +4,7 @@
use {
crate::{
client_error::ClientErrorKind,
connection_cache::ConnectionCacheStats,
tpu_connection::{ClientStats, TpuConnection},
},
async_mutex::Mutex,
@ -14,7 +15,6 @@ use {
quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, NewConnection, VarInt, WriteError,
},
quinn_proto::ConnectionStats,
solana_measure::measure::Measure,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::{
@ -154,24 +154,24 @@ struct QuicClient {
pub struct QuicTpuConnection {
client: Arc<QuicClient>,
connection_stats: Arc<ConnectionCacheStats>,
}
impl QuicTpuConnection {
pub fn stats(&self) -> Option<ConnectionStats> {
self.client.stats()
}
pub fn base_stats(&self) -> Arc<ClientStats> {
self.client.stats.clone()
}
}
impl TpuConnection for QuicTpuConnection {
fn new(tpu_addr: SocketAddr) -> Self {
fn new(tpu_addr: SocketAddr, connection_stats: Arc<ConnectionCacheStats>) -> Self {
let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET);
let client = Arc::new(QuicClient::new(tpu_addr));
Self { client }
Self {
client,
connection_stats,
}
}
fn tpu_addr(&self) -> &SocketAddr {
@ -187,7 +187,9 @@ impl TpuConnection for QuicTpuConnection {
T: AsRef<[u8]>,
{
let _guard = RUNTIME.enter();
let send_buffer = self.client.send_buffer(wire_transaction, stats);
let send_buffer =
self.client
.send_buffer(wire_transaction, stats, self.connection_stats.clone());
RUNTIME.block_on(send_buffer)?;
Ok(())
}
@ -201,7 +203,9 @@ impl TpuConnection for QuicTpuConnection {
T: AsRef<[u8]>,
{
let _guard = RUNTIME.enter();
let send_batch = self.client.send_batch(buffers, stats);
let send_batch = self
.client
.send_batch(buffers, stats, self.connection_stats.clone());
RUNTIME.block_on(send_batch)?;
Ok(())
}
@ -213,9 +217,10 @@ impl TpuConnection for QuicTpuConnection {
) -> TransportResult<()> {
let _guard = RUNTIME.enter();
let client = self.client.clone();
let connection_stats = self.connection_stats.clone();
//drop and detach the task
let _ = RUNTIME.spawn(async move {
let send_buffer = client.send_buffer(wire_transaction, &stats);
let send_buffer = client.send_buffer(wire_transaction, &stats, connection_stats);
if let Err(e) = send_buffer.await {
warn!("Failed to send transaction async to {:?}", e);
datapoint_warn!("send-wire-async", ("failure", 1, i64),);
@ -231,9 +236,10 @@ impl TpuConnection for QuicTpuConnection {
) -> TransportResult<()> {
let _guard = RUNTIME.enter();
let client = self.client.clone();
let connection_stats = self.connection_stats.clone();
//drop and detach the task
let _ = RUNTIME.spawn(async move {
let send_batch = client.send_batch(&buffers, &stats);
let send_batch = client.send_batch(&buffers, &stats, connection_stats);
if let Err(e) = send_batch.await {
warn!("Failed to send transaction batch async to {:?}", e);
datapoint_warn!("send-wire-batch-async", ("failure", 1, i64),);
@ -252,12 +258,6 @@ impl QuicClient {
}
}
pub fn stats(&self) -> Option<ConnectionStats> {
let conn_guard = self.connection.lock();
let x = RUNTIME.block_on(conn_guard);
x.as_ref().map(|c| c.connection.connection.stats())
}
async fn _send_buffer_using_conn(
data: &[u8],
connection: &NewConnection,
@ -282,6 +282,7 @@ impl QuicClient {
&self,
data: &[u8],
stats: &ClientStats,
connection_stats: Arc<ConnectionCacheStats>,
) -> Result<Arc<NewConnection>, WriteError> {
let connection = {
let mut conn_guard = self.connection.lock().await;
@ -299,6 +300,35 @@ impl QuicClient {
}
}
};
let new_stats = connection.connection.stats();
connection_stats
.total_client_stats
.congestion_events
.update_stat(
&self.stats.congestion_events,
new_stats.path.congestion_events,
);
connection_stats
.total_client_stats
.tx_streams_blocked_uni
.update_stat(
&self.stats.tx_streams_blocked_uni,
new_stats.frame_tx.streams_blocked_uni,
);
connection_stats
.total_client_stats
.tx_data_blocked
.update_stat(&self.stats.tx_data_blocked, new_stats.frame_tx.data_blocked);
connection_stats
.total_client_stats
.tx_acks
.update_stat(&self.stats.tx_acks, new_stats.frame_tx.acks);
match Self::_send_buffer_using_conn(data, &connection).await {
Ok(()) => Ok(connection),
_ => {
@ -313,11 +343,17 @@ impl QuicClient {
}
}
pub async fn send_buffer<T>(&self, data: T, stats: &ClientStats) -> Result<(), ClientErrorKind>
pub async fn send_buffer<T>(
&self,
data: T,
stats: &ClientStats,
connection_stats: Arc<ConnectionCacheStats>,
) -> Result<(), ClientErrorKind>
where
T: AsRef<[u8]>,
{
self._send_buffer(data.as_ref(), stats).await?;
self._send_buffer(data.as_ref(), stats, connection_stats)
.await?;
Ok(())
}
@ -325,6 +361,7 @@ impl QuicClient {
&self,
buffers: &[T],
stats: &ClientStats,
connection_stats: Arc<ConnectionCacheStats>,
) -> Result<(), ClientErrorKind>
where
T: AsRef<[u8]>,
@ -343,7 +380,9 @@ impl QuicClient {
if buffers.is_empty() {
return Ok(());
}
let connection = self._send_buffer(buffers[0].as_ref(), stats).await?;
let connection = self
._send_buffer(buffers[0].as_ref(), stats, connection_stats)
.await?;
// Used to avoid dereferencing the Arc multiple times below
// by just getting a reference to the NewConnection once

View File

@ -1,4 +1,5 @@
use {
crate::connection_cache::ConnectionCacheStats,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_metrics::MovingStat,
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
@ -25,7 +26,7 @@ pub struct ClientStats {
}
pub trait TpuConnection {
fn new(tpu_addr: SocketAddr) -> Self;
fn new(tpu_addr: SocketAddr, connection_stats: Arc<ConnectionCacheStats>) -> Self;
fn tpu_addr(&self) -> &SocketAddr;

View File

@ -2,7 +2,10 @@
//! an interface for sending transactions
use {
crate::tpu_connection::{ClientStats, TpuConnection},
crate::{
connection_cache::ConnectionCacheStats,
tpu_connection::{ClientStats, TpuConnection},
},
core::iter::repeat,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::transport::Result as TransportResult,
@ -19,7 +22,7 @@ pub struct UdpTpuConnection {
}
impl TpuConnection for UdpTpuConnection {
fn new(tpu_addr: SocketAddr) -> Self {
fn new(tpu_addr: SocketAddr, _connection_stats: Arc<ConnectionCacheStats>) -> Self {
let (_, client_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
VALIDATOR_PORT_RANGE,

View File

@ -3,6 +3,7 @@ mod tests {
use {
crossbeam_channel::unbounded,
solana_client::{
connection_cache::ConnectionCacheStats,
quic_client::QuicTpuConnection,
tpu_connection::{ClientStats, TpuConnection},
},
@ -44,7 +45,8 @@ mod tests {
let addr = s.local_addr().unwrap().ip();
let port = s.local_addr().unwrap().port() - QUIC_PORT_OFFSET;
let tpu_addr = SocketAddr::new(addr, port);
let client = QuicTpuConnection::new(tpu_addr);
let connection_cache_stats = Arc::new(ConnectionCacheStats::default());
let client = QuicTpuConnection::new(tpu_addr, connection_cache_stats);
// Send a full size packet with single byte writes.
let num_bytes = PACKET_DATA_SIZE;