fix wrong agent-tpu assignment

This commit is contained in:
GroovieGermanikus 2023-07-29 00:06:02 +02:00
parent c13f0f5e7c
commit 88b6935319
1 changed files with 12 additions and 12 deletions

View File

@ -201,7 +201,7 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R
let mut agents: HashMap<SocketAddr, FanOut<ForwardPacket>> = HashMap::new();
let tpu_quic_client_copy = tpu_quic_client.clone();
let endpoint = tpu_quic_client.get_endpoint().clone();
loop {
// TODO add exit
@ -215,8 +215,8 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R
for i in 0..4 {
let (sender, mut receiver) = channel::<ForwardPacket>(100000);
senders.push(sender);
let endpoint = tpu_quic_client.get_endpoint().clone();
let exit_signal = exit_signal.clone();
let endpoint_copy = endpoint.clone();
tokio::spawn(async move {
debug!("Start Quic forwarder agent for TPU {}", tpu_address);
// TODO pass+check the tpu_address
@ -224,7 +224,7 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R
// TODO consume queue
// TODO exit signal
let auto_connection = AutoReconnect::new(endpoint, tpu_address);
let auto_connection = AutoReconnect::new(endpoint_copy, tpu_address);
// let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake");
loop {
@ -275,14 +275,14 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R
agent_channel.send(forward_packet).await.unwrap();
let mut batch_size = 1;
while let Ok(more) = transaction_channel.try_recv() {
agent_channel.send(more).await.unwrap();
batch_size += 1;
}
if batch_size > 1 {
debug!("encountered batch of size {}", batch_size);
}
// let mut batch_size = 1;
// while let Ok(more) = transaction_channel.try_recv() {
// agent_channel.send(more).await.unwrap();
// batch_size += 1;
// }
// if batch_size > 1 {
// debug!("encountered batch of size {}", batch_size);
// }
// check if the tpu has already a task+queue running, if not start one, sort+queue packets by tpu address
@ -297,7 +297,7 @@ async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: R
}
bail!("TPU Quic forward service stopped");
panic!("not reachable");
}