diff --git a/Cargo.lock b/Cargo.lock index e64fddc2..bba80a90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2254,17 +2254,6 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" -[[package]] -name = "jemalloc-ctl" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cffc705424a344c054e135d12ee591402f4539245e8bbd64e6c9eaa9458b63c" -dependencies = [ - "jemalloc-sys", - "libc", - "paste", -] - [[package]] name = "jemalloc-sys" version = "0.5.4+5.3.0-patched" @@ -2599,7 +2588,6 @@ dependencies = [ "futures-util", "hyper", "itertools 0.10.5", - "jemalloc-ctl", "jemallocator", "jsonrpsee", "lazy_static", @@ -4849,8 +4837,6 @@ dependencies = [ "crossbeam-channel", "dashmap 5.5.3", "itertools 0.10.5", - "jemalloc-ctl", - "jemallocator", "lite-rpc", "log", "serde", @@ -4859,6 +4845,7 @@ dependencies = [ "solana-lite-rpc-cluster-endpoints", "solana-lite-rpc-core", "solana-lite-rpc-prioritization-fees", + "solana-lite-rpc-quic-forward-proxy-integration-test", "solana-lite-rpc-services", "solana-lite-rpc-util", "solana-net-utils", diff --git a/lite-rpc/Cargo.toml b/lite-rpc/Cargo.toml index b9c31436..3a8ea8a7 100644 --- a/lite-rpc/Cargo.toml +++ b/lite-rpc/Cargo.toml @@ -51,7 +51,6 @@ tower = "0.4.13" hyper = { version = "0.14", features = ["server", "http1", "http2"] } tower-http = { version = "0.4.0", features = ["full"] } jemallocator = { workspace = true, features = ["profiling"] } -jemalloc-ctl = "0.5.4" solana-lite-rpc-core = { workspace = true } solana-lite-rpc-util = { workspace = true } diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index 4804c2ab..b21311e4 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -8,7 +8,7 @@ use lite_rpc::bridge::LiteBridge; use lite_rpc::bridge_pubsub::LitePubSubBridge; use lite_rpc::cli::Config; use lite_rpc::postgres_logger::PostgresLogger; -use lite_rpc::service_spawner::ServiceSpawner; +use lite_rpc::service_spawner::{configure_tpu_connection_path, ServiceSpawner}; use lite_rpc::start_server::start_servers; use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE; use log::info; @@ -368,30 +368,6 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: )); drop(slot_notifier); - // tokio::spawn(async move { - - // let last_allocated = stats::allocated::read().unwrap(); - // let last_resident = stats::resident::read().unwrap(); - // let last_mapped = stats::mapped::read().unwrap(); - // let last_active = stats::active::read().unwrap(); - - // loop { - // thread::sleep(time::Duration::from_secs(10)); - // // Retrieve memory statistics - // // let stats = stats::active::mib().unwrap().read().unwrap(); - - // let allocated = stats::allocated::read().unwrap(); - // let resident = stats::resident::read().unwrap(); - // let mapped = stats::mapped::read().unwrap(); - // let active = stats::active::read().unwrap(); - - // info!("Current allocated memory: {} bytes -- diff {}", allocated, last_allocated as i64 - allocated as i64); - // info!("Current resident memory: {} bytes -- diff {}", resident, last_resident as i64 - resident as i64); - // info!("Current mapped memory: {} bytes -- diff {}", mapped, last_mapped as i64 - mapped as i64); - // info!("Current active memory: {} bytes -- diff {}\n", active, last_active as i64 - active as i64); - // } - // }); - tokio::select! { res = tx_service_jh => { anyhow::bail!("Tx Services {res:?}") @@ -471,33 +447,6 @@ pub async fn main() -> anyhow::Result<()> { } } -fn configure_tpu_connection_path(quic_proxy_addr: Option) -> TpuConnectionPath { - match quic_proxy_addr { - None => TpuConnectionPath::QuicDirectPath, - Some(prox_address) => { - let proxy_socket_addr = parse_host_port(prox_address.as_str()).unwrap(); - TpuConnectionPath::QuicForwardProxyPath { - // e.g. "127.0.0.1:11111" or "localhost:11111" - forward_proxy_address: proxy_socket_addr, - } - } - } -} - -fn parse_host_port(host_port: &str) -> Result { - let addrs: Vec<_> = host_port - .to_socket_addrs() - .map_err(|err| format!("Unable to resolve host {host_port}: {err}"))? - .collect(); - if addrs.is_empty() { - Err(format!("Unable to resolve host: {host_port}")) - } else if addrs.len() > 1 { - Err(format!("Multiple addresses resolved for host: {host_port}")) - } else { - Ok(addrs[0]) - } -} - fn setup_tracing_subscriber() { let enable_instrument_tracing = std::env::var("ENABLE_INSTRUMENT_TRACING") .unwrap_or("false".to_string()) diff --git a/lite-rpc/src/service_spawner.rs b/lite-rpc/src/service_spawner.rs index ff2de631..5a5682be 100644 --- a/lite-rpc/src/service_spawner.rs +++ b/lite-rpc/src/service_spawner.rs @@ -5,6 +5,7 @@ use solana_lite_rpc_core::{ types::{BlockStream, ClusterInfoStream, SlotStream, VoteAccountStream}, AnyhowJoinHandle, }; +use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath; use solana_lite_rpc_services::{ data_caching_service::DataCachingService, metrics_capture::MetricsCapture, @@ -15,6 +16,7 @@ use solana_lite_rpc_services::{ tx_sender::TxSender, }; use std::time::Duration; +use std::net::{SocketAddr, ToSocketAddrs}; pub struct ServiceSpawner { pub data_cache: DataCache, @@ -87,3 +89,31 @@ impl ServiceSpawner { ) } } + + +pub fn configure_tpu_connection_path(quic_proxy_addr: Option) -> TpuConnectionPath { + match quic_proxy_addr { + None => TpuConnectionPath::QuicDirectPath, + Some(prox_address) => { + let proxy_socket_addr = parse_host_port(prox_address.as_str()).unwrap(); + TpuConnectionPath::QuicForwardProxyPath { + // e.g. "127.0.0.1:11111" or "localhost:11111" + forward_proxy_address: proxy_socket_addr, + } + } + } +} + +fn parse_host_port(host_port: &str) -> Result { + let addrs: Vec<_> = host_port + .to_socket_addrs() + .map_err(|err| format!("Unable to resolve host {host_port}: {err}"))? + .collect(); + if addrs.is_empty() { + Err(format!("Unable to resolve host: {host_port}")) + } else if addrs.len() > 1 { + Err(format!("Multiple addresses resolved for host: {host_port}")) + } else { + Ok(addrs[0]) + } +} \ No newline at end of file diff --git a/quic-forward-proxy-integration-test/src/lib.rs b/quic-forward-proxy-integration-test/src/lib.rs new file mode 100644 index 00000000..7633fd0b --- /dev/null +++ b/quic-forward-proxy-integration-test/src/lib.rs @@ -0,0 +1 @@ +pub mod setup; \ No newline at end of file diff --git a/quic-forward-proxy-integration-test/src/setup.rs b/quic-forward-proxy-integration-test/src/setup.rs new file mode 100644 index 00000000..43e4346d --- /dev/null +++ b/quic-forward-proxy-integration-test/src/setup.rs @@ -0,0 +1,284 @@ +use log::{debug, error, info, trace, warn}; + +use solana_lite_rpc_core::solana_utils::SerializableTransaction; +use solana_lite_rpc_core::stores::data_cache::DataCache; +use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData; +use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo; +use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters; +use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; +use solana_sdk::instruction::Instruction; +use solana_sdk::message::Message; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, Signature, Signer}; +use solana_sdk::hash::Hash; + +use solana_sdk::transaction::{Transaction, VersionedTransaction}; +use solana_streamer::nonblocking::quic::ConnectionPeerType; +use solana_streamer::tls_certificates::new_self_signed_tls_certificate; +use std::collections::{HashMap, HashSet}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; + +use std::str::FromStr; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use tokio::task::{yield_now, JoinHandle}; +use tokio::time::sleep; +use solana_lite_rpc_services::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager; + +pub const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 16_384; +pub const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo + +pub const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { + connection_timeout: Duration::from_secs(2), + connection_retry_count: 10, + finalize_timeout: Duration::from_secs(2), + max_number_of_connections: 8, + unistream_timeout: Duration::from_secs(2), + write_timeout: Duration::from_secs(2), + number_of_transactions_per_unistream: 10, + unistreams_to_create_new_connection_in_percentage: 10, + prioritization_heap_size: None, +}; + +#[derive(Copy, Clone, Debug)] +pub struct TestCaseParams { + pub sample_tx_count: u32, + pub stake_connection: bool, + pub proxy_mode: bool, +} + +pub async fn start_literpc_client_proxy_mode( + test_case_params: TestCaseParams, + streamer_listen_addrs: SocketAddr, + validator_identity: Arc, + forward_proxy_address: SocketAddr, +) -> anyhow::Result<()> { + info!( + "Start lite-rpc test client using quic proxy at {} ...", + forward_proxy_address + ); + + // (String, Vec) (signature, transaction) + let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); + let broadcast_sender = Arc::new(sender); + let (certificate, key) = new_self_signed_tls_certificate( + validator_identity.as_ref(), + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + ) + .expect("Failed to initialize QUIC connection certificates"); + + let quic_proxy_connection_manager = + QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await; + + // this effectively controls how many connections we will have + let mut connections_to_keep: HashMap = HashMap::new(); + let addr1 = UdpSocket::bind("127.0.0.1:0") + .unwrap() + .local_addr() + .unwrap(); + connections_to_keep.insert( + Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, + addr1, + ); + + let addr2 = UdpSocket::bind("127.0.0.1:0") + .unwrap() + .local_addr() + .unwrap(); + connections_to_keep.insert( + Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, + addr2, + ); + + // this is the real streamer + connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs); + + // get information about the optional validator identity stake + // populated from get_stakes_for_identity() + let _identity_stakes = IdentityStakesData { + peer_type: ConnectionPeerType::Staked, + stakes: if test_case_params.stake_connection { + 30 + } else { + 0 + }, // stake of lite-rpc + min_stakes: 0, + max_stakes: 40, + total_stakes: 100, + }; + + let transaction_receiver = broadcast_sender.subscribe(); + quic_proxy_connection_manager + .update_connection( + transaction_receiver, + connections_to_keep, + QUIC_CONNECTION_PARAMS, + ) + .await; + + for i in 0..test_case_params.sample_tx_count { + let raw_sample_tx = build_raw_sample_tx(i); + trace!( + "broadcast transaction {} to {} receivers: {}", + raw_sample_tx.signature, + broadcast_sender.receiver_count(), + format!("hi {}", i) + ); + + broadcast_sender.send(raw_sample_tx)?; + if (i + 1) % 1000 == 0 { + yield_now().await; + } + } + + while !broadcast_sender.is_empty() { + sleep(Duration::from_millis(1000)).await; + warn!("broadcast channel is not empty - wait before shutdown test client thread"); + } + + assert!( + broadcast_sender.is_empty(), + "broadcast channel must be empty" + ); + + quic_proxy_connection_manager.signal_shutdown(); + + sleep(Duration::from_secs(3)).await; + + Ok(()) +} + + +// no quic proxy +pub async fn start_literpc_client_direct_mode( + test_case_params: TestCaseParams, + streamer_listen_addrs: SocketAddr, + literpc_validator_identity: Arc, +) -> anyhow::Result<()> { + info!("Start lite-rpc test client in direct-mode..."); + + let fanout_slots = 4; + + // (String, Vec) (signature, transaction) + let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); + let broadcast_sender = Arc::new(sender); + let (certificate, key) = new_self_signed_tls_certificate( + literpc_validator_identity.as_ref(), + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + ) + .expect("Failed to initialize QUIC connection certificates"); + + let tpu_connection_manager = + TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; + + // this effectively controls how many connections we will have + let mut connections_to_keep: HashMap = HashMap::new(); + let addr1 = UdpSocket::bind("127.0.0.1:0") + .unwrap() + .local_addr() + .unwrap(); + connections_to_keep.insert( + Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, + addr1, + ); + + let addr2 = UdpSocket::bind("127.0.0.1:0") + .unwrap() + .local_addr() + .unwrap(); + connections_to_keep.insert( + Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, + addr2, + ); + + // this is the real streamer + connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs); + + // get information about the optional validator identity stake + // populated from get_stakes_for_identity() + let identity_stakes = IdentityStakesData { + peer_type: ConnectionPeerType::Staked, + stakes: if test_case_params.stake_connection { + 30 + } else { + 0 + }, // stake of lite-rpc + min_stakes: 0, + max_stakes: 40, + total_stakes: 100, + }; + + // solana_streamer::nonblocking::quic: Peer type: Staked, stake 30, total stake 0, max streams 128 receive_window Ok(12320) from peer 127.0.0.1:8000 + + tpu_connection_manager + .update_connections( + broadcast_sender.clone(), + connections_to_keep, + identity_stakes, + // note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates + DataCache::new_for_tests(), + QUIC_CONNECTION_PARAMS, + ) + .await; + + for i in 0..test_case_params.sample_tx_count { + let raw_sample_tx = build_raw_sample_tx(i); + trace!( + "broadcast transaction {} to {} receivers: {}", + raw_sample_tx.signature, + broadcast_sender.receiver_count(), + format!("hi {}", i) + ); + + broadcast_sender.send(raw_sample_tx)?; + } + + while !broadcast_sender.is_empty() { + sleep(Duration::from_millis(1000)).await; + warn!("broadcast channel is not empty - wait before shutdown test client thread"); + } + + assert!( + broadcast_sender.is_empty(), + "broadcast channel must be empty" + ); + + sleep(Duration::from_secs(3)).await; + + Ok(()) +} + +const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; + +pub fn build_raw_sample_tx(i: u32) -> SentTransactionInfo { + let payer_keypair = Keypair::from_base58_string( + "rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr", + ); + + let tx = build_sample_tx(&payer_keypair, i); + + let transaction = + Arc::new(bincode::serialize::(&tx).expect("failed to serialize tx")); + + SentTransactionInfo { + signature: *tx.get_signature(), + slot: 1, + transaction, + last_valid_block_height: 300, + prioritization_fee: 0, + } +} + +fn build_sample_tx(payer_keypair: &Keypair, i: u32) -> VersionedTransaction { + let blockhash = Hash::default(); + create_memo_tx(format!("hi {}", i).as_bytes(), payer_keypair, blockhash).into() +} + +fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction { + let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); + + let instruction = Instruction::new_with_bytes(memo, msg, vec![]); + let message = Message::new(&[instruction], Some(&payer.pubkey())); + Transaction::new(&[payer], message, blockhash) +} \ No newline at end of file diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index b9f61914..1d68c269 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -4,24 +4,15 @@ use crossbeam_channel::Sender; use log::{debug, error, info, trace, warn}; use solana_lite_rpc_core::solana_utils::SerializableTransaction; -use solana_lite_rpc_core::stores::data_cache::DataCache; -use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData; -use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo; -use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; -use solana_sdk::hash::Hash; -use solana_sdk::instruction::Instruction; -use solana_sdk::message::Message; -use solana_sdk::pubkey::Pubkey; +use solana_lite_rpc_quic_forward_proxy_integration_test::setup::{start_literpc_client_direct_mode, start_literpc_client_proxy_mode, TestCaseParams, MAX_QUIC_CONNECTIONS_PER_PEER}; use solana_sdk::signature::{Keypair, Signature, Signer}; -use solana_sdk::transaction::{Transaction, VersionedTransaction}; -use solana_streamer::nonblocking::quic::ConnectionPeerType; +use solana_sdk::transaction::VersionedTransaction; use solana_streamer::packet::PacketBatch; use solana_streamer::quic::StreamStats; use solana_streamer::streamer::StakedNodes; -use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use std::collections::{HashMap, HashSet}; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::net::{SocketAddr, UdpSocket}; use itertools::Itertools; use std::str::FromStr; @@ -31,39 +22,14 @@ use std::thread; use std::time::{Duration, Instant}; use tokio::runtime::Builder; -use tokio::task::{yield_now, JoinHandle}; -use tokio::time::sleep; +use tokio::task::JoinHandle; use tracing_subscriber::EnvFilter; use solana_lite_rpc_quic_forward_proxy::proxy::QuicForwardProxy; use solana_lite_rpc_quic_forward_proxy::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider; use solana_lite_rpc_quic_forward_proxy::validator_identity::ValidatorIdentity; -use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters; -use solana_lite_rpc_services::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager; use tracing_subscriber::fmt::format::FmtSpan; -#[derive(Copy, Clone, Debug)] -struct TestCaseParams { - sample_tx_count: u32, - stake_connection: bool, - proxy_mode: bool, -} - -const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 16_384; -const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; // like solana repo - -const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameters { - connection_timeout: Duration::from_secs(2), - connection_retry_count: 10, - finalize_timeout: Duration::from_secs(2), - max_number_of_connections: 8, - unistream_timeout: Duration::from_secs(2), - write_timeout: Duration::from_secs(2), - number_of_transactions_per_unistream: 10, - unistreams_to_create_new_connection_in_percentage: 10, - prioritization_heap_size: None, -}; - #[test] pub fn small_tx_batch_staked_direct() { configure_logging(true); @@ -452,218 +418,6 @@ async fn solana_quic_streamer_start() { stats.report("test-quic-streamer"); } -// no quic proxy -async fn start_literpc_client_direct_mode( - test_case_params: TestCaseParams, - streamer_listen_addrs: SocketAddr, - literpc_validator_identity: Arc, -) -> anyhow::Result<()> { - info!("Start lite-rpc test client in direct-mode..."); - - let fanout_slots = 4; - - // (String, Vec) (signature, transaction) - let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); - let broadcast_sender = Arc::new(sender); - let (certificate, key) = new_self_signed_tls_certificate( - literpc_validator_identity.as_ref(), - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - ) - .expect("Failed to initialize QUIC connection certificates"); - - let tpu_connection_manager = - TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; - - // this effectively controls how many connections we will have - let mut connections_to_keep: HashMap = HashMap::new(); - let addr1 = UdpSocket::bind("127.0.0.1:0") - .unwrap() - .local_addr() - .unwrap(); - connections_to_keep.insert( - Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, - addr1, - ); - - let addr2 = UdpSocket::bind("127.0.0.1:0") - .unwrap() - .local_addr() - .unwrap(); - connections_to_keep.insert( - Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, - addr2, - ); - - // this is the real streamer - connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs); - - // get information about the optional validator identity stake - // populated from get_stakes_for_identity() - let identity_stakes = IdentityStakesData { - peer_type: ConnectionPeerType::Staked, - stakes: if test_case_params.stake_connection { - 30 - } else { - 0 - }, // stake of lite-rpc - min_stakes: 0, - max_stakes: 40, - total_stakes: 100, - }; - - // solana_streamer::nonblocking::quic: Peer type: Staked, stake 30, total stake 0, max streams 128 receive_window Ok(12320) from peer 127.0.0.1:8000 - - tpu_connection_manager - .update_connections( - broadcast_sender.clone(), - connections_to_keep, - identity_stakes, - // note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates - DataCache::new_for_tests(), - QUIC_CONNECTION_PARAMS, - ) - .await; - - for i in 0..test_case_params.sample_tx_count { - let raw_sample_tx = build_raw_sample_tx(i); - trace!( - "broadcast transaction {} to {} receivers: {}", - raw_sample_tx.signature, - broadcast_sender.receiver_count(), - format!("hi {}", i) - ); - - broadcast_sender.send(raw_sample_tx)?; - } - - while !broadcast_sender.is_empty() { - sleep(Duration::from_millis(1000)).await; - warn!("broadcast channel is not empty - wait before shutdown test client thread"); - } - - assert!( - broadcast_sender.is_empty(), - "broadcast channel must be empty" - ); - - sleep(Duration::from_secs(3)).await; - - Ok(()) -} - -async fn start_literpc_client_proxy_mode( - test_case_params: TestCaseParams, - streamer_listen_addrs: SocketAddr, - validator_identity: Arc, - forward_proxy_address: SocketAddr, -) -> anyhow::Result<()> { - info!( - "Start lite-rpc test client using quic proxy at {} ...", - forward_proxy_address - ); - - // (String, Vec) (signature, transaction) - let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); - let broadcast_sender = Arc::new(sender); - let (certificate, key) = new_self_signed_tls_certificate( - validator_identity.as_ref(), - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - ) - .expect("Failed to initialize QUIC connection certificates"); - - let quic_proxy_connection_manager = - QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await; - - // this effectively controls how many connections we will have - let mut connections_to_keep: HashMap = HashMap::new(); - let addr1 = UdpSocket::bind("127.0.0.1:0") - .unwrap() - .local_addr() - .unwrap(); - connections_to_keep.insert( - Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, - addr1, - ); - - let addr2 = UdpSocket::bind("127.0.0.1:0") - .unwrap() - .local_addr() - .unwrap(); - connections_to_keep.insert( - Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, - addr2, - ); - - // this is the real streamer - connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs); - - // get information about the optional validator identity stake - // populated from get_stakes_for_identity() - let _identity_stakes = IdentityStakesData { - peer_type: ConnectionPeerType::Staked, - stakes: if test_case_params.stake_connection { - 30 - } else { - 0 - }, // stake of lite-rpc - min_stakes: 0, - max_stakes: 40, - total_stakes: 100, - }; - - // solana_streamer::nonblocking::quic: Peer type: Staked, stake 30, total stake 0, max streams 128 receive_window Ok(12320) from peer 127.0.0.1:8000 - // - // tpu_connection_manager - // .update_connections( - // broadcast_sender.clone(), - // connections_to_keep, - // identity_stakes, - // // note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates - // empty_tx_store().clone(), - // QUIC_CONNECTION_PARAMS, - // ) - // .await; - - let transaction_receiver = broadcast_sender.subscribe(); - quic_proxy_connection_manager - .update_connection( - transaction_receiver, - connections_to_keep, - QUIC_CONNECTION_PARAMS, - ) - .await; - - for i in 0..test_case_params.sample_tx_count { - let raw_sample_tx = build_raw_sample_tx(i); - trace!( - "broadcast transaction {} to {} receivers: {}", - raw_sample_tx.signature, - broadcast_sender.receiver_count(), - format!("hi {}", i) - ); - - broadcast_sender.send(raw_sample_tx)?; - if (i + 1) % 1000 == 0 { - yield_now().await; - } - } - - while !broadcast_sender.is_empty() { - sleep(Duration::from_millis(1000)).await; - warn!("broadcast channel is not empty - wait before shutdown test client thread"); - } - - assert!( - broadcast_sender.is_empty(), - "broadcast channel must be empty" - ); - - quic_proxy_connection_manager.signal_shutdown(); - - sleep(Duration::from_secs(3)).await; - - Ok(()) -} async fn start_quic_proxy(proxy_listen_addr: SocketAddr) -> anyhow::Result<()> { let _tls_configuration = SelfSignedTlsConfigProvider::new_singleton_self_signed_localhost(); @@ -734,43 +488,9 @@ impl SolanaQuicStreamer { } } -const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; - -pub fn build_raw_sample_tx(i: u32) -> SentTransactionInfo { - let payer_keypair = Keypair::from_base58_string( - "rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr", - ); - - let tx = build_sample_tx(&payer_keypair, i); - - let transaction = - Arc::new(bincode::serialize::(&tx).expect("failed to serialize tx")); - - SentTransactionInfo { - signature: *tx.get_signature(), - slot: 1, - transaction, - last_valid_block_height: 300, - prioritization_fee: 0, - } -} - // "hi 1234 " -> 1234 fn parse_hi(input: &str) -> Option { let input = input.trim(); let input = input.replace("hi ", ""); input.parse::().ok() -} - -fn build_sample_tx(payer_keypair: &Keypair, i: u32) -> VersionedTransaction { - let blockhash = Hash::default(); - create_memo_tx(format!("hi {}", i).as_bytes(), payer_keypair, blockhash).into() -} - -fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction { - let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); - - let instruction = Instruction::new_with_bytes(memo, msg, vec![]); - let message = Message::new(&[instruction], Some(&payer.pubkey())); - Transaction::new(&[payer], message, blockhash) -} +} \ No newline at end of file diff --git a/services-integration-test/Cargo.toml b/services-integration-test/Cargo.toml index 7d6a63ec..7f703325 100644 --- a/services-integration-test/Cargo.toml +++ b/services-integration-test/Cargo.toml @@ -18,6 +18,7 @@ solana-lite-rpc-util = { workspace = true } solana-lite-rpc-services = { workspace = true } solana-lite-rpc-prioritization-fees = { workspace = true } solana-lite-rpc-cluster-endpoints = { workspace = true } +solana-lite-rpc-quic-forward-proxy-integration-test = { path = "../quic-forward-proxy-integration-test" } solana-sdk = { workspace = true } solana-streamer = { workspace = true } solana-transaction-status = { workspace = true } @@ -31,8 +32,6 @@ dashmap = { workspace = true } itertools = { workspace = true } tracing-subscriber = { workspace = true, features = ["std", "env-filter"] } tokio = { version = "1.28.2", features = ["full", "fs"]} -jemallocator = "0.5.4" -jemalloc-ctl = "0.5.4" [dev-dependencies] crossbeam-channel = "0.5.6" diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index 2eae1624..86b8edae 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -9,6 +9,7 @@ use crate::{ tx_sender::TxSender, }; use anyhow::bail; +use log::trace; use prometheus::{histogram_opts, register_histogram, Histogram}; use solana_lite_rpc_core::{ solana_utils::SerializableTransaction, structures::transaction_sent_info::SentTransactionInfo, @@ -123,10 +124,10 @@ pub struct TransactionService { impl TransactionService { pub async fn send_transaction( &self, - tx: Transaction, + tx: VersionedTransaction, max_retries: Option, ) -> anyhow::Result { - let raw_tx = bincode::serialize(&tx)?; + let raw_tx = bincode::serialize(&tx).expect("Could not serialize tx: {&tx.signatures[0]}"); self.send_wire_transaction(raw_tx, max_retries).await }