making all solana runtime tests depend on bencher

This commit is contained in:
Godmode Galactus 2023-05-16 12:55:18 +02:00
parent 76cf386bd9
commit caffe84b73
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
5 changed files with 120 additions and 239 deletions

View File

@ -1,19 +1,20 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use itertools::Itertools;
use rand::{SeedableRng, Rng};
use rand::rngs::StdRng;
use serde::Serialize;
use solana_program::hash::Hash;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use tokio::sync::RwLock;
use crate::cli::Args;
use crate::config::{Config};
pub type BlockHashGetter = Arc<RwLock<Hash>>;
#[async_trait::async_trait]
pub trait Benchmark: Send + 'static {
async fn prepare(rpc_client: Arc<RpcClient>) -> anyhow::Result<Self>
where
Self: Sized;
async fn run(&mut self, rpc_client: Arc<RpcClient>, duration: Duration) -> anyhow::Result<Run>;
pub trait Benchmark: Clone + Send + 'static {
async fn run(self, rpc_client: Arc<RpcClient>, duration: Duration, args: Args, config: Config, random_number: u64) -> anyhow::Result<Run>;
}
#[derive(Default, Serialize)]
@ -39,15 +40,18 @@ pub struct Stats {
pub struct Bencher;
impl Bencher {
pub async fn bench<B: Benchmark>(args: Args) -> anyhow::Result<Stats> {
pub async fn bench<B: Benchmark + Send + Clone>(instant: B, args: Args, config: Config) -> anyhow::Result<Stats> {
let start = Instant::now();
let mut random = StdRng::seed_from_u64(0);
let futs = (0..args.threads).map(|_| {
let rpc_client = args.get_rpc_client();
let duration = args.get_duration_to_run_test();
let args = args.clone();
let config = config.clone();
let random_number = random.gen();
let instant = instant.clone();
tokio::spawn(async move {
let mut benchmark = B::prepare(rpc_client.clone()).await.unwrap();
benchmark.run(rpc_client.clone(), duration).await.unwrap()
instant.run(rpc_client.clone(), duration, args, config, random_number).await.unwrap()
})
});
@ -79,4 +83,4 @@ impl Bencher {
all_runs: all_results,
})
}
}
}

View File

