remove useless async

This commit is contained in:
GroovieGermanikus 2023-06-27 16:55:38 +02:00
parent c0d4b18df5
commit c406f447cc
1 changed files with 30 additions and 34 deletions

View File

@ -84,46 +84,42 @@ impl QuicForwardProxy {
async fn handle_connection(connecting: Connecting, exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
let connection = connecting.await?;
debug!("inbound connection established, remote {connection}", connection = connection.remote_address());
async {
loop {
let maybe_stream = connection.accept_uni().await;
let mut recv_stream = match maybe_stream {
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
debug!("connection closed by peer - reason: {:?}", reason);
if reason.error_code != VarInt::from_u32(0) {
return Err(anyhow!("connection closed by peer with unexpected reason: {:?}", reason));
}
debug!("connection gracefully closed by peer");
return Ok(());
},
Err(e) => {
error!("failed to accept stream: {}", e);
return Err(anyhow::Error::msg("error accepting stream"));
loop {
let maybe_stream = connection.accept_uni().await;
let mut recv_stream = match maybe_stream {
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
debug!("connection closed by peer - reason: {:?}", reason);
if reason.error_code != VarInt::from_u32(0) {
return Err(anyhow!("connection closed by peer with unexpected reason: {:?}", reason));
}
Ok(s) => s,
};
let exit_signal_copy = exit_signal.clone();
let validator_identity_copy = validator_identity.clone();
tokio::spawn(async move {
let raw_request = recv_stream.read_to_end(10_000_000).await
.unwrap();
debug!("read proxy_request {} bytes", raw_request.len());
debug!("connection gracefully closed by peer");
return Ok(());
},
Err(e) => {
error!("failed to accept stream: {}", e);
return Err(anyhow::Error::msg("error accepting stream"));
}
Ok(s) => s,
};
let exit_signal_copy = exit_signal.clone();
let validator_identity_copy = validator_identity.clone();
tokio::spawn(async move {
let raw_request = recv_stream.read_to_end(10_000_000).await
.unwrap();
debug!("read proxy_request {} bytes", raw_request.len());
let proxy_request = TpuForwardingRequest::deserialize_from_raw_request(&raw_request);
let proxy_request = TpuForwardingRequest::deserialize_from_raw_request(&raw_request);
debug!("proxy request details: {}", proxy_request);
let tpu_identity = proxy_request.get_identity_tpunode();
let tpu_addr = proxy_request.get_tpu_socket_addr();
let txs = proxy_request.get_transactions();
debug!("proxy request details: {}", proxy_request);
let tpu_identity = proxy_request.get_identity_tpunode();
let tpu_addr = proxy_request.get_tpu_socket_addr();
let txs = proxy_request.get_transactions();
send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_addr, &txs).await;
send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_addr, &txs).await;
});
});
} // -- loop
}
.await?;
Ok(())
} // -- loop
}
mod test {