From 230171acea2af8c73bfea3c64798c85581c7a856 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Tue, 2 May 2023 17:06:20 +0200 Subject: [PATCH] save transaction in a seperate csv --- bench/src/cli.rs | 4 +- bench/src/main.rs | 76 ++++++++++++++++--- bench/src/metrics.rs | 11 +++ src/workers/block_listenser.rs | 3 +- .../tpu_utils/tpu_connection_manager.rs | 5 +- 5 files changed, 80 insertions(+), 19 deletions(-) diff --git a/bench/src/cli.rs b/bench/src/cli.rs index 39e2024a..21b7ee7a 100644 --- a/bench/src/cli.rs +++ b/bench/src/cli.rs @@ -4,7 +4,7 @@ use clap::{command, Parser}; #[command(author, version, about, long_about = None)] pub struct Args { /// Number of tx(s) sent in each run - #[arg(short = 't', long, default_value_t = 20_000)] + #[arg(short = 'n', long, default_value_t = 5_000)] pub tx_count: usize, /// Number of bench runs #[arg(short = 'r', long, default_value_t = 1)] @@ -18,4 +18,6 @@ pub struct Args { /// Lite Rpc Address #[arg(short = 'l', long, default_value_t = String::from("http://127.0.0.1:8890"))] pub lite_rpc_addr: String, + #[arg(short = 't', long, default_value_t = String::from("transactions.csv"))] + pub transaction_save_file: String, } diff --git a/bench/src/main.rs b/bench/src/main.rs index 000e7ac1..edd1176e 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -1,7 +1,7 @@ use bench::{ cli::Args, helpers::BenchHelper, - metrics::{AvgMetric, Metric}, + metrics::{AvgMetric, Metric, TxMetricData}, }; use clap::Parser; use dashmap::DashMap; @@ -10,10 +10,14 @@ use log::{error, info}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::{ commitment_config::CommitmentConfig, hash::Hash, signature::Keypair, signer::Signer, + slot_history::Slot, +}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, }; -use std::sync::Arc; use tokio::{ - sync::RwLock, + sync::{mpsc::UnboundedSender, RwLock}, time::{Duration, Instant}, }; @@ -27,14 +31,13 @@ async fn main() { run_interval_ms, metrics_file_name, lite_rpc_addr, + transaction_save_file, } = Args::parse(); let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms)); info!("Connecting to {lite_rpc_addr}"); - let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap(); - let mut avg_metric = AvgMetric::default(); let mut tasks = vec![]; @@ -47,11 +50,14 @@ async fn main() { CommitmentConfig::confirmed(), )); let bh = rpc_client.get_latest_blockhash().await.unwrap(); + let slot = rpc_client.get_slot().await.unwrap(); let block_hash: Arc> = Arc::new(RwLock::new(bh)); - let _jh = { + let current_slot = Arc::new(AtomicU64::new(slot)); + { // block hash updater task let block_hash = block_hash.clone(); let rpc_client = rpc_client.clone(); + let current_slot = current_slot.clone(); tokio::spawn(async move { loop { let bh = rpc_client.get_latest_blockhash().await; @@ -59,14 +65,41 @@ async fn main() { Ok(bh) => { let mut lock = block_hash.write().await; *lock = bh; - }, + } Err(e) => println!("blockhash update error {}", e), } - tokio::time::sleep(Duration::from_millis(500)).await; + + let slot = rpc_client.get_slot().await; + match slot { + Ok(slot) => { + current_slot.store(slot, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => println!("slot {}", e), + } + tokio::time::sleep(Duration::from_millis(100)).await; } }) }; + // transaction logger + let (tx_log_sx, mut tx_log_rx) = tokio::sync::mpsc::unbounded_channel::(); + let log_transactions = !transaction_save_file.is_empty(); + if log_transactions { + tokio::spawn(async move { + let mut tx_writer = csv::Writer::from_path(transaction_save_file).unwrap(); + loop { + match tx_log_rx.recv().await { + Some(x) => { + tx_writer.serialize(x).unwrap(); + } + None => { + break; + } + } + } + }); + } + for seed in 0..runs { let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap(); tasks.push(tokio::spawn(bench( @@ -75,6 +108,9 @@ async fn main() { funded_payer, seed as u64, block_hash.clone(), + current_slot.clone(), + tx_log_sx.clone(), + log_transactions, ))); // wait for an interval run_interval_ms.tick().await; @@ -83,6 +119,8 @@ async fn main() { let join_res = join_all(tasks).await; let mut run_num = 1; + + let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap(); for res in join_res { match res { Ok(metric) => { @@ -110,6 +148,7 @@ async fn main() { struct TxSendData { sent_duration: Duration, sent_instant: Instant, + sent_slot: Slot, } async fn bench( @@ -118,12 +157,16 @@ async fn bench( funded_payer: Keypair, seed: u64, block_hash: Arc>, + current_slot: Arc, + tx_metric_sx: UnboundedSender, + log_txs: bool, ) -> Metric { let map_of_txs = Arc::new(DashMap::new()); // transaction sender task { let map_of_txs = map_of_txs.clone(); let rpc_client = rpc_client.clone(); + let current_slot = current_slot.clone(); tokio::spawn(async move { let map_of_txs = map_of_txs.clone(); let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed)); @@ -138,6 +181,7 @@ async fn bench( TxSendData { sent_duration: start_time.elapsed(), sent_instant: Instant::now(), + sent_slot: current_slot.load(std::sync::atomic::Ordering::Relaxed), }, ); } @@ -166,10 +210,18 @@ async fn bench( if let Some(_) = tx_status { let signature = signatures[i]; let tx_data = map_of_txs.get(&signature).unwrap(); - metric.add_successful_transaction( - tx_data.sent_duration, - tx_data.sent_instant.elapsed(), - ); + let time_to_confirm = tx_data.sent_instant.elapsed(); + metric.add_successful_transaction(tx_data.sent_duration, time_to_confirm); + + if log_txs { + let _ = tx_metric_sx.send(TxMetricData { + signature: signature.to_string(), + sent_slot: tx_data.sent_slot, + confirmed_slot: current_slot.load(Ordering::Relaxed), + time_to_send_in_millis: tx_data.sent_duration.as_millis() as u64, + time_to_confirm_in_millis: time_to_confirm.as_millis() as u64, + }); + } drop(tx_data); map_of_txs.remove(&signature); confirmed_count += 1; diff --git a/bench/src/metrics.rs b/bench/src/metrics.rs index f5f9615d..b4677ee3 100644 --- a/bench/src/metrics.rs +++ b/bench/src/metrics.rs @@ -3,6 +3,8 @@ use std::{ time::Duration, }; +use solana_sdk::slot_history::Slot; + #[derive(Clone, Copy, Debug, Default, serde::Serialize)] pub struct Metric { pub txs_sent: u64, @@ -104,3 +106,12 @@ impl From for Metric { avg_metric.total_metric } } + +#[derive(Clone, Debug, Default, serde::Serialize)] +pub struct TxMetricData { + pub signature: String, + pub sent_slot: Slot, + pub confirmed_slot: Slot, + pub time_to_send_in_millis: u64, + pub time_to_confirm_in_millis: u64, +} diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index fe2cf779..8076b547 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -402,8 +402,7 @@ impl BlockListener { estimated_slot: Arc, ) -> JoinHandle> { let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel(); - let (block_schedule_queue_sx, block_schedule_queue_rx) = - async_channel::unbounded::(); + let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::(); // task to fetch blocks for _i in 0..8 { diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 7243b519..0513459d 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -15,9 +15,7 @@ use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, }; -use solana_sdk::{ - pubkey::Pubkey, -}; +use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; use tokio::{ sync::{broadcast::Receiver, broadcast::Sender, RwLock}, @@ -213,7 +211,6 @@ impl ActiveConnection { exit_signal: Arc, last_stable_id: Arc, ) { - let mut queue = VecDeque::new(); for tx in txs { queue.push_back(tx);