Merge pull request #132 from blockworks-foundation/improving_bench
making a task for each transaction send, and tracking metrics correctly
This commit is contained in:
commit
58cccaf6ef
|
@ -18,4 +18,3 @@ dirs = "5.0.0"
|
|||
rand = "0.8.5"
|
||||
rand_chacha = "0.3.1"
|
||||
futures = { workspace = true }
|
||||
|
||||
|
|
|
@ -1,8 +1,4 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use std::{sync::Arc, collections::HashMap};
|
||||
|
||||
use bench::{
|
||||
cli::Args,
|
||||
|
@ -12,8 +8,11 @@ use bench::{
|
|||
use clap::Parser;
|
||||
use futures::future::join_all;
|
||||
use log::{error, info};
|
||||
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
|
||||
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature, signer::Signer};
|
||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_sdk::{commitment_config::CommitmentConfig, signer::Signer, signature::Keypair};
|
||||
use tokio::{
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
|
@ -31,20 +30,22 @@ async fn main() {
|
|||
|
||||
info!("Connecting to {lite_rpc_addr}");
|
||||
|
||||
let rpc_client = Arc::new(RpcClient::new_with_commitment(
|
||||
lite_rpc_addr,
|
||||
CommitmentConfig::confirmed(),
|
||||
));
|
||||
|
||||
let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap();
|
||||
|
||||
let mut avg_metric = AvgMetric::default();
|
||||
|
||||
let mut tasks = vec![];
|
||||
|
||||
for _ in 0..runs {
|
||||
let rpc_client = rpc_client.clone();
|
||||
tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count)));
|
||||
let funded_payer = BenchHelper::get_payer().await.unwrap();
|
||||
println!("payer : {}", funded_payer.pubkey());
|
||||
|
||||
let rpc_client = Arc::new(RpcClient::new_with_commitment(
|
||||
lite_rpc_addr.clone(),
|
||||
CommitmentConfig::confirmed(),
|
||||
));
|
||||
for seed in 0..runs {
|
||||
let funded_payer = Keypair::from_bytes(funded_payer.to_bytes().as_slice()).unwrap();
|
||||
tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count, funded_payer, seed as u64)));
|
||||
// wait for an interval
|
||||
run_interval_ms.tick().await;
|
||||
}
|
||||
|
@ -75,79 +76,48 @@ async fn main() {
|
|||
csv_writer.flush().unwrap();
|
||||
}
|
||||
|
||||
async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
|
||||
let funded_payer = BenchHelper::get_payer().await.unwrap();
|
||||
#[derive(Clone, Debug, Copy)]
|
||||
struct TxSendData {
|
||||
sent_duration: Duration,
|
||||
sent_instant: Instant,
|
||||
}
|
||||
|
||||
println!("payer {}", funded_payer.pubkey());
|
||||
async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize, funded_payer: Keypair, seed: u64) -> Metric {
|
||||
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
|
||||
|
||||
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None);
|
||||
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, Some(seed));
|
||||
|
||||
let mut un_confirmed_txs: HashMap<Signature, Option<Instant>> =
|
||||
HashMap::with_capacity(txs.len());
|
||||
|
||||
for tx in &txs {
|
||||
un_confirmed_txs.insert(*tx.get_signature(), None);
|
||||
}
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
info!("Sending and Confirming {tx_count} tx(s)",);
|
||||
|
||||
let send_fut = {
|
||||
let mut metric = Metric::default();
|
||||
let mut map_of_txs = HashMap::new();
|
||||
for tx in txs {
|
||||
let rpc_client = rpc_client.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
for tx in txs {
|
||||
rpc_client.send_transaction(&tx).await.unwrap();
|
||||
info!("Tx {}", &tx.signatures[0]);
|
||||
}
|
||||
info!("Sent {tx_count} tx(s)");
|
||||
|
||||
start_time.elapsed()
|
||||
})
|
||||
};
|
||||
|
||||
let confirm_fut = tokio::spawn(async move {
|
||||
let mut metrics = Metric::default();
|
||||
|
||||
while !un_confirmed_txs.is_empty() {
|
||||
let mut to_remove_txs = Vec::new();
|
||||
|
||||
for (sig, time_elapsed_since_last_confirmed) in un_confirmed_txs.iter_mut() {
|
||||
let sig = *sig;
|
||||
|
||||
if time_elapsed_since_last_confirmed.is_none() {
|
||||
*time_elapsed_since_last_confirmed = Some(Instant::now())
|
||||
let start_time = Instant::now();
|
||||
if let Ok(signature) = rpc_client.send_transaction(&tx).await {
|
||||
map_of_txs.insert( signature, TxSendData {
|
||||
sent_duration: start_time.elapsed(),
|
||||
sent_instant: Instant::now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
let confirmation_time = Instant::now();
|
||||
while confirmation_time.elapsed() < Duration::from_secs(60) && !map_of_txs.is_empty() {
|
||||
let signatures = map_of_txs.iter().map(|x| x.0.clone()).collect::<Vec<_>>();
|
||||
if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await {
|
||||
for i in 0..signatures.len() {
|
||||
let tx_status = &res.value[i];
|
||||
if let Some(_) = tx_status {
|
||||
let signature = signatures[i];
|
||||
let tx_data = map_of_txs.get(&signature).unwrap();
|
||||
metric.add_successful_transaction( tx_data.sent_duration, tx_data.sent_instant.elapsed());
|
||||
map_of_txs.remove(&signature);
|
||||
}
|
||||
|
||||
if rpc_client.confirm_transaction(&sig).await.unwrap() {
|
||||
metrics.txs_confirmed += 1;
|
||||
to_remove_txs.push(sig);
|
||||
} else if time_elapsed_since_last_confirmed.unwrap().elapsed()
|
||||
> Duration::from_secs(30)
|
||||
{
|
||||
metrics.txs_un_confirmed += 1;
|
||||
to_remove_txs.push(sig);
|
||||
}
|
||||
}
|
||||
|
||||
for to_remove_tx in to_remove_txs {
|
||||
un_confirmed_txs.remove(&to_remove_tx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics.total_time_elapsed_sec = start_time.elapsed().as_secs_f64();
|
||||
metrics.txs_sent = tx_count as u64;
|
||||
|
||||
metrics
|
||||
});
|
||||
|
||||
let (send_fut, confirm_fut) = tokio::join!(send_fut, confirm_fut);
|
||||
let time_to_send_txs = send_fut.unwrap();
|
||||
let mut metrics = confirm_fut.unwrap();
|
||||
metrics.time_to_send_txs = time_to_send_txs.as_secs_f64();
|
||||
metrics.calc_tps();
|
||||
|
||||
metrics
|
||||
for (_, tx) in map_of_txs {
|
||||
metric.add_unsuccessful_transaction(tx.sent_duration);
|
||||
}
|
||||
metric.finalize();
|
||||
metric
|
||||
}
|
||||
|
|
|
@ -1,13 +1,50 @@
|
|||
use std::ops::{AddAssign, DivAssign};
|
||||
use std::{
|
||||
ops::{AddAssign, DivAssign},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[derive(Debug, Default, serde::Serialize)]
|
||||
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
|
||||
pub struct Metric {
|
||||
pub total_time_elapsed_sec: f64,
|
||||
pub txs_sent: u64,
|
||||
pub time_to_send_txs: f64,
|
||||
pub txs_confirmed: u64,
|
||||
pub txs_un_confirmed: u64,
|
||||
pub tps: f64,
|
||||
pub average_confirmation_time_ms: f64,
|
||||
pub average_time_to_send_txs: f64,
|
||||
|
||||
#[serde(skip_serializing)]
|
||||
total_sent_time: Duration,
|
||||
#[serde(skip_serializing)]
|
||||
total_confirmation_time: Duration,
|
||||
}
|
||||
|
||||
impl Metric {
|
||||
pub fn add_successful_transaction(
|
||||
&mut self,
|
||||
time_to_send: Duration,
|
||||
time_to_confrim: Duration,
|
||||
) {
|
||||
self.total_sent_time += time_to_send;
|
||||
self.total_confirmation_time += time_to_confrim;
|
||||
|
||||
self.txs_confirmed += 1;
|
||||
self.txs_sent += 1;
|
||||
}
|
||||
|
||||
pub fn add_unsuccessful_transaction(&mut self, time_to_send: Duration) {
|
||||
self.total_sent_time += time_to_send;
|
||||
self.txs_un_confirmed += 1;
|
||||
self.txs_sent += 1;
|
||||
}
|
||||
|
||||
pub fn finalize(&mut self) {
|
||||
if self.txs_sent > 0 {
|
||||
self.average_time_to_send_txs = self.total_sent_time.as_millis() as f64 / self.txs_sent as f64;
|
||||
}
|
||||
|
||||
if self.txs_confirmed > 0 {
|
||||
self.average_confirmation_time_ms = self.total_confirmation_time.as_millis() as f64 / self.txs_confirmed as f64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -17,19 +54,20 @@ pub struct AvgMetric {
|
|||
}
|
||||
|
||||
impl Metric {
|
||||
pub fn calc_tps(&mut self) {
|
||||
self.tps = self.txs_confirmed as f64 / self.total_time_elapsed_sec
|
||||
pub fn calc_tps(&mut self) -> f64 {
|
||||
self.txs_confirmed as f64
|
||||
}
|
||||
}
|
||||
|
||||
impl AddAssign<&Self> for Metric {
|
||||
fn add_assign(&mut self, rhs: &Self) {
|
||||
self.total_time_elapsed_sec += rhs.total_time_elapsed_sec;
|
||||
self.txs_sent += rhs.txs_sent;
|
||||
self.time_to_send_txs += rhs.time_to_send_txs;
|
||||
self.txs_confirmed += rhs.txs_confirmed;
|
||||
self.txs_un_confirmed += rhs.txs_un_confirmed;
|
||||
self.tps += rhs.tps
|
||||
|
||||
self.total_confirmation_time += rhs.total_confirmation_time;
|
||||
self.total_sent_time += rhs.total_sent_time;
|
||||
self.finalize();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,12 +77,13 @@ impl DivAssign<u64> for Metric {
|
|||
if rhs == 0 {
|
||||
return;
|
||||
}
|
||||
self.total_time_elapsed_sec /= rhs as f64;
|
||||
self.txs_sent /= rhs;
|
||||
self.time_to_send_txs /= rhs as f64;
|
||||
self.txs_confirmed /= rhs;
|
||||
self.txs_un_confirmed /= rhs;
|
||||
self.tps /= rhs as f64;
|
||||
|
||||
self.total_confirmation_time = Duration::from_micros((self.total_confirmation_time.as_micros() / rhs as u128) as u64);
|
||||
self.total_sent_time = Duration::from_micros((self.total_sent_time.as_micros() / rhs as u128) as u64);
|
||||
self.finalize();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue