From 88b6935319fb71b1d56160892954d04468cfeee6 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Sat, 29 Jul 2023 00:06:02 +0200 Subject: [PATCH] fix wrong agent-tpu assignment --- quic-forward-proxy/src/proxy.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index 0a28276a..5def09dd 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -201,7 +201,7 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R let mut agents: HashMap> = HashMap::new(); - let tpu_quic_client_copy = tpu_quic_client.clone(); + let endpoint = tpu_quic_client.get_endpoint().clone(); loop { // TODO add exit @@ -215,8 +215,8 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R for i in 0..4 { let (sender, mut receiver) = channel::(100000); senders.push(sender); - let endpoint = tpu_quic_client.get_endpoint().clone(); let exit_signal = exit_signal.clone(); + let endpoint_copy = endpoint.clone(); tokio::spawn(async move { debug!("Start Quic forwarder agent for TPU {}", tpu_address); // TODO pass+check the tpu_address @@ -224,7 +224,7 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R // TODO consume queue // TODO exit signal - let auto_connection = AutoReconnect::new(endpoint, tpu_address); + let auto_connection = AutoReconnect::new(endpoint_copy, tpu_address); // let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake"); loop { @@ -275,14 +275,14 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R agent_channel.send(forward_packet).await.unwrap(); - let mut batch_size = 1; - while let Ok(more) = transaction_channel.try_recv() { - agent_channel.send(more).await.unwrap(); - batch_size += 1; - } - if batch_size > 1 { - debug!("encountered batch of size {}", batch_size); - } + // let mut batch_size = 1; + // while let Ok(more) = transaction_channel.try_recv() { + // agent_channel.send(more).await.unwrap(); + // batch_size += 1; + // } + // if batch_size > 1 { + // debug!("encountered batch of size {}", batch_size); + // } // check if the tpu has already a task+queue running, if not start one, sort+queue packets by tpu address @@ -297,7 +297,7 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R } - bail!("TPU Quic forward service stopped"); + panic!("not reachable"); }