This commit is contained in:
GroovieGermanikus 2023-07-27 11:27:31 +02:00
parent d378209195
commit 23ffbc50ab
3 changed files with 27 additions and 29 deletions

View File

@ -15,7 +15,7 @@ const FORMAT_VERSION1: u16 = 2301;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuForwardingRequest {
format_version: u16,
tpu_socket_addr: SocketAddr, // TODO is that correct
tpu_socket_addr: SocketAddr, // TODO is that correct, maybe it should be V4; maybe we also need to provide a list
identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>,
}

View File

@ -1,3 +1,4 @@
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::Path;
use std::sync::Arc;
@ -5,7 +6,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64};
use std::thread::sleep;
use std::time::Duration;
use tracing::{debug_span, instrument, Instrument, span};
use anyhow::{anyhow, bail, Context};
use anyhow::{anyhow, bail, Context, Error};
use dashmap::DashMap;
use itertools::{any, Itertools};
use log::{debug, error, info, trace, warn};
@ -75,31 +76,7 @@ impl QuicForwardProxy {
let exit_signal = Arc::new(AtomicBool::new(false));
let endpoint = self.endpoint.clone();
let quic_proxy: AnyhowJoinHandle = tokio::spawn(async move {
info!("TPU Quic Proxy server listening on {}", endpoint.local_addr()?);
while let Some(connecting) = endpoint.accept().await {
let exit_signal = exit_signal.clone();
let validator_identity_copy = self.validator_identity.clone();
let tpu_quic_client = self.tpu_quic_client.clone();
tokio::spawn(async move {
let connection = connecting.await.context("handshake").unwrap();
match accept_client_connection(connection, tpu_quic_client, exit_signal, validator_identity_copy)
.await {
Ok(()) => {}
Err(err) => {
error!("setup connection failed: {reason}", reason = err);
}
}
});
}
bail!("TPU Quic Proxy server stopped");
});
let quic_proxy: AnyhowJoinHandle = tokio::spawn(self.listen(exit_signal, endpoint));
tokio::select! {
res = quic_proxy => {
@ -108,6 +85,27 @@ impl QuicForwardProxy {
}
}
async fn listen(mut self, exit_signal: Arc<AtomicBool>, endpoint: Endpoint) -> anyhow::Result<()> {
info!("TPU Quic Proxy server listening on {}", endpoint.local_addr()?);
while let Some(connecting) = endpoint.accept().await {
let exit_signal = exit_signal.clone();
let validator_identity_copy = self.validator_identity.clone();
let tpu_quic_client = self.tpu_quic_client.clone();
tokio::spawn(async move {
let connection = connecting.await.context("handshake").unwrap();
match accept_client_connection(connection, tpu_quic_client, exit_signal, validator_identity_copy)
.await {
Ok(()) => {}
Err(err) => {
error!("setup connection failed: {reason}", reason = err);
}
}
});
}
bail!("TPU Quic Proxy server stopped");
}
}

View File

@ -10,7 +10,7 @@ use solana_sdk::transaction::VersionedTransaction;
/// lite-rpc to proxy wire format
/// compat info: non-public format ATM
/// initial version
const FORMAT_VERSION1: u16 = 2301;
pub const FORMAT_VERSION1: u16 = 2301;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuForwardingRequest {
@ -50,7 +50,7 @@ impl TpuForwardingRequest {
.context("deserialize proxy request")
.unwrap();
assert_eq!(request.format_version, 2301);
assert_eq!(request.format_version, FORMAT_VERSION1);
request
}