WIP: refactor active type connection

This commit is contained in:
GroovieGermanikus 2023-06-27 17:06:35 +02:00
parent c406f447cc
commit f5979eb098
1 changed files with 70 additions and 73 deletions

View File

@ -60,7 +60,20 @@ impl QuicForwardProxy {
while let Some(conn) = endpoint.accept().await {
trace!("connection incoming");
let fut = handle_connection(conn, exit_signal.clone(), self.validator_identity.clone());
let (certificate, key) = new_self_signed_tls_certificate(
self.validator_identity.as_ref(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC client certificates");
let endpoint_outbound = QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone());
let active_tpu_connection = ActiveTpuConnection {
endpoint: endpoint_outbound.clone(),
};
let fut = handle_connection(conn, active_tpu_connection, exit_signal.clone(), self.validator_identity.clone());
tokio::spawn(async move {
if let Err(e) = fut.await {
error!("connection failed: {reason}", reason = e.to_string())
@ -80,8 +93,61 @@ impl QuicForwardProxy {
}
#[derive(Clone)]
struct ActiveTpuConnection {
endpoint: Endpoint,
}
async fn handle_connection(connecting: Connecting, exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
impl ActiveTpuConnection {
pub async fn send_txs_to_tpu(&self, exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>, tpu_identity: Pubkey, tpu_addr: SocketAddr, txs: &Vec<VersionedTransaction>) {
let last_stable_id: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let txs_raw = serialize_to_vecvec(&txs);
info!("received vecvec: {}", txs_raw.iter().map(|tx| tx.len().to_string()).into_iter().join(","));
let connection =
Arc::new(RwLock::new(
QuicConnectionUtils::connect(
tpu_identity,
false,
self.endpoint.clone(),
tpu_addr,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
exit_signal.clone(),
|| {
// do nothing
},
).await.unwrap()));
QuicConnectionUtils::send_transaction_batch(
connection.clone(),
txs_raw,
tpu_identity,
self.endpoint.clone(),
tpu_addr,
exit_signal.clone(),
last_stable_id,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
|| {
// do nothing
}
).await;
{
let conn = connection.clone();
conn.write().await.close(0u32.into(), b"done");
}
}
}
async fn handle_connection(connecting: Connecting, active_tpu_connection: ActiveTpuConnection, exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>) -> anyhow::Result<()> {
let connection = connecting.await?;
debug!("inbound connection established, remote {connection}", connection = connection.remote_address());
loop {
@ -101,6 +167,7 @@ async fn handle_connection(connecting: Connecting, exit_signal: Arc<AtomicBool>,
}
Ok(s) => s,
};
let active_tpu_connection_copy = active_tpu_connection.clone();
let exit_signal_copy = exit_signal.clone();
let validator_identity_copy = validator_identity.clone();
tokio::spawn(async move {
@ -115,83 +182,13 @@ async fn handle_connection(connecting: Connecting, exit_signal: Arc<AtomicBool>,
let tpu_addr = proxy_request.get_tpu_socket_addr();
let txs = proxy_request.get_transactions();
send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_addr, &txs).await;
active_tpu_connection_copy.send_txs_to_tpu(exit_signal_copy, validator_identity_copy, tpu_identity, tpu_addr, &txs).await;
});
} // -- loop
}
mod test {
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use solana_sdk::pubkey::Pubkey;
use crate::cli::get_identity_keypair;
use crate::proxy::send_txs_to_tpu;
#[test]
fn call() {
let exit_signal = Arc::new(AtomicBool::new(false));
let validator_identity = get_identity_keypair(&"/Users/stefan/mango/projects/quic-forward-proxy/local-testvalidator-stake-account.json".to_string());
let tpu_identity = Pubkey::from_str("asdfsdf").unwrap();
let tpu_address = "127.0.0.1:1027".parse().unwrap();
send_txs_to_tpu(exit_signal, validator_identity, tpu_identity, tpu_address, &vec![])
}
}
async fn send_txs_to_tpu(exit_signal: Arc<AtomicBool>, validator_identity: Arc<Keypair>, tpu_identity: Pubkey, tpu_addr: SocketAddr, txs: &Vec<VersionedTransaction>) {
let (certificate, key) = new_self_signed_tls_certificate(
validator_identity.as_ref(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC client certificates");
let endpoint = QuicConnectionUtils::create_endpoint(certificate.clone(), key.clone());
let last_stable_id: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let txs_raw = serialize_to_vecvec(&txs);
info!("received vecvec: {}", txs_raw.iter().map(|tx| tx.len().to_string()).into_iter().join(","));
let connection =
Arc::new(RwLock::new(
QuicConnectionUtils::connect(
tpu_identity,
false,
endpoint.clone(),
tpu_addr,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
exit_signal.clone(),
|| {
// do nothing
},
).await.unwrap()));
QuicConnectionUtils::send_transaction_batch(
connection.clone(),
txs_raw,
tpu_identity,
endpoint,
tpu_addr,
exit_signal.clone(),
last_stable_id,
QUIC_CONNECTION_TIMEOUT,
CONNECTION_RETRY_COUNT,
|| {
// do nothing
}
).await;
{
let conn = connection.clone();
conn.write().await.close(0u32.into(), b"done");
}
}
fn serialize_to_vecvec(transactions: &Vec<VersionedTransaction>) -> Vec<Vec<u8>> {
transactions.iter().map(|tx| {
let tx_raw = bincode::serialize(tx).unwrap();