create stats

This commit is contained in:
GroovieGermanikus 2023-07-01 12:48:14 +02:00
parent 89d6c343af
commit 93a84df68b
1 changed files with 38 additions and 5 deletions

View File

@ -13,6 +13,7 @@ use futures::future::join_all;
use itertools::join;
use log::{debug, error, info};
use quinn::TokioRuntime;
use serde::de::Unexpected::Option;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::Instruction;
@ -37,16 +38,22 @@ use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
use solana_lite_rpc_core::tx_store::empty_tx_store;
use solana_lite_rpc_services::tpu_utils::tpu_connection_manager::TpuConnectionManager;
const SAMPLE_TX_COUNT: u32 = 10;
// note: logging will be auto-adjusted
const SAMPLE_TX_COUNT: u32 = 1000;
const MAXIMUM_TRANSACTIONS_IN_QUEUE: usize = 200_000;
const MAX_QUIC_CONNECTIONS_PER_PEER: usize = 2; // prod=8
#[test]
pub fn wireup_and_send_txs_via_channel() {
let env_filter = if SAMPLE_TX_COUNT < 100 {
"trace,rustls=info"
} else {
"trace,rustls=info,quinn_proto=debug"
};
tracing_subscriber::fmt::fmt()
.with_max_level(LevelFilter::DEBUG)
// .with_max_level(LevelFilter::DEBUG)
.with_env_filter(env_filter)
.init();
// solana quic streamer - see quic.rs -> rt()
@ -105,20 +112,36 @@ pub fn wireup_and_send_txs_via_channel() {
let packet_consumer_jh = thread::spawn(move || {
info!("start pulling packets...");
let mut packet_count = 0;
let time_to_first = Instant::now();
let timer = Instant::now();
// second half
let mut timer2 = None;
let mut packet_count2 = 0;
const WARMUP_TX_COUNT: u32 = SAMPLE_TX_COUNT / 2;
while packet_count != SAMPLE_TX_COUNT {
match inbound_packets_receiver.recv() {
Ok(packet_batch) => {
if packet_count == 0 {
info!("time to first packet {}ms", time_to_first.elapsed().as_millis());
}
packet_count = packet_count + packet_batch.len() as u32;
if timer2.is_some() {
packet_count2 = packet_count2 + packet_batch.len() as u32;
}
for packet in packet_batch.iter() {
let tx = packet.deserialize_slice::<VersionedTransaction, _>(..).unwrap();
debug!("read transaction from quic streaming server: {:?}", tx.get_signature());
// debug!("read transaction from quic streaming server: {:?}", tx.get_signature());
// for ix in tx.message.instructions() {
// info!("instruction: {:?}", ix.data);
// }
}
info!("received packets so far: {}", packet_count);
if packet_count == WARMUP_TX_COUNT {
timer2 = Some(Instant::now());
}
// info!("received packets so far: {}", packet_count);
if packet_count == SAMPLE_TX_COUNT {
break;
}
@ -129,6 +152,16 @@ pub fn wireup_and_send_txs_via_channel() {
}
}
let total_duration = timer.elapsed();
let half_duration = timer2.unwrap().elapsed();
// throughput_50 is second half of transactions - should iron out startup effects
info!("consumed {} packets in {}us - throughput {:.2} tps, throughput_50 {:.2} tps, "
, packet_count, total_duration.as_micros(),
packet_count as f64 / total_duration.as_secs_f64(),
packet_count2 as f64 / half_duration.as_secs_f64(),
);
info!("got all expected packets - shutting down tokio runtime with lite-rpc client");
runtime2.shutdown_timeout(Duration::from_millis(100));
});