move bench run+persist logic around

This commit is contained in:
GroovieGermanikus 2024-03-27 17:41:56 +01:00
parent bd9d15ab3b
commit 4d5b9f23f3
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
6 changed files with 128 additions and 54 deletions

View File

@ -26,7 +26,8 @@ pub mod bench1;
pub mod benches; pub mod benches;
pub mod helpers; pub mod helpers;
pub mod metrics; pub mod metrics;
pub mod service_adapter; pub mod service_adapter1;
pub mod service_adapter_new;
pub mod tx_size; pub mod tx_size;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]

View File

@ -0,0 +1,18 @@
use solana_sdk::signature::Keypair;
use tokio::time::Instant;
use crate::metrics::Metric;
use crate::service_adapter1::BenchConfig;
use crate::tx_size::TxSize;
pub async fn benchnew_confirmation_rate_servicerunner(
bench_config: &BenchConfig,
rpc_addr: String,
funded_payer: Keypair,
size_tx: TxSize,
) -> Metric {
let started_at = Instant::now();
todo!()
}

View File

@ -3,23 +3,29 @@ mod cli;
mod postgres; mod postgres;
mod prometheus; mod prometheus;
use crate::args::{get_funded_payer_from_env, read_tenant_configs}; use crate::args::{get_funded_payer_from_env, read_tenant_configs, TenantConfig};
use crate::cli::Args; use crate::cli::Args;
use crate::postgres::metrics_dbstore::{ use crate::postgres::metrics_dbstore::{
save_metrics_to_postgres, upsert_benchrun_status, BenchRunStatus, upsert_benchrun_status, BenchRunStatus,
}; };
use crate::postgres::postgres_session::PostgresSessionConfig; use crate::postgres::postgres_session::PostgresSessionConfig;
use crate::postgres::postgres_session_cache::PostgresSessionCache; use crate::postgres::postgres_session_cache::PostgresSessionCache;
use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus; use crate::prometheus::metrics_prometheus::publish_metrics_on_prometheus;
use crate::prometheus::prometheus_sync::PrometheusSync; use crate::prometheus::prometheus_sync::PrometheusSync;
use bench::service_adapter::BenchConfig; use bench::service_adapter1::BenchConfig;
use clap::Parser; use clap::Parser;
use futures_util::future::join_all; use futures_util::future::join_all;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, error, info}; use log::{debug, error, info, warn};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use postgres_types::ToSql;
use solana_sdk::signature::Keypair;
use bench::metrics::Metric;
use bench::tx_size::TxSize;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -36,7 +42,7 @@ async fn main() {
let bench_interval = Duration::from_millis(bench_interval); let bench_interval = Duration::from_millis(bench_interval);
let funded_payer = get_funded_payer_from_env(); let funded_payer = Arc::new(get_funded_payer_from_env());
let tenant_configs = read_tenant_configs(std::env::vars().collect::<Vec<(String, String)>>()); let tenant_configs = read_tenant_configs(std::env::vars().collect::<Vec<(String, String)>>());
@ -76,8 +82,10 @@ async fn main() {
}) })
.collect_vec(); .collect_vec();
// 1 task per tenant - each task will perform bench runs in sequence
// (the slot comparison bench is done somewhere else)
for tenant_config in &tenant_configs { for tenant_config in &tenant_configs {
let funded_payer = funded_payer.insecure_clone(); let funded_payer = funded_payer.clone();
let tenant_id = tenant_config.tenant_id.clone(); let tenant_id = tenant_config.tenant_id.clone();
let postgres_session = postgres_session.clone(); let postgres_session = postgres_session.clone();
let tenant_config = tenant_config.clone(); let tenant_config = tenant_config.clone();
@ -92,6 +100,14 @@ async fn main() {
); );
let benchrun_at = SystemTime::now(); let benchrun_at = SystemTime::now();
let bench_impl = BenchRunnerOldBenchImpl {
benchrun_at,
tenant_config: tenant_config.clone(),
bench_config: bench_config.clone(),
funded_payer: funded_payer.clone(),
size_tx,
};
if let Some(postgres_session) = postgres_session.as_ref() { if let Some(postgres_session) = postgres_session.as_ref() {
let _dbstatus = upsert_benchrun_status( let _dbstatus = upsert_benchrun_status(
postgres_session, postgres_session,
@ -103,23 +119,29 @@ async fn main() {
.await; .await;
} }
let metric = bench::service_adapter::bench_servicerunner( let metric = bench_impl.run_bench().await;
&bench_config, // let metric = bench::service_adapter1::bench_servicerunner(
tenant_config.rpc_addr.clone(), // &bench_config,
funded_payer.insecure_clone(), // tenant_config.rpc_addr.clone(),
size_tx, // funded_payer.insecure_clone(),
) // size_tx,
.await; // )
// .await;
if let Some(postgres_session) = postgres_session.as_ref() { if let Some(postgres_session) = postgres_session.as_ref() {
let _dbstatus = save_metrics_to_postgres( // let _dbstatus = save_metrics_to_postgres(
postgres_session, // postgres_session,
&tenant_config, // &tenant_config,
&bench_config, // &bench_config,
&metric, // &metric,
benchrun_at, // benchrun_at,
) // )
.await; // .await;
let save_result = bench_impl.try_save_results_postgres(&metric, postgres_session).await;
if let Err(err) = save_result {
warn!("Failed to save metrics to postgres (err {:?}) - continue", err);
}
} }
publish_metrics_on_prometheus(&tenant_config, &bench_config, &metric).await; publish_metrics_on_prometheus(&tenant_config, &bench_config, &metric).await;
@ -147,3 +169,36 @@ async fn main() {
join_all(jh_tenant_task).await; join_all(jh_tenant_task).await;
} }
// R: result
#[async_trait]
trait BenchRunner<M> {
async fn run_bench(&self) -> M;
}
// R: result
#[async_trait]
trait BenchMetricsPostgresSaver<M> {
async fn try_save_results_postgres(&self, metric: &M, postgres_session: &PostgresSessionCache) -> anyhow::Result<()>;
}
struct BenchRunnerOldBenchImpl {
pub benchrun_at: SystemTime,
pub tenant_config: TenantConfig,
pub bench_config: BenchConfig,
pub funded_payer: Arc<Keypair>,
pub size_tx: TxSize,
}
#[async_trait]
impl BenchRunner<Metric> for BenchRunnerOldBenchImpl {
async fn run_bench(&self) -> Metric {
bench::service_adapter1::bench_servicerunner(
&self.bench_config,
self.tenant_config.rpc_addr.clone(),
self.funded_payer.insecure_clone(),
self.size_tx,
)
.await
}
}

View File

@ -1,10 +1,12 @@
use crate::args::TenantConfig; use crate::args::TenantConfig;
use crate::postgres::postgres_session_cache::PostgresSessionCache; use crate::postgres::postgres_session_cache::PostgresSessionCache;
use bench::metrics::Metric; use bench::metrics::Metric;
use bench::service_adapter::BenchConfig; use bench::service_adapter1::BenchConfig;
use log::warn; use log::warn;
use postgres_types::ToSql; use postgres_types::ToSql;
use std::time::SystemTime; use std::time::SystemTime;
use async_trait::async_trait;
use crate::{BenchMetricsPostgresSaver, BenchRunner, BenchRunnerOldBenchImpl};
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
pub enum BenchRunStatus { pub enum BenchRunStatus {
@ -57,29 +59,27 @@ pub async fn upsert_benchrun_status(
Ok(()) Ok(())
} }
pub async fn save_metrics_to_postgres(
postgres_session: &PostgresSessionCache,
tenant_config: &TenantConfig, #[async_trait]
bench_config: &BenchConfig, impl BenchMetricsPostgresSaver<Metric> for BenchRunnerOldBenchImpl {
metric: &Metric, async fn try_save_results_postgres(&self, metric: &Metric, postgres_session: &PostgresSessionCache) -> anyhow::Result<()> {
benchrun_at: SystemTime, let metricjson = serde_json::to_value(metric).unwrap();
) -> anyhow::Result<()> { let values: &[&(dyn ToSql + Sync)] = &[
let metricjson = serde_json::to_value(metric).unwrap(); &self.tenant_config.tenant_id,
let values: &[&(dyn ToSql + Sync)] = &[ &self.benchrun_at,
&tenant_config.tenant_id, &(self.bench_config.cu_price_micro_lamports as i64),
&benchrun_at, &(metric.txs_sent as i64),
&(bench_config.cu_price_micro_lamports as i64), &(metric.txs_confirmed as i64),
&(metric.txs_sent as i64), &(metric.txs_un_confirmed as i64),
&(metric.txs_confirmed as i64), &(metric.average_confirmation_time_ms as f32),
&(metric.txs_un_confirmed as i64), &metricjson,
&(metric.average_confirmation_time_ms as f32), ];
&metricjson, postgres_session
]; .get_session()
let write_result = postgres_session .await?
.get_session() .execute(
.await? r#"
.execute(
r#"
INSERT INTO INSERT INTO
benchrunner.bench_metrics ( benchrunner.bench_metrics (
tenant, tenant,
@ -92,13 +92,13 @@ pub async fn save_metrics_to_postgres(
) )
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#, "#,
values, values,
) )
.await; .await?;
if let Err(err) = write_result {
warn!("Failed to insert metrics (err {:?}) - continue", err); Ok(())
} }
Ok(())
} }

View File

@ -1,7 +1,7 @@
use bench::metrics::Metric; use bench::metrics::Metric;
use crate::args::TenantConfig; use crate::args::TenantConfig;
use bench::service_adapter::BenchConfig; use bench::service_adapter1::BenchConfig;
use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; use prometheus::{opts, register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
// https://github.com/blockworks-foundation/lite-rpc/blob/production/bench/src/metrics.rs // https://github.com/blockworks-foundation/lite-rpc/blob/production/bench/src/metrics.rs