diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index df901cb3..28f25a11 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -1,12 +1,3 @@ -use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; -use std::ops::Deref; -use std::path::Path; -use std::str::FromStr; -use std::sync::{Arc, RwLock}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread; -use std::time::{Duration, Instant}; use anyhow::bail; use countmap::CountMap; use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; @@ -14,6 +5,10 @@ use futures::future::join_all; use log::{debug, error, info, trace, warn}; use quinn::TokioRuntime; use serde::de::Unexpected::Option; +use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; +use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; +use solana_lite_rpc_core::tx_store::empty_tx_store; +use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; use solana_rpc_client::rpc_client::SerializableTransaction; use solana_sdk::hash::Hash; use solana_sdk::instruction::Instruction; @@ -27,17 +22,22 @@ use solana_streamer::packet::PacketBatch; use solana_streamer::quic::StreamStats; use solana_streamer::streamer::StakedNodes; use solana_streamer::tls_certificates::new_self_signed_tls_certificate; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::ops::Deref; +use std::path::Path; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::{Duration, Instant}; use tokio::runtime::{Builder, Runtime}; -use tokio::{join, spawn, task}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::SendError; use tokio::task::JoinHandle; use tokio::time::{interval, sleep}; -use tracing_subscriber::{fmt, filter::LevelFilter}; -use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; -use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; -use solana_lite_rpc_core::tx_store::empty_tx_store; -use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager; +use tokio::{join, spawn, task}; +use tracing_subscriber::{filter::LevelFilter, fmt}; // note: logging will be auto-adjusted const SAMPLE_TX_COUNT: u32 = 20; @@ -66,7 +66,6 @@ pub fn wireup_and_send_txs_via_channel() { .build() .expect("failed to build tokio runtime for lite-rpc-tpu-client"); - let literpc_validator_identity = Arc::new(Keypair::new()); let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let listen_addr = udp_listen_socket.local_addr().unwrap(); @@ -74,10 +73,8 @@ pub fn wireup_and_send_txs_via_channel() { let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded(); runtime_quic1.block_on(async { - /// setup solana Quic streamer // see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }" - let staked_nodes = StakedNodes { total_stake: 100, max_stake: 40, @@ -93,12 +90,17 @@ pub fn wireup_and_send_txs_via_channel() { }; let _solana_quic_streamer = SolanaQuicStreamer::new_start_listening( - udp_listen_socket, inbound_packets_sender, staked_nodes); - + udp_listen_socket, + inbound_packets_sender, + staked_nodes, + ); }); runtime_literpc.block_on(async { - tokio::spawn(start_literpc_client(listen_addr, literpc_validator_identity)); + tokio::spawn(start_literpc_client( + listen_addr, + literpc_validator_identity, + )); }); let packet_consumer_jh = thread::spawn(move || { @@ -118,7 +120,9 @@ pub fn wireup_and_send_txs_via_channel() { break; } - let packet_batch = match inbound_packets_receiver.recv_timeout(Duration::from_millis(500)) { + let packet_batch = match inbound_packets_receiver + .recv_timeout(Duration::from_millis(500)) + { Ok(batch) => batch, Err(_) => { debug!("consumer thread did not receive packets on inbound channel recently - continue polling"); @@ -130,7 +134,10 @@ pub fn wireup_and_send_txs_via_channel() { latest_tx = Instant::now(); if packet_count == 0 { - info!("time to first packet {}ms", time_to_first.elapsed().as_millis()); + info!( + "time to first packet {}ms", + time_to_first.elapsed().as_millis() + ); } packet_count = packet_count + packet_batch.len() as u32; @@ -139,8 +146,13 @@ pub fn wireup_and_send_txs_via_channel() { } for packet in packet_batch.iter() { - let tx = packet.deserialize_slice::(..).unwrap(); - trace!("read transaction from quic streaming server: {:?}", tx.get_signature()); + let tx = packet + .deserialize_slice::(..) + .unwrap(); + trace!( + "read transaction from quic streaming server: {:?}", + tx.get_signature() + ); count_map.insert_or_increment(*tx.get_signature()); // for ix in tx.message.instructions() { // info!("instruction: {:?}", ix.data); @@ -157,31 +169,39 @@ pub fn wireup_and_send_txs_via_channel() { } // -- while not all packets received - by count let total_duration = timer.elapsed(); - let half_duration = timer2.map(|t| t.elapsed()).unwrap_or(Duration::from_secs(3333)); + let half_duration = timer2 + .map(|t| t.elapsed()) + .unwrap_or(Duration::from_secs(3333)); // throughput_50 is second half of transactions - should iron out startup effects - info!("consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, " - , packet_count, total_duration.as_micros(), + info!( + "consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, ", + packet_count, + total_duration.as_micros(), packet_count as f64 / total_duration.as_secs_f64(), packet_count2 as f64 / half_duration.as_secs_f64(), ); info!("got all expected packets - shutting down tokio runtime with lite-rpc client"); - assert_eq!(count_map.len() as u32, SAMPLE_TX_COUNT, "count_map size should be equal to SAMPLE_TX_COUNT"); + assert_eq!( + count_map.len() as u32, + SAMPLE_TX_COUNT, + "count_map size should be equal to SAMPLE_TX_COUNT" + ); // note: this assumption will not hold as soon as test is configured to do fanout - assert!(count_map.values().all(|cnt| *cnt == 1), "all transactions should be unique"); + assert!( + count_map.values().all(|cnt| *cnt == 1), + "all transactions should be unique" + ); runtime_literpc.shutdown_timeout(Duration::from_millis(1000)); }); - - // shutdown streamer // solana_quic_streamer.shutdown().await; packet_consumer_jh.join().unwrap(); - } fn configure_logging() { @@ -208,7 +228,10 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter number_of_transactions_per_unistream: 10, }; -async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validator_identity: Arc) -> anyhow::Result<()> { +async fn start_literpc_client( + streamer_listen_addrs: SocketAddr, + literpc_validator_identity: Arc, +) -> anyhow::Result<()> { info!("Start lite-rpc test client ..."); let fanout_slots = 4; @@ -220,31 +243,33 @@ async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validat literpc_validator_identity.as_ref(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), ) - .expect("Failed to initialize QUIC connection certificates"); + .expect("Failed to initialize QUIC connection certificates"); let tpu_connection_manager = TpuConnectionManager::new(certificate, key, fanout_slots as usize).await; - // this effectively controls how many connections we will have let mut connections_to_keep: HashMap = HashMap::new(); - let addr1 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); + let addr1 = UdpSocket::bind("127.0.0.1:0") + .unwrap() + .local_addr() + .unwrap(); connections_to_keep.insert( Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, addr1, ); - let addr2 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); + let addr2 = UdpSocket::bind("127.0.0.1:0") + .unwrap() + .local_addr() + .unwrap(); connections_to_keep.insert( Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, addr2, ); // this is the real streamer - connections_to_keep.insert( - literpc_validator_identity.pubkey(), - streamer_listen_addrs, - ); + connections_to_keep.insert(literpc_validator_identity.pubkey(), streamer_listen_addrs); // get information about the optional validator identity stake // populated from get_stakes_for_identity() @@ -266,15 +291,20 @@ async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validat // note: tx_store is useless in this scenario as it is never changed; it's only used to check for duplicates empty_tx_store().clone(), QUIC_CONNECTION_PARAMS, - ).await; + ) + .await; // TODO this is a race sleep(Duration::from_millis(1500)).await; for i in 0..SAMPLE_TX_COUNT { let raw_sample_tx = build_raw_sample_tx(i); - debug!("broadcast transaction {} to {} receivers: {}", - raw_sample_tx.0, broadcast_sender.receiver_count(), format!("hi {}", i)); + debug!( + "broadcast transaction {} to {} receivers: {}", + raw_sample_tx.0, + broadcast_sender.receiver_count(), + format!("hi {}", i) + ); broadcast_sender.send(raw_sample_tx)?; } @@ -310,7 +340,7 @@ async fn solana_quic_streamer_start() { stats.clone(), 1000, ) - .unwrap(); + .unwrap(); let addr = sock.local_addr().unwrap().ip(); let port = sock.local_addr().unwrap().port(); @@ -346,7 +376,11 @@ impl SolanaQuicStreamer { } impl SolanaQuicStreamer { - fn new_start_listening(udp_socket: UdpSocket, sender: Sender, staked_nodes: StakedNodes) -> Self { + fn new_start_listening( + udp_socket: UdpSocket, + sender: Sender, + staked_nodes: StakedNodes, + ) -> Self { let staked_nodes = Arc::new(RwLock::new(staked_nodes)); let exit = Arc::new(AtomicBool::new(false)); // keypair to derive the server tls certificate @@ -367,7 +401,7 @@ impl SolanaQuicStreamer { stats.clone(), 1000, ) - .unwrap(); + .unwrap(); let addr = udp_socket.local_addr().unwrap().ip(); let port = udp_socket.local_addr().unwrap().port(); @@ -384,10 +418,10 @@ impl SolanaQuicStreamer { const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; - pub fn build_raw_sample_tx(i: u32) -> (String, Vec) { - - let payer_keypair = Keypair::from_base58_string("rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr"); + let payer_keypair = Keypair::from_base58_string( + "rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr", + ); let tx = build_sample_tx(&payer_keypair, i); @@ -408,4 +442,3 @@ fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction { let message = Message::new(&[instruction], Some(&payer.pubkey())); Transaction::new(&[payer], message, blockhash) } -