works: split listener+forwarder

This commit is contained in:
GroovieGermanikus 2023-07-31 09:58:17 +02:00
parent 20c2710202
commit aea5657135
14 changed files with 345 additions and 280 deletions

View File

@ -39,9 +39,9 @@ use tokio::time::{sleep};
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::fmt::format::FmtSpan;
use solana_lite_rpc_quic_forward_proxy::outbound::validator_identity::ValidatorIdentity;
use solana_lite_rpc_quic_forward_proxy::proxy::QuicForwardProxy; use solana_lite_rpc_quic_forward_proxy::proxy::QuicForwardProxy;
use solana_lite_rpc_quic_forward_proxy::tls_config_provicer::SelfSignedTlsConfigProvider; use solana_lite_rpc_quic_forward_proxy::tls_config_provicer::SelfSignedTlsConfigProvider;
use solana_lite_rpc_quic_forward_proxy::validator_identity::ValidatorIdentity;
use solana_lite_rpc_services::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager; use solana_lite_rpc_services::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
@ -680,8 +680,8 @@ async fn start_quic_proxy(proxy_listen_addr: SocketAddr) -> anyhow::Result<()> {
let _tls_configuration = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost(); let _tls_configuration = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let random_unstaked_validator_identity = ValidatorIdentity::new(None); let random_unstaked_validator_identity = ValidatorIdentity::new(None);
let tls_config = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost(); let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost());
let proxy_service = QuicForwardProxy::new(proxy_listen_addr, &tls_config, random_unstaked_validator_identity) let proxy_service = QuicForwardProxy::new(proxy_listen_addr, tls_config, random_unstaked_validator_identity)
.await? .await?
.start_services(); .start_services();

BIN
quic-forward-proxy/src/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -8,7 +8,7 @@ pub struct Args {
#[arg(short = 'k', long, default_value_t = String::new())] #[arg(short = 'k', long, default_value_t = String::new())]
pub identity_keypair: String, pub identity_keypair: String,
#[arg(short = 'l', long)] #[arg(short = 'l', long)]
pub proxy_rpc_addr: String, pub proxy_listen_addr: String,
} }
// note this is duplicated from lite-rpc module // note this is duplicated from lite-rpc module

View File

@ -1,22 +0,0 @@
use solana_streamer::nonblocking::quic::ConnectionPeerType;
#[derive(Debug, Copy, Clone)]
pub struct IdentityStakes {
pub peer_type: ConnectionPeerType,
pub stakes: u64,
pub total_stakes: u64,
pub min_stakes: u64,
pub max_stakes: u64,
}
impl Default for IdentityStakes {
fn default() -> Self {
IdentityStakes {
peer_type: ConnectionPeerType::Unstaked,
stakes: 0,
total_stakes: 0,
max_stakes: 0,
min_stakes: 0,
}
}
}

View File

@ -0,0 +1 @@
pub(crate) mod proxy_listener;

View File

