making a task for each transaction send, and tracking metrics correctly

This commit is contained in:
Godmode Galactus 2023-04-26 16:36:34 +02:00
parent 7941855c74
commit f17b346e08
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 73 additions and 86 deletions

View File

@ -1,8 +1,4 @@
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use std::sync::Arc;
use bench::{
cli::Args,
@ -12,8 +8,12 @@ use bench::{
use clap::Parser;
use futures::future::join_all;
use log::{error, info};
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature, signer::Signer};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{commitment_config::CommitmentConfig, signer::Signer};
use tokio::{
sync::Mutex,
time::{Duration, Instant},
};
#[tokio::main]
async fn main() {
@ -31,11 +31,6 @@ async fn main() {
info!("Connecting to {lite_rpc_addr}");
let rpc_client = Arc::new(RpcClient::new_with_commitment(
lite_rpc_addr,
CommitmentConfig::confirmed(),
));
let mut csv_writer = csv::Writer::from_path(metrics_file_name).unwrap();
let mut avg_metric = AvgMetric::default();
@ -43,7 +38,10 @@ async fn main() {
let mut tasks = vec![];
for _ in 0..runs {
let rpc_client = rpc_client.clone();
let rpc_client = Arc::new(RpcClient::new_with_commitment(
lite_rpc_addr.clone(),
CommitmentConfig::confirmed(),
));
tasks.push(tokio::spawn(bench(rpc_client.clone(), tx_count)));
// wait for an interval
run_interval_ms.tick().await;
@ -83,71 +81,36 @@ async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None);
let mut un_confirmed_txs: HashMap<Signature, Option<Instant>> =
HashMap::with_capacity(txs.len());
for tx in &txs {
un_confirmed_txs.insert(*tx.get_signature(), None);
}
let start_time = Instant::now();
info!("Sending and Confirming {tx_count} tx(s)",);
let send_fut = {
let metric = Arc::new(Mutex::new(Metric::default()));
let mut tx_sent_tasks = vec![];
for tx in txs {
let rpc_client = rpc_client.clone();
tokio::spawn(async move {
for tx in txs {
rpc_client.send_transaction(&tx).await.unwrap();
info!("Tx {}", &tx.signatures[0]);
}
info!("Sent {tx_count} tx(s)");
start_time.elapsed()
})
};
let confirm_fut = tokio::spawn(async move {
let mut metrics = Metric::default();
while !un_confirmed_txs.is_empty() {
let mut to_remove_txs = Vec::new();
for (sig, time_elapsed_since_last_confirmed) in un_confirmed_txs.iter_mut() {
let sig = *sig;
if time_elapsed_since_last_confirmed.is_none() {
*time_elapsed_since_last_confirmed = Some(Instant::now())
}
if rpc_client.confirm_transaction(&sig).await.unwrap() {
metrics.txs_confirmed += 1;
to_remove_txs.push(sig);
} else if time_elapsed_since_last_confirmed.unwrap().elapsed()
> Duration::from_secs(30)
{
metrics.txs_un_confirmed += 1;
to_remove_txs.push(sig);
let metric = metric.clone();
let task = tokio::spawn(async move {
let start_time = Instant::now();
let signature = rpc_client.send_transaction(&tx).await.unwrap();
let sent_duration = start_time.elapsed();
let mut confirmed = false;
while start_time.elapsed() < Duration::from_secs(60) {
tokio::time::sleep(Duration::from_millis(1)).await;
if let Ok(b) = rpc_client.confirm_transaction(&signature).await {
if b == true {
confirmed = true;
break;
}
}
}
for to_remove_tx in to_remove_txs {
un_confirmed_txs.remove(&to_remove_tx);
let send_and_confirm_time = start_time.elapsed();
let mut metric = metric.lock().await;
if confirmed {
metric.add_successful_transaction(sent_duration, send_and_confirm_time);
} else {
metric.add_unsuccessful_transaction();
}
}
metrics.total_time_elapsed_sec = start_time.elapsed().as_secs_f64();
metrics.txs_sent = tx_count as u64;
metrics
});
let (send_fut, confirm_fut) = tokio::join!(send_fut, confirm_fut);
let time_to_send_txs = send_fut.unwrap();
let mut metrics = confirm_fut.unwrap();
metrics.time_to_send_txs = time_to_send_txs.as_secs_f64();
metrics.calc_tps();
metrics
});
tx_sent_tasks.push(task);
}
futures::future::join_all(tx_sent_tasks).await;
let m = *metric.lock().await;
m
}

View File

@ -1,13 +1,39 @@
use std::ops::{AddAssign, DivAssign};
use std::{
ops::{AddAssign, Div, DivAssign, Mul},
time::Duration,
};
#[derive(Debug, Default, serde::Serialize)]
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
pub struct Metric {
pub total_time_elapsed_sec: f64,
pub txs_sent: u64,
pub time_to_send_txs: f64,
pub txs_confirmed: u64,
pub txs_un_confirmed: u64,
pub tps: f64,
pub average_confirmation_time_ms: f64,
}
impl Metric {
pub fn add_successful_transaction(
&mut self,
time_to_send: Duration,
time_to_send_and_confrim: Duration,
) {
self.time_to_send_txs = (self.time_to_send_txs.mul(self.txs_confirmed as f64)
+ time_to_send.as_millis() as f64)
.div(self.txs_confirmed as f64 + 1.0);
self.average_confirmation_time_ms = (self
.average_confirmation_time_ms
.mul(self.txs_confirmed as f64)
+ time_to_send_and_confrim.as_millis() as f64)
.div(self.txs_confirmed as f64 + 1.0);
self.txs_confirmed += 1;
self.txs_sent += 1;
}
pub fn add_unsuccessful_transaction(&mut self) {
self.txs_un_confirmed += 1;
self.txs_sent += 1;
}
}
#[derive(Default)]
@ -17,19 +43,18 @@ pub struct AvgMetric {
}
impl Metric {
pub fn calc_tps(&mut self) {
self.tps = self.txs_confirmed as f64 / self.total_time_elapsed_sec
pub fn calc_tps(&mut self) -> f64 {
self.txs_confirmed as f64
}
}
impl AddAssign<&Self> for Metric {
fn add_assign(&mut self, rhs: &Self) {
self.total_time_elapsed_sec += rhs.total_time_elapsed_sec;
self.txs_sent += rhs.txs_sent;
self.time_to_send_txs += rhs.time_to_send_txs;
self.txs_confirmed += rhs.txs_confirmed;
self.txs_un_confirmed += rhs.txs_un_confirmed;
self.tps += rhs.tps
self.average_confirmation_time_ms += rhs.average_confirmation_time_ms;
}
}
@ -39,12 +64,11 @@ impl DivAssign<u64> for Metric {
if rhs == 0 {
return;
}
self.total_time_elapsed_sec /= rhs as f64;
self.txs_sent /= rhs;
self.time_to_send_txs /= rhs as f64;
self.txs_confirmed /= rhs;
self.txs_un_confirmed /= rhs;
self.tps /= rhs as f64;
self.average_confirmation_time_ms /= rhs as f64;
}
}