Compare commits

...

5 Commits

Author SHA1 Message Date
Aniket Prajapati f828cb7bce
Merge pull request #5 from blockworks-foundation/feature
Rust tests in series and other quality of life fixes
2023-06-28 15:12:59 +05:30
aniketfuryrocks 85217c223b
suggested changes 2023-06-28 01:35:50 +05:30
aniketfuryrocks d4692919d9
sync 2023-06-27 04:47:19 +05:30
aniketfuryrocks b4fec95032
bench 2023-06-16 22:17:56 +05:30
aniketfuryrocks cda950f49f
dont' error out 2023-06-16 21:50:57 +05:30
9 changed files with 63 additions and 78 deletions

View File

@ -3,10 +3,7 @@ use itertools::Itertools;
use rand::{rngs::StdRng, Rng, SeedableRng}; use rand::{rngs::StdRng, Rng, SeedableRng};
use serde::Serialize; use serde::Serialize;
use std::{ use std::time::{Duration, Instant};
time::{Duration, Instant},
};
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Benchmark: Clone + Send + 'static { pub trait Benchmark: Clone + Send + 'static {
@ -27,7 +24,7 @@ pub struct Run {
pub errors: Vec<String>, pub errors: Vec<String>,
} }
#[derive(Default, Serialize, Clone)] #[derive(Default, Serialize, Clone, Debug)]
pub struct Stats { pub struct Stats {
pub total_requests: u64, pub total_requests: u64,
pub requests_per_second: f64, pub requests_per_second: f64,
@ -46,7 +43,7 @@ pub struct Bencher;
impl Bencher { impl Bencher {
pub async fn bench<B: Benchmark + Send + Clone>( pub async fn bench<B: Benchmark + Send + Clone>(
instant: B, instant: B,
args: Args, args: &Args,
) -> anyhow::Result<Stats> { ) -> anyhow::Result<Stats> {
let start = Instant::now(); let start = Instant::now();
let mut random = StdRng::seed_from_u64(0); let mut random = StdRng::seed_from_u64(0);

View File

@ -24,17 +24,19 @@ async fn main() -> anyhow::Result<()> {
assert_ne!(args.threads, 0, "Threads can't be 0"); assert_ne!(args.threads, 0, "Threads can't be 0");
assert_ne!(args.duration_in_seconds, 0, "Duration can't be 0"); assert_ne!(args.duration_in_seconds, 0, "Duration can't be 0");
assert_ne!(args.output_file.len(), 0, "Output file name can't be empty");
let contents = std::fs::read_to_string(&args.config_file) let config = std::fs::read_to_string(&args.config_file)
.context("Should have been able to read the file")?; .context("Should have been able to read the file")?;
let config_json: Config = serde_json::from_str(&contents).context("Config file not valid")?;
if config_json.users.is_empty() { let config: Config = serde_json::from_str(&config).context("Config file not valid")?;
if config.users.is_empty() {
log::error!("Config file is missing payers"); log::error!("Config file is missing payers");
bail!("No payers"); bail!("No payers");
} }
if config_json.markets.is_empty() { if config.markets.is_empty() {
log::error!("Config file is missing markets"); log::error!("Config file is missing markets");
bail!("No markets") bail!("No markets")
} }
@ -61,9 +63,13 @@ async fn main() -> anyhow::Result<()> {
}); });
} }
args.generate_test_registry(block_hash) let results = args
.start_testing(args, config_json) .generate_test_registry(block_hash)
.run_tests(&args, &config)
.await; .await;
let results = serde_json::to_string(&results)?;
std::fs::write(args.output_file, results)?;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,7 @@
use crate::{ use crate::{
bencher::{Bencher, Benchmark, Stats}, bencher::{Bencher, Benchmark, Stats},
config::{Market, User}, cli::Args,
config::{Config, Market, User},
rpc_client::CustomRpcClient, rpc_client::CustomRpcClient,
test_registry::TestingTask, test_registry::TestingTask,
utils::noop, utils::noop,
@ -51,11 +52,7 @@ impl ToPubkey for String {
#[async_trait] #[async_trait]
impl TestingTask for SimulateOpenbookV2PlaceOrder { impl TestingTask for SimulateOpenbookV2PlaceOrder {
async fn test( async fn test(&self, args: &Args, config: &Config) -> anyhow::Result<Stats> {
&self,
args: crate::cli::Args,
config: crate::config::Config,
) -> anyhow::Result<Stats> {
let openbook_data = config let openbook_data = config
.programs .programs
.iter() .iter()
@ -80,7 +77,6 @@ impl TestingTask for SimulateOpenbookV2PlaceOrder {
openbook_pid, openbook_pid,
}; };
let metric = Bencher::bench::<SimulateOpenbookV2PlaceOrderBench>(instant, args).await?; let metric = Bencher::bench::<SimulateOpenbookV2PlaceOrderBench>(instant, args).await?;
log::info!("{} {}", self.get_name(), serde_json::to_string(&metric)?);
Ok(metric) Ok(metric)
} }

View File

@ -96,15 +96,17 @@ impl CustomRpcClient {
Ok(res_bytes) => { Ok(res_bytes) => {
self.metric.bytes_received += res_bytes.len() as u64; self.metric.bytes_received += res_bytes.len() as u64;
let res: Value = match serde_json::from_slice::<Value>(&res_bytes) {
serde_json::from_slice(&res_bytes).expect("Server invalid response json"); Ok(res) => {
if res.get("result").is_some() {
self.metric.requests_completed += 1;
return;
}
if res.get("result").is_some() { res["error"].to_string()
self.metric.requests_completed += 1; }
return; Err(err) => err.to_string(),
} }
res["error"].to_string()
} }
Err(err) => err.to_string(), Err(err) => err.to_string(),
}; };
@ -114,14 +116,16 @@ impl CustomRpcClient {
} }
pub async fn send_raw(&self, req_raw_body: Vec<u8>) -> anyhow::Result<Bytes> { pub async fn send_raw(&self, req_raw_body: Vec<u8>) -> anyhow::Result<Bytes> {
Ok(self let response = self
.client .client
.post(&self.url) .post(&self.url)
.header(CONTENT_TYPE, "application/json") .header(CONTENT_TYPE, "application/json")
.body(req_raw_body) .body(req_raw_body)
.send() .send()
.await? .await?;
.bytes()
.await?) response.error_for_status_ref()?;
Ok(response.bytes().await?)
} }
} }

View File

@ -2,11 +2,12 @@ use async_trait::async_trait;
use const_env::from_env; use const_env::from_env;
use rand::{seq::IteratorRandom, SeedableRng}; use rand::{seq::IteratorRandom, SeedableRng};
use solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer}; use solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer};
use std::str::FromStr; use std::{str::FromStr, sync::Arc};
use tokio::time::Instant; use tokio::time::Instant;
use crate::{ use crate::{
bencher::{Bencher, Benchmark, Stats}, bencher::{Bencher, Benchmark, Stats},
cli::Args,
config::Config, config::Config,
rpc_client::CustomRpcClient, rpc_client::CustomRpcClient,
test_registry::TestingTask, test_registry::TestingTask,
@ -25,7 +26,7 @@ impl AccountsFetchingTests {
#[async_trait] #[async_trait]
impl TestingTask for AccountsFetchingTests { impl TestingTask for AccountsFetchingTests {
async fn test(&self, args: crate::cli::Args, config: Config) -> anyhow::Result<Stats> { async fn test(&self, args: &Args, config: &Config) -> anyhow::Result<Stats> {
let accounts = config let accounts = config
.known_accounts .known_accounts
.iter() .iter()
@ -35,13 +36,13 @@ impl TestingTask for AccountsFetchingTests {
AccountsFetchingTests::create_random_address(accounts.len()); AccountsFetchingTests::create_random_address(accounts.len());
let instant = GetAccountsBench { let instant = GetAccountsBench {
accounts_list: [accounts, unknown_accounts].concat(), accounts_list: Arc::new([accounts, unknown_accounts].concat()),
}; };
let metric = Bencher::bench::<GetAccountsBench>(instant, args).await?; let metric = Bencher::bench::<GetAccountsBench>(instant, args).await?;
log::info!("{} {}", self.get_name(), serde_json::to_string(&metric)?);
Ok(metric) Ok(metric)
} }
fn get_name(&self) -> String { fn get_name(&self) -> String {
"Accounts Fetching".to_string() "Accounts Fetching".to_string()
} }
@ -49,7 +50,7 @@ impl TestingTask for AccountsFetchingTests {
#[derive(Clone)] #[derive(Clone)]
pub struct GetAccountsBench { pub struct GetAccountsBench {
accounts_list: Vec<Pubkey>, accounts_list: Arc<Vec<Pubkey>>,
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@ -1,6 +1,5 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use log::info;
use solana_sdk::slot_history::Slot; use solana_sdk::slot_history::Slot;
use crate::{ use crate::{
@ -15,11 +14,10 @@ pub struct GetBlockTest;
#[async_trait::async_trait] #[async_trait::async_trait]
impl TestingTask for GetBlockTest { impl TestingTask for GetBlockTest {
async fn test(&self, args: Args, _: Config) -> anyhow::Result<Stats> { async fn test(&self, args: &Args, _: &Config) -> anyhow::Result<Stats> {
let slot = args.get_rpc_client().get_slot().await.unwrap(); let slot = args.get_rpc_client().get_slot().await.unwrap();
let instant = GetBlockBench { slot }; let instant = GetBlockBench { slot };
let metric = Bencher::bench::<GetBlockBench>(instant, args).await?; let metric = Bencher::bench::<GetBlockBench>(instant, args).await?;
info!("{} {}", self.get_name(), serde_json::to_string(&metric)?);
Ok(metric) Ok(metric)
} }

View File

@ -15,7 +15,7 @@ pub struct GetSlotTest;
#[async_trait::async_trait] #[async_trait::async_trait]
impl TestingTask for GetSlotTest { impl TestingTask for GetSlotTest {
async fn test(&self, args: Args, _: Config) -> anyhow::Result<Stats> { async fn test(&self, args: &Args, _: &Config) -> anyhow::Result<Stats> {
let instant = GetSlotTest; let instant = GetSlotTest;
let stats = Bencher::bench::<Self>(instant, args).await?; let stats = Bencher::bench::<Self>(instant, args).await?;
info!("{} {}", self.get_name(), serde_json::to_string(&stats)?); info!("{} {}", self.get_name(), serde_json::to_string(&stats)?);

View File

@ -1,5 +1,7 @@
use crate::{ use crate::{
bencher::{Bencher, Benchmark, Stats}, bencher::{Bencher, Benchmark, Stats},
cli::Args,
config::Config,
rpc_client::CustomRpcClient, rpc_client::CustomRpcClient,
test_registry::TestingTask, test_registry::TestingTask,
}; };
@ -28,21 +30,12 @@ fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
#[async_trait] #[async_trait]
impl TestingTask for SendAndConfrimTesting { impl TestingTask for SendAndConfrimTesting {
async fn test( async fn test(&self, args: &Args, config: &Config) -> anyhow::Result<Stats> {
&self,
args: crate::cli::Args,
config: crate::config::Config,
) -> anyhow::Result<Stats> {
let instant = SendMemoTransactionsBench { let instant = SendMemoTransactionsBench {
block_hash: self.block_hash.clone(), block_hash: self.block_hash.clone(),
payers: config payers: Arc::new(config.users.iter().map(|x| x.get_keypair()).collect()),
.users
.iter()
.map(|x| Arc::new(x.get_keypair()))
.collect(),
}; };
let metric = Bencher::bench::<SendMemoTransactionsBench>(instant, args).await?; let metric = Bencher::bench::<SendMemoTransactionsBench>(instant, args).await?;
log::info!("{} {}", self.get_name(), serde_json::to_string(&metric)?);
Ok(metric) Ok(metric)
} }
@ -54,7 +47,7 @@ impl TestingTask for SendAndConfrimTesting {
#[derive(Clone)] #[derive(Clone)]
struct SendMemoTransactionsBench { struct SendMemoTransactionsBench {
block_hash: Arc<RwLock<Hash>>, block_hash: Arc<RwLock<Hash>>,
payers: Vec<Arc<Keypair>>, payers: Arc<Vec<Keypair>>,
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@ -1,12 +1,11 @@
use std::{collections::HashMap, sync::Arc}; use std::collections::HashMap;
use crate::{bencher::Stats, cli::Args, config::Config}; use crate::{bencher::Stats, cli::Args, config::Config};
use async_trait::async_trait; use async_trait::async_trait;
use tokio::sync::RwLock;
#[async_trait] #[async_trait]
pub trait TestingTask: Send + Sync { pub trait TestingTask: Send + Sync {
async fn test(&self, args: Args, config: Config) -> anyhow::Result<Stats>; async fn test(&self, args: &Args, config: &Config) -> anyhow::Result<Stats>;
fn get_name(&self) -> String; fn get_name(&self) -> String;
} }
@ -20,35 +19,26 @@ impl TestRegistry {
self.tests.push(test); self.tests.push(test);
} }
pub async fn start_testing(self, args: Args, config: Config) { pub async fn run_tests(&self, args: &Args, config: &Config) -> HashMap<String, Stats> {
let results = Arc::new(RwLock::new(HashMap::new())); let mut results = HashMap::new();
let tasks = self.tests.into_iter().map(|test| {
let args = args.clone(); for test in &self.tests {
let config = config.clone();
let name = test.get_name(); let name = test.get_name();
let results = results.clone();
tokio::spawn(async move { log::info!("Test: {name}");
log::info!("test {name}");
match test.test(args, config).await { match test.test(args, config).await {
Ok(metric) => { Ok(stat) => {
log::info!("test {name} passed"); log::info!("Test {name} passed");
let mut lock = results.write().await; log::info!("Metric:\n{stat:#?}");
lock.insert(test.get_name(), metric);
} results.insert(name, stat);
Err(e) => log::info!("test {name} failed with error {e}"),
} }
}) Err(e) => log::error!("test {name} failed with error {e}"),
});
futures::future::join_all(tasks).await;
let res = results.read().await.clone();
if !args.output_file.is_empty() {
let result_string = serde_json::to_string(&res);
if let Ok(result) = result_string {
std::fs::write(args.output_file, result).expect("Could not write output file");
} }
} }
results
} }
} }