Merge pull request #376 from blockworks-foundation/feature/benchrunner-tc1-tc2

benchrunner tc2 integration
This commit is contained in:
Lou-Kamades 2024-03-27 22:36:36 -05:00 committed by GitHub
commit 9926d2e5c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 330 additions and 101 deletions

View File

@ -12,16 +12,16 @@ use crate::benches::rpc_interface::{
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer};
#[derive(Debug, serde::Serialize)]
pub struct RpcStat {
tx_sent: u64,
tx_confirmed: u64,
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
pub struct Metric {
pub txs_sent: u64,
pub txs_confirmed: u64,
// in ms
average_confirmation_time: f32,
pub average_confirmation_time: f32,
// in slots
average_slot_confirmation_time: f32,
tx_send_errors: u64,
tx_unconfirmed: u64,
pub average_slot_confirmation_time: f32,
pub txs_send_errors: u64,
pub txs_un_confirmed: u64,
}
/// TC2 send multiple runs of num_txs, measure the confirmation rate
@ -76,7 +76,7 @@ pub async fn send_bulk_txs_and_wait(
num_txs: usize,
tx_params: &BenchmarkTransactionParams,
max_timeout: Duration,
) -> anyhow::Result<RpcStat> {
) -> anyhow::Result<Metric> {
trace!("Get latest blockhash and generate transactions");
let hash = rpc.get_latest_blockhash().await.map_err(|err| {
log::error!("Error get latest blockhash : {err:?}");
@ -148,41 +148,41 @@ pub async fn send_bulk_txs_and_wait(
0.0
};
Ok(RpcStat {
tx_sent,
tx_send_errors,
tx_confirmed,
tx_unconfirmed,
Ok(Metric {
txs_sent: tx_sent,
txs_send_errors: tx_send_errors,
txs_confirmed: tx_confirmed,
txs_un_confirmed: tx_unconfirmed,
average_confirmation_time: average_confirmation_time_ms,
average_slot_confirmation_time,
})
}
fn calc_stats_avg(stats: &[RpcStat]) -> RpcStat {
fn calc_stats_avg(stats: &[Metric]) -> Metric {
let len = stats.len();
let mut avg = RpcStat {
tx_sent: 0,
tx_send_errors: 0,
tx_confirmed: 0,
tx_unconfirmed: 0,
let mut avg = Metric {
txs_sent: 0,
txs_send_errors: 0,
txs_confirmed: 0,
txs_un_confirmed: 0,
average_confirmation_time: 0.0,
average_slot_confirmation_time: 0.0,
};
for stat in stats {
avg.tx_sent += stat.tx_sent;
avg.tx_send_errors += stat.tx_send_errors;
avg.tx_confirmed += stat.tx_confirmed;
avg.tx_unconfirmed += stat.tx_unconfirmed;
avg.txs_sent += stat.txs_sent;
avg.txs_send_errors += stat.txs_send_errors;
avg.txs_confirmed += stat.txs_confirmed;
avg.txs_un_confirmed += stat.txs_un_confirmed;
avg.average_confirmation_time += stat.average_confirmation_time;
avg.average_slot_confirmation_time += stat.average_slot_confirmation_time;
}
avg.tx_sent /= len as u64;
avg.tx_send_errors /= len as u64;
avg.tx_confirmed /= len as u64;
avg.tx_unconfirmed /= len as u64;
avg.txs_sent /= len as u64;
avg.txs_send_errors /= len as u64;
avg.txs_confirmed /= len as u64;
avg.txs_un_confirmed /= len as u64;
avg.average_confirmation_time /= len as f32;
avg.average_slot_confirmation_time /= len as f32;

View File

@ -28,7 +28,8 @@ pub mod bench1;
pub mod benches;
pub mod helpers;
pub mod metrics;
pub mod service_adapter;
pub mod service_adapter1;
pub mod service_adapter_new;
pub mod tx_size;
#[derive(Parser, Debug)]

View File

@ -20,6 +20,7 @@ use tokio::time::Instant;
#[derive(Debug, Clone)]
pub struct BenchConfig {
pub tx_count: usize,
pub tx_size: TxSize,
pub cu_price_micro_lamports: u64,
}
@ -33,11 +34,10 @@ pub async fn bench_servicerunner(
bench_config: &BenchConfig,
rpc_addr: String,
funded_payer: Keypair,
size_tx: TxSize,
) -> Metric {
let started_at = Instant::now();
let transaction_size = match size_tx {
let transaction_size = match bench_config.tx_size {
TxSize::Small => TransactionSize::Small,
TxSize::Large => TransactionSize::Large,
};

View File

@ -0,0 +1,34 @@
use crate::benches::confirmation_rate;
use crate::benches::confirmation_rate::send_bulk_txs_and_wait;
use crate::service_adapter1::BenchConfig;
use crate::BenchmarkTransactionParams;
use log::error;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Keypair;
use std::sync::Arc;
use std::time::Duration;
pub async fn benchnew_confirmation_rate_servicerunner(
bench_config: &BenchConfig,
rpc_addr: String,
funded_payer: Keypair,
) -> confirmation_rate::Metric {
let rpc = Arc::new(RpcClient::new(rpc_addr));
let tx_params = BenchmarkTransactionParams {
tx_size: bench_config.tx_size,
cu_price_micro_lamports: bench_config.cu_price_micro_lamports,
};
let max_timeout = Duration::from_secs(60);
let result = send_bulk_txs_and_wait(
&rpc,
&funded_payer,
bench_config.tx_count,
&tx_params,
max_timeout,
)
.await;
result.unwrap_or_else(|err| {
error!("Failed to send bulk txs and wait: {}", err);
confirmation_rate::Metric::default()
})
}

View File

@ -0,0 +1,19 @@
### Inspect the BenchRunner executions
```postgresql
SELECT * FROM benchrunner.bench_runs
ORDER BY ts DESC
```
### Bench1 (old bench)
```postgresql
SELECT * FROM benchrunner.bench_metrics_bench1
ORDER BY ts DESC
```
### Bench Confirmation Rate
```postgresql
SELECT * FROM benchrunner.bench_metrics_confirmation_rate
ORDER BY ts DESC
```

View File

@ -3,23 +3,26 @@ mod cli;
mod postgres;
mod prometheus;
use crate::args::{get_funded_payer_from_env, read_tenant_configs};
use crate::args::{get_funded_payer_from_env, read_tenant_configs, TenantConfig};
use crate::cli::Args;
use crate::postgres::metrics_dbstore::{
save_metrics_to_postgres, upsert_benchrun_status, BenchRunStatus,
};
use crate::postgres::metrics_dbstore::{upsert_benchrun_status, BenchRunStatus};
use crate::postgres::postgres_session::PostgresSessionConfig;
use crate::postgres::postgres_session_cache::PostgresSessionCache;
use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus;
use crate::prometheus::prometheus_sync::PrometheusSync;
use bench::service_adapter::BenchConfig;
use async_trait::async_trait;
use bench::benches::confirmation_rate;
use bench::metrics;
use bench::service_adapter1::BenchConfig;
use clap::Parser;
use futures_util::future::join_all;
use itertools::Itertools;
use log::{debug, error, info};
use log::{debug, error, info, warn};
use solana_sdk::signature::Keypair;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::OnceCell;
#[tokio::main]
async fn main() {
@ -36,7 +39,7 @@ async fn main() {
let bench_interval = Duration::from_millis(bench_interval);
let funded_payer = get_funded_payer_from_env();
let funded_payer = Arc::new(get_funded_payer_from_env());
let tenant_configs = read_tenant_configs(std::env::vars().collect::<Vec<(String, String)>>());
@ -72,12 +75,15 @@ async fn main() {
.iter()
.map(|prio_fees| BenchConfig {
tx_count,
tx_size: size_tx,
cu_price_micro_lamports: *prio_fees,
})
.collect_vec();
// 1 task per tenant - each task will perform bench runs in sequence
// (the slot comparison bench is done somewhere else)
for tenant_config in &tenant_configs {
let funded_payer = funded_payer.insecure_clone();
let funded_payer = funded_payer.clone();
let tenant_id = tenant_config.tenant_id.clone();
let postgres_session = postgres_session.clone();
let tenant_config = tenant_config.clone();
@ -85,12 +91,36 @@ async fn main() {
let jh_runner = tokio::spawn(async move {
let mut interval = tokio::time::interval(bench_interval);
for run_count in 1.. {
let bench_config = bench_configs[run_count % bench_configs.len()].clone();
const NUM_BENCH_IMPLS: usize = 2;
let benchrun_at = SystemTime::now();
let permutation = factorize(run_count, &[NUM_BENCH_IMPLS, bench_configs.len()]);
let bench_config = bench_configs[permutation[1]].clone();
let bench_impl: Box<dyn BenchTrait> = match permutation[0] {
0 => Box::new(BenchRunnerConfirmationRateImpl {
benchrun_at,
tenant_config: tenant_config.clone(),
bench_config: bench_config.clone(),
funded_payer: funded_payer.clone(),
metric: OnceCell::new(),
}),
1 => Box::new(BenchRunnerBench1Impl {
benchrun_at,
tenant_config: tenant_config.clone(),
bench_config: bench_config.clone(),
funded_payer: funded_payer.clone(),
metric: OnceCell::new(),
}),
_ => unreachable!(),
};
debug!(
"Invoke bench execution (#{}) on tenant <{}> using {}",
run_count, tenant_id, bench_config
run_count, tenant_id, &bench_config
);
let benchrun_at = SystemTime::now();
if let Some(postgres_session) = postgres_session.as_ref() {
let _dbstatus = upsert_benchrun_status(
@ -103,26 +133,19 @@ async fn main() {
.await;
}
let metric = bench::service_adapter::bench_servicerunner(
&bench_config,
tenant_config.rpc_addr.clone(),
funded_payer.insecure_clone(),
size_tx,
)
.await;
bench_impl.run_bench().await;
if let Some(postgres_session) = postgres_session.as_ref() {
let _dbstatus = save_metrics_to_postgres(
postgres_session,
&tenant_config,
&bench_config,
&metric,
benchrun_at,
)
.await;
let save_result = bench_impl.try_save_results_postgres(postgres_session).await;
if let Err(err) = save_result {
warn!(
"Failed to save metrics to postgres (err {:?}) - continue",
err
);
}
}
publish_metrics_on_prometheus(&tenant_config, &bench_config, &metric).await;
// publish_metrics_on_prometheus(&tenant_config, &bench_config).await;
if let Some(postgres_session) = postgres_session.as_ref() {
let _dbstatus = upsert_benchrun_status(
@ -147,3 +170,87 @@ async fn main() {
join_all(jh_tenant_task).await;
}
// dimensions: least-significant first
fn factorize(i: usize, dimensions: &[usize]) -> Vec<usize> {
let mut i = i;
let mut result = Vec::new();
for &dim in dimensions {
result.push(i % dim);
i /= dim;
}
result
}
#[test]
fn test_factorize() {
assert_eq!(factorize(0, &[2, 3]), vec![0, 0]);
assert_eq!(factorize(1, &[2, 3]), vec![1, 0]);
assert_eq!(factorize(2, &[2, 3]), vec![0, 1]);
assert_eq!(factorize(3, &[2, 3]), vec![1, 1]);
assert_eq!(factorize(4, &[2, 3]), vec![0, 2]);
assert_eq!(factorize(5, &[2, 3]), vec![1, 2]);
}
// R: result
#[async_trait]
trait BenchRunner: Send + Sync + 'static {
async fn run_bench(&self);
}
trait BenchTrait: BenchRunner + BenchMetricsPostgresSaver {}
// R: result
#[async_trait]
trait BenchMetricsPostgresSaver: Send + Sync + 'static {
async fn try_save_results_postgres(
&self,
postgres_session: &PostgresSessionCache,
) -> anyhow::Result<()>;
}
struct BenchRunnerBench1Impl {
pub benchrun_at: SystemTime,
pub tenant_config: TenantConfig,
pub bench_config: BenchConfig,
pub funded_payer: Arc<Keypair>,
pub metric: OnceCell<metrics::Metric>,
}
impl BenchTrait for BenchRunnerBench1Impl {}
#[async_trait]
impl BenchRunner for BenchRunnerBench1Impl {
async fn run_bench(&self) {
let metric = bench::service_adapter1::bench_servicerunner(
&self.bench_config,
self.tenant_config.rpc_addr.clone(),
self.funded_payer.insecure_clone(),
)
.await;
self.metric.set(metric).unwrap();
}
}
impl BenchTrait for BenchRunnerConfirmationRateImpl {}
struct BenchRunnerConfirmationRateImpl {
pub benchrun_at: SystemTime,
pub tenant_config: TenantConfig,
pub bench_config: BenchConfig,
pub funded_payer: Arc<Keypair>,
pub metric: OnceCell<confirmation_rate::Metric>,
}
#[async_trait]
impl BenchRunner for BenchRunnerConfirmationRateImpl {
async fn run_bench(&self) {
let metric = bench::service_adapter_new::benchnew_confirmation_rate_servicerunner(
&self.bench_config,
self.tenant_config.rpc_addr.clone(),
self.funded_payer.insecure_clone(),
)
.await;
self.metric.set(metric).unwrap();
}
}

View File

@ -1,7 +1,8 @@
use crate::args::TenantConfig;
use crate::postgres::postgres_session_cache::PostgresSessionCache;
use bench::metrics::Metric;
use bench::service_adapter::BenchConfig;
use crate::{BenchMetricsPostgresSaver, BenchRunnerBench1Impl, BenchRunnerConfirmationRateImpl};
use async_trait::async_trait;
use bench::service_adapter1::BenchConfig;
use log::warn;
use postgres_types::ToSql;
use std::time::SystemTime;
@ -57,31 +58,31 @@ pub async fn upsert_benchrun_status(
Ok(())
}
pub async fn save_metrics_to_postgres(
postgres_session: &PostgresSessionCache,
tenant_config: &TenantConfig,
bench_config: &BenchConfig,
metric: &Metric,
benchrun_at: SystemTime,
) -> anyhow::Result<()> {
let metricjson = serde_json::to_value(metric).unwrap();
let values: &[&(dyn ToSql + Sync)] = &[
&tenant_config.tenant_id,
&benchrun_at,
&(bench_config.cu_price_micro_lamports as i64),
&(metric.txs_sent as i64),
&(metric.txs_confirmed as i64),
&(metric.txs_un_confirmed as i64),
&(metric.average_confirmation_time_ms as f32),
&metricjson,
];
let write_result = postgres_session
.get_session()
.await?
.execute(
r#"
#[async_trait]
impl BenchMetricsPostgresSaver for BenchRunnerBench1Impl {
async fn try_save_results_postgres(
&self,
postgres_session: &PostgresSessionCache,
) -> anyhow::Result<()> {
let metric = self.metric.get().expect("metric not set");
let metricjson = serde_json::to_value(metric).unwrap();
let values: &[&(dyn ToSql + Sync)] = &[
&self.tenant_config.tenant_id,
&self.benchrun_at,
&(self.bench_config.cu_price_micro_lamports as i64),
&(metric.txs_sent as i64),
&(metric.txs_confirmed as i64),
&(metric.txs_un_confirmed as i64),
&(metric.average_confirmation_time_ms as f32),
&metricjson,
];
postgres_session
.get_session()
.await?
.execute(
r#"
INSERT INTO
benchrunner.bench_metrics (
benchrunner.bench_metrics_bench1 (
tenant,
ts,
prio_fees,
@ -92,13 +93,56 @@ pub async fn save_metrics_to_postgres(
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
values,
)
.await;
values,
)
.await?;
if let Err(err) = write_result {
warn!("Failed to insert metrics (err {:?}) - continue", err);
Ok(())
}
}
#[async_trait]
impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl {
async fn try_save_results_postgres(
&self,
postgres_session: &PostgresSessionCache,
) -> anyhow::Result<()> {
let metric = self.metric.get().expect("metric not set");
let metricjson = serde_json::to_value(metric).unwrap();
let values: &[&(dyn ToSql + Sync)] = &[
&self.tenant_config.tenant_id,
&self.benchrun_at,
&(self.bench_config.cu_price_micro_lamports as i64),
&(metric.txs_sent as i64),
&(metric.txs_confirmed as i64),
&(metric.txs_un_confirmed as i64),
&(metric.average_confirmation_time),
&(metric.average_slot_confirmation_time),
&metricjson,
];
postgres_session
.get_session()
.await?
.execute(
r#"
INSERT INTO
benchrunner.bench_metrics_confirmation_rate (
tenant,
ts,
prio_fees,
txs_sent,
txs_confirmed,
txs_un_confirmed,
average_confirmation_time_ms,
average_slot_confirmation_time_ms,
metric_json
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"#,
values,
)
.await?;
Ok(())
}
Ok(())
}

View File

@ -1,7 +1,7 @@
use bench::metrics::Metric;
use crate::args::TenantConfig;
use bench::service_adapter::BenchConfig;
use bench::service_adapter1::BenchConfig;
use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
// https://github.com/blockworks-foundation/lite-rpc/blob/production/bench/src/metrics.rs
@ -14,7 +14,8 @@ lazy_static::lazy_static! {
// TODO add more
}
pub async fn publish_metrics_on_prometheus(
// TODO implement
pub async fn _publish_metrics_on_prometheus(
tenant_config: &TenantConfig,
_bench_config: &BenchConfig,
metric: &Metric,

View File

@ -1,7 +1,16 @@
CREATE SCHEMA benchrunner;
CREATE TABLE benchrunner.bench_metrics (
CREATE TABLE benchrunner.bench_runs (
tenant text NOT NULL,
ts timestamp NOT NULL,
status text NOT NULL,
PRIMARY KEY (tenant, ts)
);
CREATE TABLE benchrunner.bench_metrics_bench1 (
tenant text NOT NULL,
ts timestamp NOT NULL,
prio_fees int8 NOT NULL,
@ -13,14 +22,28 @@ CREATE TABLE benchrunner.bench_metrics (
PRIMARY KEY (tenant, ts)
);
CREATE TABLE benchrunner.bench_runs (
tenant text NOT NULL,
ts timestamp NOT NULL,
status text NOT NULL,
PRIMARY KEY (tenant, ts)
CREATE TABLE benchrunner.bench_metrics_confirmation_rate (
tenant text NOT NULL,
ts timestamp NOT NULL,
prio_fees int8 NOT NULL,
txs_sent int8 NOT NULL,
txs_confirmed int8 NOT NULL,
txs_un_confirmed int8 NOT NULL,
average_confirmation_time_ms real NOT NULL,
average_slot_confirmation_time_ms real NOT NULL,
metric_json jsonb NOT NULL,
PRIMARY KEY (tenant, ts)
);
GRANT USAGE ON SCHEMA benchrunner TO r_benchrunner;
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA benchrunner TO r_benchrunner;
GRANT USAGE ON SCHEMA benchrunner TO ro_benchrunner;
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA benchrunner TO r_benchrunner;
ALTER DEFAULT PRIVILEGES IN SCHEMA benchrunner GRANT SELECT, INSERT, UPDATE ON TABLES TO r_benchrunner;
GRANT SELECT ON ALL TABLES IN SCHEMA benchrunner TO ro_benchrunner;
ALTER DEFAULT PRIVILEGES IN SCHEMA benchrunner GRANT SELECT ON TABLES TO ro_benchrunner;
ALTER TABLE benchrunner.bench_metrics RENAME TO bench_metrics_bench1;