@ -0,0 +1,152 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use anyhow::{anyhow, bail, Context};
use log::{debug, error, info, trace};
use quinn::{Connection, Endpoint, ServerConfig, VarInt};
use solana_sdk::packet::PACKET_DATA_SIZE;
use tokio::sync::mpsc::Sender;
use crate::inbound::proxy_listener;
use crate::outbound::tx_forward::tx_forwarder;
use crate::outbound::validator_identity::ValidatorIdentity;
use crate::proxy_request_format::TpuForwardingRequest;
use crate::share::ForwardPacket;
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
use crate::util::AnyhowJoinHandle;
// TODO tweak this value - solana server sets 256
// setting this to "1" did not make a difference!
const MAX_CONCURRENT_UNI_STREAMS: u32 = 24;
pub struct ProxyListener {
tls_config: Arc<SelfSignedTlsConfigProvider>,
proxy_listener_addr: SocketAddr,
}
impl ProxyListener {
pub fn new(
proxy_listener_addr: SocketAddr,
tls_config: Arc<SelfSignedTlsConfigProvider>) -> Self {
Self { proxy_listener_addr, tls_config }
}
pub async fn listen(&self, exit_signal: Arc<AtomicBool>, forwarder_channel: Sender<ForwardPacket>) -> anyhow::Result<()> {
info!("TPU Quic Proxy server listening on {}", self.proxy_listener_addr);
let endpoint = Self::new_proxy_listen_endpoint(&self.tls_config, self.proxy_listener_addr).await;
while let Some(connecting) = endpoint.accept().await {
let exit_signal = exit_signal.clone();
let forwarder_channel_copy = forwarder_channel.clone();
tokio::spawn(async move {
let connection = connecting.await.context("handshake").unwrap();
match Self::accept_client_connection(connection, forwarder_channel_copy,
exit_signal)
.await {
Ok(()) => {}
Err(err) => {
error!("setup connection failed: {reason}", reason = err);
}
}
});
}
bail!("TPU Quic Proxy server stopped");
}
async fn new_proxy_listen_endpoint(tls_config: &SelfSignedTlsConfigProvider, proxy_listener_addr: SocketAddr) -> Endpoint {
let server_tls_config = tls_config.get_server_tls_crypto_config();
let mut quinn_server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
// note: this config must be aligned with lite-rpc's client config
let transport_config = Arc::get_mut(&mut quinn_server_config.transport).unwrap();
// TODO experiment with this value
transport_config.max_concurrent_uni_streams(VarInt::from_u32(MAX_CONCURRENT_UNI_STREAMS));
// no bidi streams used
transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0));
let timeout = Duration::from_secs(10).try_into().unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
transport_config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
transport_config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
Endpoint::server(quinn_server_config, proxy_listener_addr).unwrap()
}
// TODO use interface abstraction for connection_per_tpunode
#[tracing::instrument(skip_all, level = "debug")]
async fn accept_client_connection(client_connection: Connection, forwarder_channel: Sender<ForwardPacket>,
exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
debug!("inbound connection established, client {}", client_connection.remote_address());
loop {
let maybe_stream = client_connection.accept_uni().await;
let result = match maybe_stream {
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
debug!("connection closed by client - reason: {:?}", reason);
if reason.error_code != VarInt::from_u32(0) {
return Err(anyhow!("connection closed by client with unexpected reason: {:?}", reason));
}
debug!("connection gracefully closed by client");
return Ok(());
},
Err(e) => {
error!("failed to accept stream: {}", e);
return Err(anyhow::Error::msg("error accepting stream"));
}
Ok(recv_stream) => {
let _exit_signal_copy = exit_signal.clone();
let forwarder_channel_copy = forwarder_channel.clone();
tokio::spawn(async move {
let raw_request = recv_stream.read_to_end(10_000_000).await // TODO extract to const
.unwrap();
trace!("read proxy_request {} bytes", raw_request.len());
let proxy_request = TpuForwardingRequest::deserialize_from_raw_request(&raw_request);
trace!("proxy request details: {}", proxy_request);
let _tpu_identity = proxy_request.get_identity_tpunode();
let tpu_address = proxy_request.get_tpu_socket_addr();
let txs = proxy_request.get_transactions();
debug!("enqueue transaction batch of size {} to address {}", txs.len(), tpu_address);
// tpu_quic_client_copy.send_txs_to_tpu(tpu_address, &txs, exit_signal_copy).await;
forwarder_channel_copy.send(ForwardPacket { transactions: txs, tpu_address }).await.unwrap();
// debug!("connection stats (proxy inbound): {}", connection_stats(&client_connection));
});
Ok(())
},
}; // -- result
if let Err(e) = result {
return Err(e);
}
} // -- loop
}
}

View File

@ -2,12 +2,12 @@ pub mod quic_util;
pub mod tls_config_provicer; pub mod tls_config_provicer;
pub mod proxy; pub mod proxy;
pub mod proxy_request_format; pub mod proxy_request_format;
pub mod tpu_quic_client;
pub mod cli; pub mod cli;
pub mod test_client; pub mod test_client;
pub mod validator_identity;
mod util; mod util;
mod tx_store; mod tx_store;
mod identity_stakes;
mod quic_connection_utils; mod quic_connection_utils;
mod quinn_auto_reconnect; mod quinn_auto_reconnect;
pub mod outbound;
mod inbound;
mod share;

