adds metrics to repair QUIC endpoint (#33818)

This commit is contained in:
behzad nouri 2023-10-25 18:59:14 +00:00 committed by GitHub
parent a851670d54
commit e555a61c78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 311 additions and 21 deletions

View File

@ -6,8 +6,8 @@ use {
log::error,
quinn::{
ClientConfig, ConnectError, Connecting, Connection, ConnectionError, Endpoint,
EndpointConfig, ReadToEndError, RecvStream, SendStream, ServerConfig, TokioRuntime,
TransportConfig, VarInt, WriteError,
EndpointConfig, ReadError, ReadToEndError, RecvStream, SendStream, ServerConfig,
TokioRuntime, TransportConfig, VarInt, WriteError,
},
rcgen::RcgenError,
rustls::{Certificate, PrivateKey},
@ -24,7 +24,7 @@ use {
io::{Cursor, Error as IoError},
net::{IpAddr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
time::Duration,
@ -82,16 +82,14 @@ pub struct RemoteRequest {
#[derive(Error, Debug)]
#[allow(clippy::enum_variant_names)]
pub(crate) enum Error {
#[error(transparent)]
BincodeError(#[from] bincode::Error),
#[error(transparent)]
CertificateError(#[from] RcgenError),
#[error("Channel Send Error")]
ChannelSendError,
#[error(transparent)]
ConnectError(#[from] ConnectError),
#[error(transparent)]
ConnectionError(#[from] ConnectionError),
#[error("Channel Send Error")]
ChannelSendError,
#[error("Invalid Identity: {0:?}")]
InvalidIdentity(SocketAddr),
#[error(transparent)]
@ -103,9 +101,15 @@ pub(crate) enum Error {
#[error("read_to_end Timeout")]
ReadToEndTimeout,
#[error(transparent)]
WriteError(#[from] WriteError),
#[error(transparent)]
TlsError(#[from] rustls::Error),
#[error(transparent)]
WriteError(#[from] WriteError),
}
macro_rules! add_metric {
($metric: expr) => {{
$metric.fetch_add(1, Ordering::Relaxed);
}};
}
#[allow(clippy::type_complexity)]
@ -207,8 +211,11 @@ async fn run_server(
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<LocalRequest>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
) {
let stats = Arc::<RepairQuicStats>::default();
let report_metrics_task =
tokio::task::spawn(report_metrics_task("repair_quic_server", stats.clone()));
while let Some(connecting) = endpoint.accept().await {
tokio::task::spawn(handle_connecting_error(
tokio::task::spawn(handle_connecting_task(
endpoint.clone(),
connecting,
remote_request_sender.clone(),
@ -216,8 +223,10 @@ async fn run_server(
prune_cache_pending.clone(),
router.clone(),
cache.clone(),
stats.clone(),
));
}
report_metrics_task.abort();
}
async fn run_client(
@ -229,14 +238,17 @@ async fn run_client(
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<LocalRequest>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
) {
let stats = Arc::<RepairQuicStats>::default();
let report_metrics_task =
tokio::task::spawn(report_metrics_task("repair_quic_client", stats.clone()));
while let Some(request) = receiver.recv().await {
let Some(request) = try_route_request(request, &*router.read().await) else {
let Some(request) = try_route_request(request, &*router.read().await, &stats) else {
continue;
};
let remote_address = request.remote_address;
let receiver = {
let mut router = router.write().await;
let Some(request) = try_route_request(request, &router) else {
let Some(request) = try_route_request(request, &router, &stats) else {
continue;
};
let (sender, receiver) = tokio::sync::mpsc::channel(ROUTER_CHANNEL_BUFFER);
@ -253,11 +265,13 @@ async fn run_client(
prune_cache_pending.clone(),
router.clone(),
cache.clone(),
stats.clone(),
));
}
close_quic_endpoint(&endpoint);
// Drop sender channels to unblock threads waiting on the receiving end.
router.write().await.clear();
report_metrics_task.abort();
}
// Routes the local request to respective channel. Drops the request if the
@ -266,13 +280,15 @@ async fn run_client(
fn try_route_request(
request: LocalRequest,
router: &HashMap<SocketAddr, AsyncSender<LocalRequest>>,
stats: &RepairQuicStats,
) -> Option<LocalRequest> {
match router.get(&request.remote_address) {
None => Some(request),
Some(sender) => match sender.try_send(request) {
Ok(()) => None,
Err(TrySendError::Full(request)) => {
error!("TrySendError::Full {}", request.remote_address);
debug!("TrySendError::Full {}", request.remote_address);
add_metric!(stats.router_try_send_error_full);
None
}
Err(TrySendError::Closed(request)) => Some(request),
@ -280,7 +296,7 @@ fn try_route_request(
}
}
async fn handle_connecting_error(
async fn handle_connecting_task(
endpoint: Endpoint,
connecting: Connecting,
remote_request_sender: Sender<RemoteRequest>,
@ -288,6 +304,7 @@ async fn handle_connecting_error(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<LocalRequest>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<RepairQuicStats>,
) {
if let Err(err) = handle_connecting(
endpoint,
@ -297,10 +314,12 @@ async fn handle_connecting_error(
prune_cache_pending,
router,
cache,
stats.clone(),
)
.await
{
error!("handle_connecting: {err:?}");
debug!("handle_connecting: {err:?}");
record_error(&err, &stats);
}
}
@ -312,6 +331,7 @@ async fn handle_connecting(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<LocalRequest>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<RepairQuicStats>,
) -> Result<(), Error> {
let connection = connecting.await?;
let remote_address = connection.remote_address();
@ -332,6 +352,7 @@ async fn handle_connecting(
prune_cache_pending,
router,
cache,
stats,
)
.await;
Ok(())
@ -349,6 +370,7 @@ async fn handle_connection(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<LocalRequest>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<RepairQuicStats>,
) {
cache_connection(
remote_pubkey,
@ -361,8 +383,10 @@ async fn handle_connection(
.await;
let send_requests_task = tokio::task::spawn(send_requests_task(
endpoint.clone(),
remote_address,
connection.clone(),
receiver,
stats.clone(),
));
let recv_requests_task = tokio::task::spawn(recv_requests_task(
endpoint,
@ -370,11 +394,13 @@ async fn handle_connection(
remote_pubkey,
connection.clone(),
remote_request_sender,
stats.clone(),
));
match futures::future::try_join(send_requests_task, recv_requests_task).await {
Err(err) => error!("handle_connection: {remote_pubkey}, {remote_address}, {err:?}"),
Ok(((), Err(ref err))) => {
error!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}");
Ok(((), Err(err))) => {
debug!("recv_requests_task: {remote_pubkey}, {remote_address}, {err:?}");
record_error(&err, &stats);
}
Ok(((), Ok(()))) => (),
}
@ -392,6 +418,7 @@ async fn recv_requests_task(
remote_pubkey: Pubkey,
connection: Connection,
remote_request_sender: Sender<RemoteRequest>,
stats: Arc<RepairQuicStats>,
) -> Result<(), Error> {
loop {
let (send_stream, recv_stream) = connection.accept_bi().await?;
@ -402,6 +429,7 @@ async fn recv_requests_task(
send_stream,
recv_stream,
remote_request_sender.clone(),
stats.clone(),
));
}
}
@ -413,6 +441,7 @@ async fn handle_streams_task(
send_stream: SendStream,
recv_stream: RecvStream,
remote_request_sender: Sender<RemoteRequest>,
stats: Arc<RepairQuicStats>,
) {
if let Err(err) = handle_streams(
&endpoint,
@ -424,7 +453,8 @@ async fn handle_streams_task(
)
.await
{
error!("handle_stream: {remote_address}, {remote_pubkey}, {err:?}");
debug!("handle_stream: {remote_address}, {remote_pubkey}, {err:?}");
record_error(&err, &stats);
}
}
@ -469,21 +499,32 @@ async fn handle_streams(
async fn send_requests_task(
endpoint: Endpoint,
remote_address: SocketAddr,
connection: Connection,
mut receiver: AsyncReceiver<LocalRequest>,
stats: Arc<RepairQuicStats>,
) {
while let Some(request) = receiver.recv().await {
tokio::task::spawn(send_request_task(
endpoint.clone(),
remote_address,
connection.clone(),
request,
stats.clone(),
));
}
}
async fn send_request_task(endpoint: Endpoint, connection: Connection, request: LocalRequest) {
async fn send_request_task(
endpoint: Endpoint,
remote_address: SocketAddr,
connection: Connection,
request: LocalRequest,
stats: Arc<RepairQuicStats>,
) {
if let Err(err) = send_request(endpoint, connection, request).await {
error!("send_request: {err:?}")
debug!("send_request: {remote_address}, {err:?}");
record_error(&err, &stats);
}
}
@ -542,6 +583,7 @@ async fn make_connection_task(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<LocalRequest>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<RepairQuicStats>,
) {
if let Err(err) = make_connection(
endpoint,
@ -552,10 +594,12 @@ async fn make_connection_task(
prune_cache_pending,
router,
cache,
stats.clone(),
)
.await
{
error!("make_connection: {remote_address}, {err:?}");
debug!("make_connection: {remote_address}, {err:?}");
record_error(&err, &stats);
}
}
@ -568,6 +612,7 @@ async fn make_connection(
prune_cache_pending: Arc<AtomicBool>,
router: Arc<AsyncRwLock<HashMap<SocketAddr, AsyncSender<LocalRequest>>>>,
cache: Arc<Mutex<HashMap<Pubkey, Connection>>>,
stats: Arc<RepairQuicStats>,
) -> Result<(), Error> {
let connection = endpoint
.connect(remote_address, CONNECT_SERVER_NAME)?
@ -583,6 +628,7 @@ async fn make_connection(
prune_cache_pending,
router,
cache,
stats,
)
.await;
Ok(())
@ -698,6 +744,250 @@ impl<T> From<crossbeam_channel::SendError<T>> for Error {
}
}
#[derive(Default)]
struct RepairQuicStats {
connect_error_invalid_remote_address: AtomicU64,
connect_error_other: AtomicU64,
connect_error_too_many_connections: AtomicU64,
connection_error_application_closed: AtomicU64,
connection_error_connection_closed: AtomicU64,
connection_error_locally_closed: AtomicU64,
connection_error_reset: AtomicU64,
connection_error_timed_out: AtomicU64,
connection_error_transport_error: AtomicU64,
connection_error_version_mismatch: AtomicU64,
invalid_identity: AtomicU64,
no_response_received: AtomicU64,
read_to_end_error_connection_lost: AtomicU64,
read_to_end_error_illegal_ordered_read: AtomicU64,
read_to_end_error_reset: AtomicU64,
read_to_end_error_too_long: AtomicU64,
read_to_end_error_unknown_stream: AtomicU64,
read_to_end_error_zero_rtt_rejected: AtomicU64,
read_to_end_timeout: AtomicU64,
router_try_send_error_full: AtomicU64,
write_error_connection_lost: AtomicU64,
write_error_stopped: AtomicU64,
write_error_unknown_stream: AtomicU64,
write_error_zero_rtt_rejected: AtomicU64,
}
async fn report_metrics_task(name: &'static str, stats: Arc<RepairQuicStats>) {
const METRICS_SUBMIT_CADENCE: Duration = Duration::from_secs(2);
loop {
tokio::time::sleep(METRICS_SUBMIT_CADENCE).await;
report_metrics(name, &stats);
}
}
fn record_error(err: &Error, stats: &RepairQuicStats) {
match err {
Error::CertificateError(_) => (),
Error::ChannelSendError => (),
Error::ConnectError(ConnectError::EndpointStopping) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::TooManyConnections) => {
add_metric!(stats.connect_error_too_many_connections)
}
Error::ConnectError(ConnectError::InvalidDnsName(_)) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::InvalidRemoteAddress(_)) => {
add_metric!(stats.connect_error_invalid_remote_address)
}
Error::ConnectError(ConnectError::NoDefaultClientConfig) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::UnsupportedVersion) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectionError(ConnectionError::VersionMismatch) => {
add_metric!(stats.connection_error_version_mismatch)
}
Error::ConnectionError(ConnectionError::TransportError(_)) => {
add_metric!(stats.connection_error_transport_error)
}
Error::ConnectionError(ConnectionError::ConnectionClosed(_)) => {
add_metric!(stats.connection_error_connection_closed)
}
Error::ConnectionError(ConnectionError::ApplicationClosed(_)) => {
add_metric!(stats.connection_error_application_closed)
}
Error::ConnectionError(ConnectionError::Reset) => add_metric!(stats.connection_error_reset),
Error::ConnectionError(ConnectionError::TimedOut) => {
add_metric!(stats.connection_error_timed_out)
}
Error::ConnectionError(ConnectionError::LocallyClosed) => {
add_metric!(stats.connection_error_locally_closed)
}
Error::InvalidIdentity(_) => add_metric!(stats.invalid_identity),
Error::IoError(_) => (),
Error::NoResponseReceived => add_metric!(stats.no_response_received),
Error::ReadToEndError(ReadToEndError::Read(ReadError::Reset(_))) => {
add_metric!(stats.read_to_end_error_reset)
}
Error::ReadToEndError(ReadToEndError::Read(ReadError::ConnectionLost(_))) => {
add_metric!(stats.read_to_end_error_connection_lost)
}
Error::ReadToEndError(ReadToEndError::Read(ReadError::UnknownStream)) => {
add_metric!(stats.read_to_end_error_unknown_stream)
}
Error::ReadToEndError(ReadToEndError::Read(ReadError::IllegalOrderedRead)) => {
add_metric!(stats.read_to_end_error_illegal_ordered_read)
}
Error::ReadToEndError(ReadToEndError::Read(ReadError::ZeroRttRejected)) => {
add_metric!(stats.read_to_end_error_zero_rtt_rejected)
}
Error::ReadToEndError(ReadToEndError::TooLong) => {
add_metric!(stats.read_to_end_error_too_long)
}
Error::ReadToEndTimeout => add_metric!(stats.read_to_end_timeout),
Error::TlsError(_) => (),
Error::WriteError(WriteError::Stopped(_)) => add_metric!(stats.write_error_stopped),
Error::WriteError(WriteError::ConnectionLost(_)) => {
add_metric!(stats.write_error_connection_lost)
}
Error::WriteError(WriteError::UnknownStream) => {
add_metric!(stats.write_error_unknown_stream)
}
Error::WriteError(WriteError::ZeroRttRejected) => {
add_metric!(stats.write_error_zero_rtt_rejected)
}
}
}
fn report_metrics(name: &'static str, stats: &RepairQuicStats) {
macro_rules! reset_metric {
($metric: expr) => {
$metric.swap(0, Ordering::Relaxed)
};
}
datapoint_info!(
name,
(
"connect_error_invalid_remote_address",
reset_metric!(stats.connect_error_invalid_remote_address),
i64
),
(
"connect_error_other",
reset_metric!(stats.connect_error_other),
i64
),
(
"connect_error_too_many_connections",
reset_metric!(stats.connect_error_too_many_connections),
i64
),
(
"connection_error_application_closed",
reset_metric!(stats.connection_error_application_closed),
i64
),
(
"connection_error_connection_closed",
reset_metric!(stats.connection_error_connection_closed),
i64
),
(
"connection_error_locally_closed",
reset_metric!(stats.connection_error_locally_closed),
i64
),
(
"connection_error_reset",
reset_metric!(stats.connection_error_reset),
i64
),
(
"connection_error_timed_out",
reset_metric!(stats.connection_error_timed_out),
i64
),
(
"connection_error_transport_error",
reset_metric!(stats.connection_error_transport_error),
i64
),
(
"connection_error_version_mismatch",
reset_metric!(stats.connection_error_version_mismatch),
i64
),
(
"invalid_identity",
reset_metric!(stats.invalid_identity),
i64
),
(
"no_response_received",
reset_metric!(stats.no_response_received),
i64
),
(
"read_to_end_error_connection_lost",
reset_metric!(stats.read_to_end_error_connection_lost),
i64
),
(
"read_to_end_error_illegal_ordered_read",
reset_metric!(stats.read_to_end_error_illegal_ordered_read),
i64
),
(
"read_to_end_error_reset",
reset_metric!(stats.read_to_end_error_reset),
i64
),
(
"read_to_end_error_too_long",
reset_metric!(stats.read_to_end_error_too_long),
i64
),
(
"read_to_end_error_unknown_stream",
reset_metric!(stats.read_to_end_error_unknown_stream),
i64
),
(
"read_to_end_error_zero_rtt_rejected",
reset_metric!(stats.read_to_end_error_zero_rtt_rejected),
i64
),
(
"read_to_end_timeout",
reset_metric!(stats.read_to_end_timeout),
i64
),
(
"router_try_send_error_full",
reset_metric!(stats.router_try_send_error_full),
i64
),
(
"write_error_connection_lost",
reset_metric!(stats.write_error_connection_lost),
i64
),
(
"write_error_stopped",
reset_metric!(stats.write_error_stopped),
i64
),
(
"write_error_unknown_stream",
reset_metric!(stats.write_error_unknown_stream),
i64
),
(
"write_error_zero_rtt_rejected",
reset_metric!(stats.write_error_zero_rtt_rejected),
i64
),
);
}
#[cfg(test)]
mod tests {
use {