2023-07-20 04:45:54 -07:00
|
|
|
use log::{trace, warn};
|
2023-06-08 02:27:02 -07:00
|
|
|
use quinn::{
|
|
|
|
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
|
|
|
|
TokioRuntime, TransportConfig,
|
|
|
|
};
|
2023-06-07 11:00:59 -07:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
|
|
|
use std::{
|
2023-06-08 02:27:02 -07:00
|
|
|
net::{IpAddr, Ipv4Addr, SocketAddr},
|
2023-06-07 11:00:59 -07:00
|
|
|
sync::{
|
2023-07-20 04:45:54 -07:00
|
|
|
atomic::{AtomicBool, Ordering},
|
2023-06-07 11:00:59 -07:00
|
|
|
Arc,
|
|
|
|
},
|
|
|
|
time::Duration,
|
|
|
|
};
|
2023-07-20 04:45:54 -07:00
|
|
|
use tokio::time::timeout;
|
2023-06-07 11:00:59 -07:00
|
|
|
|
|
|
|
const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
|
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
pub enum QuicConnectionError {
|
|
|
|
TimeOut,
|
|
|
|
ConnectionError { retry: bool },
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Copy)]
|
|
|
|
pub struct QuicConnectionParameters {
|
|
|
|
pub connection_timeout: Duration,
|
|
|
|
pub unistream_timeout: Duration,
|
|
|
|
pub write_timeout: Duration,
|
|
|
|
pub finalize_timeout: Duration,
|
|
|
|
pub connection_retry_count: usize,
|
|
|
|
pub max_number_of_connections: usize,
|
|
|
|
pub number_of_transactions_per_unistream: usize,
|
|
|
|
}
|
|
|
|
|
2023-06-07 11:00:59 -07:00
|
|
|
pub struct QuicConnectionUtils {}
|
|
|
|
|
|
|
|
impl QuicConnectionUtils {
|
|
|
|
pub fn create_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint {
|
|
|
|
let mut endpoint = {
|
|
|
|
let client_socket =
|
|
|
|
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000))
|
|
|
|
.expect("create_endpoint bind_in_range")
|
|
|
|
.1;
|
|
|
|
let config = EndpointConfig::default();
|
|
|
|
quinn::Endpoint::new(config, None, client_socket, TokioRuntime)
|
|
|
|
.expect("create_endpoint quinn::Endpoint::new")
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut crypto = rustls::ClientConfig::builder()
|
|
|
|
.with_safe_defaults()
|
|
|
|
.with_custom_certificate_verifier(SkipServerVerification::new())
|
|
|
|
.with_single_cert(vec![certificate], key)
|
|
|
|
.expect("Failed to set QUIC client certificates");
|
|
|
|
|
|
|
|
crypto.enable_early_data = true;
|
2023-07-20 04:45:54 -07:00
|
|
|
crypto.alpn_protocols = vec![ALPN_TPU_PROTOCOL_ID.to_vec()];
|
2023-06-07 11:00:59 -07:00
|
|
|
|
|
|
|
let mut config = ClientConfig::new(Arc::new(crypto));
|
|
|
|
let mut transport_config = TransportConfig::default();
|
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
let timeout = IdleTimeout::try_from(Duration::from_secs(1)).unwrap();
|
2023-06-07 11:00:59 -07:00
|
|
|
transport_config.max_idle_timeout(Some(timeout));
|
|
|
|
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
|
|
|
|
config.transport_config(Arc::new(transport_config));
|
|
|
|
|
|
|
|
endpoint.set_default_client_config(config);
|
|
|
|
|
|
|
|
endpoint
|
|
|
|
}
|
2023-06-08 02:27:02 -07:00
|
|
|
|
2023-06-07 11:00:59 -07:00
|
|
|
pub async fn make_connection(
|
|
|
|
endpoint: Endpoint,
|
|
|
|
addr: SocketAddr,
|
|
|
|
connection_timeout: Duration,
|
|
|
|
) -> anyhow::Result<Connection> {
|
|
|
|
let connecting = endpoint.connect(addr, "connect")?;
|
|
|
|
let res = timeout(connection_timeout, connecting).await??;
|
|
|
|
Ok(res)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn make_connection_0rtt(
|
|
|
|
endpoint: Endpoint,
|
|
|
|
addr: SocketAddr,
|
|
|
|
connection_timeout: Duration,
|
|
|
|
) -> anyhow::Result<Connection> {
|
|
|
|
let connecting = endpoint.connect(addr, "connect")?;
|
|
|
|
let connection = match connecting.into_0rtt() {
|
|
|
|
Ok((connection, zero_rtt)) => {
|
|
|
|
if (timeout(connection_timeout, zero_rtt).await).is_ok() {
|
|
|
|
connection
|
|
|
|
} else {
|
|
|
|
return Err(ConnectionError::TimedOut.into());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(connecting) => {
|
|
|
|
if let Ok(connecting_result) = timeout(connection_timeout, connecting).await {
|
|
|
|
connecting_result?
|
|
|
|
} else {
|
|
|
|
return Err(ConnectionError::TimedOut.into());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
Ok(connection)
|
|
|
|
}
|
|
|
|
|
2023-06-08 02:27:02 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2023-06-07 11:00:59 -07:00
|
|
|
pub async fn connect(
|
|
|
|
identity: Pubkey,
|
|
|
|
already_connected: bool,
|
|
|
|
endpoint: Endpoint,
|
2023-07-20 04:45:54 -07:00
|
|
|
addr: SocketAddr,
|
2023-06-07 11:00:59 -07:00
|
|
|
connection_timeout: Duration,
|
|
|
|
connection_retry_count: usize,
|
|
|
|
exit_signal: Arc<AtomicBool>,
|
|
|
|
) -> Option<Connection> {
|
|
|
|
for _ in 0..connection_retry_count {
|
|
|
|
let conn = if already_connected {
|
2023-07-20 04:45:54 -07:00
|
|
|
Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await
|
2023-06-07 11:00:59 -07:00
|
|
|
} else {
|
2023-07-20 04:45:54 -07:00
|
|
|
Self::make_connection(endpoint.clone(), addr, connection_timeout).await
|
2023-06-07 11:00:59 -07:00
|
|
|
};
|
|
|
|
match conn {
|
|
|
|
Ok(conn) => {
|
|
|
|
return Some(conn);
|
|
|
|
}
|
|
|
|
Err(e) => {
|
2023-07-20 04:45:54 -07:00
|
|
|
trace!("Could not connect to {} because of error {}", identity, e);
|
2023-06-07 11:00:59 -07:00
|
|
|
if exit_signal.load(Ordering::Relaxed) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn write_all(
|
|
|
|
mut send_stream: SendStream,
|
|
|
|
tx: &Vec<u8>,
|
|
|
|
identity: Pubkey,
|
2023-07-20 04:45:54 -07:00
|
|
|
connection_params: QuicConnectionParameters,
|
|
|
|
) -> Result<(), QuicConnectionError> {
|
|
|
|
let write_timeout_res = timeout(
|
|
|
|
connection_params.write_timeout,
|
|
|
|
send_stream.write_all(tx.as_slice()),
|
|
|
|
)
|
|
|
|
.await;
|
2023-06-07 11:00:59 -07:00
|
|
|
match write_timeout_res {
|
|
|
|
Ok(write_res) => {
|
|
|
|
if let Err(e) = write_res {
|
|
|
|
trace!(
|
|
|
|
"Error while writing transaction for {}, error {}",
|
|
|
|
identity,
|
|
|
|
e
|
|
|
|
);
|
2023-07-20 04:45:54 -07:00
|
|
|
return Err(QuicConnectionError::ConnectionError { retry: true });
|
2023-06-07 11:00:59 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(_) => {
|
|
|
|
warn!("timeout while writing transaction for {}", identity);
|
2023-07-20 04:45:54 -07:00
|
|
|
return Err(QuicConnectionError::TimeOut);
|
2023-06-07 11:00:59 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
let finish_timeout_res =
|
|
|
|
timeout(connection_params.finalize_timeout, send_stream.finish()).await;
|
2023-06-07 11:00:59 -07:00
|
|
|
match finish_timeout_res {
|
|
|
|
Ok(finish_res) => {
|
|
|
|
if let Err(e) = finish_res {
|
|
|
|
trace!(
|
2023-07-20 04:45:54 -07:00
|
|
|
"Error while finishing transaction for {}, error {}",
|
2023-06-07 11:00:59 -07:00
|
|
|
identity,
|
|
|
|
e
|
|
|
|
);
|
2023-07-20 04:45:54 -07:00
|
|
|
return Err(QuicConnectionError::ConnectionError { retry: false });
|
2023-06-07 11:00:59 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(_) => {
|
|
|
|
warn!("timeout while finishing transaction for {}", identity);
|
2023-07-20 04:45:54 -07:00
|
|
|
return Err(QuicConnectionError::TimeOut);
|
2023-06-07 11:00:59 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
Ok(())
|
2023-06-07 11:00:59 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn open_unistream(
|
|
|
|
connection: Connection,
|
|
|
|
connection_timeout: Duration,
|
2023-07-20 04:45:54 -07:00
|
|
|
) -> Result<SendStream, QuicConnectionError> {
|
2023-06-07 11:00:59 -07:00
|
|
|
match timeout(connection_timeout, connection.open_uni()).await {
|
2023-07-20 04:45:54 -07:00
|
|
|
Ok(Ok(unistream)) => Ok(unistream),
|
|
|
|
Ok(Err(_)) => Err(QuicConnectionError::ConnectionError { retry: true }),
|
|
|
|
Err(_) => Err(QuicConnectionError::TimeOut),
|
2023-06-07 11:00:59 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
struct SkipServerVerification;
|
2023-06-07 11:00:59 -07:00
|
|
|
|
|
|
|
impl SkipServerVerification {
|
|
|
|
pub fn new() -> Arc<Self> {
|
|
|
|
Arc::new(Self)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl rustls::client::ServerCertVerifier for SkipServerVerification {
|
|
|
|
fn verify_server_cert(
|
|
|
|
&self,
|
|
|
|
_end_entity: &rustls::Certificate,
|
|
|
|
_intermediates: &[rustls::Certificate],
|
|
|
|
_server_name: &rustls::ServerName,
|
|
|
|
_scts: &mut dyn Iterator<Item = &[u8]>,
|
|
|
|
_ocsp_response: &[u8],
|
|
|
|
_now: std::time::SystemTime,
|
|
|
|
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
|
|
|
Ok(rustls::client::ServerCertVerified::assertion())
|
|
|
|
}
|
|
|
|
}
|