cleanup minor todos

This commit is contained in:
GroovieGermanikus 2023-08-02 14:15:39 +02:00
parent 2f97cb8385
commit b41d05e613
9 changed files with 19 additions and 62 deletions

View File

@ -10,12 +10,12 @@ use solana_sdk::transaction::VersionedTransaction;
/// lite-rpc to proxy wire format /// lite-rpc to proxy wire format
/// compat info: non-public format ATM /// compat info: non-public format ATM
/// initial version /// initial version
const FORMAT_VERSION1: u16 = 2301; const FORMAT_VERSION1: u16 = 2302;
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuForwardingRequest { pub struct TpuForwardingRequest {
format_version: u16, format_version: u16,
tpu_socket_addr: SocketAddr, // TODO is that correct, maybe it should be V4; maybe we also need to provide a list tpu_socket_addr: SocketAddr,
identity_tpunode: Pubkey, identity_tpunode: Pubkey,
transactions: Vec<VersionedTransaction>, transactions: Vec<VersionedTransaction>,
} }
@ -43,7 +43,6 @@ impl TpuForwardingRequest {
bincode::serialize(&self).expect("Expect to serialize transactions") bincode::serialize(&self).expect("Expect to serialize transactions")
} }
// TODO reame
pub fn deserialize_from_raw_request(raw_proxy_request: &Vec<u8>) -> TpuForwardingRequest { pub fn deserialize_from_raw_request(raw_proxy_request: &Vec<u8>) -> TpuForwardingRequest {
let request = bincode::deserialize::<TpuForwardingRequest>(&raw_proxy_request) let request = bincode::deserialize::<TpuForwardingRequest>(&raw_proxy_request)
.context("deserialize proxy request") .context("deserialize proxy request")

View File

@ -559,9 +559,6 @@ async fn start_literpc_client_direct_mode(
) )
.await; .await;
// TODO this is a race
sleep(Duration::from_millis(1500)).await;
for i in 0..test_case_params.sample_tx_count { for i in 0..test_case_params.sample_tx_count {
let raw_sample_tx = build_raw_sample_tx(i); let raw_sample_tx = build_raw_sample_tx(i);
trace!( trace!(
@ -582,27 +579,22 @@ async fn start_literpc_client_direct_mode(
async fn start_literpc_client_proxy_mode( async fn start_literpc_client_proxy_mode(
test_case_params: TestCaseParams, test_case_params: TestCaseParams,
streamer_listen_addrs: SocketAddr, streamer_listen_addrs: SocketAddr,
literpc_validator_identity: Arc<Keypair>, validator_identity: Arc<Keypair>,
forward_proxy_address: SocketAddr, forward_proxy_address: SocketAddr,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("Start lite-rpc test client using quic proxy at {} ...", forward_proxy_address); info!("Start lite-rpc test client using quic proxy at {} ...", forward_proxy_address);
let _fanout_slots = 4;
// (String, Vec<u8>) (signature, transaction) // (String, Vec<u8>) (signature, transaction)
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let broadcast_sender = Arc::new(sender); let broadcast_sender = Arc::new(sender);
let (certificate, key) = new_self_signed_tls_certificate( let (certificate, key) = new_self_signed_tls_certificate(
literpc_validator_identity.as_ref(), validator_identity.as_ref(),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
) )
.expect("Failed to initialize QUIC connection certificates"); .expect("Failed to initialize QUIC connection certificates");
// let tpu_connection_manager =
// TpuConnectionManager::new(certificate, key, fanout_slots as usize).await;
let quic_proxy_connection_manager = let quic_proxy_connection_manager =
QuicProxyConnectionManager::new(certificate, key, literpc_validator_identity.clone(), forward_proxy_address).await; QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;
// this effectively controls how many connections we will have // this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new(); let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
@ -625,7 +617,7 @@ async fn start_literpc_client_proxy_mode(
); );
// this is the real streamer // this is the real streamer
connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs); connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs);
// get information about the optional validator identity stake // get information about the optional validator identity stake
// populated from get_stakes_for_identity() // populated from get_stakes_for_identity()
@ -650,7 +642,8 @@ async fn start_literpc_client_proxy_mode(
// ) // )
// .await; // .await;
quic_proxy_connection_manager.update_connection(broadcast_sender.clone(), connections_to_keep).await; quic_proxy_connection_manager.update_connection(
broadcast_sender.clone(), connections_to_keep, QUIC_CONNECTION_PARAMS).await;
// TODO this is a race // TODO this is a race
sleep(Duration::from_millis(1500)).await; sleep(Duration::from_millis(1500)).await;

View File

@ -31,12 +31,9 @@ pub async fn main() -> anyhow::Result<()> {
identity_keypair, identity_keypair,
proxy_listen_addr: proxy_listen_addr, proxy_listen_addr: proxy_listen_addr,
} = Args::parse(); } = Args::parse();
dotenv().ok(); dotenv().ok();
// TODO build args struct dedicated to proxy
let proxy_listener_addr = proxy_listen_addr.parse().unwrap(); let proxy_listener_addr = proxy_listen_addr.parse().unwrap();
let _tls_configuration = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost();
let validator_identity = ValidatorIdentity::new(get_identity_keypair(&identity_keypair).await); let validator_identity = ValidatorIdentity::new(get_identity_keypair(&identity_keypair).await);
let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost()); let tls_config = Arc::new(SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost());

View File

@ -10,7 +10,7 @@ use std::net::SocketAddr;
/// lite-rpc to proxy wire format /// lite-rpc to proxy wire format
/// compat info: non-public format ATM /// compat info: non-public format ATM
/// initial version /// initial version
pub const FORMAT_VERSION1: u16 = 2301; pub const FORMAT_VERSION1: u16 = 2302;
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TpuForwardingRequest { pub struct TpuForwardingRequest {

View File

@ -1,7 +1,5 @@
use rustls::ClientConfig; use rustls::ClientConfig;
// TODO integrate with tpu_service + quic_connection_utils pub trait TpuClientTlsConfigProvider {
pub trait TpuCLientTlsConfigProvider {
fn get_client_tls_crypto_config(&self) -> ClientConfig; fn get_client_tls_crypto_config(&self) -> ClientConfig;
} }

View File

@ -1,7 +1,5 @@
use rustls::ServerConfig; use rustls::ServerConfig;
// TODO integrate with tpu_service + quic_connection_utils
pub trait ProxyTlsConfigProvider { pub trait ProxyTlsConfigProvider {
fn get_server_tls_crypto_config(&self) -> ServerConfig; fn get_server_tls_crypto_config(&self) -> ServerConfig;
} }

View File

@ -1,5 +1,5 @@
use crate::quic_util::{SkipServerVerification, ALPN_TPU_FORWARDPROXY_PROTOCOL_ID}; use crate::quic_util::{SkipServerVerification, ALPN_TPU_FORWARDPROXY_PROTOCOL_ID};
use crate::tls_config_provider_client::TpuCLientTlsConfigProvider; use crate::tls_config_provider_client::TpuClientTlsConfigProvider;
use crate::tls_config_provider_server::ProxyTlsConfigProvider; use crate::tls_config_provider_server::ProxyTlsConfigProvider;
use rcgen::generate_simple_self_signed; use rcgen::generate_simple_self_signed;
use rustls::{Certificate, ClientConfig, PrivateKey, ServerConfig}; use rustls::{Certificate, ClientConfig, PrivateKey, ServerConfig};
@ -11,7 +11,7 @@ impl ProxyTlsConfigProvider for SelfSignedTlsConfigProvider {
} }
} }
impl TpuCLientTlsConfigProvider for SelfSignedTlsConfigProvider { impl TpuClientTlsConfigProvider for SelfSignedTlsConfigProvider {
fn get_client_tls_crypto_config(&self) -> ClientConfig { fn get_client_tls_crypto_config(&self) -> ClientConfig {
self.client_crypto.clone() self.client_crypto.clone()
} }

View File

@ -21,7 +21,7 @@ use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock};
use tokio::time::timeout; use tokio::time::timeout;
use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest; use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest;
use solana_lite_rpc_core::quic_connection_utils::{connection_stats, SkipServerVerification}; use solana_lite_rpc_core::quic_connection_utils::{connection_stats, QuicConnectionParameters, SkipServerVerification};
use crate::tpu_utils::quinn_auto_reconnect::AutoReconnect; use crate::tpu_utils::quinn_auto_reconnect::AutoReconnect;
@ -34,21 +34,17 @@ pub struct TpuNode {
pub struct QuicProxyConnectionManager { pub struct QuicProxyConnectionManager {
endpoint: Endpoint, endpoint: Endpoint,
// TODO remove
validator_identity: Arc<Keypair>,
simple_thread_started: AtomicBool, simple_thread_started: AtomicBool,
proxy_addr: SocketAddr, proxy_addr: SocketAddr,
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>> current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>
} }
// TODO consolidate with number_of_transactions_per_unistream
const CHUNK_SIZE_PER_STREAM: usize = 20; const CHUNK_SIZE_PER_STREAM: usize = 20;
impl QuicProxyConnectionManager { impl QuicProxyConnectionManager {
pub async fn new( pub async fn new(
certificate: rustls::Certificate, certificate: rustls::Certificate,
key: rustls::PrivateKey, key: rustls::PrivateKey,
validator_identity: Arc<Keypair>,
proxy_addr: SocketAddr, proxy_addr: SocketAddr,
) -> Self { ) -> Self {
info!("Configure Quic proxy connection manager to {}", proxy_addr); info!("Configure Quic proxy connection manager to {}", proxy_addr);
@ -56,7 +52,6 @@ impl QuicProxyConnectionManager {
Self { Self {
endpoint, endpoint,
validator_identity,
simple_thread_started: AtomicBool::from(false), simple_thread_started: AtomicBool::from(false),
proxy_addr, proxy_addr,
current_tpu_nodes: Arc::new(RwLock::new(vec![])), current_tpu_nodes: Arc::new(RwLock::new(vec![])),
@ -68,10 +63,10 @@ impl QuicProxyConnectionManager {
transaction_sender: Arc<Sender<(String, Vec<u8>)>>, transaction_sender: Arc<Sender<(String, Vec<u8>)>>,
// for duration of this slot these tpu nodes will receive the transactions // for duration of this slot these tpu nodes will receive the transactions
connections_to_keep: HashMap<Pubkey, SocketAddr>, connections_to_keep: HashMap<Pubkey, SocketAddr>,
connection_parameters: QuicConnectionParameters,
) { ) {
debug!("reconfigure quic proxy connection (# of tpu nodes: {})", connections_to_keep.len()); debug!("reconfigure quic proxy connection (# of tpu nodes: {})", connections_to_keep.len());
{ {
let list_of_nodes = connections_to_keep.iter().map(|(identity, tpu_address)| { let list_of_nodes = connections_to_keep.iter().map(|(identity, tpu_address)| {
TpuNode { TpuNode {
@ -95,7 +90,6 @@ impl QuicProxyConnectionManager {
let transaction_receiver = transaction_sender.subscribe(); let transaction_receiver = transaction_sender.subscribe();
// TODO use it
let exit_signal = Arc::new(AtomicBool::new(false)); let exit_signal = Arc::new(AtomicBool::new(false));
tokio::spawn(Self::read_transactions_and_broadcast( tokio::spawn(Self::read_transactions_and_broadcast(
@ -104,6 +98,7 @@ impl QuicProxyConnectionManager {
self.proxy_addr, self.proxy_addr,
self.endpoint.clone(), self.endpoint.clone(),
exit_signal, exit_signal,
connection_parameters,
)); ));
} }
@ -149,18 +144,15 @@ impl QuicProxyConnectionManager {
endpoint endpoint
} }
// blocks and loops until exit signal (TODO)
async fn read_transactions_and_broadcast( async fn read_transactions_and_broadcast(
mut transaction_receiver: Receiver<(String, Vec<u8>)>, mut transaction_receiver: Receiver<(String, Vec<u8>)>,
current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>, current_tpu_nodes: Arc<RwLock<Vec<TpuNode>>>,
proxy_addr: SocketAddr, proxy_addr: SocketAddr,
endpoint: Endpoint, endpoint: Endpoint,
exit_signal: Arc<AtomicBool>, exit_signal: Arc<AtomicBool>,
connection_parameters: QuicConnectionParameters,
) { ) {
// let mut connection = endpoint.connect(proxy_addr, "localhost").unwrap()
// .await.unwrap();
let auto_connection = AutoReconnect::new(endpoint, proxy_addr); let auto_connection = AutoReconnect::new(endpoint, proxy_addr);
loop { loop {
@ -173,13 +165,8 @@ impl QuicProxyConnectionManager {
// TODO add timeout // TODO add timeout
tx = transaction_receiver.recv() => { tx = transaction_receiver.recv() => {
let first_tx: Vec<u8> = match tx { let first_tx: Vec<u8> = match tx {
Ok((_sig, tx)) => { Ok((_sig, tx)) => {
// if Self::check_for_confirmation(&txs_sent_store, sig) {
// // transaction is already confirmed/ no need to send
// continue;
// }
tx tx
}, },
Err(e) => { Err(e) => {
@ -189,16 +176,9 @@ impl QuicProxyConnectionManager {
} }
}; };
let number_of_transactions_per_unistream = 8; // TODO read from QuicConnectionParameters
let mut txs = vec![first_tx]; let mut txs = vec![first_tx];
// TODO comment in for _ in 1..connection_parameters.number_of_transactions_per_unistream {
let _foo = PACKET_DATA_SIZE;
for _ in 1..number_of_transactions_per_unistream {
if let Ok((_signature, tx)) = transaction_receiver.try_recv() { if let Ok((_signature, tx)) = transaction_receiver.try_recv() {
// if Self::check_for_confirmation(&txs_sent_store, signature) {
// continue;
// }
txs.push(tx); txs.push(tx);
} }
} }
@ -226,9 +206,6 @@ impl QuicProxyConnectionManager {
_proxy_address: SocketAddr, tpu_target_address: SocketAddr, _proxy_address: SocketAddr, tpu_target_address: SocketAddr,
target_tpu_identity: Pubkey) -> anyhow::Result<()> { target_tpu_identity: Pubkey) -> anyhow::Result<()> {
// TODO add timeout
// let mut send_stream = timeout(Duration::from_millis(500), connection.open_uni()).await??;
let raw_tx_batch_copy = raw_tx_batch.clone(); let raw_tx_batch_copy = raw_tx_batch.clone();
let mut txs = vec![]; let mut txs = vec![];
@ -253,10 +230,6 @@ impl QuicProxyConnectionManager {
let send_result = auto_connection.send_uni(proxy_request_raw).await; let send_result = auto_connection.send_uni(proxy_request_raw).await;
// let send_result =
// timeout(Duration::from_millis(3500), Self::send_proxy_request(endpoint, proxy_address, &proxy_request_raw))
// .await.context("Timeout sending data to quic proxy")?;
match send_result { match send_result {
Ok(()) => { Ok(()) => {
debug!("Successfully sent {} txs to quic proxy", txs.len()); debug!("Successfully sent {} txs to quic proxy", txs.len());
@ -272,7 +245,6 @@ impl QuicProxyConnectionManager {
Ok(()) Ok(())
} }
// TODO optimize connection
async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec<u8>) -> anyhow::Result<()> { async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec<u8>) -> anyhow::Result<()> {
info!("sending {} bytes to proxy", proxy_request_raw.len()); info!("sending {} bytes to proxy", proxy_request_raw.len());

View File

@ -103,7 +103,7 @@ impl TpuService {
} }
TpuConnectionPath::QuicForwardProxyPath { forward_proxy_address } => { TpuConnectionPath::QuicForwardProxyPath { forward_proxy_address } => {
let quic_proxy_connection_manager = let quic_proxy_connection_manager =
QuicProxyConnectionManager::new(certificate, key, identity.clone(), forward_proxy_address).await; QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;
QuicProxy { QuicProxy {
quic_proxy_connection_manager: Arc::new(quic_proxy_connection_manager), quic_proxy_connection_manager: Arc::new(quic_proxy_connection_manager),
@ -196,10 +196,10 @@ impl TpuService {
.await; .await;
}, },
QuicProxy { quic_proxy_connection_manager } => { QuicProxy { quic_proxy_connection_manager } => {
// TODO maybe we need more data (see .update_connections)
quic_proxy_connection_manager.update_connection( quic_proxy_connection_manager.update_connection(
self.broadcast_sender.clone(), self.broadcast_sender.clone(),
connections_to_keep, connections_to_keep,
self.config.quic_connection_params,
).await; ).await;
} }
} }