From 71e828fa96fe3621dc810c5b2c0ec196462cfda4 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 17 Jul 2023 11:36:51 +0200 Subject: [PATCH] timeout in consumer thread --- ...literpc_tpu_quic_server_integrationtest.rs | 55 +++++++++++-------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index b6a44acb..5f017e50 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -9,9 +9,9 @@ use std::thread; use std::time::{Duration, Instant}; use anyhow::bail; use countmap::CountMap; -use crossbeam_channel::{Receiver, RecvError, Sender}; +use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; use futures::future::join_all; -use log::{debug, error, info, trace}; +use log::{debug, error, info, trace, warn}; use quinn::TokioRuntime; use serde::de::Unexpected::Option; use solana_rpc_client::rpc_client::SerializableTransaction; @@ -51,7 +51,7 @@ pub fn wireup_and_send_txs_via_channel() { // value from solana - see quic streamer - see quic.rs -> rt() const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1; - let runtime1 = Builder::new_multi_thread() + let runtime_quic1 = Builder::new_multi_thread() .worker_threads(1) .thread_name("quic-server") .enable_all() @@ -59,7 +59,7 @@ pub fn wireup_and_send_txs_via_channel() { .expect("failed to build tokio runtime for testing quic server"); // lite-rpc - let runtime2 = tokio::runtime::Builder::new_multi_thread() + let runtime_literpc = tokio::runtime::Builder::new_multi_thread() // see lite-rpc -> main.rs .worker_threads(16) // note: this value has changed with the "deadlock fix" - TODO experiment with it .enable_all() @@ -73,7 +73,7 @@ pub fn wireup_and_send_txs_via_channel() { let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded(); - runtime1.block_on(async { + runtime_quic1.block_on(async { /// setup solana Quic streamer // see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }" @@ -97,7 +97,7 @@ pub fn wireup_and_send_txs_via_channel() { }); - runtime2.block_on(async { + runtime_literpc.block_on(async { tokio::spawn(start_literpc_client(listen_addr, literpc_validator_identity)); }); @@ -111,9 +111,19 @@ pub fn wireup_and_send_txs_via_channel() { let mut packet_count2 = 0; let mut count_map: CountMap = CountMap::with_capacity(SAMPLE_TX_COUNT as usize); const WARMUP_TX_COUNT: u32 = SAMPLE_TX_COUNT / 2; - while packet_count != SAMPLE_TX_COUNT { - let packet_batch: PacketBatch = inbound_packets_receiver.recv().expect("receive must succeed"); - println!("packets: {}", packet_batch.len()); + while packet_count < SAMPLE_TX_COUNT { + if timer.elapsed() > Duration::from_secs(5) { + warn!("timeout waiting for packet from quic streamer"); + break; + } + + let packet_batch = match inbound_packets_receiver.recv_timeout(Duration::from_millis(500)) { + Ok(batch) => batch, + Err(_) => { + debug!("consumer thread did not receive packets on inbound channel recently - continue polling"); + continue; + } + }; if packet_count == 0 { info!("time to first packet {}ms", time_to_first.elapsed().as_millis()); @@ -143,7 +153,7 @@ pub fn wireup_and_send_txs_via_channel() { } // -- while not all packets received - by count let total_duration = timer.elapsed(); - let half_duration = timer2.unwrap().elapsed(); + let half_duration = timer2.map(|t| t.elapsed()).unwrap_or(Duration::from_secs(3333)); // throughput_50 is second half of transactions - should iron out startup effects info!("consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, " @@ -157,7 +167,7 @@ pub fn wireup_and_send_txs_via_channel() { assert_eq!(count_map.len() as u32, SAMPLE_TX_COUNT, "count_map size should be equal to SAMPLE_TX_COUNT"); assert!(count_map.values().all(|cnt| *cnt == 1), "all transactions should be unique"); - runtime2.shutdown_timeout(Duration::from_millis(1000)); + runtime_literpc.shutdown_timeout(Duration::from_millis(1000)); }); @@ -213,18 +223,17 @@ async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validat // this effectively controls how many connections we will have let mut connections_to_keep: HashMap = HashMap::new(); - // TODO comment in - // let addr1 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); - // connections_to_keep.insert( - // Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, - // addr1, - // ); - // - // let addr2 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); - // connections_to_keep.insert( - // Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, - // addr2, - // ); + let addr1 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); + connections_to_keep.insert( + Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?, + addr1, + ); + + let addr2 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap(); + connections_to_keep.insert( + Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?, + addr2, + ); // this is the real streamer connections_to_keep.insert(