Merge pull request #136 from blockworks-foundation/bench_save_transactions_in_csv

save transaction in a seperate csv
This commit is contained in:
galactus 2023-06-03 20:59:45 +02:00 committed by GitHub
commit a3aa9993b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 19 deletions

View File

@ -4,7 +4,7 @@ use clap::{command, Parser};
#[command(author, version, about, long_about = None)]
pub struct Args {
/// Number of tx(s) sent in each run
#[arg(short = 't', long, default_value_t = 20_000)]
#[arg(short = 'n', long, default_value_t = 5_000)]
pub tx_count: usize,
/// Number of bench runs
#[arg(short = 'r', long, default_value_t = 1)]
@ -18,4 +18,6 @@ pub struct Args {
/// Lite Rpc Address
#[arg(short = 'l', long, default_value_t = String::from("http://127.0.0.1:8890"))]
pub lite_rpc_addr: String,
#[arg(short = 't', long, default_value_t = String::from("transactions.csv"))]
pub transaction_save_file: String,
}

View File

@ -1,7 +1,7 @@
use bench::{
cli::Args,
helpers::BenchHelper,
metrics::{AvgMetric, Metric},
metrics::{AvgMetric, Metric, TxMetricData},
};
use clap::Parser;
use dashmap::DashMap;
@ -10,10 +10,14 @@ use log::{error, info};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
commitment_config::CommitmentConfig, hash::Hash, signature::Keypair, signer::Signer,
slot_history::Slot,
};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::sync::Arc;
use tokio::{
sync::RwLock,
sync::{mpsc::UnboundedSender, RwLock},
time::{Duration, Instant},
};
@ -27,14 +31,13 @@ async fn main() {
run_interval_ms,
metrics_file_name,
lite_rpc_addr,
transaction_save_file,
} = Args::parse();
let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms));
info!("Connecting to {lite_rpc_addr}");
let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap();
let mut avg_metric = AvgMetric::default();
let mut tasks = vec![];
@ -47,11 +50,14 @@ async fn main() {
CommitmentConfig::confirmed(),
));
let bh = rpc_client.get_latest_blockhash().await.unwrap();
let slot = rpc_client.get_slot().await.unwrap();
let block_hash: Arc<RwLock<Hash>> = Arc::new(RwLock::new(bh));
let _jh = {
let current_slot = Arc::new(AtomicU64::new(slot));
{
// block hash updater task
let block_hash = block_hash.clone();
let rpc_client = rpc_client.clone();
let current_slot = current_slot.clone();
tokio::spawn(async move {
loop {
let bh = rpc_client.get_latest_blockhash().await;
@ -59,14 +65,41 @@ async fn main() {
Ok(bh) => {
let mut lock = block_hash.write().await;
*lock = bh;
},
}
Err(e) => println!("blockhash update error {}", e),
}
tokio::time::sleep(Duration::from_millis(500)).await;
let slot = rpc_client.get_slot().await;
match slot {
Ok(slot) => {
current_slot.store(slot, std::sync::atomic::Ordering::Relaxed);
}
Err(e) => println!("slot {}", e),
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
};
// transaction logger
let (tx_log_sx, mut tx_log_rx) = tokio::sync::mpsc::unbounded_channel::<TxMetricData>();
let log_transactions = !transaction_save_file.is_empty();
if log_transactions {
tokio::spawn(async move {
let mut tx_writer = csv::Writer::from_path(transaction_save_file).unwrap();
loop {
match tx_log_rx.recv().await {
Some(x) => {
tx_writer.serialize(x).unwrap();
}
None => {
break;
}
}
}
});
}
for seed in 0..runs {
let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap();
tasks.push(tokio::spawn(bench(
@ -75,6 +108,9 @@ async fn main() {
funded_payer,
seed as u64,
block_hash.clone(),
current_slot.clone(),
tx_log_sx.clone(),
log_transactions,
)));
// wait for an interval
run_interval_ms.tick().await;
@ -83,6 +119,8 @@ async fn main() {
let join_res = join_all(tasks).await;
let mut run_num = 1;
let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap();
for res in join_res {
match res {
Ok(metric) => {
@ -110,6 +148,7 @@ async fn main() {
struct TxSendData {
sent_duration: Duration,
sent_instant: Instant,
sent_slot: Slot,
}
async fn bench(
@ -118,12 +157,16 @@ async fn bench(
funded_payer: Keypair,
seed: u64,
block_hash: Arc<RwLock<Hash>>,
current_slot: Arc<AtomicU64>,
tx_metric_sx: UnboundedSender<TxMetricData>,
log_txs: bool,
) -> Metric {
let map_of_txs = Arc::new(DashMap::new());
// transaction sender task
{
let map_of_txs = map_of_txs.clone();
let rpc_client = rpc_client.clone();
let current_slot = current_slot.clone();
tokio::spawn(async move {
let map_of_txs = map_of_txs.clone();
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed));
@ -138,6 +181,7 @@ async fn bench(
TxSendData {
sent_duration: start_time.elapsed(),
sent_instant: Instant::now(),
sent_slot: current_slot.load(std::sync::atomic::Ordering::Relaxed),
},
);
}
@ -166,10 +210,18 @@ async fn bench(
if let Some(_) = tx_status {
let signature = signatures[i];
let tx_data = map_of_txs.get(&signature).unwrap();
metric.add_successful_transaction(
tx_data.sent_duration,
tx_data.sent_instant.elapsed(),
);
let time_to_confirm = tx_data.sent_instant.elapsed();
metric.add_successful_transaction(tx_data.sent_duration, time_to_confirm);
if log_txs {
let _ = tx_metric_sx.send(TxMetricData {
signature: signature.to_string(),
sent_slot: tx_data.sent_slot,
confirmed_slot: current_slot.load(Ordering::Relaxed),
time_to_send_in_millis: tx_data.sent_duration.as_millis() as u64,
time_to_confirm_in_millis: time_to_confirm.as_millis() as u64,
});
}
drop(tx_data);
map_of_txs.remove(&signature);
confirmed_count += 1;

View File

@ -3,6 +3,8 @@ use std::{
time::Duration,
};
use solana_sdk::slot_history::Slot;
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
pub struct Metric {
pub txs_sent: u64,
@ -104,3 +106,12 @@ impl From<AvgMetric> for Metric {
avg_metric.total_metric
}
}
#[derive(Clone, Debug, Default, serde::Serialize)]
pub struct TxMetricData {
pub signature: String,
pub sent_slot: Slot,
pub confirmed_slot: Slot,
pub time_to_send_in_millis: u64,
pub time_to_confirm_in_millis: u64,
}

View File

@ -402,8 +402,7 @@ impl BlockListener {
estimated_slot: Arc<AtomicU64>,
) -> JoinHandle<anyhow::Result<()>> {
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
let (block_schedule_queue_sx, block_schedule_queue_rx) =
async_channel::unbounded::<Slot>();
let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::<Slot>();
// task to fetch blocks
for _i in 0..8 {

View File

@ -15,9 +15,7 @@ use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig,
};
use solana_sdk::{
pubkey::Pubkey,
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::{
sync::{broadcast::Receiver, broadcast::Sender, RwLock},
@ -213,7 +211,6 @@ impl ActiveConnection {
exit_signal: Arc<AtomicBool>,
last_stable_id: Arc<AtomicU64>,
) {
let mut queue = VecDeque::new();
for tx in txs {
queue.push_back(tx);