inline proxy outbound quinn handling

This commit is contained in:
GroovieGermanikus 2023-07-28 14:39:59 +02:00
parent 7abf1351e8
commit 8ea7428afd
4 changed files with 36 additions and 25 deletions

View File

@ -9,3 +9,4 @@ mod util;
mod tx_store;
mod identity_stakes;
mod quic_connection_utils;
mod quinn_auto_reconnect;

View File

@ -21,6 +21,7 @@ mod util;
mod tx_store;
mod identity_stakes;
mod quic_connection_utils;
mod quinn_auto_reconnect;
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]

View File

@ -4,6 +4,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use tracing::{debug_span, instrument, Instrument, span};
@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Context, Error};
use dashmap::DashMap;
use itertools::{any, Itertools};
use log::{debug, error, info, trace, warn};
use quinn::{Connecting, Connection, Endpoint, SendStream, ServerConfig, TransportConfig, VarInt};
use quinn::{Connecting, Connection, ConnectionError, Endpoint, SendStream, ServerConfig, TransportConfig, VarInt};
use rcgen::generate_simple_self_signed;
use rustls::{Certificate, PrivateKey};
use rustls::server::ResolvesServerCert;
@ -25,13 +26,14 @@ use solana_sdk::transaction::VersionedTransaction;
use tokio::net::ToSocketAddrs;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use tokio::sync::{RwLock, RwLockReadGuard};
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use tracing::field::debug;
use crate::proxy_request_format::TpuForwardingRequest;
use crate::quic_connection_utils::{connection_stats, QuicConnectionUtils};
use crate::quinn_auto_reconnect::AutoReconnect;
use crate::tpu_quic_client::{send_txs_to_tpu_static, SingleTPUConnectionManager, TpuQuicClient};
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
use crate::util::AnyhowJoinHandle;
@ -222,26 +224,23 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R
// TODO consume queue
// TODO exit signal
let auto_connection = AutoReconnect::new(tpu_quic_client_copy.get_endpoint(), tpu_address);
// let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake");
loop {
let maybe_connection = tpu_quic_client_copy.create_connection(tpu_address).await;
if maybe_connection.is_err() {
// TODO implement retries etc
error!("failed to connect to TPU {} - giving up, dropping unprocessed elements in channel", tpu_address);
return;
}
let connection = maybe_connection.unwrap();
let exit_signal = exit_signal.clone();
while let Some(packet) = receiver.recv().await {
loop {
let packet = receiver.recv().await.unwrap();
assert_eq!(packet.tpu_address, tpu_address, "routing error");
debug!("forwarding transaction batch of size {} to address {}", packet.transactions.len(), packet.tpu_address);
// TODo move send_txs_to_tpu_static to tpu_quic_client
timeout(Duration::from_millis(500),
send_txs_to_tpu_static(connection.clone(), tpu_address, &packet.transactions, exit_signal.clone())).await
.expect("timeout sending data to TPU node")
let result = timeout(Duration::from_millis(500),
send_txs_to_tpu_static(&auto_connection, &packet.transactions)).await;
// .expect("timeout sending data to TPU node");
debug!("send_txs_to_tpu_static result {:?} - loop over errors", result);
}
@ -271,3 +270,4 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R
bail!("TPU Quic forward service stopped");
}

View File

@ -9,6 +9,7 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Error};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::future::join_all;
use itertools::{any, Itertools};
use log::{debug, error, info, trace, warn};
use quinn::{Connecting, Connection, ConnectionError, Endpoint, SendStream, ServerConfig, VarInt};
@ -24,6 +25,7 @@ use tokio::net::ToSocketAddrs;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::sync::RwLock;
use crate::quic_connection_utils::{connection_stats, QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils};
use crate::quinn_auto_reconnect::AutoReconnect;
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
@ -44,6 +46,12 @@ pub struct TpuQuicClient {
}
impl TpuQuicClient {
// note: this is a dirty workaround to expose enpoint to autoconnect class
pub fn get_endpoint(&self) -> Endpoint {
self.endpoint.clone()
}
pub async fn create_connection(&self, tpu_address: SocketAddr) -> anyhow::Result<Connection> {
let connection =
// TODO try 0rff
@ -302,11 +310,10 @@ fn serialize_to_vecvec(transactions: &Vec<VersionedTransaction>) -> Vec<Vec<u8>>
// send potentially large amount of transactions to a single TPU
#[tracing::instrument(skip_all, level = "debug")]
pub async fn send_txs_to_tpu_static(
tpu_connection: Connection,
tpu_address: SocketAddr,
auto_connection: &AutoReconnect,
txs: &Vec<VersionedTransaction>,
exit_signal: Arc<AtomicBool>,
) {
// note: this impl does not deal with connection errors
@ -316,16 +323,18 @@ pub async fn send_txs_to_tpu_static(
// TODO add error handling
for chunk in txs.chunks(MAX_PARALLEL_STREAMS) {
let vecvec = chunk.iter().map(|tx| {
let all_send_fns = chunk.iter().map(|tx| {
let tx_raw = bincode::serialize(tx).unwrap();
tx_raw
}).collect_vec();
QuicConnectionUtils::send_transaction_batch_parallel(
tpu_connection.clone(),
vecvec,
exit_signal.clone(),
QUIC_CONNECTION_TIMEOUT,
).await;
})
.map(|tx_raw| {
auto_connection.send(tx_raw) // ignores error
});
// let all_send_fns = (0..txs.len()).map(|i| auto_connection.roundtrip(vecvec.get(i))).collect_vec();
join_all(all_send_fns).await;
}
}