diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index 4429e8e8..95bad738 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -60,7 +60,20 @@ impl QuicForwardProxy { while let Some(conn) = endpoint.accept().await { trace!("connection incoming"); - let fut = handle_connection(conn, exit_signal.clone(), self.validator_identity.clone()); + + let (certificate, key) = new_self_signed_tls_certificate( + self.validator_identity.as_ref(), + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + ) + .expect("Failed to initialize QUIC client certificates"); + + let endpoint_outbound = QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()); + + let active_tpu_connection = ActiveTpuConnection { + endpoint: endpoint_outbound.clone(), + }; + + let fut = handle_connection(conn, active_tpu_connection, exit_signal.clone(), self.validator_identity.clone()); tokio::spawn(async move { if let Err(e) = fut.await { error!("connection failed: {reason}", reason = e.to_string()) @@ -80,8 +93,61 @@ impl QuicForwardProxy { } +#[derive(Clone)] +struct ActiveTpuConnection { + endpoint: Endpoint, +} -async fn handle_connection(connecting: Connecting, exit_signal: Arc, validator_identity: Arc) -> anyhow::Result<()> { +impl ActiveTpuConnection { + + pub async fn send_txs_to_tpu(&self, exit_signal: Arc, validator_identity: Arc, tpu_identity: Pubkey, tpu_addr: SocketAddr, txs: &Vec) { + + let last_stable_id: Arc = Arc::new(AtomicU64::new(0)); + + let txs_raw = serialize_to_vecvec(&txs); + + info!("received vecvec: {}", txs_raw.iter().map(|tx| tx.len().to_string()).into_iter().join(",")); + + let connection = + Arc::new(RwLock::new( + QuicConnectionUtils::connect( + tpu_identity, + false, + self.endpoint.clone(), + tpu_addr, + QUIC_CONNECTION_TIMEOUT, + CONNECTION_RETRY_COUNT, + exit_signal.clone(), + || { + // do nothing + }, + ).await.unwrap())); + + QuicConnectionUtils::send_transaction_batch( + connection.clone(), + txs_raw, + tpu_identity, + self.endpoint.clone(), + tpu_addr, + exit_signal.clone(), + last_stable_id, + QUIC_CONNECTION_TIMEOUT, + CONNECTION_RETRY_COUNT, + || { + // do nothing + } + ).await; + + { + let conn = connection.clone(); + conn.write().await.close(0u32.into(), b"done"); + } + } + +} + + +async fn handle_connection(connecting: Connecting, active_tpu_connection: ActiveTpuConnection, exit_signal: Arc, validator_identity: Arc) -> anyhow::Result<()> { let connection = connecting.await?; debug!("inbound connection established, remote {connection}", connection = connection.remote_address()); loop { @@ -101,6 +167,7 @@ async fn handle_connection(connecting: Connecting, exit_signal: Arc, } Ok(s) => s, }; + let active_tpu_connection_copy = active_tpu_connection.clone(); let exit_signal_copy = exit_signal.clone(); let validator_identity_copy = validator_identity.clone(); tokio::spawn(async move { @@ -115,83 +182,13 @@ async fn handle_connection(connecting: Connecting, exit_signal: Arc, let tpu_addr = proxy_request.get_tpu_socket_addr(); let txs = proxy_request.get_transactions(); - send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_addr, &txs).await; + active_tpu_connection_copy.send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_addr, &txs).await; }); } // -- loop } -mod test { - use std::str::FromStr; - use std::sync::Arc; - use std::sync::atomic::AtomicBool; - use solana_sdk::pubkey::Pubkey; - use crate::cli::get_identity_keypair; - use crate::proxy::send_txs_to_tpu; - - #[test] - fn call() { - let exit_signal = Arc::new(AtomicBool::new(false)); - - let validator_identity = get_identity_keypair(&"/Users/stefan/mango/projects/quic-forward-proxy/local-testvalidator-stake-account.json".to_string()); - let tpu_identity = Pubkey::from_str("asdfsdf").unwrap(); - let tpu_address = "127.0.0.1:1027".parse().unwrap(); - send_txs_to_tpu(exit_signal, validator_identity, tpu_identity, tpu_address, &vec![]) - - } -} - -async fn send_txs_to_tpu(exit_signal: Arc, validator_identity: Arc, tpu_identity: Pubkey, tpu_addr: SocketAddr, txs: &Vec) { - let (certificate, key) = new_self_signed_tls_certificate( - validator_identity.as_ref(), - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - ) - .expect("Failed to initialize QUIC client certificates"); - - let endpoint = QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone()); - let last_stable_id: Arc = Arc::new(AtomicU64::new(0)); - - let txs_raw = serialize_to_vecvec(&txs); - - info!("received vecvec: {}", txs_raw.iter().map(|tx| tx.len().to_string()).into_iter().join(",")); - - let connection = - Arc::new(RwLock::new( - QuicConnectionUtils::connect( - tpu_identity, - false, - endpoint.clone(), - tpu_addr, - QUIC_CONNECTION_TIMEOUT, - CONNECTION_RETRY_COUNT, - exit_signal.clone(), - || { - // do nothing - }, - ).await.unwrap())); - - QuicConnectionUtils::send_transaction_batch( - connection.clone(), - txs_raw, - tpu_identity, - endpoint, - tpu_addr, - exit_signal.clone(), - last_stable_id, - QUIC_CONNECTION_TIMEOUT, - CONNECTION_RETRY_COUNT, - || { - // do nothing - } - ).await; - - { - let conn = connection.clone(); - conn.write().await.close(0u32.into(), b"done"); - } -} - fn serialize_to_vecvec(transactions: &Vec) -> Vec> { transactions.iter().map(|tx| { let tx_raw = bincode::serialize(tx).unwrap();