connection stats
This commit is contained in:
parent
400048ccba
commit
0102058052
|
@ -217,3 +217,15 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification {
|
|||
Ok(rustls::client::ServerCertVerified::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
// connection for sending proxy request: FrameStats {
|
||||
// ACK: 2, CONNECTION_CLOSE: 0, CRYPTO: 3, DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1,
|
||||
// MAX_DATA: 0, MAX_STREAM_DATA: 1, MAX_STREAMS_BIDI: 0, MAX_STREAMS_UNI: 0, NEW_CONNECTION_ID: 4,
|
||||
// NEW_TOKEN: 0, PATH_CHALLENGE: 0, PATH_RESPONSE: 0, PING: 0, RESET_STREAM: 0, RETIRE_CONNECTION_ID: 1,
|
||||
// STREAM_DATA_BLOCKED: 0, STREAMS_BLOCKED_BIDI: 0, STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 }
|
||||
// rtt=1.08178ms
|
||||
pub fn connection_stats(connection: &Connection) -> String {
|
||||
format!("stable_id {}, rtt={:?}, stats {:?}",
|
||||
connection.stable_id(), connection.stats().path.rtt, connection.stats().frame_rx)
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ use tokio::net::ToSocketAddrs;
|
|||
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
|
||||
use tokio::sync::RwLock;
|
||||
use crate::proxy_request_format::TpuForwardingRequest;
|
||||
use crate::quic_connection_utils::QuicConnectionUtils;
|
||||
use crate::quic_connection_utils::{connection_stats, QuicConnectionUtils};
|
||||
use crate::tpu_quic_client::{SingleTPUConnectionManager, TpuQuicClient};
|
||||
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
|
||||
use crate::util::AnyhowJoinHandle;
|
||||
|
@ -123,7 +123,7 @@ async fn accept_client_connection(client_connection: Connection, tpu_quic_client
|
|||
|
||||
loop {
|
||||
let maybe_stream = client_connection.accept_uni().await;
|
||||
let mut recv_stream = match maybe_stream {
|
||||
let result = match maybe_stream {
|
||||
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
|
||||
debug!("connection closed by client - reason: {:?}", reason);
|
||||
if reason.error_code != VarInt::from_u32(0) {
|
||||
|
@ -136,32 +136,39 @@ async fn accept_client_connection(client_connection: Connection, tpu_quic_client
|
|||
error!("failed to accept stream: {}", e);
|
||||
return Err(anyhow::Error::msg("error accepting stream"));
|
||||
}
|
||||
Ok(s) => s,
|
||||
};
|
||||
let exit_signal_copy = exit_signal.clone();
|
||||
let validator_identity_copy = validator_identity.clone();
|
||||
let tpu_quic_client_copy = tpu_quic_client.clone();
|
||||
Ok(recv_stream) => {
|
||||
let exit_signal_copy = exit_signal.clone();
|
||||
let validator_identity_copy = validator_identity.clone();
|
||||
let tpu_quic_client_copy = tpu_quic_client.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
tokio::spawn(async move {
|
||||
|
||||
let raw_request = recv_stream.read_to_end(10_000_000).await // TODO extract to const
|
||||
.unwrap();
|
||||
debug!("read proxy_request {} bytes", raw_request.len());
|
||||
let raw_request = recv_stream.read_to_end(10_000_000).await // TODO extract to const
|
||||
.unwrap();
|
||||
trace!("read proxy_request {} bytes", raw_request.len());
|
||||
|
||||
let proxy_request = TpuForwardingRequest::deserialize_from_raw_request(&raw_request);
|
||||
let proxy_request = TpuForwardingRequest::deserialize_from_raw_request(&raw_request);
|
||||
|
||||
debug!("proxy request details: {}", proxy_request);
|
||||
let tpu_identity = proxy_request.get_identity_tpunode();
|
||||
let tpu_address = proxy_request.get_tpu_socket_addr();
|
||||
let txs = proxy_request.get_transactions();
|
||||
trace!("proxy request details: {}", proxy_request);
|
||||
let tpu_identity = proxy_request.get_identity_tpunode();
|
||||
let tpu_address = proxy_request.get_tpu_socket_addr();
|
||||
let txs = proxy_request.get_transactions();
|
||||
|
||||
debug!("send transaction batch of size {} to address {}", txs.len(), tpu_address);
|
||||
tpu_quic_client_copy.send_txs_to_tpu(tpu_address, &txs, exit_signal_copy).await;
|
||||
|
||||
info!("send transaction batch of size {} to address {}", txs.len(), tpu_address);
|
||||
tpu_quic_client_copy.send_txs_to_tpu(tpu_address, &txs, exit_signal_copy).await;
|
||||
debug!("connection stats (proxy inbound): {}", connection_stats(&client_connection));
|
||||
|
||||
// active_tpu_connection_copy.send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_address, &txs).await;
|
||||
});
|
||||
|
||||
});
|
||||
Ok(())
|
||||
},
|
||||
}; // -- result
|
||||
|
||||
if let Err(e) = result {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
} // -- loop
|
||||
}
|
||||
|
|
|
@ -243,7 +243,7 @@ impl QuicConnectionUtils {
|
|||
}
|
||||
|
||||
pub async fn open_unistream(
|
||||
connection: Connection,
|
||||
connection: &Connection,
|
||||
connection_timeout: Duration,
|
||||
) -> Result<SendStream, QuicConnectionError> {
|
||||
match timeout(connection_timeout, connection.open_uni()).await {
|
||||
|
@ -329,6 +329,8 @@ impl QuicConnectionUtils {
|
|||
let all_send_fns = (0..txs.len()).map(|i| Self::send_tx_to_new_stream(&txs[i], connection.clone(), connection_timeout)).collect_vec();
|
||||
|
||||
join_all(all_send_fns).await;
|
||||
|
||||
debug!("connection stats (proxy send tx parallel): {}", connection_stats(&connection));
|
||||
}
|
||||
|
||||
|
||||
|
@ -380,3 +382,15 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification {
|
|||
Ok(rustls::client::ServerCertVerified::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
// stable_id 140266619216912, rtt=2.156683ms,
|
||||
// stats FrameStats { ACK: 3, CONNECTION_CLOSE: 0, CRYPTO: 3,
|
||||
// DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1, MAX_DATA: 0,
|
||||
// MAX_STREAM_DATA: 1, MAX_STREAMS_BIDI: 0, MAX_STREAMS_UNI: 0, NEW_CONNECTION_ID: 4,
|
||||
// NEW_TOKEN: 0, PATH_CHALLENGE: 0, PATH_RESPONSE: 0, PING: 0, RESET_STREAM: 0,
|
||||
// RETIRE_CONNECTION_ID: 1, STREAM_DATA_BLOCKED: 0, STREAMS_BLOCKED_BIDI: 0,
|
||||
// STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 }
|
||||
pub fn connection_stats(connection: &Connection) -> String {
|
||||
format!("stable_id {} stats {:?}, rtt={:?}",
|
||||
connection.stable_id(), connection.stats().frame_rx, connection.stats().path.rtt)
|
||||
}
|
|
@ -11,7 +11,7 @@ use async_trait::async_trait;
|
|||
use dashmap::DashMap;
|
||||
use itertools::{any, Itertools};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use quinn::{Connecting, Connection, Endpoint, SendStream, ServerConfig, VarInt};
|
||||
use quinn::{Connecting, Connection, ConnectionError, Endpoint, SendStream, ServerConfig, VarInt};
|
||||
use rcgen::generate_simple_self_signed;
|
||||
use rustls::{Certificate, PrivateKey};
|
||||
use rustls::server::ResolvesServerCert;
|
||||
|
@ -23,7 +23,7 @@ use solana_sdk::transaction::VersionedTransaction;
|
|||
use tokio::net::ToSocketAddrs;
|
||||
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
|
||||
use tokio::sync::RwLock;
|
||||
use crate::quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils};
|
||||
use crate::quic_connection_utils::{connection_stats, QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils};
|
||||
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
|
||||
|
||||
const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
@ -46,6 +46,7 @@ pub struct TpuQuicClient {
|
|||
/// per TPU connection manager
|
||||
#[async_trait]
|
||||
pub trait SingleTPUConnectionManager {
|
||||
// async fn refresh_connection(&self, connection: &Connection) -> Connection;
|
||||
async fn get_or_create_connection(&self, tpu_address: SocketAddr) -> anyhow::Result<Connection>;
|
||||
fn update_last_stable_id(&self, stable_id: u64);
|
||||
}
|
||||
|
@ -55,8 +56,28 @@ pub type SingleTPUConnectionManagerWrapper = dyn SingleTPUConnectionManager + Sy
|
|||
#[async_trait]
|
||||
impl SingleTPUConnectionManager for TpuQuicClient {
|
||||
|
||||
// make sure the connection is usable for a resonable time
|
||||
// never returns the "same" instances but a clone
|
||||
// async fn refresh_connection(&self, connection: &Connection) -> Connection {
|
||||
// let reverse_lookup = self.connection_per_tpunode.into_read_only().values().find(|conn| {
|
||||
// conn.stable_id() == connection.stable_id()
|
||||
// });
|
||||
//
|
||||
// match reverse_lookup {
|
||||
// Some(existing_conn) => {
|
||||
// return existing_conn.clone();
|
||||
// }
|
||||
// None => {
|
||||
// TpuQuicClient::create_new(&self, reverse_lookup).await.unwrap())
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // TODO implement
|
||||
// connection.clone()
|
||||
// }
|
||||
|
||||
#[tracing::instrument(skip(self), level = "debug")]
|
||||
// TODO improve error handling
|
||||
// TODO improve error handling; might need to signal if connection was reset
|
||||
async fn get_or_create_connection(&self, tpu_address: SocketAddr) -> anyhow::Result<Connection> {
|
||||
// TODO try 0rff
|
||||
// QuicConnectionUtils::make_connection(
|
||||
|
@ -72,23 +93,11 @@ impl SingleTPUConnectionManager for TpuQuicClient {
|
|||
}
|
||||
}
|
||||
|
||||
let connection =
|
||||
// TODO try 0rff
|
||||
match QuicConnectionUtils::make_connection_0rtt(
|
||||
self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT)
|
||||
.await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
warn!("Failed to open Quic connection to TPU {}: {}", tpu_address, err);
|
||||
return Err(anyhow!("Failed to create Quic connection to TPU {}: {}", tpu_address, err));
|
||||
},
|
||||
};
|
||||
let connection = match self.create_new(tpu_address).await {
|
||||
Ok(value) => value,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
let old_value = self.connection_per_tpunode.insert(tpu_address, connection.clone());
|
||||
assert!(old_value.is_none(), "no prev value must be overridden");
|
||||
|
||||
debug!("Created new Quic connection {} to TPU node {}, total connections is now {}",
|
||||
connection.stable_id(), tpu_address, self.connection_per_tpunode.len());
|
||||
return Ok(connection);
|
||||
}
|
||||
|
||||
|
@ -186,7 +195,6 @@ impl TpuQuicClient {
|
|||
let mut do_retry = false;
|
||||
while !queue.is_empty() {
|
||||
let tx = queue.pop_front().unwrap();
|
||||
// remove Option
|
||||
let connection = connection_manager.get_or_create_connection(tpu_address).await;
|
||||
|
||||
if exit_signal.load(Ordering::Relaxed) {
|
||||
|
@ -196,7 +204,7 @@ impl TpuQuicClient {
|
|||
if let Ok(connection) = connection {
|
||||
let current_stable_id = connection.stable_id() as u64;
|
||||
match QuicConnectionUtils::open_unistream(
|
||||
connection,
|
||||
&connection,
|
||||
connection_params.unistream_timeout,
|
||||
)
|
||||
.await
|
||||
|
@ -211,6 +219,7 @@ impl TpuQuicClient {
|
|||
{
|
||||
Ok(()) => {
|
||||
// do nothing
|
||||
debug!("connection stats (proxy send tx batch): {}", connection_stats(&connection));
|
||||
}
|
||||
Err(QuicConnectionError::ConnectionError { retry }) => {
|
||||
do_retry = retry;
|
||||
|
@ -247,6 +256,26 @@ impl TpuQuicClient {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn create_new(&self, tpu_address: SocketAddr) -> anyhow::Result<Connection> {
|
||||
let connection =
|
||||
// TODO try 0rff
|
||||
match QuicConnectionUtils::make_connection_0rtt(
|
||||
self.endpoint.clone(), tpu_address, QUIC_CONNECTION_TIMEOUT)
|
||||
.await {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
warn!("Failed to open Quic connection to TPU {}: {}", tpu_address, err);
|
||||
return Err(anyhow!("Failed to create Quic connection to TPU {}: {}", tpu_address, err));
|
||||
},
|
||||
};
|
||||
|
||||
let old_value = self.connection_per_tpunode.insert(tpu_address, connection.clone());
|
||||
assert!(old_value.is_none(), "no prev value must be overridden");
|
||||
|
||||
debug!("Created new Quic connection {} to TPU node {}, total connections is now {}",
|
||||
connection.stable_id(), tpu_address, self.connection_per_tpunode.len());
|
||||
Ok(connection)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -19,8 +19,9 @@ use solana_sdk::signature::Keypair;
|
|||
use solana_sdk::transaction::VersionedTransaction;
|
||||
use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock};
|
||||
use tokio::time::timeout;
|
||||
use tracing::field::debug;
|
||||
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
|
||||
use solana_lite_rpc_core::quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils, SkipServerVerification};
|
||||
use solana_lite_rpc_core::quic_connection_utils::{connection_stats, QuicConnectionParameters, QuicConnectionUtils, SkipServerVerification};
|
||||
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
|
||||
use solana_lite_rpc_core::tx_store::TxStore;
|
||||
|
||||
|
@ -265,6 +266,7 @@ impl QuicProxyConnectionManager {
|
|||
|
||||
send.finish().await?;
|
||||
|
||||
debug!("connection stats (lite-rpc to proxy): {}", connection_stats(&connection));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue