move proxy send code

This commit is contained in:
GroovieGermanikus 2023-07-24 10:45:17 +02:00
parent 9ab08d241b
commit 0d1c95c327
2 changed files with 19 additions and 14 deletions

View File

@ -61,10 +61,12 @@ impl ActiveConnection {
}
fn check_for_confirmation(txs_sent_store: &TxStore, signature: String) -> bool {
match txs_sent_store.get(&signature) {
Some(props) => props.status.is_some(),
None => false,
}
// TODO build a smarter duplication check
false
// match txs_sent_store.get(&signature) {
// Some(props) => props.status.is_some(),
// None => false,
// }
}
#[allow(clippy::too_many_arguments)]

View File

@ -56,19 +56,18 @@ impl QuicForwardProxy {
let quic_proxy: AnyhowJoinHandle = tokio::spawn(async move {
info!("TPU Quic Proxy server start on {}", endpoint.local_addr()?);
let identity_keypair = Keypair::new(); // TODO
while let Some(conn) = endpoint.accept().await {
trace!("connection incoming");
let active_tpu_connection =
TpuQuicConnection::new_with_validator_identity(self.validator_identity.as_ref());
let fut = handle_connection(conn, active_tpu_connection, exit_signal.clone(), self.validator_identity.clone());
let exit_signal = exit_signal.clone();
let validator_identity_copy = self.validator_identity.clone();
tokio::spawn(async move {
if let Err(e) = fut.await {
error!("connection failed: {reason}", reason = e.to_string())
match setup_connection(conn, exit_signal, validator_identity_copy).await {
Ok(()) => {}
Err(err) => {
error!("setup connection failed: {reason}", reason = err);
}
}
});
}
@ -85,9 +84,13 @@ impl QuicForwardProxy {
}
async fn handle_connection(connecting: Connecting, active_tpu_connection: TpuQuicConnection, exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
async fn setup_connection(connecting: Connecting, exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
let connection = connecting.await?;
debug!("inbound connection established, remote {connection}", connection = connection.remote_address());
let active_tpu_connection =
TpuQuicConnection::new_with_validator_identity(validator_identity.as_ref());
loop {
let maybe_stream = connection.accept_uni().await;
let mut recv_stream = match maybe_stream {