diff --git a/quic-forward-proxy/Cargo.toml b/quic-forward-proxy/Cargo.toml index 18e5a264..9dc99be8 100644 --- a/quic-forward-proxy/Cargo.toml +++ b/quic-forward-proxy/Cargo.toml @@ -41,4 +41,6 @@ chrono = { workspace = true } tokio = { version = "1.28.2", features = ["full", "fs"]} rcgen = "0.9.3" spl-memo = "3.0.1" +# tokio channel fanout +fan = "0.1.3" diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index 761f9401..5d2e5798 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -10,6 +10,8 @@ use std::time::Duration; use tracing::{debug_span, instrument, Instrument, span}; use anyhow::{anyhow, bail, Context, Error}; use dashmap::DashMap; +use fan::tokio::mpsc::FanOut; +use futures::sink::Fanout; use itertools::{any, Itertools}; use log::{debug, error, info, trace, warn}; use quinn::{Connecting, Connection, ConnectionError, Endpoint, SendStream, ServerConfig, TransportConfig, VarInt}; @@ -201,7 +203,7 @@ async fn accept_client_connection(client_connection: Connection, forwarder_chann async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: Receiver, exit_signal: Arc) -> anyhow::Result<()> { info!("TPU Quic forwarder started"); - let mut agents: HashMap> = HashMap::new(); + let mut agents: HashMap> = HashMap::new(); let tpu_quic_client_copy = tpu_quic_client.clone(); loop { @@ -211,53 +213,65 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R let tpu_address = forward_packet.tpu_address; if !agents.contains_key(&tpu_address) { - let (sender, mut receiver) = channel::(10000); // TODO cleanup agent after a while of iactivity - agents.insert(tpu_address, sender); - let tpu_quic_client_copy = tpu_quic_client.clone(); - let exit_signal = exit_signal.clone(); - tokio::spawn(async move { - debug!("Start Quic forwarder agent for TPU {}", tpu_address); - // TODO pass+check the tpu_address - // TODO connect - // TODO consume queue - // TODO exit signal + let mut senders = Vec::new(); + for i in 0..4 { + let (sender, mut receiver) = channel::(100000); + senders.push(sender); + let endpoint = tpu_quic_client.get_endpoint().clone(); + let exit_signal = exit_signal.clone(); + tokio::spawn(async move { + debug!("Start Quic forwarder agent for TPU {}", tpu_address); + // TODO pass+check the tpu_address + // TODO connect + // TODO consume queue + // TODO exit signal - let auto_connection = AutoReconnect::new(tpu_quic_client_copy.get_endpoint(), tpu_address); - // let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake"); - loop { - - let exit_signal = exit_signal.clone(); + let auto_connection = AutoReconnect::new(endpoint, tpu_address); + // let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake"); loop { - let packet = receiver.recv().await.unwrap(); - assert_eq!(packet.tpu_address, tpu_address, "routing error"); - let mut transactions_batch = packet.transactions; + let exit_signal = exit_signal.clone(); + loop { + let packet = receiver.recv().await.unwrap(); + assert_eq!(packet.tpu_address, tpu_address, "routing error"); - let mut batch_size = 1; - while let Ok(more) = receiver.try_recv() { - transactions_batch.extend(more.transactions); - batch_size += 1; - } - if batch_size > 1 { - debug!("encountered batch of size {}", batch_size); - } + let mut transactions_batch = packet.transactions; - debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address); + let mut batch_size = 1; + while let Ok(more) = receiver.try_recv() { + transactions_batch.extend(more.transactions); + batch_size += 1; + } + if batch_size > 1 { + debug!("encountered batch of size {}", batch_size); + } - // TODo move send_txs_to_tpu_static to tpu_quic_client - let result = timeout(Duration::from_millis(500), - send_txs_to_tpu_static(&auto_connection, &transactions_batch)).await; + debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address); + + // TODo move send_txs_to_tpu_static to tpu_quic_client + let result = timeout(Duration::from_millis(500), + send_txs_to_tpu_static(&auto_connection, &transactions_batch)).await; // .expect("timeout sending data to TPU node"); - debug!("send_txs_to_tpu_static result {:?} - loop over errors", result); + if result.is_err() { + warn!("send_txs_to_tpu_static result {:?} - loop over errors", result); + } else { + debug!("send_txs_to_tpu_static sent {}", transactions_batch.len()); + } + + } } - } + }); - }); + } + + let fanout = FanOut::new(senders); + + agents.insert(tpu_address, fanout); } // -- new agent diff --git a/quic-forward-proxy/src/tpu_quic_client.rs b/quic-forward-proxy/src/tpu_quic_client.rs index e51acf76..a7032f63 100644 --- a/quic-forward-proxy/src/tpu_quic_client.rs +++ b/quic-forward-proxy/src/tpu_quic_client.rs @@ -151,7 +151,7 @@ impl TpuQuicClient { /// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU // note: ATM the provided identity might or might not be a valid validator keypair pub async fn new_with_validator_identity(validator_identity: &Keypair) -> TpuQuicClient { - info!("Setup TPU Quic stable connection ..."); + info!("Setup TPU Quic stable connection with validator identity {} ...", bs58::encode(validator_identity.pubkey()).into_string()); let (certificate, key) = new_self_signed_tls_certificate( validator_identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index ae8b9194..676eb1e0 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -40,7 +40,8 @@ pub struct QuicProxyConnectionManager { current_tpu_nodes: Arc>> } -const PARALLEL_STREAMS_TO_PROXY: usize = 4; +// TODO consolidate with number_of_transactions_per_unistream +const CHUNK_SIZE_PER_STREAM: usize = 20; impl QuicProxyConnectionManager { pub async fn new( @@ -242,7 +243,7 @@ impl QuicProxyConnectionManager { } - for chunk in txs.chunks(PARALLEL_STREAMS_TO_PROXY) { + for chunk in txs.chunks(CHUNK_SIZE_PER_STREAM) { let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into()); debug!("forwarding_request: {}", forwarding_request); diff --git a/services/src/tpu_utils/quinn_auto_reconnect.rs b/services/src/tpu_utils/quinn_auto_reconnect.rs index 56ab85c9..bce87d1f 100644 --- a/services/src/tpu_utils/quinn_auto_reconnect.rs +++ b/services/src/tpu_utils/quinn_auto_reconnect.rs @@ -2,6 +2,7 @@ use std::cell::RefCell; use std::fmt; use std::net::SocketAddr; use std::sync::atomic::{AtomicU32, Ordering}; +use log::{trace, warn}; use tracing::{debug, info}; use quinn::{Connection, Endpoint}; use tokio::sync::{RwLock, RwLockWriteGuard}; @@ -53,46 +54,40 @@ impl AutoReconnect { if maybe_conn.as_ref().filter(|conn| conn.close_reason().is_none()).is_some() { // let reuse = lock.unwrap().clone(); let reuse = maybe_conn.as_ref().unwrap(); - debug!("Reuse connection {}", reuse.stable_id()); + trace!("Reuse connection {}", reuse.stable_id()); return reuse.clone(); } } let mut lock = self.current.write().await; + match &*lock { Some(current) => { if current.close_reason().is_some() { - info!("Connection is closed for reason: {:?}", current.close_reason()); - // TODO log - + warn!("Connection i s closed for reason: {:?}", current.close_reason()); let new_connection = self.create_connection().await; + let prev_stable_id = current.stable_id(); *lock = Some(new_connection.clone()); // let old_conn = lock.replace(new_connection.clone()); self.reconnect_count.fetch_add(1, Ordering::SeqCst); - - // debug!("Replace closed connection {} with {} (retry {})", - // old_conn.map(|c| c.stable_id().to_string()).unwrap_or("none".to_string()), - // new_connection.stable_id(), - // self.reconnect_count.load(Ordering::SeqCst)); + debug!("Replace closed connection {} with {} (retry {})", + prev_stable_id, + new_connection.stable_id(), + self.reconnect_count.load(Ordering::SeqCst)); // TODO log old vs new stable_id - return new_connection.clone(); } else { - debug!("Reuse connection {} with write-lock", current.stable_id()); + // TODO check log if that ever happens + warn!("Reuse connection {} with write-lock", current.stable_id()); return current.clone(); } } None => { let new_connection = self.create_connection().await; - - // let old_conn = lock.replace(new_connection.clone()); - // assert!(old_conn.is_none(), "old connection should be None"); *lock = Some(new_connection.clone()); - // let old_conn = foo.replace(Some(new_connection.clone())); - // TODO log old vs new stable_id - debug!("Create initial connection {}", new_connection.stable_id()); + trace!("Create initial connection {}", new_connection.stable_id()); return new_connection.clone(); }