save transaction in a seperate csv
This commit is contained in:
parent
44922e3d7f
commit
230171acea
|
@ -4,7 +4,7 @@ use clap::{command, Parser};
|
|||
#[command(author, version, about, long_about = None)]
|
||||
pub struct Args {
|
||||
/// Number of tx(s) sent in each run
|
||||
#[arg(short = 't', long, default_value_t = 20_000)]
|
||||
#[arg(short = 'n', long, default_value_t = 5_000)]
|
||||
pub tx_count: usize,
|
||||
/// Number of bench runs
|
||||
#[arg(short = 'r', long, default_value_t = 1)]
|
||||
|
@ -18,4 +18,6 @@ pub struct Args {
|
|||
/// Lite Rpc Address
|
||||
#[arg(short = 'l', long, default_value_t = String::from("http://127.0.0.1:8890"))]
|
||||
pub lite_rpc_addr: String,
|
||||
#[arg(short = 't', long, default_value_t = String::from("transactions.csv"))]
|
||||
pub transaction_save_file: String,
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use bench::{
|
||||
cli::Args,
|
||||
helpers::BenchHelper,
|
||||
metrics::{AvgMetric, Metric},
|
||||
metrics::{AvgMetric, Metric, TxMetricData},
|
||||
};
|
||||
use clap::Parser;
|
||||
use dashmap::DashMap;
|
||||
|
@ -10,10 +10,14 @@ use log::{error, info};
|
|||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_sdk::{
|
||||
commitment_config::CommitmentConfig, hash::Hash, signature::Keypair, signer::Signer,
|
||||
slot_history::Slot,
|
||||
};
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tokio::{
|
||||
sync::RwLock,
|
||||
sync::{mpsc::UnboundedSender, RwLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
|
@ -27,14 +31,13 @@ async fn main() {
|
|||
run_interval_ms,
|
||||
metrics_file_name,
|
||||
lite_rpc_addr,
|
||||
transaction_save_file,
|
||||
} = Args::parse();
|
||||
|
||||
let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms));
|
||||
|
||||
info!("Connecting to {lite_rpc_addr}");
|
||||
|
||||
let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap();
|
||||
|
||||
let mut avg_metric = AvgMetric::default();
|
||||
|
||||
let mut tasks = vec![];
|
||||
|
@ -47,11 +50,14 @@ async fn main() {
|
|||
CommitmentConfig::confirmed(),
|
||||
));
|
||||
let bh = rpc_client.get_latest_blockhash().await.unwrap();
|
||||
let slot = rpc_client.get_slot().await.unwrap();
|
||||
let block_hash: Arc<RwLock<Hash>> = Arc::new(RwLock::new(bh));
|
||||
let _jh = {
|
||||
let current_slot = Arc::new(AtomicU64::new(slot));
|
||||
{
|
||||
// block hash updater task
|
||||
let block_hash = block_hash.clone();
|
||||
let rpc_client = rpc_client.clone();
|
||||
let current_slot = current_slot.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let bh = rpc_client.get_latest_blockhash().await;
|
||||
|
@ -59,14 +65,41 @@ async fn main() {
|
|||
Ok(bh) => {
|
||||
let mut lock = block_hash.write().await;
|
||||
*lock = bh;
|
||||
},
|
||||
}
|
||||
Err(e) => println!("blockhash update error {}", e),
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
|
||||
let slot = rpc_client.get_slot().await;
|
||||
match slot {
|
||||
Ok(slot) => {
|
||||
current_slot.store(slot, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
Err(e) => println!("slot {}", e),
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
// transaction logger
|
||||
let (tx_log_sx, mut tx_log_rx) = tokio::sync::mpsc::unbounded_channel::<TxMetricData>();
|
||||
let log_transactions = !transaction_save_file.is_empty();
|
||||
if log_transactions {
|
||||
tokio::spawn(async move {
|
||||
let mut tx_writer = csv::Writer::from_path(transaction_save_file).unwrap();
|
||||
loop {
|
||||
match tx_log_rx.recv().await {
|
||||
Some(x) => {
|
||||
tx_writer.serialize(x).unwrap();
|
||||
}
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for seed in 0..runs {
|
||||
let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap();
|
||||
tasks.push(tokio::spawn(bench(
|
||||
|
@ -75,6 +108,9 @@ async fn main() {
|
|||
funded_payer,
|
||||
seed as u64,
|
||||
block_hash.clone(),
|
||||
current_slot.clone(),
|
||||
tx_log_sx.clone(),
|
||||
log_transactions,
|
||||
)));
|
||||
// wait for an interval
|
||||
run_interval_ms.tick().await;
|
||||
|
@ -83,6 +119,8 @@ async fn main() {
|
|||
let join_res = join_all(tasks).await;
|
||||
|
||||
let mut run_num = 1;
|
||||
|
||||
let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap();
|
||||
for res in join_res {
|
||||
match res {
|
||||
Ok(metric) => {
|
||||
|
@ -110,6 +148,7 @@ async fn main() {
|
|||
struct TxSendData {
|
||||
sent_duration: Duration,
|
||||
sent_instant: Instant,
|
||||
sent_slot: Slot,
|
||||
}
|
||||
|
||||
async fn bench(
|
||||
|
@ -118,12 +157,16 @@ async fn bench(
|
|||
funded_payer: Keypair,
|
||||
seed: u64,
|
||||
block_hash: Arc<RwLock<Hash>>,
|
||||
current_slot: Arc<AtomicU64>,
|
||||
tx_metric_sx: UnboundedSender<TxMetricData>,
|
||||
log_txs: bool,
|
||||
) -> Metric {
|
||||
let map_of_txs = Arc::new(DashMap::new());
|
||||
// transaction sender task
|
||||
{
|
||||
let map_of_txs = map_of_txs.clone();
|
||||
let rpc_client = rpc_client.clone();
|
||||
let current_slot = current_slot.clone();
|
||||
tokio::spawn(async move {
|
||||
let map_of_txs = map_of_txs.clone();
|
||||
let rand_strings = BenchHelper::generate_random_strings(tx_count, Some(seed));
|
||||
|
@ -138,6 +181,7 @@ async fn bench(
|
|||
TxSendData {
|
||||
sent_duration: start_time.elapsed(),
|
||||
sent_instant: Instant::now(),
|
||||
sent_slot: current_slot.load(std::sync::atomic::Ordering::Relaxed),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
@ -166,10 +210,18 @@ async fn bench(
|
|||
if let Some(_) = tx_status {
|
||||
let signature = signatures[i];
|
||||
let tx_data = map_of_txs.get(&signature).unwrap();
|
||||
metric.add_successful_transaction(
|
||||
tx_data.sent_duration,
|
||||
tx_data.sent_instant.elapsed(),
|
||||
);
|
||||
let time_to_confirm = tx_data.sent_instant.elapsed();
|
||||
metric.add_successful_transaction(tx_data.sent_duration, time_to_confirm);
|
||||
|
||||
if log_txs {
|
||||
let _ = tx_metric_sx.send(TxMetricData {
|
||||
signature: signature.to_string(),
|
||||
sent_slot: tx_data.sent_slot,
|
||||
confirmed_slot: current_slot.load(Ordering::Relaxed),
|
||||
time_to_send_in_millis: tx_data.sent_duration.as_millis() as u64,
|
||||
time_to_confirm_in_millis: time_to_confirm.as_millis() as u64,
|
||||
});
|
||||
}
|
||||
drop(tx_data);
|
||||
map_of_txs.remove(&signature);
|
||||
confirmed_count += 1;
|
||||
|
|
|
@ -3,6 +3,8 @@ use std::{
|
|||
time::Duration,
|
||||
};
|
||||
|
||||
use solana_sdk::slot_history::Slot;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
|
||||
pub struct Metric {
|
||||
pub txs_sent: u64,
|
||||
|
@ -104,3 +106,12 @@ impl From<AvgMetric> for Metric {
|
|||
avg_metric.total_metric
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, serde::Serialize)]
|
||||
pub struct TxMetricData {
|
||||
pub signature: String,
|
||||
pub sent_slot: Slot,
|
||||
pub confirmed_slot: Slot,
|
||||
pub time_to_send_in_millis: u64,
|
||||
pub time_to_confirm_in_millis: u64,
|
||||
}
|
||||
|
|
|
@ -402,8 +402,7 @@ impl BlockListener {
|
|||
estimated_slot: Arc<AtomicU64>,
|
||||
) -> JoinHandle<anyhow::Result<()>> {
|
||||
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (block_schedule_queue_sx, block_schedule_queue_rx) =
|
||||
async_channel::unbounded::<Slot>();
|
||||
let (block_schedule_queue_sx, block_schedule_queue_rx) = async_channel::unbounded::<Slot>();
|
||||
|
||||
// task to fetch blocks
|
||||
for _i in 0..8 {
|
||||
|
|
|
@ -15,9 +15,7 @@ use quinn::{
|
|||
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
|
||||
TokioRuntime, TransportConfig,
|
||||
};
|
||||
use solana_sdk::{
|
||||
pubkey::Pubkey,
|
||||
};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
|
||||
use tokio::{
|
||||
sync::{broadcast::Receiver, broadcast::Sender, RwLock},
|
||||
|
@ -213,7 +211,6 @@ impl ActiveConnection {
|
|||
exit_signal: Arc<AtomicBool>,
|
||||
last_stable_id: Arc<AtomicU64>,
|
||||
) {
|
||||
|
||||
let mut queue = VecDeque::new();
|
||||
for tx in txs {
|
||||
queue.push_back(tx);
|
||||
|
|
Loading…
Reference in New Issue