From 1fe7f23fc51989edc3bfd3bf2ad103f10972f8ac Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 30 Jun 2023 17:37:27 +0200 Subject: [PATCH] do proper shutdown --- ...literpc_tpu_quic_server_integrationtest.rs | 78 ++++++++----------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index 7d6c9a7b..05dd22e3 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -27,11 +27,11 @@ use solana_streamer::quic::StreamStats; use solana_streamer::streamer::StakedNodes; use solana_streamer::tls_certificates::new_self_signed_tls_certificate; use tokio::runtime::{Builder, Runtime}; -use tokio::{join, spawn}; +use tokio::{join, spawn, task}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::SendError; use tokio::task::JoinHandle; -use tokio::time::sleep; +use tokio::time::{interval, sleep}; use tracing_subscriber::util::SubscriberInitExt; use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes; use solana_lite_rpc_core::tx_store::empty_tx_store; @@ -41,9 +41,10 @@ use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionMa const SAMPLE_TX_COUNT: u32 = 2; const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000; +const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8; #[test] -pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> { +pub fn wireup_and_send_txs_via_channel() { tracing_subscriber::fmt::fmt() .with_max_level(tracing::Level::TRACE).init(); @@ -67,7 +68,7 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> { let literpc_validator_identity = Arc::new(Keypair::new()); let udp_listen_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - let listen_addr = udp_listen_socket.local_addr()?; + let listen_addr = udp_listen_socket.local_addr().unwrap(); let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded(); @@ -94,11 +95,11 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> { let mut solana_quic_streamer = SolanaQuicStreamer::new_start_listening(udp_listen_socket, inbound_packets_sender, staked_nodes); - - info!("end of quic streamer runtime"); }); - runtime2.block_on(start_literpc_client(listen_addr, literpc_validator_identity))?; + runtime2.block_on(async { + tokio::spawn(start_literpc_client(listen_addr, literpc_validator_identity)); + }); let packet_consumer_jh = thread::spawn(move || { info!("start pulling packets..."); @@ -118,32 +119,18 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> { // } } - if packet_count == 10 { - return; + if packet_count == SAMPLE_TX_COUNT { + break; } } Err(err) => { - error!("Error polling quic streaming channel: {}", err); + panic!("Error polling quic streaming channel: {}", err); } } } - while let Ok(packet_batch) = inbound_packets_receiver.recv() { - info!("received packets: {}", packet_batch.len()); - packet_count = packet_count + packet_batch.len() as u32; - for packet in packet_batch.iter() { - info!("packet: {:?}", packet); - let tx = packet.deserialize_slice::(..).unwrap(); - debug!("read transaction from quic streaming server: {:?}", tx.get_signature()); - // for ix in tx.message.instructions() { - // info!("instruction: {:?}", ix.data); - // } - } - - if packet_count == 10 { - return; - } - } + info!("got all expected packets - shutting down tokio runtime with lite-rpc client"); + runtime2.shutdown_timeout(Duration::from_millis(100)); }); @@ -153,17 +140,15 @@ pub fn wireup_and_send_txs_via_channel() -> anyhow::Result<()> { packet_consumer_jh.join().unwrap(); - Ok(()) } const STAKE_CONNECTION: bool = true; -async fn start_literpc_client(listen_addrs: SocketAddr, literpc_validator_identity: Arc) -> anyhow::Result<()> { - let fanout_slots = 4; +async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validator_identity: Arc) -> anyhow::Result<()> { + let fanout_slots = 1; // TODO change // (String, Vec) (signature, transaction) - // _keeper is used to prevent the ref-count base closing to kick in - let (sender, _keeper) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); + let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE); let broadcast_sender = Arc::new(sender); let (certificate, key) = new_self_signed_tls_certificate( literpc_validator_identity.as_ref(), @@ -176,20 +161,20 @@ async fn start_literpc_client(listen_addrs: SocketAddr, literpc_validator_identi // this effectively controls how many connections we will have let mut connections_to_keep: HashMap = HashMap::new(); - connections_to_keep.insert( - Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, - "127.0.0.1:20001".parse()?, - ); + // connections_to_keep.insert( + // Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, + // "127.0.0.1:20001".parse()?, + // ); - connections_to_keep.insert( - Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, - "127.0.0.1:20002".parse()?, - ); + // connections_to_keep.insert( + // Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, + // "127.0.0.1:20002".parse()?, + // ); // this is the real streamer connections_to_keep.insert( literpc_validator_identity.pubkey(), - listen_addrs, + streamer_listen_addrs, ); // get information about the optional validator identity stake @@ -214,16 +199,19 @@ async fn start_literpc_client(listen_addrs: SocketAddr, literpc_validator_identi ) .await; - // TODO this is a rase + // TODO this is a race sleep(Duration::from_millis(1500)).await; for i in 0..SAMPLE_TX_COUNT { let raw_sample_tx = build_raw_sample_tx(i); - debug!("broadcast transaction {}: {}", raw_sample_tx.0, format!("hi {}", i)); + debug!("broadcast transaction {} to {} receivers: {}", + raw_sample_tx.0, broadcast_sender.receiver_count(), format!("hi {}", i)); + broadcast_sender.send(raw_sample_tx)?; } - debug!("end of lite-rpc client runtime: {}", broadcast_sender.len()); + sleep(Duration::from_secs(30)).await; + Ok(()) } @@ -259,7 +247,7 @@ async fn solana_quic_streamer_start() { let port = sock.local_addr().unwrap().port(); let tpu_addr = SocketAddr::new(addr, port); - sleep(Duration::from_millis(500)).await; + // sleep(Duration::from_millis(500)).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); @@ -303,7 +291,7 @@ impl SolanaQuicStreamer { gossip_host, sender, exit.clone(), - 1, + MAX_QUIC_CONNECTIONS_PER_PEER, staked_nodes.clone(), 10, 10,