From 8ea7428afda813cba3de729e8642a20096add5c7 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 28 Jul 2023 14:39:59 +0200 Subject: [PATCH] inline proxy outbound quinn handling --- quic-forward-proxy/src/lib.rs | 1 + quic-forward-proxy/src/main.rs | 1 + quic-forward-proxy/src/proxy.rs | 28 ++++++++++---------- quic-forward-proxy/src/tpu_quic_client.rs | 31 +++++++++++++++-------- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/quic-forward-proxy/src/lib.rs b/quic-forward-proxy/src/lib.rs index 2aa1d6b6..7bda23d9 100644 --- a/quic-forward-proxy/src/lib.rs +++ b/quic-forward-proxy/src/lib.rs @@ -9,3 +9,4 @@ mod util; mod tx_store; mod identity_stakes; mod quic_connection_utils; +mod quinn_auto_reconnect; diff --git a/quic-forward-proxy/src/main.rs b/quic-forward-proxy/src/main.rs index 7d4a5947..04ed9930 100644 --- a/quic-forward-proxy/src/main.rs +++ b/quic-forward-proxy/src/main.rs @@ -21,6 +21,7 @@ mod util; mod tx_store; mod identity_stakes; mod quic_connection_utils; +mod quinn_auto_reconnect; #[tokio::main(flavor = "multi_thread", worker_threads = 16)] diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index 7062d6dd..4651c5d9 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -4,6 +4,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::Path; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64}; +use std::thread; use std::thread::sleep; use std::time::Duration; use tracing::{debug_span, instrument, Instrument, span}; @@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Context, Error}; use dashmap::DashMap; use itertools::{any, Itertools}; use log::{debug, error, info, trace, warn}; -use quinn::{Connecting, Connection, Endpoint, SendStream, ServerConfig, TransportConfig, VarInt}; +use quinn::{Connecting, Connection, ConnectionError, Endpoint, SendStream, ServerConfig, TransportConfig, VarInt}; use rcgen::generate_simple_self_signed; use rustls::{Certificate, PrivateKey}; use rustls::server::ResolvesServerCert; @@ -25,13 +26,14 @@ use solana_sdk::transaction::VersionedTransaction; use tokio::net::ToSocketAddrs; use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, RwLockReadGuard}; use tokio::task::JoinHandle; use tokio::time::error::Elapsed; use tokio::time::timeout; use tracing::field::debug; use crate::proxy_request_format::TpuForwardingRequest; use crate::quic_connection_utils::{connection_stats, QuicConnectionUtils}; +use crate::quinn_auto_reconnect::AutoReconnect; use crate::tpu_quic_client::{send_txs_to_tpu_static, SingleTPUConnectionManager, TpuQuicClient}; use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider}; use crate::util::AnyhowJoinHandle; @@ -222,26 +224,23 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R // TODO consume queue // TODO exit signal + let auto_connection = AutoReconnect::new(tpu_quic_client_copy.get_endpoint(), tpu_address); + // let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake"); loop { - let maybe_connection = tpu_quic_client_copy.create_connection(tpu_address).await; - if maybe_connection.is_err() { - // TODO implement retries etc - error!("failed to connect to TPU {} - giving up, dropping unprocessed elements in channel", tpu_address); - return; - } - - let connection = maybe_connection.unwrap(); let exit_signal = exit_signal.clone(); - while let Some(packet) = receiver.recv().await { + loop { + let packet = receiver.recv().await.unwrap(); assert_eq!(packet.tpu_address, tpu_address, "routing error"); debug!("forwarding transaction batch of size {} to address {}", packet.transactions.len(), packet.tpu_address); // TODo move send_txs_to_tpu_static to tpu_quic_client - timeout(Duration::from_millis(500), - send_txs_to_tpu_static(connection.clone(), tpu_address, &packet.transactions, exit_signal.clone())).await - .expect("timeout sending data to TPU node") + let result = timeout(Duration::from_millis(500), + send_txs_to_tpu_static(&auto_connection, &packet.transactions)).await; + // .expect("timeout sending data to TPU node"); + + debug!("send_txs_to_tpu_static result {:?} - loop over errors", result); } @@ -271,3 +270,4 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R bail!("TPU Quic forward service stopped"); } + diff --git a/quic-forward-proxy/src/tpu_quic_client.rs b/quic-forward-proxy/src/tpu_quic_client.rs index 9c85d715..e51acf76 100644 --- a/quic-forward-proxy/src/tpu_quic_client.rs +++ b/quic-forward-proxy/src/tpu_quic_client.rs @@ -9,6 +9,7 @@ use std::time::Duration; use anyhow::{anyhow, bail, Error}; use async_trait::async_trait; use dashmap::DashMap; +use futures::future::join_all; use itertools::{any, Itertools}; use log::{debug, error, info, trace, warn}; use quinn::{Connecting, Connection, ConnectionError, Endpoint, SendStream, ServerConfig, VarInt}; @@ -24,6 +25,7 @@ use tokio::net::ToSocketAddrs; use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use tokio::sync::RwLock; use crate::quic_connection_utils::{connection_stats, QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils}; +use crate::quinn_auto_reconnect::AutoReconnect; use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider}; const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); @@ -44,6 +46,12 @@ pub struct TpuQuicClient { } impl TpuQuicClient { + + // note: this is a dirty workaround to expose enpoint to autoconnect class + pub fn get_endpoint(&self) -> Endpoint { + self.endpoint.clone() + } + pub async fn create_connection(&self, tpu_address: SocketAddr) -> anyhow::Result { let connection = // TODO try 0rff @@ -302,11 +310,10 @@ fn serialize_to_vecvec(transactions: &Vec) -> Vec> // send potentially large amount of transactions to a single TPU +#[tracing::instrument(skip_all, level = "debug")] pub async fn send_txs_to_tpu_static( - tpu_connection: Connection, - tpu_address: SocketAddr, + auto_connection: &AutoReconnect, txs: &Vec, - exit_signal: Arc, ) { // note: this impl does not deal with connection errors @@ -316,16 +323,18 @@ pub async fn send_txs_to_tpu_static( // TODO add error handling for chunk in txs.chunks(MAX_PARALLEL_STREAMS) { - let vecvec = chunk.iter().map(|tx| { + let all_send_fns = chunk.iter().map(|tx| { let tx_raw = bincode::serialize(tx).unwrap(); tx_raw - }).collect_vec(); - QuicConnectionUtils::send_transaction_batch_parallel( - tpu_connection.clone(), - vecvec, - exit_signal.clone(), - QUIC_CONNECTION_TIMEOUT, - ).await; + }) + .map(|tx_raw| { + auto_connection.send(tx_raw) // ignores error + }); + + // let all_send_fns = (0..txs.len()).map(|i| auto_connection.roundtrip(vecvec.get(i))).collect_vec(); + + join_all(all_send_fns).await; + } } \ No newline at end of file