adds metrics to turbine QUIC endpoint (#33819)

This commit is contained in:
behzad nouri 2023-10-24 14:59:15 +00:00 committed by GitHub
parent c3d588b3f1
commit 6470544ea2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 218 additions and 13 deletions

View File

@ -21,9 +21,10 @@ use {
io::Error as IoError,
net::{IpAddr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
time::Duration,
},
thiserror::Error,
tokio::{
@ -60,12 +61,12 @@ pub type AsyncTryJoinHandle = TryJoin<JoinHandle<()>, JoinHandle<()>>;
pub enum Error {
#[error(transparent)]
CertificateError(#[from] RcgenError),
#[error("Channel Send Error")]
ChannelSendError,
#[error(transparent)]
ConnectError(#[from] ConnectError),
#[error(transparent)]
ConnectionError(#[from] ConnectionError),
#[error("Channel Send Error")]
ChannelSendError,
#[error("Invalid Identity: {0:?}")]
InvalidIdentity(SocketAddr),
#[error(transparent)]
@ -76,6 +77,12 @@ pub enum Error {
TlsError(#[from] rustls::Error),
}
macro_rules! add_metric {
($metric: expr) => {{
$metric.fetch_add(1, Ordering::Relaxed);
}};
}
#[allow(clippy::type_complexity)]
pub fn new_quic_endpoint(
runtime: &tokio::runtime::Handle,
@ -182,8 +189,11 @@ async fn run_server(
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<Bytes>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
) {
let stats = Arc::<TurbineQuicStats>::default();
let report_metrics_task =
tokio::task::spawn(report_metrics_task("repair_quic_server", stats.clone()));
while let Some(connecting) = endpoint.accept().await {
tokio::task::spawn(handle_connecting_error(
tokio::task::spawn(handle_connecting_task(
endpoint.clone(),
connecting,
sender.clone(),
@ -191,8 +201,10 @@ async fn run_server(
prune_cache_pending.clone(),
router.clone(),
cache.clone(),
stats.clone(),
));
}
report_metrics_task.abort();
}
async fn run_client(
@ -204,13 +216,17 @@ async fn run_client(
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<Bytes>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
) {
let stats = Arc::<TurbineQuicStats>::default();
let report_metrics_task =
tokio::task::spawn(report_metrics_task("repair_quic_client", stats.clone()));
while let Some((remote_address, bytes)) = receiver.recv().await {
let Some(bytes) = try_route_bytes(&remote_address, bytes, &*router.read().await) else {
let Some(bytes) = try_route_bytes(&remote_address, bytes, &*router.read().await, &stats)
else {
continue;
};
let receiver = {
let mut router = router.write().await;
let Some(bytes) = try_route_bytes(&remote_address, bytes, &router) else {
let Some(bytes) = try_route_bytes(&remote_address, bytes, &router, &stats) else {
continue;
};
let (sender, receiver) = tokio::sync::mpsc::channel(ROUTER_CHANNEL_BUFFER);
@ -227,24 +243,28 @@ async fn run_client(
prune_cache_pending.clone(),
router.clone(),
cache.clone(),
stats.clone(),
));
}
close_quic_endpoint(&endpoint);
// Drop sender channels to unblock threads waiting on the receiving end.
router.write().await.clear();
report_metrics_task.abort();
}
fn try_route_bytes(
remote_address: &SocketAddr,
bytes: Bytes,
router: &HashMap<SocketAddr, AsyncSender<Bytes>>,
stats: &TurbineQuicStats,
) -> Option<Bytes> {
match router.get(remote_address) {
None => Some(bytes),
Some(sender) => match sender.try_send(bytes) {
Ok(()) => None,
Err(TrySendError::Full(_)) => {
error!("TrySendError::Full {remote_address}");
debug!("TrySendError::Full {remote_address}");
add_metric!(stats.router_try_send_error_full);
None
}
Err(TrySendError::Closed(bytes)) => Some(bytes),
@ -252,7 +272,7 @@ fn try_route_bytes(
}
}
async fn handle_connecting_error(
async fn handle_connecting_task(
endpoint: Endpoint,
connecting: Connecting,
sender: Sender<(Pubkey, SocketAddr, Bytes)>,
@ -260,6 +280,7 @@ async fn handle_connecting_error(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<Bytes>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<TurbineQuicStats>,
) {
if let Err(err) = handle_connecting(
endpoint,
@ -269,10 +290,12 @@ async fn handle_connecting_error(
prune_cache_pending,
router,
cache,
stats.clone(),
)
.await
{
error!("handle_connecting: {err:?}");
debug!("handle_connecting: {err:?}");
record_error(&err, &stats);
}
}
@ -284,6 +307,7 @@ async fn handle_connecting(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<Bytes>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<TurbineQuicStats>,
) -> Result<(), Error> {
let connection = connecting.await?;
let remote_address = connection.remote_address();
@ -304,6 +328,7 @@ async fn handle_connecting(
prune_cache_pending,
router,
cache,
stats,
)
.await;
Ok(())
@ -321,6 +346,7 @@ async fn handle_connection(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<Bytes>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<TurbineQuicStats>,
) {
cache_connection(
remote_pubkey,
@ -338,15 +364,18 @@ async fn handle_connection(
remote_pubkey,
connection.clone(),
sender,
stats.clone(),
));
match futures::future::try_join(send_datagram_task, read_datagram_task).await {
Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"),
Ok(out) => {
if let (Err(ref err), _) = out {
error!("send_datagram_task: {remote_pubkey}, {remote_address}, {err:?}");
debug!("send_datagram_task: {remote_pubkey}, {remote_address}, {err:?}");
record_error(err, &stats);
}
if let (_, Err(ref err)) = out {
error!("read_datagram_task: {remote_pubkey}, {remote_address}, {err:?}");
debug!("read_datagram_task: {remote_pubkey}, {remote_address}, {err:?}");
record_error(err, &stats);
}
}
}
@ -364,6 +393,7 @@ async fn read_datagram_task(
remote_pubkey: Pubkey,
connection: Connection,
sender: Sender<(Pubkey, SocketAddr, Bytes)>,
stats: Arc<TurbineQuicStats>,
) -> Result<(), Error> {
// Assert that send won't block.
debug_assert_eq!(sender.capacity(), None);
@ -379,7 +409,8 @@ async fn read_datagram_task(
if let Some(err) = connection.close_reason() {
return Err(Error::from(err));
}
error!("connection.read_datagram: {remote_pubkey}, {remote_address}, {err:?}");
debug!("connection.read_datagram: {remote_pubkey}, {remote_address}, {err:?}");
record_error(&Error::from(err), &stats);
}
};
}
@ -404,6 +435,7 @@ async fn make_connection_task(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<Bytes>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<TurbineQuicStats>,
) {
if let Err(err) = make_connection(
endpoint,
@ -414,10 +446,12 @@ async fn make_connection_task(
prune_cache_pending,
router,
cache,
stats.clone(),
)
.await
{
error!("make_connection: {remote_address}, {err:?}");
debug!("make_connection: {remote_address}, {err:?}");
record_error(&err, &stats);
}
}
@ -430,6 +464,7 @@ async fn make_connection(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<Bytes>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<TurbineQuicStats>,
) -> Result<(), Error> {
let connection = endpoint
.connect(remote_address, CONNECT_SERVER_NAME)?
@ -445,6 +480,7 @@ async fn make_connection(
prune_cache_pending,
router,
cache,
stats,
)
.await;
Ok(())
@ -560,6 +596,175 @@ impl<T> From<crossbeam_channel::SendError<T>> for Error {
}
}
#[derive(Default)]
struct TurbineQuicStats {
connect_error_invalid_remote_address: AtomicU64,
connect_error_other: AtomicU64,
connect_error_too_many_connections: AtomicU64,
connection_error_application_closed: AtomicU64,
connection_error_connection_closed: AtomicU64,
connection_error_locally_closed: AtomicU64,
connection_error_reset: AtomicU64,
connection_error_timed_out: AtomicU64,
connection_error_transport_error: AtomicU64,
connection_error_version_mismatch: AtomicU64,
invalid_identity: AtomicU64,
router_try_send_error_full: AtomicU64,
send_datagram_error_connection_lost: AtomicU64,
send_datagram_error_too_large: AtomicU64,
send_datagram_error_unsupported_by_peer: AtomicU64,
}
async fn report_metrics_task(name: &'static str, stats: Arc<TurbineQuicStats>) {
loop {
tokio::time::sleep(Duration::from_secs(2)).await;
report_metrics(name, &stats);
}
}
fn record_error(err: &Error, stats: &TurbineQuicStats) {
match err {
Error::CertificateError(_) => (),
Error::ChannelSendError => (),
Error::ConnectError(ConnectError::EndpointStopping) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::TooManyConnections) => {
add_metric!(stats.connect_error_too_many_connections)
}
Error::ConnectError(ConnectError::InvalidDnsName(_)) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::InvalidRemoteAddress(_)) => {
add_metric!(stats.connect_error_invalid_remote_address)
}
Error::ConnectError(ConnectError::NoDefaultClientConfig) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::UnsupportedVersion) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectionError(ConnectionError::VersionMismatch) => {
add_metric!(stats.connection_error_version_mismatch)
}
Error::ConnectionError(ConnectionError::TransportError(_)) => {
add_metric!(stats.connection_error_transport_error)
}
Error::ConnectionError(ConnectionError::ConnectionClosed(_)) => {
add_metric!(stats.connection_error_connection_closed)
}
Error::ConnectionError(ConnectionError::ApplicationClosed(_)) => {
add_metric!(stats.connection_error_application_closed)
}
Error::ConnectionError(ConnectionError::Reset) => add_metric!(stats.connection_error_reset),
Error::ConnectionError(ConnectionError::TimedOut) => {
add_metric!(stats.connection_error_timed_out)
}
Error::ConnectionError(ConnectionError::LocallyClosed) => {
add_metric!(stats.connection_error_locally_closed)
}
Error::InvalidIdentity(_) => add_metric!(stats.invalid_identity),
Error::IoError(_) => (),
Error::SendDatagramError(SendDatagramError::UnsupportedByPeer) => {
add_metric!(stats.send_datagram_error_unsupported_by_peer)
}
Error::SendDatagramError(SendDatagramError::Disabled) => (),
Error::SendDatagramError(SendDatagramError::TooLarge) => {
add_metric!(stats.send_datagram_error_too_large)
}
Error::SendDatagramError(SendDatagramError::ConnectionLost(_)) => {
add_metric!(stats.send_datagram_error_connection_lost)
}
Error::TlsError(_) => (),
}
}
fn report_metrics(name: &'static str, stats: &TurbineQuicStats) {
macro_rules! reset_metric {
($metric: expr) => {
$metric.swap(0, Ordering::Relaxed)
};
}
datapoint_info!(
name,
(
"connect_error_invalid_remote_address",
reset_metric!(stats.connect_error_invalid_remote_address),
i64
),
(
"connect_error_other",
reset_metric!(stats.connect_error_other),
i64
),
(
"connect_error_too_many_connections",
reset_metric!(stats.connect_error_too_many_connections),
i64
),
(
"connection_error_application_closed",
reset_metric!(stats.connection_error_application_closed),
i64
),
(
"connection_error_connection_closed",
reset_metric!(stats.connection_error_connection_closed),
i64
),
(
"connection_error_locally_closed",
reset_metric!(stats.connection_error_locally_closed),
i64
),
(
"connection_error_reset",
reset_metric!(stats.connection_error_reset),
i64
),
(
"connection_error_timed_out",
reset_metric!(stats.connection_error_timed_out),
i64
),
(
"connection_error_transport_error",
reset_metric!(stats.connection_error_transport_error),
i64
),
(
"connection_error_version_mismatch",
reset_metric!(stats.connection_error_version_mismatch),
i64
),
(
"invalid_identity",
reset_metric!(stats.invalid_identity),
i64
),
(
"router_try_send_error_full",
reset_metric!(stats.router_try_send_error_full),
i64
),
(
"send_datagram_error_connection_lost",
reset_metric!(stats.send_datagram_error_connection_lost),
i64
),
(
"send_datagram_error_too_large",
reset_metric!(stats.send_datagram_error_too_large),
i64
),
(
"send_datagram_error_unsupported_by_peer",
reset_metric!(stats.send_datagram_error_unsupported_by_peer),
i64
),
);
}
#[cfg(test)]
mod tests {
use {