View File

@ -8,22 +8,22 @@ use crate::cli::{Args, get_identity_keypair};
use crate::proxy::QuicForwardProxy; use crate::proxy::QuicForwardProxy;
pub use tls_config_provicer::SelfSignedTlsConfigProvider; pub use tls_config_provicer::SelfSignedTlsConfigProvider;
use crate::validator_identity::ValidatorIdentity; use crate::outbound::validator_identity::ValidatorIdentity;
pub mod quic_util; pub mod quic_util;
pub mod tls_config_provicer; pub mod tls_config_provicer;
pub mod proxy; pub mod proxy;
pub mod proxy_request_format; pub mod proxy_request_format;
pub mod tpu_quic_client;
pub mod cli; pub mod cli;
pub mod test_client; pub mod test_client;
mod util; mod util;
mod tx_store; mod tx_store;
mod identity_stakes;
mod quic_connection_utils; mod quic_connection_utils;
mod quinn_auto_reconnect; mod quinn_auto_reconnect;
mod validator_identity; mod outbound;
mod inbound;
mod share;
#[tokio::main(flavor = "multi_thread", worker_threads = 16)] #[tokio::main(flavor = "multi_thread", worker_threads = 16)]
@ -32,7 +32,7 @@ pub async fn main() -> anyhow::Result<()> {
let Args { let Args {
identity_keypair, identity_keypair,
proxy_rpc_addr, proxy_listen_addr: proxy_rpc_addr,
} = Args::parse(); } = Args::parse();
dotenv().ok(); dotenv().ok();

View File

@ -0,0 +1,3 @@
mod tpu_quic_client;
pub mod validator_identity;
pub mod tx_forward;

View File

@ -24,11 +24,10 @@ use solana_sdk::transaction::VersionedTransaction;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::outbound::validator_identity::ValidatorIdentity;
use crate::quic_connection_utils::{connection_stats, QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils}; use crate::quic_connection_utils::{connection_stats, QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils};
use crate::quinn_auto_reconnect::AutoReconnect; use crate::quinn_auto_reconnect::AutoReconnect;
use crate::validator_identity::ValidatorIdentity;
const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
pub const CONNECTION_RETRY_COUNT: usize = 10; pub const CONNECTION_RETRY_COUNT: usize = 10;
@ -151,7 +150,7 @@ impl TpuQuicClient {
/// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU /// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU
// note: ATM the provided identity might or might not be a valid validator keypair // note: ATM the provided identity might or might not be a valid validator keypair
pub async fn new_with_validator_identity(validator_identity: ValidatorIdentity) -> TpuQuicClient { pub async fn new_with_validator_identity_delme(validator_identity: ValidatorIdentity) -> TpuQuicClient {
info!("Setup TPU Quic stable connection with validator identity {} ...", validator_identity); info!("Setup TPU Quic stable connection with validator identity {} ...", validator_identity);
let (certificate, key) = new_self_signed_tls_certificate( let (certificate, key) = new_self_signed_tls_certificate(
&validator_identity.get_keypair_for_tls(), &validator_identity.get_keypair_for_tls(),
@ -338,4 +337,4 @@ pub async fn send_txs_to_tpu_static(
} }
} }

View File

@ -0,0 +1,137 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use log::{debug, info, warn};
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use fan::tokio::mpsc::FanOut;
use std::time::Duration;
use quinn::Endpoint;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::time::timeout;
use crate::outbound::tpu_quic_client::{send_txs_to_tpu_static, TpuQuicClient};
use crate::outbound::validator_identity::ValidatorIdentity;
use crate::quic_connection_utils::QuicConnectionUtils;
use crate::quinn_auto_reconnect::AutoReconnect;
use crate::share::ForwardPacket;
// takes transactions from upstream clients and forwards them to the TPU
pub async fn tx_forwarder(validator_identity: ValidatorIdentity, mut transaction_channel: Receiver<ForwardPacket>, exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
info!("TPU Quic forwarder started");
let endpoint = new_endpoint_with_validator_identity(validator_identity).await;
let mut agents: HashMap<SocketAddr, FanOut<ForwardPacket>> = HashMap::new();
loop {
// TODO add exit
let forward_packet = transaction_channel.recv().await.expect("channel closed unexpectedly");
let tpu_address = forward_packet.tpu_address;
if !agents.contains_key(&tpu_address) {
// TODO cleanup agent after a while of iactivity
let mut senders = Vec::new();
for _i in 0..4 {
let (sender, mut receiver) = channel::<ForwardPacket>(100000);
senders.push(sender);
let exit_signal = exit_signal.clone();
let endpoint_copy = endpoint.clone();
tokio::spawn(async move {
debug!("Start Quic forwarder agent for TPU {}", tpu_address);
// TODO pass+check the tpu_address
// TODO connect
// TODO consume queue
// TODO exit signal
let auto_connection = AutoReconnect::new(endpoint_copy, tpu_address);
// let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake");
loop {
let _exit_signal = exit_signal.clone();
loop {
let packet = receiver.recv().await.unwrap();
assert_eq!(packet.tpu_address, tpu_address, "routing error");
let mut transactions_batch = packet.transactions;
let mut batch_size = 1;
while let Ok(more) = receiver.try_recv() {
transactions_batch.extend(more.transactions);
batch_size += 1;
}
if batch_size > 1 {
debug!("encountered batch of size {}", batch_size);
}
debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address);
// TODo move send_txs_to_tpu_static to tpu_quic_client
let result = timeout(Duration::from_millis(500),
send_txs_to_tpu_static(&auto_connection, &transactions_batch)).await;
// .expect("timeout sending data to TPU node");
if result.is_err() {
warn!("send_txs_to_tpu_static result {:?} - loop over errors", result);
} else {
debug!("send_txs_to_tpu_static sent {}", transactions_batch.len());
}
}
}
});
}
let fanout = FanOut::new(senders);
agents.insert(tpu_address, fanout);
} // -- new agent
let agent_channel = agents.get(&tpu_address).unwrap();
agent_channel.send(forward_packet).await.unwrap();
// let mut batch_size = 1;
// while let Ok(more) = transaction_channel.try_recv() {
// agent_channel.send(more).await.unwrap();
// batch_size += 1;
// }
// if batch_size > 1 {
// debug!("encountered batch of size {}", batch_size);
// }
// check if the tpu has already a task+queue running, if not start one, sort+queue packets by tpu address
// maintain the health of a TPU connection, debounce errors; if failing, drop the respective messages
// let exit_signal_copy = exit_signal.clone();
// debug!("send transaction batch of size {} to address {}", forward_packet.transactions.len(), forward_packet.tpu_address);
// // TODO: this will block/timeout if the TPU is not available
// timeout(Duration::from_millis(500),
// tpu_quic_client_copy.send_txs_to_tpu(tpu_address, &forward_packet.transactions, exit_signal_copy)).await;
// tpu_quic_client_copy.send_txs_to_tpu(forward_packet.tpu_address, &forward_packet.transactions, exit_signal_copy).await;
} // -- loop over transactions from ustream channels
// not reachable
}
/// takes a validator identity and creates a new QUIC client; appears as staked peer to TPU
// note: ATM the provided identity might or might not be a valid validator keypair
async fn new_endpoint_with_validator_identity(validator_identity: ValidatorIdentity) -> Endpoint {
info!("Setup TPU Quic stable connection with validator identity {} ...", validator_identity);
let (certificate, key) = new_self_signed_tls_certificate(
&validator_identity.get_keypair_for_tls(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)
.expect("Failed to initialize QUIC connection certificates");
let endpoint_outbound = QuicConnectionUtils::create_tpu_client_endpoint(certificate.clone(), key.clone());
endpoint_outbound
}

View File

@ -1,19 +1,14 @@
use std::collections::HashMap; use std::net::SocketAddr;
use std::net::{SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool}; use std::sync::atomic::AtomicBool;
use std::time::Duration; use std::time::Duration;
use anyhow::{anyhow, bail, Context}; use anyhow::{anyhow, bail, Context};
use fan::tokio::mpsc::FanOut; use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use quinn::{Connection, Endpoint, ServerConfig, VarInt}; use quinn::{Connection, Endpoint, ServerConfig, VarInt};
@ -22,69 +17,36 @@ use quinn::{Connection, Endpoint, ServerConfig, VarInt};
use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::packet::PACKET_DATA_SIZE;
use tokio::sync::mpsc::Sender;
use crate::inbound::proxy_listener;
use crate::outbound::tx_forward::tx_forwarder;
use crate::outbound::validator_identity::ValidatorIdentity;
use solana_sdk::transaction::VersionedTransaction;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::timeout;
use crate::proxy_request_format::TpuForwardingRequest; use crate::proxy_request_format::TpuForwardingRequest;
use crate::share::ForwardPacket;
use crate::quinn_auto_reconnect::AutoReconnect;
use crate::tpu_quic_client::{send_txs_to_tpu_static, TpuQuicClient};
use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider}; use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProvider};
use crate::util::AnyhowJoinHandle; use crate::util::AnyhowJoinHandle;
use crate::validator_identity::ValidatorIdentity;
// TODO tweak this value - solana server sets 256
// setting this to "1" did not make a difference!
const MAX_CONCURRENT_UNI_STREAMS: u32 = 24;
pub struct QuicForwardProxy { pub struct QuicForwardProxy {
endpoint: Endpoint, // endpoint: Endpoint,
// validator_identity: ValidatorIdentity, validator_identity: ValidatorIdentity,
tpu_quic_client: TpuQuicClient, tls_config: Arc<SelfSignedTlsConfigProvider>,
} pub proxy_listener_addr: SocketAddr,
/// internal structure with transactions and target TPU
#[derive(Debug)]
struct ForwardPacket {
pub transactions: Vec<VersionedTransaction>,
pub tpu_address: SocketAddr,
} }
impl QuicForwardProxy { impl QuicForwardProxy {
pub async fn new( pub async fn new(
proxy_listener_addr: SocketAddr, proxy_listener_addr: SocketAddr,
tls_config: &SelfSignedTlsConfigProvider, tls_config: Arc<SelfSignedTlsConfigProvider>,
validator_identity: ValidatorIdentity) -> anyhow::Result<Self> { validator_identity: ValidatorIdentity) -> anyhow::Result<Self> {
let server_tls_config = tls_config.get_server_tls_crypto_config();
let mut quinn_server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
// note: this config must be aligned with lite-rpc's client config
let transport_config = Arc::get_mut(&mut quinn_server_config.transport).unwrap();
// TODO experiment with this value
transport_config.max_concurrent_uni_streams(VarInt::from_u32(MAX_CONCURRENT_UNI_STREAMS));
// no bidi streams used
transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0));
let timeout = Duration::from_secs(10).try_into().unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
transport_config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
transport_config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
let endpoint = Endpoint::server(quinn_server_config, proxy_listener_addr).unwrap();
info!("Quic proxy uses validator identity {}", validator_identity); info!("Quic proxy uses validator identity {}", validator_identity);
let tpu_quic_client = Ok(Self { proxy_listener_addr, validator_identity, tls_config })
TpuQuicClient::new_with_validator_identity(validator_identity).await;
Ok(Self { endpoint, tpu_quic_client })
} }
@ -93,13 +55,22 @@ impl QuicForwardProxy {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let exit_signal = Arc::new(AtomicBool::new(false)); let exit_signal = Arc::new(AtomicBool::new(false));
let tpu_quic_client_copy = self.tpu_quic_client.clone();
let endpoint = self.endpoint.clone();
let (forwarder_channel, forward_receiver) = tokio::sync::mpsc::channel(1000); let (forwarder_channel, forward_receiver) = tokio::sync::mpsc::channel(1000);
let quic_proxy: AnyhowJoinHandle = tokio::spawn(self.listen(exit_signal.clone(), endpoint, forwarder_channel)); let proxy_listener = proxy_listener::ProxyListener::new(
self.proxy_listener_addr,
self.tls_config);
let forwarder: AnyhowJoinHandle = tokio::spawn(tx_forwarder(tpu_quic_client_copy, forward_receiver, exit_signal.clone())); let exit_signal_clone = exit_signal.clone();
let quic_proxy = tokio::spawn(async move {
proxy_listener.listen(exit_signal_clone.clone(), forwarder_channel).await;
});
let validator_identity = self.validator_identity.clone();
let exit_signal_clone = exit_signal.clone();
let forwarder: AnyhowJoinHandle = tokio::spawn(tx_forwarder(validator_identity,
forward_receiver, exit_signal_clone));
tokio::select! { tokio::select! {
res = quic_proxy => { res = quic_proxy => {
@ -111,193 +82,8 @@ impl QuicForwardProxy {
} }
} }
async fn listen(self, exit_signal: Arc<AtomicBool>, endpoint: Endpoint, forwarder_channel: Sender<ForwardPacket>) -> anyhow::Result<()> {
info!("TPU Quic Proxy server listening on {}", endpoint.local_addr()?);
while let Some(connecting) = endpoint.accept().await {
let exit_signal = exit_signal.clone();
let tpu_quic_client = self.tpu_quic_client.clone();
let forwarder_channel_copy = forwarder_channel.clone();
tokio::spawn(async move {
let connection = connecting.await.context("handshake").unwrap();
match accept_client_connection(connection, forwarder_channel_copy,
tpu_quic_client, exit_signal)
.await {
Ok(()) => {}
Err(err) => {
error!("setup connection failed: {reason}", reason = err);
}
}
});
}
bail!("TPU Quic Proxy server stopped");
}
} }
// TODO use interface abstraction for connection_per_tpunode
#[tracing::instrument(skip_all, level = "debug")]
async fn accept_client_connection(client_connection: Connection, forwarder_channel: Sender<ForwardPacket>,
tpu_quic_client: TpuQuicClient,
exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
debug!("inbound connection established, client {}", client_connection.remote_address());
loop {
let maybe_stream = client_connection.accept_uni().await;
let result = match maybe_stream {
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
debug!("connection closed by client - reason: {:?}", reason);
if reason.error_code != VarInt::from_u32(0) {
return Err(anyhow!("connection closed by client with unexpected reason: {:?}", reason));
}
debug!("connection gracefully closed by client");
return Ok(());
},
Err(e) => {
error!("failed to accept stream: {}", e);
return Err(anyhow::Error::msg("error accepting stream"));
}
Ok(recv_stream) => {
let _exit_signal_copy = exit_signal.clone();
let _tpu_quic_client_copy = tpu_quic_client.clone();
let forwarder_channel_copy = forwarder_channel.clone();
tokio::spawn(async move {
let raw_request = recv_stream.read_to_end(10_000_000).await // TODO extract to const
.unwrap();
trace!("read proxy_request {} bytes", raw_request.len());
let proxy_request = TpuForwardingRequest::deserialize_from_raw_request(&raw_request);
trace!("proxy request details: {}", proxy_request);
let _tpu_identity = proxy_request.get_identity_tpunode();
let tpu_address = proxy_request.get_tpu_socket_addr();
let txs = proxy_request.get_transactions();
debug!("enqueue transaction batch of size {} to address {}", txs.len(), tpu_address);
// tpu_quic_client_copy.send_txs_to_tpu(tpu_address, &txs, exit_signal_copy).await;
forwarder_channel_copy.send(ForwardPacket { transactions: txs, tpu_address }).await.unwrap();
// debug!("connection stats (proxy inbound): {}", connection_stats(&client_connection));
});
Ok(())
},
}; // -- result
if let Err(e) = result {
return Err(e);
}
} // -- loop
}
// takes transactions from upstream clients and forwards them to the TPU
async fn tx_forwarder(tpu_quic_client: TpuQuicClient, mut transaction_channel: Receiver<ForwardPacket>, exit_signal: Arc<AtomicBool>) -> anyhow::Result<()> {
info!("TPU Quic forwarder started");
let mut agents: HashMap<SocketAddr, FanOut<ForwardPacket>> = HashMap::new();
let endpoint = tpu_quic_client.get_endpoint().clone();
loop {
// TODO add exit
let forward_packet = transaction_channel.recv().await.expect("channel closed unexpectedly");
let tpu_address = forward_packet.tpu_address;
if !agents.contains_key(&tpu_address) {
// TODO cleanup agent after a while of iactivity
let mut senders = Vec::new();
for _i in 0..4 {
let (sender, mut receiver) = channel::<ForwardPacket>(100000);
senders.push(sender);
let exit_signal = exit_signal.clone();
let endpoint_copy = endpoint.clone();
tokio::spawn(async move {
debug!("Start Quic forwarder agent for TPU {}", tpu_address);
// TODO pass+check the tpu_address
// TODO connect
// TODO consume queue
// TODO exit signal
let auto_connection = AutoReconnect::new(endpoint_copy, tpu_address);
// let mut connection = tpu_quic_client_copy.create_connection(tpu_address).await.expect("handshake");
loop {
let _exit_signal = exit_signal.clone();
loop {
let packet = receiver.recv().await.unwrap();
assert_eq!(packet.tpu_address, tpu_address, "routing error");
let mut transactions_batch = packet.transactions;
let mut batch_size = 1;
while let Ok(more) = receiver.try_recv() {
transactions_batch.extend(more.transactions);
batch_size += 1;
}
if batch_size > 1 {
debug!("encountered batch of size {}", batch_size);
}
debug!("forwarding transaction batch of size {} to address {}", transactions_batch.len(), packet.tpu_address);
// TODo move send_txs_to_tpu_static to tpu_quic_client
let result = timeout(Duration::from_millis(500),
send_txs_to_tpu_static(&auto_connection, &transactions_batch)).await;
// .expect("timeout sending data to TPU node");
if result.is_err() {
warn!("send_txs_to_tpu_static result {:?} - loop over errors", result);
} else {
debug!("send_txs_to_tpu_static sent {}", transactions_batch.len());
}
}
}
});
}
let fanout = FanOut::new(senders);
agents.insert(tpu_address, fanout);
} // -- new agent
let agent_channel = agents.get(&tpu_address).unwrap();
agent_channel.send(forward_packet).await.unwrap();
// let mut batch_size = 1;
// while let Ok(more) = transaction_channel.try_recv() {
// agent_channel.send(more).await.unwrap();
// batch_size += 1;
// }
// if batch_size > 1 {
// debug!("encountered batch of size {}", batch_size);
// }
// check if the tpu has already a task+queue running, if not start one, sort+queue packets by tpu address
// maintain the health of a TPU connection, debounce errors; if failing, drop the respective messages
// let exit_signal_copy = exit_signal.clone();
// debug!("send transaction batch of size {} to address {}", forward_packet.transactions.len(), forward_packet.tpu_address);
// // TODO: this will block/timeout if the TPU is not available
// timeout(Duration::from_millis(500),
// tpu_quic_client_copy.send_txs_to_tpu(tpu_address, &forward_packet.transactions, exit_signal_copy)).await;
// tpu_quic_client_copy.send_txs_to_tpu(forward_packet.tpu_address, &forward_packet.transactions, exit_signal_copy).await;
} // -- loop over transactions from ustream channels
// not reachable
}

View File

@ -0,0 +1,9 @@
use solana_sdk::transaction::VersionedTransaction;
use std::net::SocketAddr;
/// internal structure with transactions and target TPU
#[derive(Debug)]
pub struct ForwardPacket {
pub transactions: Vec<VersionedTransaction>,
pub tpu_address: SocketAddr,
}