@ -1,9 +1,7 @@
use std::{
collections::HashSet,
str::FromStr,
sync::{atomic::AtomicU64, Arc},
sync::{Arc},
};
use async_trait::async_trait;
use const_env::from_env;
use rand::{seq::IteratorRandom, SeedableRng};
@ -11,10 +9,7 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{pubkey::Pubkey, signature::Keypair, signer::Signer};
use tokio::time::Instant;
use crate::{config::Config, test_registry::TestingTask};
#[from_env]
const NB_ACCOUNT_FETCHING_TASKS: usize = 10;
use crate::{config::Config, test_registry::TestingTask, bencher::{Bencher, Benchmark, Run}};
#[from_env]
const NB_OF_ACCOUNTS_FETCHED_PER_TASK: usize = 100;
@ -30,96 +25,19 @@ impl AccountsFetchingTests {
#[async_trait]
impl TestingTask for AccountsFetchingTests {
async fn test(&self, args: crate::cli::Args, config: Config) -> anyhow::Result<()> {
let rpc_client = Arc::new(RpcClient::new(args.rpc_addr.clone()));
let total_fetches = Arc::new(AtomicU64::new(0));
let known_accounts = config
let accounts = config
.known_accounts
.iter()
.map(|x| Pubkey::from_str(x.as_str()).unwrap())
.collect::<Vec<_>>();
let unknown_accounts: Vec<Pubkey> =
AccountsFetchingTests::create_random_address(known_accounts.len());
let number_of_fetched_accounts = NB_OF_ACCOUNTS_FETCHED_PER_TASK.min(known_accounts.len());
let hash_set_known = Arc::new(known_accounts.iter().copied().collect::<HashSet<_>>());
let mut tasks = vec![];
for i in 0..NB_ACCOUNT_FETCHING_TASKS {
// each new task will fetch (100/NB_ACCOUNT_FETCHING_TASKS) * i percent of unknown accounts
// so first task will fetch no unknown accounts and last will fetch almost all unknown accounts
let known_accounts = known_accounts.clone();
let unknown_accounts = unknown_accounts.clone();
let duration = args.get_duration_to_run_test();
let rpc_client = rpc_client.clone();
let hash_set_known = hash_set_known.clone();
let total_fetches = total_fetches.clone();
let task = tokio::spawn(async move {
let percentage_of_unknown_tasks =
(100.0 / (NB_ACCOUNT_FETCHING_TASKS as f64)) * (i as f64);
println!("started fetching accounts task #{}", i);
let unknown_accounts_to_take = ((number_of_fetched_accounts as f64)
* percentage_of_unknown_tasks
/ 100.0) as usize;
let known_accounts_to_take =
number_of_fetched_accounts.saturating_sub(unknown_accounts_to_take);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(i as u64);
let instant = Instant::now();
while instant.elapsed() < duration {
let known_accounts = known_accounts
.iter()
.choose_multiple(&mut rng, known_accounts_to_take);
let unknown_accounts = unknown_accounts
.iter()
.choose_multiple(&mut rng, unknown_accounts_to_take);
let accounts_to_fetch = [known_accounts, unknown_accounts]
.concat()
.iter()
.map(|x| *(*x))
.collect::<Vec<_>>();
let res = rpc_client
.get_multiple_accounts(accounts_to_fetch.as_slice())
.await;
total_fetches.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match res {
Ok(res) => {
if res.len() == accounts_to_fetch.len() {
for i in 0..accounts_to_fetch.len() {
if hash_set_known.contains(&accounts_to_fetch[i]) {
if res[i].is_none() {
println!(
"unable to fetch known account {}",
accounts_to_fetch[i]
);
}
} else if res[i].is_some() {
println!("fetched unknown account should not be possible");
}
}
} else {
println!("fetched accounts results mismatched");
}
}
Err(e) => {
println!("accounts fetching failed because {}", e);
}
}
}
});
tasks.push(task);
}
futures::future::join_all(tasks).await;
println!(
"Accounts fetching did {} requests of {} accounts in {} tasks",
total_fetches.load(std::sync::atomic::Ordering::Relaxed),
number_of_fetched_accounts,
NB_ACCOUNT_FETCHING_TASKS
);
AccountsFetchingTests::create_random_address(accounts.len());
let instant = GetAccountsBench {
accounts_list: [accounts, unknown_accounts].concat(),
};
let metric = Bencher::bench::<GetAccountsBench>(instant, args, config).await?;
log::info!("{}", serde_json::to_string(&metric)?);
Ok(())
}
@ -127,3 +45,40 @@ impl TestingTask for AccountsFetchingTests {
"Accounts Fetching".to_string()
}
}
#[derive(Clone)]
pub struct GetAccountsBench {
accounts_list: Vec<Pubkey>,
}
#[async_trait::async_trait]
impl Benchmark for GetAccountsBench {
async fn run(self, rpc_client: Arc<RpcClient>, duration: std::time::Duration, _: crate::cli::Args, _: Config, random_number: u64) -> anyhow::Result<crate::bencher::Run> {
let mut result = Run::default();
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(random_number);
let number_of_fetched_accounts = NB_OF_ACCOUNTS_FETCHED_PER_TASK.min(self.accounts_list.len());
let start = Instant::now();
while start.elapsed() < duration {
let accounts = self.accounts_list
.iter()
.copied()
.choose_multiple(&mut rng, number_of_fetched_accounts);
match rpc_client
.get_multiple_accounts(accounts.as_slice())
.await {
Ok(_) => {
result.requests_completed += 1;
result.bytes_received += 0;
}
Err(e) => {
result.requests_failed += 1;
result.errors.push(format!("{:?}", e.kind()));
}
}
result.bytes_sent += 0;
}
Ok(result)
}
}

View File

@ -18,8 +18,14 @@ pub struct GetBlockTest;
#[async_trait::async_trait]
impl TestingTask for GetBlockTest {
async fn test(&self, args: Args, _config: Config) -> anyhow::Result<()> {
let metric = Bencher::bench::<GetBlockBench>(args).await?;
async fn test(&self, args: Args, config: Config) -> anyhow::Result<()> {
let slot = {
args.get_rpc_client().get_slot().await.unwrap()
};
let instant = GetBlockBench {
slot
};
let metric = Bencher::bench::<GetBlockBench>( instant, args, config).await?;
info!("{}", serde_json::to_string(&metric)?);
Ok(())
}
@ -29,19 +35,15 @@ impl TestingTask for GetBlockTest {
}
}
#[derive(Clone)]
pub struct GetBlockBench {
slot: Slot,
}
#[async_trait::async_trait]
impl Benchmark for GetBlockBench {
async fn prepare(rpc_client: Arc<RpcClient>) -> anyhow::Result<Self> {
Ok(Self {
slot: rpc_client.get_slot().await?,
})
}
async fn run(&mut self, rpc_client: Arc<RpcClient>, duration: Duration) -> anyhow::Result<Run> {
async fn run(self, rpc_client: Arc<RpcClient>, duration: Duration, _: Args, _: Config, _: u64) -> anyhow::Result<Run> {
let mut result = Run::default();
let start = Instant::now();

View File

@ -7,12 +7,14 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use crate::bencher::{Bencher, Benchmark, Run};
use crate::{cli::Args, config::Config, test_registry::TestingTask};
#[derive(Clone)]
pub struct GetSlotTest;
#[async_trait::async_trait]
impl TestingTask for GetSlotTest {
async fn test(&self, args: Args, _config: Config) -> anyhow::Result<()> {
let stats = Bencher::bench::<Self>(args).await?;
async fn test(&self, args: Args, config: Config) -> anyhow::Result<()> {
let instant = GetSlotTest;
let stats = Bencher::bench::<Self>(instant, args, config).await?;
info!("GetSlotTest {}", serde_json::to_string(&stats)?);
Ok(())
}
@ -24,11 +26,8 @@ impl TestingTask for GetSlotTest {
#[async_trait::async_trait]
impl Benchmark for GetSlotTest {
async fn prepare(_: Arc<RpcClient>) -> anyhow::Result<Self> {
Ok(Self)
}
async fn run(&mut self, rpc_client: Arc<RpcClient>, duration: Duration) -> anyhow::Result<Run> {
async fn run(self, rpc_client: Arc<RpcClient>, duration: Duration, _: Args, _: Config, _: u64) -> anyhow::Result<Run> {
let mut result = Run::default();
let start = Instant::now();

View File

@ -1,7 +1,5 @@
use crate::test_registry::TestingTask;
use crate::{test_registry::TestingTask, bencher::{Benchmark, Run, Bencher}, config::Config};
use async_trait::async_trait;
use const_env::from_env;
use dashmap::DashSet;
use rand::{distributions::Alphanumeric, prelude::Distribution, seq::SliceRandom, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
@ -11,21 +9,12 @@ use solana_sdk::{
use std::{
str::FromStr,
sync::Arc,
time::{Duration, Instant},
time::{Instant},
};
use tokio::sync::RwLock;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
#[from_env]
const NB_MEMO_TRANSACTIONS_SENT_PER_SECOND: usize = 256;
#[derive(Clone, Debug, Copy)]
struct Metric {
pub number_of_confirmed_txs: usize,
pub number_of_unconfirmed_txs: usize,
}
pub struct SendAndConfrimTesting {
pub block_hash: Arc<RwLock<Hash>>,
}
@ -38,78 +27,6 @@ fn create_memo_tx(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
Transaction::new(&[payer], message, blockhash)
}
fn generate_random_strings(num_of_txs: usize, random_seed: Option<u64>) -> Vec<Vec<u8>> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| Alphanumeric.sample_iter(&mut rng).take(10).collect())
.collect()
}
async fn send_and_confirm_transactions(
rpc_client: Arc<RpcClient>,
tx_count: usize,
funded_payer: Keypair,
seed: u64,
block_hash: Arc<RwLock<Hash>>,
) -> Metric {
let map_of_txs = Arc::new(DashSet::new());
// transaction sender task
{
let map_of_txs = map_of_txs.clone();
let rpc_client = rpc_client.clone();
tokio::spawn(async move {
let map_of_txs = map_of_txs.clone();
let rand_strings = generate_random_strings(tx_count, Some(seed));
for rand_string in rand_strings {
let blockhash = { *block_hash.read().await };
let tx = create_memo_tx(&rand_string, &funded_payer, blockhash);
if let Ok(signature) = rpc_client.send_transaction(&tx).await {
map_of_txs.insert(signature);
}
}
});
}
let confirmation_time = Instant::now();
let mut confirmed_count = 0;
let mut metric = Metric {
number_of_confirmed_txs: 0,
number_of_unconfirmed_txs: 0,
};
while confirmation_time.elapsed() < Duration::from_secs(120)
&& !(map_of_txs.is_empty() && confirmed_count == tx_count)
{
let signatures = map_of_txs.iter().map(|x| *x.key()).collect::<Vec<_>>();
if signatures.is_empty() {
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
}
if let Ok(res) = rpc_client.get_signature_statuses(&signatures).await {
for i in 0..signatures.len() {
let tx_status = &res.value[i];
if tx_status.is_some() {
let signature = signatures[i];
let tx_data = map_of_txs.get(&signature).unwrap();
metric.number_of_confirmed_txs += 1;
drop(tx_data);
map_of_txs.remove(&signature);
confirmed_count += 1;
}
}
}
}
for _ in map_of_txs.iter() {
metric.number_of_unconfirmed_txs += 1;
}
metric
}
#[async_trait]
impl TestingTask for SendAndConfrimTesting {
async fn test(
@ -117,42 +34,11 @@ impl TestingTask for SendAndConfrimTesting {
args: crate::cli::Args,
config: crate::config::Config,
) -> anyhow::Result<()> {
println!("started sending and confirming memo transactions");
let rpc_client = Arc::new(RpcClient::new(args.rpc_addr.clone()));
let mut run_interval_ms = tokio::time::interval(Duration::from_secs(1));
let nb_runs = args.duration_in_seconds;
let mut tasks = vec![];
let payers = config.get_payers();
for seed in 0..nb_runs {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
let payer = payers.choose(&mut rng).unwrap();
let payer = Keypair::from_bytes(payer.to_bytes().as_slice()).unwrap();
let block_hash = self.block_hash.clone();
tasks.push(tokio::spawn(send_and_confirm_transactions(
rpc_client.clone(),
NB_MEMO_TRANSACTIONS_SENT_PER_SECOND,
payer,
seed,
block_hash.clone(),
)));
// wait for an interval
run_interval_ms.tick().await;
}
let instant = Instant::now();
let tasks_res = futures::future::join_all(tasks).await;
let duration = instant.elapsed();
let mut total_txs_confirmed = 0;
let mut total_txs_unconfirmed = 0;
for metric in tasks_res.into_iter().flatten() {
total_txs_confirmed += metric.number_of_confirmed_txs;
total_txs_unconfirmed += metric.number_of_unconfirmed_txs;
}
println!("Memo transaction sent and confrim results \n Number of transaction confirmed : {}, \n Number of transactions unconfirmed {}, took {}s", total_txs_confirmed, total_txs_unconfirmed, duration.as_secs());
let instant = SendMemoTransactionsBench {
block_hash: self.block_hash.clone(),
};
let metric = Bencher::bench::<SendMemoTransactionsBench>( instant, args, config).await?;
log::info!("{} {}", self.get_name(), serde_json::to_string(&metric)?);
Ok(())
}
@ -160,3 +46,38 @@ impl TestingTask for SendAndConfrimTesting {
"Send and confirm memo transaction".to_string()
}
}
#[derive(Clone)]
struct SendMemoTransactionsBench {
block_hash: Arc<RwLock<Hash>>,
}
#[async_trait::async_trait]
impl Benchmark for SendMemoTransactionsBench {
async fn run(self, rpc_client: Arc<RpcClient>, duration: std::time::Duration, _: crate::cli::Args, config: Config, random_number: u64) -> anyhow::Result<crate::bencher::Run> {
let mut result = Run::default();
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(random_number);
let start = Instant::now();
while start.elapsed() < duration {
let msg: Vec<u8> = Alphanumeric.sample_iter(&mut rng).take(10).collect();
let payer = config.users.choose(&mut rng).unwrap();
let blockhash = { *self.block_hash.read().await };
let tx = create_memo_tx(&msg, &payer.get_keypair(), blockhash);
match rpc_client.send_transaction(&tx).await {
Ok(_) => {
result.requests_completed += 1;
result.bytes_received += 0;
}
Err(e) => {
result.requests_failed += 1;
result.errors.push(format!("{:?}", e.kind()));
}
}
result.bytes_sent += 0;
}
Ok(result)
}
}