Quic client stats (#24195)

* Add metrics to connection-cache to measure cache hits and misses

* Add congestion stats

* Add more client stats

* Review comments

Co-authored-by: Ryan Leung <ryan.leung@solana.com>
This commit is contained in:
sakridge 2022-04-13 05:04:40 +02:00 committed by GitHub
parent d8c45a69c3
commit e7fcda1424
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 345 additions and 63 deletions

1
Cargo.lock generated
View File

@ -4658,6 +4658,7 @@ dependencies = [
"log",
"lru",
"quinn",
"quinn-proto",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",

View File

@ -27,6 +27,7 @@ lazy_static = "1.4.0"
log = "0.4.14"
lru = "0.7.5"
quinn = "0.8.0"
quinn-proto = "0.8.0"
rand = "0.7.0"
rand_chacha = "0.2.2"
rayon = "1.5.1"

View File

@ -1,14 +1,21 @@
use {
crate::{
quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection,
quic_client::QuicTpuConnection,
tpu_connection::{ClientStats, TpuConnection},
udp_client::UdpTpuConnection,
},
lazy_static::lazy_static,
lru::LruCache,
solana_net_utils::VALIDATOR_PORT_RANGE,
solana_sdk::{transaction::VersionedTransaction, transport::TransportError},
solana_sdk::{
timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, Mutex},
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
},
};
@ -21,8 +28,99 @@ enum Connection {
Quic(Arc<QuicTpuConnection>),
}
#[derive(Default)]
struct ConnectionCacheStats {
cache_hits: AtomicU64,
cache_misses: AtomicU64,
sent_packets: AtomicU64,
total_batches: AtomicU64,
batch_success: AtomicU64,
batch_failure: AtomicU64,
// Need to track these separately per-connection
// because we need to track the base stat value from quinn
total_client_stats: ClientStats,
}
const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000;
impl ConnectionCacheStats {
fn add_client_stats(&self, client_stats: &ClientStats, num_packets: usize, is_success: bool) {
self.total_client_stats.total_connections.fetch_add(
client_stats.total_connections.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.total_client_stats.connection_reuse.fetch_add(
client_stats.connection_reuse.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.sent_packets
.fetch_add(num_packets as u64, Ordering::Relaxed);
self.total_batches.fetch_add(1, Ordering::Relaxed);
if is_success {
self.batch_success.fetch_add(1, Ordering::Relaxed);
} else {
self.batch_failure.fetch_add(1, Ordering::Relaxed);
}
}
fn report(&self) {
datapoint_info!(
"quic-client-connection-stats",
(
"cache_hits",
self.cache_hits.swap(0, Ordering::Relaxed),
i64
),
(
"cache_misses",
self.cache_misses.swap(0, Ordering::Relaxed),
i64
),
(
"total_connections",
self.total_client_stats
.total_connections
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_reuse",
self.total_client_stats
.connection_reuse
.swap(0, Ordering::Relaxed),
i64
),
(
"congestion_events",
self.total_client_stats.congestion_events.load_and_reset(),
i64
),
(
"tx_streams_blocked_uni",
self.total_client_stats
.tx_streams_blocked_uni
.load_and_reset(),
i64
),
(
"tx_data_blocked",
self.total_client_stats.tx_data_blocked.load_and_reset(),
i64
),
(
"tx_acks",
self.total_client_stats.tx_acks.load_and_reset(),
i64
),
);
}
}
struct ConnMap {
map: LruCache<SocketAddr, Connection>,
stats: Arc<ConnectionCacheStats>,
last_stats: AtomicInterval,
use_quic: bool,
}
@ -30,6 +128,8 @@ impl ConnMap {
pub fn new() -> Self {
Self {
map: LruCache::new(MAX_CONNECTIONS),
stats: Arc::new(ConnectionCacheStats::default()),
last_stats: AtomicInterval::default(),
use_quic: false,
}
}
@ -50,11 +150,25 @@ pub fn set_use_quic(use_quic: bool) {
// TODO: see https://github.com/solana-labs/solana/issues/23661
// remove lazy_static and optimize and refactor this
fn get_connection(addr: &SocketAddr) -> Connection {
fn get_connection(addr: &SocketAddr) -> (Connection, Arc<ConnectionCacheStats>) {
let mut map = (*CONNECTION_MAP).lock().unwrap();
match map.map.get(addr) {
Some(connection) => connection.clone(),
if map
.last_stats
.should_update(CONNECTION_STAT_SUBMISSION_INTERVAL)
{
map.stats.report();
}
let (connection, hit, maybe_stats) = match map.map.get(addr) {
Some(connection) => {
let mut stats = None;
// update connection stats
if let Connection::Quic(conn) = connection {
stats = conn.stats().map(|s| (conn.base_stats(), s));
}
(connection.clone(), true, stats)
}
None => {
let (_, send_socket) = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
@ -68,9 +182,41 @@ fn get_connection(addr: &SocketAddr) -> Connection {
};
map.map.put(*addr, connection.clone());
connection
(connection, false, None)
}
};
if let Some((connection_stats, new_stats)) = maybe_stats {
map.stats.total_client_stats.congestion_events.update_stat(
&connection_stats.congestion_events,
new_stats.path.congestion_events,
);
map.stats
.total_client_stats
.tx_streams_blocked_uni
.update_stat(
&connection_stats.tx_streams_blocked_uni,
new_stats.frame_tx.streams_blocked_uni,
);
map.stats.total_client_stats.tx_data_blocked.update_stat(
&connection_stats.tx_data_blocked,
new_stats.frame_tx.data_blocked,
);
map.stats
.total_client_stats
.tx_acks
.update_stat(&connection_stats.tx_acks, new_stats.frame_tx.acks);
}
if hit {
map.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
} else {
map.stats.cache_misses.fetch_add(1, Ordering::Relaxed);
}
(connection, map.stats.clone())
}
// TODO: see https://github.com/solana-labs/solana/issues/23851
@ -86,55 +232,67 @@ pub fn send_wire_transaction_batch(
packets: &[&[u8]],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.send_wire_transaction_batch(packets),
Connection::Quic(conn) => conn.send_wire_transaction_batch(packets),
}
let (conn, stats) = get_connection(addr);
let client_stats = ClientStats::default();
let r = match conn {
Connection::Udp(conn) => conn.send_wire_transaction_batch(packets, &client_stats),
Connection::Quic(conn) => conn.send_wire_transaction_batch(packets, &client_stats),
};
stats.add_client_stats(&client_stats, packets.len(), r.is_ok());
r
}
pub fn send_wire_transaction_async(
packets: Vec<u8>,
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.send_wire_transaction_async(packets),
Connection::Quic(conn) => conn.send_wire_transaction_async(packets),
}
let (conn, stats) = get_connection(addr);
let client_stats = Arc::new(ClientStats::default());
let r = match conn {
Connection::Udp(conn) => conn.send_wire_transaction_async(packets, client_stats.clone()),
Connection::Quic(conn) => conn.send_wire_transaction_async(packets, client_stats.clone()),
};
stats.add_client_stats(&client_stats, 1, r.is_ok());
r
}
pub fn send_wire_transaction(
wire_transaction: &[u8],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.send_wire_transaction(wire_transaction),
Connection::Quic(conn) => conn.send_wire_transaction(wire_transaction),
}
send_wire_transaction_batch(&[wire_transaction], addr)
}
pub fn serialize_and_send_transaction(
transaction: &VersionedTransaction,
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction),
Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction),
}
let (conn, stats) = get_connection(addr);
let client_stats = ClientStats::default();
let r = match conn {
Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction, &client_stats),
Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction, &client_stats),
};
stats.add_client_stats(&client_stats, 1, r.is_ok());
r
}
pub fn par_serialize_and_send_transaction_batch(
transactions: &[VersionedTransaction],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.par_serialize_and_send_transaction_batch(transactions),
Connection::Quic(conn) => conn.par_serialize_and_send_transaction_batch(transactions),
}
let (conn, stats) = get_connection(addr);
let client_stats = ClientStats::default();
let r = match conn {
Connection::Udp(conn) => {
conn.par_serialize_and_send_transaction_batch(transactions, &client_stats)
}
Connection::Quic(conn) => {
conn.par_serialize_and_send_transaction_batch(transactions, &client_stats)
}
};
stats.add_client_stats(&client_stats, transactions.len(), r.is_ok());
r
}
#[cfg(test)]
@ -182,7 +340,7 @@ mod tests {
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let first_addr = get_addr(&mut rng);
assert!(ip(get_connection(&first_addr)) == first_addr.ip());
assert!(ip(get_connection(&first_addr).0) == first_addr.ip());
let addrs = (0..MAX_CONNECTIONS)
.into_iter()
.map(|_| {

View File

@ -2,20 +2,24 @@
//! an interface for sending transactions which is restricted by the server's flow control.
use {
crate::{client_error::ClientErrorKind, tpu_connection::TpuConnection},
crate::{
client_error::ClientErrorKind,
tpu_connection::{ClientStats, TpuConnection},
},
async_mutex::Mutex,
futures::future::join_all,
itertools::Itertools,
lazy_static::lazy_static,
log::*,
quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError},
quinn_proto::ConnectionStats,
solana_sdk::{
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET},
transport::Result as TransportResult,
},
std::{
net::{SocketAddr, UdpSocket},
sync::Arc,
sync::{atomic::Ordering, Arc},
},
tokio::runtime::Runtime,
};
@ -52,12 +56,23 @@ struct QuicClient {
endpoint: Endpoint,
connection: Arc<Mutex<Option<Arc<NewConnection>>>>,
addr: SocketAddr,
stats: Arc<ClientStats>,
}
pub struct QuicTpuConnection {
client: Arc<QuicClient>,
}
impl QuicTpuConnection {
pub fn stats(&self) -> Option<ConnectionStats> {
self.client.stats()
}
pub fn base_stats(&self) -> Arc<ClientStats> {
self.client.stats.clone()
}
}
impl TpuConnection for QuicTpuConnection {
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self {
let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET);
@ -70,33 +85,45 @@ impl TpuConnection for QuicTpuConnection {
&self.client.addr
}
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
fn send_wire_transaction<T>(
&self,
wire_transaction: T,
stats: &ClientStats,
) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
let _guard = RUNTIME.enter();
let send_buffer = self.client.send_buffer(wire_transaction);
let send_buffer = self.client.send_buffer(wire_transaction, stats);
RUNTIME.block_on(send_buffer)?;
Ok(())
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
stats: &ClientStats,
) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
let _guard = RUNTIME.enter();
let send_batch = self.client.send_batch(buffers);
let send_batch = self.client.send_batch(buffers, stats);
RUNTIME.block_on(send_batch)?;
Ok(())
}
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
fn send_wire_transaction_async(
&self,
wire_transaction: Vec<u8>,
stats: Arc<ClientStats>,
) -> TransportResult<()> {
let _guard = RUNTIME.enter();
//drop and detach the task
let client = self.client.clone();
inc_new_counter_info!("send_wire_transaction_async", 1);
let _ = RUNTIME.spawn(async move {
let send_buffer = client.send_buffer(wire_transaction);
let send_buffer = client.send_buffer(wire_transaction, &stats);
if let Err(e) = send_buffer.await {
inc_new_counter_warn!("send_wire_transaction_async_fail", 1);
warn!("Failed to send transaction async to {:?}", e);
@ -127,9 +154,16 @@ impl QuicClient {
endpoint,
connection: Arc::new(Mutex::new(None)),
addr,
stats: Arc::new(ClientStats::default()),
}
}
pub fn stats(&self) -> Option<ConnectionStats> {
let conn_guard = self.connection.lock();
let x = RUNTIME.block_on(conn_guard);
x.as_ref().map(|c| c.connection.stats())
}
// If this function becomes public, it should be changed to
// not expose details of the specific Quic implementation we're using
async fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
@ -146,18 +180,35 @@ impl QuicClient {
Ok(())
}
async fn make_connection(&self, stats: &ClientStats) -> Result<Arc<NewConnection>, WriteError> {
let connecting = self.endpoint.connect(self.addr, "connect").unwrap();
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let connecting_result = connecting.await;
if connecting_result.is_err() {
stats.connection_errors.fetch_add(1, Ordering::Relaxed);
}
let connection = connecting_result?;
Ok(Arc::new(connection))
}
// Attempts to send data, connecting/reconnecting as necessary
// On success, returns the connection used to successfully send the data
async fn _send_buffer(&self, data: &[u8]) -> Result<Arc<NewConnection>, WriteError> {
async fn _send_buffer(
&self,
data: &[u8],
stats: &ClientStats,
) -> Result<Arc<NewConnection>, WriteError> {
let connection = {
let mut conn_guard = self.connection.lock().await;
let maybe_conn = (*conn_guard).clone();
match maybe_conn {
Some(conn) => conn.clone(),
Some(conn) => {
stats.connection_reuse.fetch_add(1, Ordering::Relaxed);
conn.clone()
}
None => {
let connecting = self.endpoint.connect(self.addr, "connect").unwrap();
let connection = Arc::new(connecting.await?);
let connection = self.make_connection(stats).await?;
*conn_guard = Some(connection.clone());
connection
}
@ -167,8 +218,7 @@ impl QuicClient {
Ok(()) => Ok(connection),
_ => {
let connection = {
let connecting = self.endpoint.connect(self.addr, "connect").unwrap();
let connection = Arc::new(connecting.await?);
let connection = self.make_connection(stats).await?;
let mut conn_guard = self.connection.lock().await;
*conn_guard = Some(connection.clone());
connection
@ -179,15 +229,19 @@ impl QuicClient {
}
}
pub async fn send_buffer<T>(&self, data: T) -> Result<(), ClientErrorKind>
pub async fn send_buffer<T>(&self, data: T, stats: &ClientStats) -> Result<(), ClientErrorKind>
where
T: AsRef<[u8]>,
{
self._send_buffer(data.as_ref()).await?;
self._send_buffer(data.as_ref(), stats).await?;
Ok(())
}
pub async fn send_batch<T>(&self, buffers: &[T]) -> Result<(), ClientErrorKind>
pub async fn send_batch<T>(
&self,
buffers: &[T],
stats: &ClientStats,
) -> Result<(), ClientErrorKind>
where
T: AsRef<[u8]>,
{
@ -205,7 +259,7 @@ impl QuicClient {
if buffers.is_empty() {
return Ok(());
}
let connection = self._send_buffer(buffers[0].as_ref()).await?;
let connection = self._send_buffer(buffers[0].as_ref(), stats).await?;
// Used to avoid dereferencing the Arc multiple times below
// by just getting a reference to the NewConnection once

View File

@ -1,9 +1,26 @@
use {
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_metrics::MovingStat,
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::net::{SocketAddr, UdpSocket},
std::{
net::{SocketAddr, UdpSocket},
sync::{atomic::AtomicU64, Arc},
},
};
#[derive(Default)]
pub struct ClientStats {
pub total_connections: AtomicU64,
pub connection_reuse: AtomicU64,
pub connection_errors: AtomicU64,
// these will be the last values of these stats
pub congestion_events: MovingStat,
pub tx_streams_blocked_uni: MovingStat,
pub tx_data_blocked: MovingStat,
pub tx_acks: MovingStat,
}
pub trait TpuConnection {
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self;
@ -12,31 +29,45 @@ pub trait TpuConnection {
fn serialize_and_send_transaction(
&self,
transaction: &VersionedTransaction,
stats: &ClientStats,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(&wire_transaction)
self.send_wire_transaction(&wire_transaction, stats)
}
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
fn send_wire_transaction<T>(
&self,
wire_transaction: T,
stats: &ClientStats,
) -> TransportResult<()>
where
T: AsRef<[u8]>;
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()>;
fn send_wire_transaction_async(
&self,
wire_transaction: Vec<u8>,
stats: Arc<ClientStats>,
) -> TransportResult<()>;
fn par_serialize_and_send_transaction_batch(
&self,
transactions: &[VersionedTransaction],
stats: &ClientStats,
) -> TransportResult<()> {
let buffers = transactions
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
self.send_wire_transaction_batch(&buffers)
self.send_wire_transaction_batch(&buffers, stats)
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
stats: &ClientStats,
) -> TransportResult<()>
where
T: AsRef<[u8]>;
}

View File

@ -2,11 +2,14 @@
//! an interface for sending transactions
use {
crate::tpu_connection::TpuConnection,
crate::tpu_connection::{ClientStats, TpuConnection},
core::iter::repeat,
solana_sdk::transport::Result as TransportResult,
solana_streamer::sendmmsg::batch_send,
std::net::{SocketAddr, UdpSocket},
std::{
net::{SocketAddr, UdpSocket},
sync::Arc,
},
};
pub struct UdpTpuConnection {
@ -26,7 +29,11 @@ impl TpuConnection for UdpTpuConnection {
&self.addr
}
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
fn send_wire_transaction<T>(
&self,
wire_transaction: T,
_stats: &ClientStats,
) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
@ -34,12 +41,20 @@ impl TpuConnection for UdpTpuConnection {
Ok(())
}
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
fn send_wire_transaction_async(
&self,
wire_transaction: Vec<u8>,
_stats: Arc<ClientStats>,
) -> TransportResult<()> {
self.socket.send_to(wire_transaction.as_ref(), self.addr)?;
Ok(())
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
fn send_wire_transaction_batch<T>(
&self,
buffers: &[T],
_stats: &ClientStats,
) -> TransportResult<()>
where
T: AsRef<[u8]>,
{

View File

@ -4,7 +4,28 @@ pub mod datapoint;
mod metrics;
pub mod poh_timing_point;
pub use crate::metrics::{flush, query, set_host_id, set_panic_hook, submit};
use std::sync::Arc;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
// To track an external counter which cannot be reset and is always increasing
#[derive(Default)]
pub struct MovingStat {
value: AtomicU64,
}
impl MovingStat {
pub fn update_stat(&self, old_value: &MovingStat, new_value: u64) {
let old = old_value.value.swap(new_value, Ordering::Acquire);
self.value
.fetch_add(new_value.saturating_sub(old), Ordering::Release);
}
pub fn load_and_reset(&self) -> u64 {
self.value.swap(0, Ordering::Acquire)
}
}
/// A helper that sends the count of created tokens as a datapoint.
#[allow(clippy::redundant_allocation)]

View File

@ -3363,6 +3363,7 @@ dependencies = [
"log",
"lru",
"quinn",
"quinn-proto",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",