diff --git a/core/src/proxy_request_format.rs b/core/src/proxy_request_format.rs index e888ae2c..cfc7d5ef 100644 --- a/core/src/proxy_request_format.rs +++ b/core/src/proxy_request_format.rs @@ -15,7 +15,7 @@ const FORMAT_VERSION1: u16 = 2301; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct TpuForwardingRequest { format_version: u16, - tpu_socket_addr: SocketAddr, // TODO is that correct + tpu_socket_addr: SocketAddr, // TODO is that correct, maybe it should be V4; maybe we also need to provide a list identity_tpunode: Pubkey, transactions: Vec, } diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index 52cd15de..b6e9f1d2 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::Path; use std::sync::Arc; @@ -5,7 +6,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64}; use std::thread::sleep; use std::time::Duration; use tracing::{debug_span, instrument, Instrument, span}; -use anyhow::{anyhow, bail, Context}; +use anyhow::{anyhow, bail, Context, Error}; use dashmap::DashMap; use itertools::{any, Itertools}; use log::{debug, error, info, trace, warn}; @@ -75,31 +76,7 @@ impl QuicForwardProxy { let exit_signal = Arc::new(AtomicBool::new(false)); let endpoint = self.endpoint.clone(); - let quic_proxy: AnyhowJoinHandle = tokio::spawn(async move { - info!("TPU Quic Proxy server listening on {}", endpoint.local_addr()?); - - - while let Some(connecting) = endpoint.accept().await { - - let exit_signal = exit_signal.clone(); - let validator_identity_copy = self.validator_identity.clone(); - let tpu_quic_client = self.tpu_quic_client.clone(); - tokio::spawn(async move { - - let connection = connecting.await.context("handshake").unwrap(); - match accept_client_connection(connection, tpu_quic_client, exit_signal, validator_identity_copy) - .await { - Ok(()) => {} - Err(err) => { - error!("setup connection failed: {reason}", reason = err); - } - } - - }); - } - - bail!("TPU Quic Proxy server stopped"); - }); + let quic_proxy: AnyhowJoinHandle = tokio::spawn(self.listen(exit_signal, endpoint)); tokio::select! { res = quic_proxy => { @@ -108,6 +85,27 @@ impl QuicForwardProxy { } } + async fn listen(mut self, exit_signal: Arc, endpoint: Endpoint) -> anyhow::Result<()> { + info!("TPU Quic Proxy server listening on {}", endpoint.local_addr()?); + + while let Some(connecting) = endpoint.accept().await { + let exit_signal = exit_signal.clone(); + let validator_identity_copy = self.validator_identity.clone(); + let tpu_quic_client = self.tpu_quic_client.clone(); + tokio::spawn(async move { + let connection = connecting.await.context("handshake").unwrap(); + match accept_client_connection(connection, tpu_quic_client, exit_signal, validator_identity_copy) + .await { + Ok(()) => {} + Err(err) => { + error!("setup connection failed: {reason}", reason = err); + } + } + }); + } + + bail!("TPU Quic Proxy server stopped"); + } } diff --git a/quic-forward-proxy/src/proxy_request_format.rs b/quic-forward-proxy/src/proxy_request_format.rs index 2339173e..d926b71b 100644 --- a/quic-forward-proxy/src/proxy_request_format.rs +++ b/quic-forward-proxy/src/proxy_request_format.rs @@ -10,7 +10,7 @@ use solana_sdk::transaction::VersionedTransaction; /// lite-rpc to proxy wire format /// compat info: non-public format ATM /// initial version -const FORMAT_VERSION1: u16 = 2301; +pub const FORMAT_VERSION1: u16 = 2301; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct TpuForwardingRequest { @@ -50,7 +50,7 @@ impl TpuForwardingRequest { .context("deserialize proxy request") .unwrap(); - assert_eq!(request.format_version, 2301); + assert_eq!(request.format_version, FORMAT_VERSION1); request }