timeout in consumer thread

This commit is contained in:
GroovieGermanikus 2023-07-17 11:36:51 +02:00
parent dea39eebda
commit 71e828fa96
1 changed files with 32 additions and 23 deletions

View File

@ -9,9 +9,9 @@ use std::thread;
use std::time::{Duration, Instant};
use anyhow::bail;
use countmap::CountMap;
use crossbeam_channel::{Receiver, RecvError, Sender};
use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
use futures::future::join_all;
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use quinn::TokioRuntime;
use serde::de::Unexpected::Option;
use solana_rpc_client::rpc_client::SerializableTransaction;
@ -51,7 +51,7 @@ pub fn wireup_and_send_txs_via_channel() {
// value from solana - see quic streamer - see quic.rs -> rt()
const NUM_QUIC_STREAMER_WORKER_THREADS: usize = 1;
let runtime1 = Builder::new_multi_thread()
let runtime_quic1 = Builder::new_multi_thread()
.worker_threads(1)
.thread_name("quic-server")
.enable_all()
@ -59,7 +59,7 @@ pub fn wireup_and_send_txs_via_channel() {
.expect("failed to build tokio runtime for testing quic server");
// lite-rpc
let runtime2 = tokio::runtime::Builder::new_multi_thread()
let runtime_literpc = tokio::runtime::Builder::new_multi_thread()
// see lite-rpc -> main.rs
.worker_threads(16) // note: this value has changed with the "deadlock fix" - TODO experiment with it
.enable_all()
@ -73,7 +73,7 @@ pub fn wireup_and_send_txs_via_channel() {
let (inbound_packets_sender, inbound_packets_receiver) = crossbeam_channel::unbounded();
runtime1.block_on(async {
runtime_quic1.block_on(async {
/// setup solana Quic streamer
// see log "Start quic server on UdpSocket { addr: 127.0.0.1:xxxxx, fd: 10 }"
@ -97,7 +97,7 @@ pub fn wireup_and_send_txs_via_channel() {
});
runtime2.block_on(async {
runtime_literpc.block_on(async {
tokio::spawn(start_literpc_client(listen_addr, literpc_validator_identity));
});
@ -111,9 +111,19 @@ pub fn wireup_and_send_txs_via_channel() {
let mut packet_count2 = 0;
let mut count_map: CountMap<Signature> = CountMap::with_capacity(SAMPLE_TX_COUNT as usize);
const WARMUP_TX_COUNT: u32 = SAMPLE_TX_COUNT / 2;
while packet_count != SAMPLE_TX_COUNT {
let packet_batch: PacketBatch = inbound_packets_receiver.recv().expect("receive must succeed");
println!("packets: {}", packet_batch.len());
while packet_count < SAMPLE_TX_COUNT {
if timer.elapsed() > Duration::from_secs(5) {
warn!("timeout waiting for packet from quic streamer");
break;
}
let packet_batch = match inbound_packets_receiver.recv_timeout(Duration::from_millis(500)) {
Ok(batch) => batch,
Err(_) => {
debug!("consumer thread did not receive packets on inbound channel recently - continue polling");
continue;
}
};
if packet_count == 0 {
info!("time to first packet {}ms", time_to_first.elapsed().as_millis());
@ -143,7 +153,7 @@ pub fn wireup_and_send_txs_via_channel() {
} // -- while not all packets received - by count
let total_duration = timer.elapsed();
let half_duration = timer2.unwrap().elapsed();
let half_duration = timer2.map(|t| t.elapsed()).unwrap_or(Duration::from_secs(3333));
// throughput_50 is second half of transactions - should iron out startup effects
info!("consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, "
@ -157,7 +167,7 @@ pub fn wireup_and_send_txs_via_channel() {
assert_eq!(count_map.len() as u32, SAMPLE_TX_COUNT, "count_map size should be equal to SAMPLE_TX_COUNT");
assert!(count_map.values().all(|cnt| *cnt == 1), "all transactions should be unique");
runtime2.shutdown_timeout(Duration::from_millis(1000));
runtime_literpc.shutdown_timeout(Duration::from_millis(1000));
});
@ -213,18 +223,17 @@ async fn start_literpc_client(streamer_listen_addrs: SocketAddr, literpc_validat
// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
// TODO comment in
// let addr1 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap();
// connections_to_keep.insert(
// Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
// addr1,
// );
//
// let addr2 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap();
// connections_to_keep.insert(
// Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
// addr2,
// );
let addr1 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap();
connections_to_keep.insert(
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
let addr2 = UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap();
connections_to_keep.insert(
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
// this is the real streamer
connections_to_keep.insert(