lite-rpc/services/src/tpu_utils/quic_proxy_connection_manag...

248 lines
8.4 KiB
Rust
Raw Normal View History

2023-07-20 08:54:26 -07:00
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2023-07-20 08:54:26 -07:00
use std::sync::atomic::Ordering::Relaxed;
2023-08-02 06:22:59 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
2023-07-29 02:26:21 -07:00
2023-08-02 06:22:59 -07:00
use anyhow::bail;
2023-07-20 08:54:26 -07:00
use std::time::Duration;
2023-07-29 02:26:21 -07:00
2023-07-20 08:54:26 -07:00
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
2023-08-09 08:10:08 -07:00
use quinn::{ClientConfig, Endpoint, EndpointConfig, TokioRuntime, TransportConfig, VarInt};
2023-07-20 08:54:26 -07:00
use solana_sdk::pubkey::Pubkey;
2023-07-29 02:26:21 -07:00
2023-07-20 08:54:26 -07:00
use solana_sdk::transaction::VersionedTransaction;
use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock};
2023-07-29 02:26:21 -07:00
2023-07-20 08:54:26 -07:00
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
2023-08-02 06:22:59 -07:00
use solana_lite_rpc_core::quic_connection_utils::{
2023-08-02 07:34:38 -07:00
QuicConnectionParameters, SkipServerVerification,
2023-08-02 06:22:59 -07:00
};
2023-07-29 02:26:21 -07:00
2023-07-28 05:56:58 -07:00
use crate::tpu_utils::quinn_auto_reconnect::AutoReconnect;
2023-07-20 08:54:26 -07:00
#[derive(Clone, Copy, Debug)]
pub struct TpuNode {
pub tpu_identity: Pubkey,
pub tpu_address: SocketAddr,
}
pub struct QuicProxyConnectionManager {
endpoint: Endpoint,
simple_thread_started: AtomicBool,
proxy_addr: SocketAddr,
2023-08-02 06:22:59 -07:00
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>,
2023-07-20 08:54:26 -07:00
}
2023-07-28 13:23:57 -07:00
const CHUNK_SIZE_PER_STREAM: usize = 20;
2023-07-20 08:54:26 -07:00
impl QuicProxyConnectionManager {
pub async fn new(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
proxy_addr: SocketAddr,
) -> Self {
2023-07-26 14:33:49 -07:00
info!("Configure Quic proxy connection manager to {}", proxy_addr);
2023-08-02 06:22:59 -07:00
let endpoint = Self::create_proxy_client_endpoint(certificate, key);
2023-07-20 08:54:26 -07:00
Self {
endpoint,
simple_thread_started: AtomicBool::from(false),
proxy_addr,
current_tpu_nodes: Arc::new(RwLock::new(vec![])),
}
}
pub async fn update_connection(
&self,
transaction_sender: Arc<Sender<(String, Vec<u8>)>>,
// for duration of this slot these tpu nodes will receive the transactions
connections_to_keep: HashMap<Pubkey, SocketAddr>,
2023-08-02 05:15:39 -07:00
connection_parameters: QuicConnectionParameters,
2023-07-20 08:54:26 -07:00
) {
2023-08-02 06:22:59 -07:00
debug!(
"reconfigure quic proxy connection (# of tpu nodes: {})",
connections_to_keep.len()
);
2023-07-20 08:54:26 -07:00
{
2023-08-02 06:22:59 -07:00
let list_of_nodes = connections_to_keep
.iter()
.map(|(identity, tpu_address)| TpuNode {
tpu_identity: *identity,
tpu_address: *tpu_address,
})
.collect_vec();
2023-07-20 08:54:26 -07:00
2023-07-25 00:04:09 -07:00
let mut lock = self.current_tpu_nodes.write().await;
2023-07-20 08:54:26 -07:00
*lock = list_of_nodes;
}
if self.simple_thread_started.load(Relaxed) {
// already started
return;
}
2023-07-25 00:04:09 -07:00
self.simple_thread_started.store(true, Relaxed);
2023-07-20 08:54:26 -07:00
info!("Starting very simple proxy thread");
2023-07-29 02:26:21 -07:00
let transaction_receiver = transaction_sender.subscribe();
2023-07-20 08:54:26 -07:00
2023-07-28 05:52:22 -07:00
let exit_signal = Arc::new(AtomicBool::new(false));
2023-07-20 08:54:26 -07:00
tokio::spawn(Self::read_transactions_and_broadcast(
transaction_receiver,
self.current_tpu_nodes.clone(),
self.proxy_addr,
self.endpoint.clone(),
2023-07-28 05:52:22 -07:00
exit_signal,
2023-08-02 05:15:39 -07:00
connection_parameters,
2023-07-20 08:54:26 -07:00
));
}
2023-08-02 06:22:59 -07:00
fn create_proxy_client_endpoint(
certificate: rustls::Certificate,
key: rustls::PrivateKey,
) -> Endpoint {
const ALPN_TPU_FORWARDPROXY_PROTOCOL_ID: &[u8] = b"solana-tpu-forward-proxy";
let mut endpoint = {
let client_socket =
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::UNSPECIFIED), (8000, 10000))
.expect("create_endpoint bind_in_range")
.1;
let config = EndpointConfig::default();
quinn::Endpoint::new(config, None, client_socket, TokioRuntime)
.expect("create_endpoint quinn::Endpoint::new")
};
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_single_cert(vec![certificate], key)
.expect("Failed to set QUIC client certificates");
crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_TPU_FORWARDPROXY_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(crypto));
// note: this config must be aligned with quic-proxy's server config
let mut transport_config = TransportConfig::default();
// no remotely-initiated streams required
transport_config.max_concurrent_uni_streams(VarInt::from_u32(0));
transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0));
let timeout = Duration::from_secs(10).try_into().unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_millis(500)));
config.transport_config(Arc::new(transport_config));
endpoint.set_default_client_config(config);
endpoint
}
2023-07-20 08:54:26 -07:00
async fn read_transactions_and_broadcast(
mut transaction_receiver: Receiver<(String, Vec<u8>)>,
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>,
proxy_addr: SocketAddr,
endpoint: Endpoint,
2023-07-28 05:52:22 -07:00
exit_signal: Arc<AtomicBool>,
2023-08-02 05:15:39 -07:00
connection_parameters: QuicConnectionParameters,
2023-07-20 08:54:26 -07:00
) {
2023-07-28 05:56:58 -07:00
let auto_connection = AutoReconnect::new(endpoint, proxy_addr);
2023-07-25 00:04:09 -07:00
2023-07-20 08:54:26 -07:00
loop {
2023-07-28 05:52:22 -07:00
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
}
2023-07-20 08:54:26 -07:00
tokio::select! {
2023-08-02 06:22:59 -07:00
tx = transaction_receiver.recv() => {
let first_tx: Vec<u8> = match tx {
Ok((_sig, tx)) => {
tx
},
Err(e) => {
error!(
2023-08-07 13:19:14 -07:00
"Broadcast channel error (close) on recv: {} - aborting", e);
return;
2023-07-25 03:00:39 -07:00
}
2023-08-02 06:22:59 -07:00
};
2023-07-20 08:54:26 -07:00
2023-08-02 06:22:59 -07:00
let mut txs = vec![first_tx];
for _ in 1..connection_parameters.number_of_transactions_per_unistream {
if let Ok((_signature, tx)) = transaction_receiver.try_recv() {
txs.push(tx);
}
}
2023-07-25 00:04:09 -07:00
2023-08-02 06:22:59 -07:00
let tpu_fanout_nodes = current_tpu_nodes.read().await.clone();
2023-07-20 08:54:26 -07:00
2023-08-02 06:22:59 -07:00
trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy",
txs.len(), tpu_fanout_nodes.len());
2023-07-20 08:54:26 -07:00
let send_result =
2023-08-02 06:22:59 -07:00
Self::send_copy_of_txs_to_quicproxy(
&txs, &auto_connection,
2023-08-07 13:19:14 -07:00
proxy_addr,
tpu_fanout_nodes)
.await;
if let Err(err) = send_result {
warn!("Failed to send copy of txs to quic proxy - skip (error {})", err);
2023-08-02 06:22:59 -07:00
}
},
};
} // -- loop
2023-07-20 08:54:26 -07:00
}
2023-08-02 06:22:59 -07:00
async fn send_copy_of_txs_to_quicproxy(
raw_tx_batch: &[Vec<u8>],
auto_connection: &AutoReconnect,
_proxy_address: SocketAddr,
tpu_fanout_nodes: Vec<TpuNode>,
2023-08-02 06:22:59 -07:00
) -> anyhow::Result<()> {
2023-07-20 08:54:26 -07:00
let mut txs = vec![];
2023-08-02 06:22:59 -07:00
for raw_tx in raw_tx_batch {
let tx = match bincode::deserialize::<VersionedTransaction>(raw_tx) {
2023-07-20 08:54:26 -07:00
Ok(tx) => tx,
Err(err) => {
bail!(err.to_string());
}
};
txs.push(tx);
}
2023-08-09 08:10:08 -07:00
let tpu_data = tpu_fanout_nodes
.iter()
.map(|tpu| (tpu.tpu_address, tpu.tpu_identity))
.collect_vec();
2023-07-28 13:23:57 -07:00
for chunk in txs.chunks(CHUNK_SIZE_PER_STREAM) {
2023-08-09 08:10:08 -07:00
let forwarding_request = TpuForwardingRequest::new(&tpu_data, chunk.into());
debug!("forwarding_request: {}", forwarding_request);
2023-07-28 05:56:58 -07:00
2023-08-02 06:22:59 -07:00
let proxy_request_raw =
bincode::serialize(&forwarding_request).expect("Expect to serialize transactions");
2023-07-20 08:54:26 -07:00
2023-08-09 04:29:11 -07:00
let send_result = auto_connection.send_uni(&proxy_request_raw).await;
match send_result {
Ok(()) => {
debug!("Successfully sent {} txs to quic proxy", txs.len());
}
Err(e) => {
bail!("Failed to send data to quic proxy: {:?}", e);
}
2023-07-20 08:54:26 -07:00
}
} // -- one chunk
2023-07-20 08:54:26 -07:00
Ok(())
}
}