make connection failures more explicit

This commit is contained in:
GroovieGermanikus 2023-08-08 11:19:01 +02:00
parent 485bb399a8
commit 3bbe99fecd
2 changed files with 105 additions and 49 deletions

View File

@ -1,17 +1,23 @@
use crate::util::timeout_fallback; use crate::util::timeout_fallback;
use anyhow::Context; use anyhow::{bail, Context};
use log::warn; use log::{info, warn};
use quinn::{Connection, Endpoint}; use quinn::{Connection, ConnectionError, Endpoint};
use std::fmt; use std::fmt;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::debug; use tracing::debug;
enum ConnectionState {
NotConnected,
Connection(Connection),
PermanentError,
}
pub struct AutoReconnect { pub struct AutoReconnect {
// endoint should be configures with keep-alive and idle timeout // endoint should be configures with keep-alive and idle timeout
endpoint: Endpoint, endpoint: Endpoint,
current: RwLock<Option<Connection>>, current: RwLock<ConnectionState>,
pub target_address: SocketAddr, pub target_address: SocketAddr,
reconnect_count: AtomicU32, reconnect_count: AtomicU32,
} }
@ -20,15 +26,14 @@ impl AutoReconnect {
pub fn new(endpoint: Endpoint, target_address: SocketAddr) -> Self { pub fn new(endpoint: Endpoint, target_address: SocketAddr) -> Self {
Self { Self {
endpoint, endpoint,
current: RwLock::new(None), current: RwLock::new(ConnectionState::NotConnected),
target_address, target_address,
reconnect_count: AtomicU32::new(0), reconnect_count: AtomicU32::new(0),
} }
} }
pub async fn send_uni(&self, payload: Vec<u8>) -> anyhow::Result<()> { pub async fn send_uni(&self, payload: Vec<u8>) -> anyhow::Result<()> {
// TOOD do smart error handling + reconnect let mut send_stream = timeout_fallback(self.refresh_and_get().await?.open_uni())
let mut send_stream = timeout_fallback(self.refresh().await.open_uni())
.await .await
.context("open uni stream for sending")??; .context("open uni stream for sending")??;
send_stream.write_all(payload.as_slice()).await?; send_stream.write_all(payload.as_slice()).await?;
@ -36,23 +41,31 @@ impl AutoReconnect {
Ok(()) Ok(())
} }
pub async fn refresh(&self) -> Connection { pub async fn refresh_and_get(&self) -> anyhow::Result<Connection> {
{ self.refresh().await;
let lock = self.current.read().await; let lock = self.current.read().await;
let maybe_conn = lock.as_ref(); match &*lock {
if maybe_conn ConnectionState::NotConnected => bail!("not connected"),
.filter(|conn| conn.close_reason().is_none()) ConnectionState::Connection(conn) => Ok(conn.clone()),
.is_some() ConnectionState::PermanentError => bail!("permanent error"),
}
}
pub async fn refresh(&self) {
{ {
let reuse = maybe_conn.unwrap(); // first check for existing connection using a cheap read-lock
debug!("Reuse connection {}", reuse.stable_id()); let lock = self.current.read().await;
return reuse.clone(); if let ConnectionState::Connection(conn) = &*lock {
if conn.close_reason().is_none() {
debug!("Reuse connection {}", conn.stable_id());
return;
}
} }
} }
let mut lock = self.current.write().await; let mut lock = self.current.write().await;
let maybe_conn = lock.as_ref(); match &*lock {
match maybe_conn { ConnectionState::Connection(current) => {
Some(current) => {
if current.close_reason().is_some() { if current.close_reason().is_some() {
let old_stable_id = current.stable_id(); let old_stable_id = current.stable_id();
warn!( warn!(
@ -61,44 +74,86 @@ impl AutoReconnect {
current.close_reason() current.close_reason()
); );
let new_connection = self.create_connection().await; match self.create_connection().await {
*lock = Some(new_connection.clone()); Some(new_connection) => {
// let old_conn = lock.replace(new_connection.clone()); *lock = ConnectionState::Connection(new_connection.clone());
let reconnect_count =
self.reconnect_count.fetch_add(1, Ordering::SeqCst); self.reconnect_count.fetch_add(1, Ordering::SeqCst);
debug!( if reconnect_count < 10 {
info!(
"Replace closed connection {} with {} (retry {})", "Replace closed connection {} with {} (retry {})",
old_stable_id, old_stable_id,
new_connection.stable_id(), new_connection.stable_id(),
self.reconnect_count.load(Ordering::SeqCst) reconnect_count
); );
new_connection
} else { } else {
debug!("Reuse connection {} with write-lock", current.stable_id()); *lock = ConnectionState::PermanentError;
current.clone() warn!(
"Too many reconnect attempts {} with {} (retry {})",
old_stable_id,
new_connection.stable_id(),
reconnect_count
);
} }
} }
None => { None => {
let new_connection = self.create_connection().await; warn!(
"Reconnect to {} failed for connection {}",
self.target_address, old_stable_id
);
*lock = ConnectionState::PermanentError;
}
};
} else {
debug!("Reuse connection {} with write-lock", current.stable_id());
}
}
ConnectionState::NotConnected => {
match self.create_connection().await {
Some(new_connection) => {
*lock = ConnectionState::Connection(new_connection.clone());
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
assert!(lock.is_none(), "old connection must be None"); info!(
*lock = Some(new_connection.clone()); "Create initial connection {} to {}",
// let old_conn = foo.replace(Some(new_connection.clone())); new_connection.stable_id(),
debug!("Create initial connection {}", new_connection.stable_id()); self.target_address
);
new_connection }
None => {
warn!(
"Initial connection to {} failed permanently",
self.target_address
);
*lock = ConnectionState::PermanentError;
}
};
}
ConnectionState::PermanentError => {
// no nothing
debug!("Not using connection with permanent error");
} }
} }
} }
async fn create_connection(&self) -> Connection { async fn create_connection(&self) -> Option<Connection> {
let connection = self let connection = self
.endpoint .endpoint
.connect(self.target_address, "localhost") .connect(self.target_address, "localhost")
.expect("handshake"); .expect("handshake");
connection.await.expect("connection") match connection.await {
Ok(conn) => Some(conn),
Err(ConnectionError::TimedOut) => None,
// maybe we should also treat TransportError explicitly
Err(unexpected_error) => {
panic!(
"Connection to {} failed with unexpected error: {}",
self.target_address, unexpected_error
);
}
}
} }
// stable_id 140266619216912, rtt=2.156683ms, // stable_id 140266619216912, rtt=2.156683ms,
@ -110,15 +165,15 @@ impl AutoReconnect {
// STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 } // STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 }
pub async fn connection_stats(&self) -> String { pub async fn connection_stats(&self) -> String {
let lock = self.current.read().await; let lock = self.current.read().await;
let maybe_conn = lock.as_ref(); match &*lock {
match maybe_conn { ConnectionState::Connection(conn) => format!(
Some(connection) => format!(
"stable_id {} stats {:?}, rtt={:?}", "stable_id {} stats {:?}, rtt={:?}",
connection.stable_id(), conn.stable_id(),
connection.stats().frame_rx, conn.stats().frame_rx,
connection.stats().path.rtt conn.stats().path.rtt
), ),
None => "n/a".to_string(), ConnectionState::NotConnected => "n/c".to_string(),
ConnectionState::PermanentError => "n/a (permanent)".to_string(),
} }
} }
} }

View File

@ -9,6 +9,7 @@ use tokio::sync::RwLock;
use tokio::time::timeout; use tokio::time::timeout;
use tracing::debug; use tracing::debug;
/// copy of quic-proxy AutoReconnect - used that for reference
pub struct AutoReconnect { pub struct AutoReconnect {
// endoint should be configures with keep-alive and idle timeout // endoint should be configures with keep-alive and idle timeout
endpoint: Endpoint, endpoint: Endpoint,