2023-07-20 08:54:26 -07:00
|
|
|
use std::collections::HashMap;
|
2023-09-08 06:13:20 -07:00
|
|
|
use std::net::{SocketAddr, UdpSocket};
|
2023-08-10 04:04:30 -07:00
|
|
|
use std::sync::atomic::AtomicBool;
|
2023-07-20 08:54:26 -07:00
|
|
|
use std::sync::atomic::Ordering::Relaxed;
|
2023-08-02 06:22:59 -07:00
|
|
|
use std::sync::Arc;
|
2023-07-29 02:26:21 -07:00
|
|
|
|
2023-08-02 06:22:59 -07:00
|
|
|
use anyhow::bail;
|
2023-09-20 07:57:01 -07:00
|
|
|
use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo;
|
2023-07-20 08:54:26 -07:00
|
|
|
use std::time::Duration;
|
2023-07-29 02:26:21 -07:00
|
|
|
|
2023-07-20 08:54:26 -07:00
|
|
|
use itertools::Itertools;
|
2023-08-10 01:43:03 -07:00
|
|
|
use log::{debug, info, trace, warn};
|
2023-08-09 08:10:08 -07:00
|
|
|
use quinn::{ClientConfig, Endpoint, EndpointConfig, TokioRuntime, TransportConfig, VarInt};
|
2023-07-20 08:54:26 -07:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
2023-07-29 02:26:21 -07:00
|
|
|
|
2023-08-10 01:31:31 -07:00
|
|
|
use tokio::sync::broadcast::error::TryRecvError;
|
2023-08-10 01:43:03 -07:00
|
|
|
use tokio::sync::{broadcast::Receiver, RwLock};
|
2023-07-29 02:26:21 -07:00
|
|
|
|
2024-01-16 02:30:40 -08:00
|
|
|
use crate::quic_connection_utils::{QuicConnectionParameters, SkipServerVerification};
|
|
|
|
use solana_lite_rpc_core::network_utils::apply_gso_workaround;
|
2023-09-13 08:15:28 -07:00
|
|
|
use solana_lite_rpc_core::structures::proxy_request_format::{TpuForwardingRequest, TxData};
|
2023-07-29 02:26:21 -07:00
|
|
|
|
2023-07-28 05:56:58 -07:00
|
|
|
use crate::tpu_utils::quinn_auto_reconnect::AutoReconnect;
|
2023-07-20 08:54:26 -07:00
|
|
|
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
|
|
pub struct TpuNode {
|
|
|
|
pub tpu_identity: Pubkey,
|
|
|
|
pub tpu_address: SocketAddr,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct QuicProxyConnectionManager {
|
|
|
|
endpoint: Endpoint,
|
|
|
|
simple_thread_started: AtomicBool,
|
|
|
|
proxy_addr: SocketAddr,
|
2023-08-02 06:22:59 -07:00
|
|
|
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>,
|
2023-08-10 04:50:47 -07:00
|
|
|
exit_signal: Arc<AtomicBool>,
|
2023-07-20 08:54:26 -07:00
|
|
|
}
|
|
|
|
|
2023-07-28 13:23:57 -07:00
|
|
|
const CHUNK_SIZE_PER_STREAM: usize = 20;
|
2023-07-28 06:30:55 -07:00
|
|
|
|
2023-07-20 08:54:26 -07:00
|
|
|
impl QuicProxyConnectionManager {
|
|
|
|
pub async fn new(
|
|
|
|
certificate: rustls::Certificate,
|
|
|
|
key: rustls::PrivateKey,
|
|
|
|
proxy_addr: SocketAddr,
|
|
|
|
) -> Self {
|
2023-07-26 14:33:49 -07:00
|
|
|
info!("Configure Quic proxy connection manager to {}", proxy_addr);
|
2023-08-02 06:22:59 -07:00
|
|
|
let endpoint = Self::create_proxy_client_endpoint(certificate, key);
|
2023-07-20 08:54:26 -07:00
|
|
|
|
|
|
|
Self {
|
|
|
|
endpoint,
|
|
|
|
simple_thread_started: AtomicBool::from(false),
|
|
|
|
proxy_addr,
|
|
|
|
current_tpu_nodes: Arc::new(RwLock::new(vec![])),
|
2023-08-10 04:50:47 -07:00
|
|
|
exit_signal: Arc::new(AtomicBool::from(false)),
|
2023-07-20 08:54:26 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-10 04:50:47 -07:00
|
|
|
pub fn signal_shutdown(&self) {
|
|
|
|
self.exit_signal.store(true, Relaxed);
|
|
|
|
}
|
|
|
|
|
2023-07-20 08:54:26 -07:00
|
|
|
pub async fn update_connection(
|
|
|
|
&self,
|
2023-09-20 07:57:01 -07:00
|
|
|
broadcast_receiver: Receiver<SentTransactionInfo>,
|
2023-07-20 08:54:26 -07:00
|
|
|
// for duration of this slot these tpu nodes will receive the transactions
|
|
|
|
connections_to_keep: HashMap<Pubkey, SocketAddr>,
|
2023-08-02 05:15:39 -07:00
|
|
|
connection_parameters: QuicConnectionParameters,
|
2023-07-20 08:54:26 -07:00
|
|
|
) {
|
2023-08-02 06:22:59 -07:00
|
|
|
debug!(
|
|
|
|
"reconfigure quic proxy connection (# of tpu nodes: {})",
|
|
|
|
connections_to_keep.len()
|
|
|
|
);
|
2023-07-20 08:54:26 -07:00
|
|
|
|
|
|
|
{
|
2023-08-02 06:22:59 -07:00
|
|
|
let list_of_nodes = connections_to_keep
|
|
|
|
.iter()
|
|
|
|
.map(|(identity, tpu_address)| TpuNode {
|
|
|
|
tpu_identity: *identity,
|
|
|
|
tpu_address: *tpu_address,
|
|
|
|
})
|
|
|
|
.collect_vec();
|
2023-07-20 08:54:26 -07:00
|
|
|
|
2023-07-25 00:04:09 -07:00
|
|
|
let mut lock = self.current_tpu_nodes.write().await;
|
2023-07-20 08:54:26 -07:00
|
|
|
*lock = list_of_nodes;
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.simple_thread_started.load(Relaxed) {
|
|
|
|
// already started
|
|
|
|
return;
|
|
|
|
}
|
2023-07-25 00:04:09 -07:00
|
|
|
self.simple_thread_started.store(true, Relaxed);
|
2023-07-20 08:54:26 -07:00
|
|
|
|
|
|
|
info!("Starting very simple proxy thread");
|
|
|
|
|
2023-08-10 04:50:47 -07:00
|
|
|
let exit_signal = self.exit_signal.clone();
|
2023-07-20 08:54:26 -07:00
|
|
|
tokio::spawn(Self::read_transactions_and_broadcast(
|
2023-08-10 01:31:31 -07:00
|
|
|
broadcast_receiver,
|
2023-07-20 08:54:26 -07:00
|
|
|
self.current_tpu_nodes.clone(),
|
|
|
|
self.proxy_addr,
|
|
|
|
self.endpoint.clone(),
|
2023-07-28 05:52:22 -07:00
|
|
|
exit_signal,
|
2023-08-02 05:15:39 -07:00
|
|
|
connection_parameters,
|
2023-07-20 08:54:26 -07:00
|
|
|
));
|
|
|
|
}
|
|
|
|
|
2023-08-02 06:22:59 -07:00
|
|
|
fn create_proxy_client_endpoint(
|
|
|
|
certificate: rustls::Certificate,
|
|
|
|
key: rustls::PrivateKey,
|
|
|
|
) -> Endpoint {
|
2023-07-20 09:07:50 -07:00
|
|
|
const ALPN_TPU_FORWARDPROXY_PROTOCOL_ID: &[u8] = b"solana-tpu-forward-proxy";
|
|
|
|
|
|
|
|
let mut endpoint = {
|
2023-09-08 06:13:20 -07:00
|
|
|
let client_socket = UdpSocket::bind("[::]:0").unwrap();
|
2023-07-20 09:07:50 -07:00
|
|
|
let config = EndpointConfig::default();
|
2024-01-15 09:43:48 -08:00
|
|
|
Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
|
2023-07-20 09:07:50 -07:00
|
|
|
.expect("create_endpoint quinn::Endpoint::new")
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut crypto = rustls::ClientConfig::builder()
|
|
|
|
.with_safe_defaults()
|
|
|
|
.with_custom_certificate_verifier(SkipServerVerification::new())
|
2024-01-15 09:43:48 -08:00
|
|
|
.with_client_auth_cert(vec![certificate], key)
|
2023-07-20 09:07:50 -07:00
|
|
|
.expect("Failed to set QUIC client certificates");
|
|
|
|
|
|
|
|
crypto.enable_early_data = true;
|
|
|
|
crypto.alpn_protocols = vec![ALPN_TPU_FORWARDPROXY_PROTOCOL_ID.to_vec()];
|
|
|
|
|
|
|
|
let mut config = ClientConfig::new(Arc::new(crypto));
|
|
|
|
|
2023-07-26 02:10:12 -07:00
|
|
|
// note: this config must be aligned with quic-proxy's server config
|
|
|
|
let mut transport_config = TransportConfig::default();
|
|
|
|
// no remotely-initiated streams required
|
|
|
|
transport_config.max_concurrent_uni_streams(VarInt::from_u32(0));
|
|
|
|
transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0));
|
|
|
|
let timeout = Duration::from_secs(10).try_into().unwrap();
|
2023-07-20 09:07:50 -07:00
|
|
|
transport_config.max_idle_timeout(Some(timeout));
|
|
|
|
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
|
2023-10-09 08:58:43 -07:00
|
|
|
apply_gso_workaround(&mut transport_config);
|
2023-07-20 09:07:50 -07:00
|
|
|
|
2023-07-26 02:10:12 -07:00
|
|
|
config.transport_config(Arc::new(transport_config));
|
2023-07-20 09:07:50 -07:00
|
|
|
endpoint.set_default_client_config(config);
|
|
|
|
|
|
|
|
endpoint
|
|
|
|
}
|
|
|
|
|
2023-08-10 04:50:47 -07:00
|
|
|
// send transactions to quic proxy
|
2023-07-20 08:54:26 -07:00
|
|
|
async fn read_transactions_and_broadcast(
|
2023-09-20 07:57:01 -07:00
|
|
|
mut transaction_receiver: Receiver<SentTransactionInfo>,
|
2023-07-20 08:54:26 -07:00
|
|
|
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>,
|
|
|
|
proxy_addr: SocketAddr,
|
|
|
|
endpoint: Endpoint,
|
2023-07-28 05:52:22 -07:00
|
|
|
exit_signal: Arc<AtomicBool>,
|
2023-08-02 05:15:39 -07:00
|
|
|
connection_parameters: QuicConnectionParameters,
|
2023-07-20 08:54:26 -07:00
|
|
|
) {
|
2023-07-28 05:56:58 -07:00
|
|
|
let auto_connection = AutoReconnect::new(endpoint, proxy_addr);
|
2023-07-25 00:04:09 -07:00
|
|
|
|
2023-07-20 08:54:26 -07:00
|
|
|
loop {
|
2023-07-28 05:52:22 -07:00
|
|
|
// exit signal set
|
2023-08-10 01:44:24 -07:00
|
|
|
if exit_signal.load(Relaxed) {
|
2023-08-10 04:50:47 -07:00
|
|
|
warn!("Caught exit signal - stopping sending transactions to quic proxy");
|
2023-07-28 05:52:22 -07:00
|
|
|
break;
|
|
|
|
}
|
2023-07-20 08:54:26 -07:00
|
|
|
|
|
|
|
tokio::select! {
|
2023-08-02 06:22:59 -07:00
|
|
|
tx = transaction_receiver.recv() => {
|
|
|
|
|
2023-08-10 01:43:03 -07:00
|
|
|
let first_tx: TxData = match tx {
|
2023-09-20 07:57:01 -07:00
|
|
|
Ok(SentTransactionInfo{
|
|
|
|
signature,
|
|
|
|
transaction,
|
|
|
|
..
|
|
|
|
}) => {
|
|
|
|
TxData::new(signature, transaction)
|
2023-08-02 06:22:59 -07:00
|
|
|
},
|
|
|
|
Err(e) => {
|
2023-08-10 01:31:31 -07:00
|
|
|
warn!("Broadcast channel error (close) on recv: {} - aborting", e);
|
2023-08-07 13:19:14 -07:00
|
|
|
return;
|
2023-07-25 03:00:39 -07:00
|
|
|
}
|
2023-08-02 06:22:59 -07:00
|
|
|
};
|
2023-07-20 08:54:26 -07:00
|
|
|
|
2023-08-10 01:43:03 -07:00
|
|
|
let mut txs: Vec<TxData> = vec![first_tx];
|
2023-08-02 06:22:59 -07:00
|
|
|
for _ in 1..connection_parameters.number_of_transactions_per_unistream {
|
2023-08-10 01:31:31 -07:00
|
|
|
match transaction_receiver.try_recv() {
|
2023-09-20 07:57:01 -07:00
|
|
|
Ok(SentTransactionInfo{
|
|
|
|
signature,
|
|
|
|
transaction,
|
|
|
|
..
|
|
|
|
}) => {
|
|
|
|
txs.push(TxData::new(signature, transaction));
|
2023-08-10 01:31:31 -07:00
|
|
|
},
|
|
|
|
Err(TryRecvError::Empty) => {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
warn!(
|
|
|
|
"Broadcast channel error (close) on more recv: {} - aborting", e);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
2023-08-02 06:22:59 -07:00
|
|
|
}
|
2023-07-25 00:04:09 -07:00
|
|
|
|
2023-08-02 06:22:59 -07:00
|
|
|
let tpu_fanout_nodes = current_tpu_nodes.read().await.clone();
|
2023-07-20 08:54:26 -07:00
|
|
|
|
2023-08-09 08:29:42 -07:00
|
|
|
if tpu_fanout_nodes.is_empty() {
|
|
|
|
warn!("No tpu nodes to send transactions to - skip");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-08-02 06:22:59 -07:00
|
|
|
trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy",
|
|
|
|
txs.len(), tpu_fanout_nodes.len());
|
2023-07-20 08:54:26 -07:00
|
|
|
|
2023-08-09 08:00:17 -07:00
|
|
|
let send_result =
|
2023-08-02 06:22:59 -07:00
|
|
|
Self::send_copy_of_txs_to_quicproxy(
|
|
|
|
&txs, &auto_connection,
|
2023-08-07 13:19:14 -07:00
|
|
|
proxy_addr,
|
2023-08-09 08:00:17 -07:00
|
|
|
tpu_fanout_nodes)
|
|
|
|
.await;
|
2023-08-10 01:31:31 -07:00
|
|
|
if let Err(e) = send_result {
|
|
|
|
warn!("Failed to send copy of txs to quic proxy - skip (error {})", e);
|
2023-08-02 06:22:59 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
},
|
2023-08-10 01:44:24 -07:00
|
|
|
}
|
2023-08-09 08:00:17 -07:00
|
|
|
} // -- loop
|
2023-07-20 08:54:26 -07:00
|
|
|
}
|
|
|
|
|
2023-08-02 06:22:59 -07:00
|
|
|
async fn send_copy_of_txs_to_quicproxy(
|
2023-08-10 01:43:03 -07:00
|
|
|
txs: &[TxData],
|
2023-08-02 06:22:59 -07:00
|
|
|
auto_connection: &AutoReconnect,
|
|
|
|
_proxy_address: SocketAddr,
|
2023-08-09 08:00:17 -07:00
|
|
|
tpu_fanout_nodes: Vec<TpuNode>,
|
2023-08-02 06:22:59 -07:00
|
|
|
) -> anyhow::Result<()> {
|
2023-08-09 08:10:08 -07:00
|
|
|
let tpu_data = tpu_fanout_nodes
|
|
|
|
.iter()
|
2023-08-09 08:00:17 -07:00
|
|
|
.map(|tpu| (tpu.tpu_address, tpu.tpu_identity))
|
|
|
|
.collect_vec();
|
|
|
|
|
2023-07-28 13:23:57 -07:00
|
|
|
for chunk in txs.chunks(CHUNK_SIZE_PER_STREAM) {
|
2023-08-10 01:43:03 -07:00
|
|
|
let forwarding_request = TpuForwardingRequest::new(&tpu_data, chunk);
|
2023-07-28 06:30:55 -07:00
|
|
|
debug!("forwarding_request: {}", forwarding_request);
|
2023-07-28 05:56:58 -07:00
|
|
|
|
2023-08-02 06:22:59 -07:00
|
|
|
let proxy_request_raw =
|
|
|
|
bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
|
2023-07-20 08:54:26 -07:00
|
|
|
|
2023-08-09 04:29:11 -07:00
|
|
|
let send_result = auto_connection.send_uni(&proxy_request_raw).await;
|
2023-07-28 06:30:55 -07:00
|
|
|
|
|
|
|
match send_result {
|
|
|
|
Ok(()) => {
|
|
|
|
debug!("Successfully sent {} txs to quic proxy", txs.len());
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
bail!("Failed to send data to quic proxy: {:?}", e);
|
|
|
|
}
|
2023-07-20 08:54:26 -07:00
|
|
|
}
|
2023-07-28 06:30:55 -07:00
|
|
|
} // -- one chunk
|
|
|
|
|
2023-07-20 08:54:26 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|