From 7cae76bb3dd2b00767ef47f41244c91d8660ae22 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 28 Jul 2023 14:47:38 +0200 Subject: [PATCH] drain queues --- quic-forward-proxy/src/proxy.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/quic-forward-proxy/src/proxy.rs b/quic-forward-proxy/src/proxy.rs index 4651c5d9..761f9401 100644 --- a/quic-forward-proxy/src/proxy.rs +++ b/quic-forward-proxy/src/proxy.rs @@ -233,11 +233,22 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R let packet = receiver.recv().await.unwrap(); assert_eq!(packet.tpu_address, tpu_address, "routing error"); - debug!("forwarding transaction batch of size {} to address {}", packet.transactions.len(), packet.tpu_address); + let mut transactions_batch = packet.transactions; + + let mut batch_size = 1; + while let Ok(more) = receiver.try_recv() { + transactions_batch.extend(more.transactions); + batch_size += 1; + } + if batch_size > 1 { + debug!("encountered batch of size {}", batch_size); + } + + debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address); // TODo move send_txs_to_tpu_static to tpu_quic_client let result = timeout(Duration::from_millis(500), - send_txs_to_tpu_static(&auto_connection, &packet.transactions)).await; + send_txs_to_tpu_static(&auto_connection, &transactions_batch)).await; // .expect("timeout sending data to TPU node"); debug!("send_txs_to_tpu_static result {:?} - loop over errors", result); @@ -251,8 +262,17 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R } // -- new agent let agent_channel = agents.get(&tpu_address).unwrap(); + agent_channel.send(forward_packet).await.unwrap(); + let mut batch_size = 1; + while let Ok(more) = transaction_channel.try_recv() { + agent_channel.send(more).await.unwrap(); + batch_size += 1; + } + if batch_size > 1 { + debug!("encountered batch of size {}", batch_size); + } // check if the tpu has already a task+queue running, if not start one, sort+queue packets by tpu address