align services/
This commit is contained in:
parent
a76c02d541
commit
d03f73df95
|
@ -5,3 +5,4 @@ pub mod tpu_connection_manager;
|
|||
pub mod quic_proxy_connection_manager;
|
||||
pub mod quinn_auto_reconnect;
|
||||
|
||||
|
||||
|
|
|
@ -251,7 +251,7 @@ impl QuicProxyConnectionManager {
|
|||
|
||||
let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
|
||||
|
||||
let send_result = auto_connection.send(proxy_request_raw).await;
|
||||
let send_result = auto_connection.send_uni(proxy_request_raw).await;
|
||||
|
||||
// let send_result =
|
||||
// timeout(Duration::from_millis(3500), Self::send_proxy_request(endpoint, proxy_address, &proxy_request_raw))
|
||||
|
|
|
@ -1,17 +1,19 @@
|
|||
|
||||
use anyhow::Context;
|
||||
use log::warn;
|
||||
use quinn::{Connection, Endpoint};
|
||||
use std::fmt;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use log::{trace, warn};
|
||||
use tracing::{debug};
|
||||
use quinn::{Connection, Endpoint};
|
||||
use tokio::sync::{RwLock};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::timeout;
|
||||
use tracing::debug;
|
||||
|
||||
pub struct AutoReconnect {
|
||||
// endoint should be configures with keep-alive and idle timeout
|
||||
endpoint: Endpoint,
|
||||
// note: no read lock is used ATM
|
||||
current: RwLock<Option<Connection>>,
|
||||
target_address: SocketAddr,
|
||||
pub target_address: SocketAddr,
|
||||
reconnect_count: AtomicU32,
|
||||
}
|
||||
|
||||
|
@ -25,24 +27,15 @@ impl AutoReconnect {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn roundtrip(&self, payload: Vec<u8>) -> anyhow::Result<Vec<u8>> {
|
||||
pub async fn send_uni(&self, payload: Vec<u8>) -> anyhow::Result<()> {
|
||||
// TOOD do smart error handling + reconnect
|
||||
// self.refresh().await.open_bi().await.unwrap()
|
||||
let (mut send_stream, recv_stream) = self.refresh().await.open_bi().await?;
|
||||
let mut send_stream = timeout(
|
||||
Duration::from_secs(4), self.refresh()
|
||||
.await.open_uni())
|
||||
.await
|
||||
.context("open uni stream for sending")??;
|
||||
send_stream.write_all(payload.as_slice()).await?;
|
||||
send_stream.finish().await?;
|
||||
|
||||
let answer = recv_stream.read_to_end(64 * 1024).await?;
|
||||
|
||||
Ok(answer)
|
||||
}
|
||||
|
||||
pub async fn send(&self, payload: Vec<u8>) -> anyhow::Result<()> {
|
||||
// TOOD do smart error handling + reconnect
|
||||
let mut send_stream = self.refresh().await.open_uni().await?;
|
||||
send_stream.write_all(payload.as_slice()).await?;
|
||||
let _ = send_stream.finish().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -50,53 +43,65 @@ impl AutoReconnect {
|
|||
pub async fn refresh(&self) -> Connection {
|
||||
{
|
||||
let lock = self.current.read().await;
|
||||
let maybe_conn: &Option<Connection> = &*lock;
|
||||
if maybe_conn.as_ref().filter(|conn| conn.close_reason().is_none()).is_some() {
|
||||
// let reuse = lock.unwrap().clone();
|
||||
let reuse = maybe_conn.as_ref().unwrap();
|
||||
trace!("Reuse connection {}", reuse.stable_id());
|
||||
let maybe_conn = lock.as_ref();
|
||||
if maybe_conn
|
||||
.filter(|conn| conn.close_reason().is_none())
|
||||
.is_some()
|
||||
{
|
||||
let reuse = maybe_conn.unwrap();
|
||||
debug!("Reuse connection {}", reuse.stable_id());
|
||||
return reuse.clone();
|
||||
}
|
||||
}
|
||||
let mut lock = self.current.write().await;
|
||||
|
||||
match &*lock {
|
||||
let maybe_conn = lock.as_ref();
|
||||
return match maybe_conn {
|
||||
Some(current) => {
|
||||
|
||||
if current.close_reason().is_some() {
|
||||
warn!("Connection is closed for reason: {:?}", current.close_reason());
|
||||
let old_stable_id = current.stable_id();
|
||||
warn!(
|
||||
"Connection {} is closed for reason: {:?}",
|
||||
old_stable_id,
|
||||
current.close_reason()
|
||||
);
|
||||
|
||||
let new_connection = self.create_connection().await;
|
||||
let prev_stable_id = current.stable_id();
|
||||
*lock = Some(new_connection.clone());
|
||||
// let old_conn = lock.replace(new_connection.clone());
|
||||
self.reconnect_count.fetch_add(1, Ordering::SeqCst);
|
||||
debug!("Replace closed connection {} with {} (retry {})",
|
||||
prev_stable_id,
|
||||
|
||||
debug!(
|
||||
"Replace closed connection {} with {} (retry {})",
|
||||
old_stable_id,
|
||||
new_connection.stable_id(),
|
||||
self.reconnect_count.load(Ordering::SeqCst));
|
||||
// TODO log old vs new stable_id
|
||||
self.reconnect_count.load(Ordering::SeqCst)
|
||||
);
|
||||
|
||||
return new_connection.clone();
|
||||
new_connection.clone()
|
||||
} else {
|
||||
// TODO check log if that ever happens
|
||||
warn!("Reuse connection {} with write-lock", current.stable_id());
|
||||
return current.clone();
|
||||
debug!("Reuse connection {} with write-lock", current.stable_id());
|
||||
current.clone()
|
||||
}
|
||||
|
||||
}
|
||||
None => {
|
||||
let new_connection = self.create_connection().await;
|
||||
*lock = Some(new_connection.clone());
|
||||
trace!("Create initial connection {}", new_connection.stable_id());
|
||||
|
||||
return new_connection.clone();
|
||||
assert!(lock.is_none(), "old connection must be None");
|
||||
*lock = Some(new_connection.clone());
|
||||
// let old_conn = foo.replace(Some(new_connection.clone()));
|
||||
debug!("Create initial connection {}", new_connection.stable_id());
|
||||
|
||||
new_connection.clone()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
async fn create_connection(&self) -> Connection {
|
||||
let connection =
|
||||
self.endpoint.connect(self.target_address, "localhost").expect("handshake");
|
||||
let connection = self
|
||||
.endpoint
|
||||
.connect(self.target_address, "localhost")
|
||||
.expect("handshake");
|
||||
|
||||
connection.await.expect("connection")
|
||||
}
|
||||
|
@ -104,9 +109,7 @@ impl AutoReconnect {
|
|||
|
||||
impl fmt::Display for AutoReconnect {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "Connection to {}",
|
||||
self.target_address,
|
||||
)
|
||||
write!(f, "Connection to {}", self.target_address,)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue