refactor bencher to collect stats

This commit is contained in:
Maximilian Schneider 2023-05-14 23:37:26 +02:00
parent c4cf616eb0
commit 46ece646df
8 changed files with 111 additions and 106 deletions

1
Cargo.lock generated
View File

@ -3595,6 +3595,7 @@ dependencies = [
"dashmap",
"derive_more",
"futures",
"itertools",
"lazy_static",
"log",
"pretty_env_logger",

View File

@ -32,4 +32,5 @@ pretty_env_logger = "0.4.0"
tracing-subscriber = "0.3.17"
spl-token = "3.5.0"
solana-client = "1.15.2"
solana-client = "1.15.2"
itertools = "0.10.5"

View File

@ -1,54 +1,82 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use itertools::Itertools;
use serde::Serialize;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use crate::cli::Args;
use crate::metrics::{Metric, PartialMetric};
#[async_trait::async_trait]
pub trait BenchFn: Send + 'static {
async fn new(rpc_client: Arc<RpcClient>) -> anyhow::Result<Self>
pub trait Benchmark: Send + 'static {
async fn prepare(rpc_client: Arc<RpcClient>) -> anyhow::Result<Self>
where
Self: Sized;
async fn bench_fn(&mut self, rpc_client: Arc<RpcClient>) -> anyhow::Result<()>;
async fn run(&mut self, rpc_client: Arc<RpcClient>, duration: Duration) -> anyhow::Result<Run>;
}
#[derive(Default, Serialize)]
pub struct Run {
pub requests_completed: u64,
pub requests_failed: u64,
pub bytes_sent: u64,
pub bytes_received: u64,
pub errors: Vec<String>,
}
#[derive(Default, Serialize)]
pub struct Stats {
pub total_requests: u64,
pub requests_per_second: f64,
pub time_per_request: f64,
pub total_transferred: u64,
pub top_5_errors: Vec<(String, usize)>,
#[serde(flatten)]
pub all_runs: Vec<Run>,
}
pub struct Bencher;
impl Bencher {
pub async fn bench<B: BenchFn>(args: Args) -> anyhow::Result<Metric> {
pub async fn bench<B: Benchmark>(args: Args) -> anyhow::Result<Stats> {
let start = Instant::now();
let futs = (0..args.threads).map(|_| {
let rpc_client = args.get_rpc_client();
let duration = args.get_duration_to_run_test();
tokio::spawn(async move {
let mut bench_fn = B::new(rpc_client.clone()).await.unwrap();
let mut part_metric = PartialMetric::default();
let thread_start = tokio::time::Instant::now();
while thread_start.elapsed() <= duration {
let Err(err) = bench_fn.bench_fn(rpc_client.clone()).await else {
part_metric.passed += 1;
continue;
};
part_metric.failed += 1;
log::warn!("{err}");
}
part_metric.total_time = thread_start.elapsed();
Metric::from(part_metric)
let mut benchmark = B::prepare(rpc_client.clone()).await.unwrap();
benchmark.run(rpc_client.clone(), duration).await.unwrap()
})
});
let avg_metric = futures::future::try_join_all(futs)
.await?
.into_iter()
.sum::<Metric>()
/ args.threads;
let all_results = futures::future::try_join_all(futs).await?;
Ok(avg_metric)
let time = start.elapsed();
let total_requests = all_results
.iter()
.fold(0, |acc, r| acc + r.requests_completed + r.requests_failed);
let total_transferred = all_results
.iter()
.fold(0, |acc, r| acc + r.bytes_sent + r.bytes_received);
let all_errors = all_results.iter().flat_map(|r| &r.errors).counts();
let top_5_errors = all_errors
.iter()
.sorted_by_key(|(_e, c)| *c)
.rev()
.take(5)
.map(|(e, c)| ((*e).clone(), c.clone()))
.collect_vec();
Ok(Stats {
total_requests,
requests_per_second: total_requests as f64 / time.as_secs_f64(),
time_per_request: time.as_secs_f64() / total_requests as f64,
total_transferred,
top_5_errors,
all_runs: all_results,
})
}
}

View File

@ -51,7 +51,7 @@ pub struct Args {
#[arg(short = 'd', long, default_value_t = 60)]
pub duration_in_seconds: u64,
#[arg(short = 't', long, default_value_t = 4)]
#[arg(short = 't', long, default_value_t = 32)]
pub threads: u64,
#[arg(short = 'p', long)]

View File

@ -1,7 +1,6 @@
pub mod bencher;
mod cli;
mod config;
mod metrics;
mod openbook;
mod solana_runtime;
mod test_registry;

View File

@ -1,59 +0,0 @@
use derive_more::{Add, Sum};
use std::time::Duration;
use serde::Serialize;
#[derive(Default, Serialize, Add, Sum)]
pub struct Metric {
pub requests_per_second: f64,
pub time_per_request: f64,
pub total_transferred: u64,
#[serde(flatten)]
pub partial_metric: PartialMetric,
}
#[derive(Default, Serialize, Add, Sum)]
pub struct PartialMetric {
pub total_time: Duration,
pub passed: u64,
pub failed: u64,
}
impl std::ops::Div<u64> for Metric {
type Output = Self;
fn div(self, rhs: u64) -> Self::Output {
Self {
requests_per_second: self.requests_per_second / rhs as f64,
time_per_request: self.time_per_request / rhs as f64,
total_transferred: self.total_transferred / rhs,
partial_metric: self.partial_metric / rhs,
}
}
}
impl std::ops::Div<u64> for PartialMetric {
type Output = Self;
fn div(self, rhs: u64) -> Self::Output {
Self {
total_time: Duration::from_secs_f64(self.total_time.as_secs_f64() / rhs as f64),
passed: self.passed / rhs,
failed: self.failed / rhs,
}
}
}
impl From<PartialMetric> for Metric {
fn from(partial_metric: PartialMetric) -> Self {
let total_transferred = partial_metric.passed + partial_metric.failed;
let total_time_secs = partial_metric.total_time.as_secs_f64();
Metric {
requests_per_second: total_transferred as f64 / total_time_secs,
time_per_request: total_time_secs / total_transferred as f64,
total_transferred,
partial_metric,
}
}
}

View File

@ -1,11 +1,14 @@
use std::sync::Arc;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use log::info;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::slot_history::Slot;
use crate::{
bencher::{BenchFn, Bencher},
bencher::{Bencher, Benchmark, Run},
cli::Args,
config::Config,
test_registry::TestingTask,
@ -31,16 +34,31 @@ pub struct GetBlockBench {
}
#[async_trait::async_trait]
impl BenchFn for GetBlockBench {
async fn new(rpc_client: Arc<RpcClient>) -> anyhow::Result<Self> {
impl Benchmark for GetBlockBench {
async fn prepare(rpc_client: Arc<RpcClient>) -> anyhow::Result<Self> {
Ok(Self {
slot: rpc_client.get_slot().await?,
})
}
async fn bench_fn(&mut self, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
// self.slot += 1;
rpc_client.get_block(self.slot).await?;
Ok(())
async fn run(&mut self, rpc_client: Arc<RpcClient>, duration: Duration) -> anyhow::Result<Run> {
let mut result = Run::default();
let start = Instant::now();
while start.elapsed() < duration {
match rpc_client.get_block(self.slot).await {
Ok(_) => {
result.requests_completed += 1;
result.bytes_received += 0;
}
Err(e) => {
result.requests_failed += 1;
result.errors.push(format!("{:?}", e.kind()));
}
}
result.bytes_sent += 0;
}
Ok(result)
}
}

View File

@ -1,9 +1,10 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use log::info;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use crate::bencher::{BenchFn, Bencher};
use crate::bencher::{Bencher, Benchmark, Run};
use crate::{cli::Args, config::Config, test_registry::TestingTask};
pub struct GetSlotTest;
@ -11,8 +12,8 @@ pub struct GetSlotTest;
#[async_trait::async_trait]
impl TestingTask for GetSlotTest {
async fn test(&self, args: Args, _config: Config) -> anyhow::Result<()> {
let metric = Bencher::bench::<Self>(args).await?;
info!("metric {}", serde_json::to_string(&metric)?);
let stats = Bencher::bench::<Self>(args).await?;
info!("GetSlotTest {}", serde_json::to_string(&stats)?);
Ok(())
}
@ -22,13 +23,29 @@ impl TestingTask for GetSlotTest {
}
#[async_trait::async_trait]
impl BenchFn for GetSlotTest {
async fn new(_: Arc<RpcClient>) -> anyhow::Result<Self> {
impl Benchmark for GetSlotTest {
async fn prepare(_: Arc<RpcClient>) -> anyhow::Result<Self> {
Ok(Self)
}
async fn bench_fn(&mut self, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
rpc_client.get_slot().await?;
Ok(())
async fn run(&mut self, rpc_client: Arc<RpcClient>, duration: Duration) -> anyhow::Result<Run> {
let mut result = Run::default();
let start = Instant::now();
while start.elapsed() < duration {
match rpc_client.get_slot().await {
Ok(_) => {
result.requests_completed += 1;
result.bytes_received += 0;
}
Err(e) => {
result.requests_failed += 1;
result.errors.push(format!("{:?}", e.kind()));
}
}
result.bytes_sent += 0;
}
Ok(result)
}
}