Refactor benchmarks into separate binaries (#350)

make the new benchmark test cases individual bins
This commit is contained in:
Lou-Kamades 2024-03-13 11:29:23 -05:00 committed by GitHub
parent ce215e0239
commit 573c2da966
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 967 additions and 569 deletions

130
Cargo.lock generated
View File

@ -572,22 +572,22 @@ version = "0.2.4"
dependencies = [
"anyhow",
"bincode",
"clap 4.4.18",
"csv",
"dashmap 5.5.3",
"dirs",
"futures",
"itertools 0.10.5",
"lazy_static",
"log",
"rand 0.8.5",
"rand_chacha 0.3.1",
"serde",
"serde_json",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-sdk",
"solana-transaction-status",
"spl-memo 4.0.0",
"tokio",
"toml 0.8.10",
"tracing",
"tracing-subscriber",
]
@ -1166,27 +1166,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "csv"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70"
dependencies = [
"memchr",
]
[[package]]
name = "ctr"
version = "0.8.0"
@ -1376,27 +1355,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "dirs"
version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-sys"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
dependencies = [
"libc",
"option-ext",
"redox_users",
"windows-sys 0.48.0",
]
[[package]]
name = "displaydoc"
version = "0.2.4"
@ -2429,17 +2387,6 @@ version = "0.2.153"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
[[package]]
name = "libredox"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8"
dependencies = [
"bitflags 2.4.2",
"libc",
"redox_syscall",
]
[[package]]
name = "libsecp256k1"
version = "0.6.0"
@ -3013,12 +2960,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "option-ext"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
[[package]]
name = "os_str_bytes"
version = "6.6.1"
@ -3274,7 +3215,7 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d6ea3c4595b96363c13943497db34af4460fb474a95c43f4446ad341b8c9785"
dependencies = [
"toml",
"toml 0.5.11",
]
[[package]]
@ -3605,17 +3546,6 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_users"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4"
dependencies = [
"getrandom 0.2.12",
"libredox",
"thiserror",
]
[[package]]
name = "regex"
version = "1.10.3"
@ -3977,6 +3907,15 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_spanned"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1"
dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -5920,11 +5859,26 @@ dependencies = [
"serde",
]
[[package]]
name = "toml"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.22.6",
]
[[package]]
name = "toml_datetime"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
@ -5934,7 +5888,7 @@ checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
"indexmap 2.2.2",
"toml_datetime",
"winnow",
"winnow 0.5.37",
]
[[package]]
@ -5945,7 +5899,20 @@ checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1"
dependencies = [
"indexmap 2.2.2",
"toml_datetime",
"winnow",
"winnow 0.5.37",
]
[[package]]
name = "toml_edit"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c1b5fd4128cc8d3e0cb74d4ed9a9cc7c7284becd4df68f5f940e1ad123606f6"
dependencies = [
"indexmap 2.2.2",
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.6.5",
]
[[package]]
@ -6622,6 +6589,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dffa400e67ed5a4dd237983829e66475f0a4a26938c4b04c21baede6262215b8"
dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.50.0"

View File

@ -48,7 +48,8 @@ $ cargo test
*bench*
```bash
$ cd bench and cargo run --release
$ cd bench
$ cargo run --bin confirmation_slot ./bench-config.toml
```
Find a new file named `metrics.csv` in the project root.

View File

@ -3,26 +3,43 @@ name = "bench"
version = "0.2.4"
edition = "2021"
[lib]
name = "bench_lib"
path = "src/lib.rs"
[[bin]]
name = "api_load"
path = "src/benches/api_load.rs"
[[bin]]
name = "confirmation_slot"
path = "src/benches/confirmation_slot.rs"
[[bin]]
name = "confirmation_rate"
path = "src/benches/confirmation_rate.rs"
[dependencies]
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
solana-transaction-status = { workspace = true }
solana-rpc-client-api = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
clap = { workspace = true }
tokio = { version = "1.28.2", features = ["full", "fs"]}
tracing-subscriber = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"
tracing-subscriber = { workspace = true, features = ["std", "env-filter"] }
rand = "0.8.5"
rand_chacha = "0.3.1"
futures = { workspace = true }
dashmap = { workspace = true }
lazy_static = "1.4.0"
bincode = { workspace = true }
itertools = "0.10.5"
spl-memo = "4.0.0"
toml = "0.8.10"
[dev-dependencies]
bincode = { workspace = true }

14
bench/bench-config.toml Normal file
View File

@ -0,0 +1,14 @@
payer_path = "/path/to/id.json"
lite_rpc_url = "http://127.0.0.1:8899"
rpc_url = "https://api.testnet.rpcpool.com/secret"
[api_load]
time_ms = 3000
[confirmation_rate]
num_txns = 20
num_runs = 5
tx_size = "Small"
[confirmation_slot]
tx_size = "Small" # Small or Large

View File

@ -0,0 +1,61 @@
use bench_lib::{config::BenchConfig, create_memo_tx_small};
use log::{info, warn};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Keypair, Signer};
// TC3 measure how much load the API endpoint can take
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
warn!("THIS IS WORK IN PROGRESS");
let config = BenchConfig::load().unwrap();
let rpc = Arc::new(RpcClient::new(config.lite_rpc_url.clone()));
info!("RPC: {}", rpc.as_ref().url());
let payer: Arc<Keypair> = Arc::new(read_keypair_file(&config.payer_path).unwrap());
info!("Payer: {}", payer.pubkey().to_string());
let mut txs = 0;
let failed = Arc::new(AtomicUsize::new(0));
let success = Arc::new(AtomicUsize::new(0));
let hash = rpc.get_latest_blockhash().await?;
let time_ms = config.api_load.time_ms;
let time = tokio::time::Instant::now();
while time.elapsed().as_millis() < time_ms.into() {
let rpc = rpc.clone();
let payer = payer.clone();
let failed = failed.clone();
let success = success.clone();
let msg = format!("tx: {txs}");
tokio::spawn(async move {
let msg = msg.as_bytes();
let tx = create_memo_tx_small(msg, &payer, hash);
match rpc.send_transaction(&tx).await {
Ok(_) => success.fetch_add(1, Ordering::Relaxed),
Err(_) => failed.fetch_add(1, Ordering::Relaxed),
};
});
txs += 1;
}
let calls_per_second = txs as f64 / (time_ms as f64 * 1000.0);
info!("calls_per_second: {}", calls_per_second);
info!("failed: {}", failed.load(Ordering::Relaxed));
info!("success: {}", success.load(Ordering::Relaxed));
Ok(())
}

View File

@ -0,0 +1,390 @@
use anyhow::{bail, Error};
use bench_lib::config::{BenchConfig, ConfirmationRateConfig};
use bench_lib::tx_size::TxSize;
use bench_lib::{create_rng, generate_txs};
use futures::future::join_all;
use futures::TryFutureExt;
use itertools::Itertools;
use log::{debug, info, trace, warn};
use std::collections::{HashMap, HashSet};
use std::iter::zip;
use std::sync::Arc;
use std::time::Duration;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_rpc_client_api::client_error::ErrorKind;
use solana_sdk::signature::{read_keypair_file, Signature, Signer};
use solana_sdk::slot_history::Slot;
use solana_sdk::transaction::Transaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::time::Instant;
#[derive(Debug, serde::Serialize)]
pub struct RpcStat {
confirmation_time: f32,
mode_slot: u64,
confirmed: u64,
unconfirmed: u64,
failed: u64,
}
/// TC2 send multiple runs of num_txns, measure the confirmation rate
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
warn!("THIS IS WORK IN PROGRESS");
let config = BenchConfig::load().unwrap();
let ConfirmationRateConfig {
tx_size,
num_txns,
num_runs,
} = config.confirmation_rate;
let rpc = Arc::new(RpcClient::new(config.lite_rpc_url.clone()));
info!("RPC: {}", rpc.as_ref().url());
let payer: Arc<Keypair> = Arc::new(read_keypair_file(&config.payer_path).unwrap());
info!("Payer: {}", payer.pubkey().to_string());
let mut rpc_results = Vec::with_capacity(num_runs);
for _ in 0..num_runs {
let stat: RpcStat = send_bulk_txs_and_wait(&rpc, &payer, num_txns, tx_size).await?;
rpc_results.push(stat);
}
info!("avg_rpc: {:?}", calc_stats_avg(&rpc_results));
Ok(())
}
pub async fn send_bulk_txs_and_wait(
rpc: &RpcClient,
payer: &Keypair,
num_txns: usize,
tx_size: TxSize,
) -> anyhow::Result<RpcStat> {
let hash = rpc.get_latest_blockhash().await?;
let mut rng = create_rng(None);
let txs = generate_txs(num_txns, payer, hash, &mut rng, tx_size);
let started_at = tokio::time::Instant::now();
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &txs).await?;
let elapsed_total = started_at.elapsed();
for (tx_sig, confirmation) in &tx_and_confirmations_from_rpc {
match confirmation {
ConfirmationResponseFromRpc::Success(slots_elapsed, level, elapsed) => {
debug!(
"Signature {} confirmed with level {:?} after {:.02}ms, {} slots",
tx_sig,
level,
elapsed.as_secs_f32() * 1000.0,
slots_elapsed
);
}
ConfirmationResponseFromRpc::Timeout(elapsed) => {
debug!(
"Signature {} not confirmed after {:.02}ms",
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
}
ConfirmationResponseFromRpc::SendError(_) => {
unreachable!()
}
}
}
let (mut confirmed, mut unconfirmed, mut failed) = (0, 0, 0);
let mut slot_hz: HashMap<Slot, u64> = Default::default();
for (_, result_from_rpc) in tx_and_confirmations_from_rpc {
match result_from_rpc {
ConfirmationResponseFromRpc::Success(slot, _, _) => {
confirmed += 1;
*slot_hz.entry(slot).or_default() += 1;
}
ConfirmationResponseFromRpc::Timeout(_) => {
unconfirmed += 1;
}
ConfirmationResponseFromRpc::SendError(_) => {
failed += 1;
}
}
//
// match tx {
// Ok(Some(status)) => {
// if status.satisfies_commitment(CommitmentConfig::confirmed()) {
// confirmed += 1;
// *slot_hz.entry(status.slot).or_default() += 1;
// } else {
// unconfirmed += 1;
// }
// }
// Ok(None) => {
// unconfirmed += 1;
// }
// Err(_) => {
// failed += 1;
// }
// }
}
let mode_slot = slot_hz
.into_iter()
.max_by_key(|(_, v)| *v)
.map(|(k, _)| k)
.unwrap_or_default();
Ok(RpcStat {
confirmation_time: elapsed_total.as_secs_f32(),
mode_slot,
confirmed,
unconfirmed,
failed,
})
}
fn calc_stats_avg(stats: &[RpcStat]) -> RpcStat {
let len = stats.len();
let mut avg = RpcStat {
confirmation_time: 0.0,
mode_slot: 0,
confirmed: 0,
unconfirmed: 0,
failed: 0,
};
for stat in stats {
avg.confirmation_time += stat.confirmation_time;
avg.confirmed += stat.confirmed;
avg.unconfirmed += stat.unconfirmed;
avg.failed += stat.failed;
}
avg.confirmation_time /= len as f32;
avg.confirmed /= len as u64;
avg.unconfirmed /= len as u64;
avg.failed /= len as u64;
avg
}
#[derive(Clone)]
enum ConfirmationResponseFromRpc {
SendError(Arc<ErrorKind>),
// elapsed slot: current slot (confirmed) at beginning til the slot where transaction showed up with status CONFIRMED
Success(Slot, TransactionConfirmationStatus, Duration),
Timeout(Duration),
}
async fn send_and_confirm_bulk_transactions(
rpc_client: &RpcClient,
txs: &[Transaction],
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
let send_slot = poll_next_slot_start(rpc_client).await?;
let started_at = Instant::now();
let batch_sigs_or_fails = join_all(
txs.iter()
.map(|tx| rpc_client.send_transaction(tx).map_err(|e| e.kind)),
)
.await;
let after_send_slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await?;
// optimal value is "0"
info!(
"slots passed while sending: {}",
after_send_slot - send_slot
);
let num_sent_ok = batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_ok())
.count();
let num_sent_failed = batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_err())
.count();
for (i, tx_sig) in txs.iter().enumerate() {
let tx_sent = batch_sigs_or_fails[i].is_ok();
if tx_sent {
debug!("- tx_sent {}", tx_sig.get_signature());
} else {
debug!("- tx_fail {}", tx_sig.get_signature());
}
}
debug!(
"{} transactions sent successfully in {:.02}ms",
num_sent_ok,
started_at.elapsed().as_secs_f32() * 1000.0
);
debug!(
"{} transactions failed to send in {:.02}ms",
num_sent_failed,
started_at.elapsed().as_secs_f32() * 1000.0
);
if num_sent_failed > 0 {
warn!(
"Some transactions failed to send: {} out of {}",
num_sent_failed,
txs.len()
);
bail!("Failed to send all transactions");
}
let mut pending_status_set: HashSet<Signature> = HashSet::new();
batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_ok())
.for_each(|sig_or_fail| {
pending_status_set.insert(sig_or_fail.as_ref().unwrap().to_owned());
});
let mut result_status_map: HashMap<Signature, ConfirmationResponseFromRpc> = HashMap::new();
// items get moved from pending_status_set to result_status_map
let started_at = Instant::now();
let mut iteration = 1;
'pooling_loop: loop {
let iteration_ends_at = started_at + Duration::from_millis(iteration * 200);
assert_eq!(
pending_status_set.len() + result_status_map.len(),
num_sent_ok,
"Items must move between pending+result"
);
let tx_batch = pending_status_set.iter().cloned().collect_vec();
debug!(
"Request status for batch of remaining {} transactions in iteration {}",
tx_batch.len(),
iteration
);
// TODO warn if get_status api calles are slow
let batch_responses = rpc_client
.get_signature_statuses(tx_batch.as_slice())
.await?;
let elapsed = started_at.elapsed();
for (tx_sig, status_response) in zip(tx_batch, batch_responses.value) {
match status_response {
Some(tx_status) => {
trace!(
"Some signature status {:?} received for {} after {:.02}ms",
tx_status.confirmation_status,
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
if !tx_status.satisfies_commitment(CommitmentConfig::confirmed()) {
continue 'pooling_loop;
}
// status is confirmed or finalized
pending_status_set.remove(&tx_sig);
let prev_value = result_status_map.insert(
tx_sig,
ConfirmationResponseFromRpc::Success(
tx_status.slot - send_slot,
tx_status.confirmation_status(),
elapsed,
),
);
assert!(prev_value.is_none(), "Must not override existing value");
}
None => {
// None: not yet processed by the cluster
trace!(
"No signature status was received for {} after {:.02}ms - continue waiting",
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
}
}
}
if pending_status_set.is_empty() {
debug!("All transactions confirmed after {} iterations", iteration);
break 'pooling_loop;
}
if iteration == 100 {
debug!("Timeout waiting for transactions to confirmed after {} iterations - giving up on {}", iteration, pending_status_set.len());
break 'pooling_loop;
}
iteration += 1;
// avg 2 samples per slot
tokio::time::sleep_until(iteration_ends_at).await;
} // -- END polling loop
let total_time_elapsed_polling = started_at.elapsed();
// all transactions which remain in pending list are considered timed out
for tx_sig in pending_status_set.clone() {
pending_status_set.remove(&tx_sig);
result_status_map.insert(
tx_sig,
ConfirmationResponseFromRpc::Timeout(total_time_elapsed_polling),
);
}
let result_as_vec = batch_sigs_or_fails
.into_iter()
.enumerate()
.map(|(i, sig_or_fail)| match sig_or_fail {
Ok(tx_sig) => {
let confirmation = result_status_map
.get(&tx_sig)
.expect("consistent map with all tx")
.clone()
.to_owned();
(tx_sig, confirmation)
}
Err(send_error) => {
let tx_sig = txs[i].get_signature();
let confirmation = ConfirmationResponseFromRpc::SendError(Arc::new(send_error));
(*tx_sig, confirmation)
}
})
.collect_vec();
Ok(result_as_vec)
}
async fn poll_next_slot_start(rpc_client: &RpcClient) -> Result<Slot, Error> {
let started_at = Instant::now();
let mut last_slot: Option<Slot> = None;
let mut i = 1;
// try to catch slot start
let send_slot = loop {
if i > 500 {
bail!("Timeout waiting for slot change");
}
let iteration_ends_at = started_at + Duration::from_millis(i * 30);
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await?;
trace!("polling slot {}", slot);
if let Some(last_slot) = last_slot {
if last_slot + 1 == slot {
break slot;
}
}
last_slot = Some(slot);
tokio::time::sleep_until(iteration_ends_at).await;
i += 1;
};
Ok(send_slot)
}

View File

@ -0,0 +1,65 @@
use anyhow::Context;
use bench_lib::config::BenchConfig;
use bench_lib::tx_size::TxSize;
use bench_lib::{create_memo_tx, create_rng, send_and_confirm_transactions, Rng8};
use log::{info, warn};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Signer};
use solana_sdk::transaction::Transaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
/// TC1 send 2 txs (one via LiteRPC, one via Solana RPC) and compare confirmation slot (=slot distance)
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
warn!("THIS IS WORK IN PROGRESS");
let config = BenchConfig::load().unwrap();
let tx_size = config.confirmation_slot.tx_size;
let lite_rpc = RpcClient::new(config.lite_rpc_url.clone());
info!("Lite RPC: {}", lite_rpc.url());
let rpc = RpcClient::new(config.rpc_url.clone());
info!("RPC: {}", rpc.url());
let mut rng = create_rng(None);
let payer = read_keypair_file(&config.payer_path).expect("payer file");
info!("Payer: {}", payer.pubkey().to_string());
let rpc_tx = create_tx(&rpc, &payer, &mut rng, tx_size).await?;
let lite_rpc_tx = create_tx(&lite_rpc, &payer, &mut rng, tx_size).await?;
let (rpc_slot, lite_rpc_slot) = tokio::join!(
send_transaction_and_get_slot(&rpc, rpc_tx),
send_transaction_and_get_slot(&lite_rpc, lite_rpc_tx)
);
info!("rpc_slot: {}", rpc_slot?);
info!("lite_rpc_slot: {}", lite_rpc_slot?);
Ok(())
}
async fn create_tx(
rpc: &RpcClient,
payer: &Keypair,
rng: &mut Rng8,
tx_size: TxSize,
) -> anyhow::Result<Transaction> {
let hash = rpc.get_latest_blockhash().await?;
Ok(create_memo_tx(payer, hash, rng, tx_size))
}
async fn send_transaction_and_get_slot(client: &RpcClient, tx: Transaction) -> anyhow::Result<u64> {
let status = send_and_confirm_transactions(client, &[tx], CommitmentConfig::confirmed(), None)
.await?
.into_iter()
.next()
.unwrap()?
.context("unable to confirm tx")?;
Ok(status.slot)
}

View File

@ -0,0 +1,37 @@
// use std::fs::File;
// use csv::Writer;
// use crate::cli::RpcArgs;
// use crate::strategies::tc1::{Tc1, Tc1Result};
// use super::Strategy;
// #[derive(Debug, serde::Serialize)]
// pub struct Tc4Result {
// // TODO
// }
// ///- send txs on LiteRPC broadcast channel and consume them using the Solana quic-streamer
// /// - see quic_proxy_tpu_integrationtest.rs (note: not only about proxy)
// /// - run cargo test (maybe need to use release build)
// /// - Goal: measure performance of LiteRPC internal channel/thread structure and the TPU_service performance
// #[derive(clap::Args, Debug)]
// pub struct Tc4 {
// #[command(flatten)]
// foobar: RpcArgs,
// }
// #[async_trait::async_trait]
// impl Strategy for Tc4 {
// type Output = Tc4Result;
// async fn execute(&self) -> anyhow::Result<Self::Output> {
// todo!()
// }
// }
// impl Tc4 {
// pub fn write_csv(csv_writer: &mut Writer<File>, result: &Tc4Result) -> anyhow::Result<()> {
// // TODO
// Ok(())
// }
// }

View File

@ -1,26 +0,0 @@
use clap::{command, Parser};
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// Number of tx(s) sent in each run
#[arg(short = 'n', long, default_value_t = 5_000)]
pub tx_count: usize,
/// Number of bench runs
#[arg(short = 'r', long, default_value_t = 1)]
pub runs: usize,
/// Interval between each bench run (ms)
#[arg(short = 'i', long, default_value_t = 1000)]
pub run_interval_ms: u64,
/// Metrics output file name
#[arg(short = 'm', long, default_value_t = String::from("metrics.csv"))]
pub metrics_file_name: String,
/// Lite Rpc Address
#[arg(short = 'l', long, default_value_t = String::from("http://127.0.0.1:8890"))]
pub lite_rpc_addr: String,
#[arg(short = 't', long, default_value_t = String::from("transactions.csv"))]
pub transaction_save_file: String,
// choose between small (179 bytes) and large (1186 bytes) transactions
#[arg(short = 'L', long, default_value_t = false)]
pub large_transactions: bool,
}

41
bench/src/config.rs Normal file
View File

@ -0,0 +1,41 @@
use crate::tx_size::TxSize;
use serde::Deserialize;
use std::fs::File;
use std::io::Read;
#[derive(Clone, Debug, Deserialize)]
pub struct BenchConfig {
pub payer_path: String,
pub lite_rpc_url: String,
pub rpc_url: String,
pub api_load: ApiLoadConfig,
pub confirmation_rate: ConfirmationRateConfig,
pub confirmation_slot: ConfirmationSlotConfig,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ApiLoadConfig {
pub time_ms: u64,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ConfirmationRateConfig {
pub num_txns: usize,
pub num_runs: usize,
pub tx_size: TxSize,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ConfirmationSlotConfig {
pub tx_size: TxSize,
}
impl BenchConfig {
pub fn load() -> anyhow::Result<Self> {
let args: Vec<String> = std::env::args().collect();
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap();
Ok(toml::from_str(&contents)?)
}
}

View File

@ -1,168 +0,0 @@
use anyhow::Context;
use itertools::Itertools;
use lazy_static::lazy_static;
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::instruction::AccountMeta;
use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash,
instruction::Instruction,
message::Message,
pubkey::Pubkey,
signature::{Keypair, Signature},
signer::Signer,
system_instruction,
transaction::Transaction,
};
use std::path::PathBuf;
use std::{str::FromStr, time::Duration};
use tokio::time::Instant;
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
const WAIT_LIMIT_IN_SECONDS: u64 = 60;
lazy_static! {
static ref USER_KEYPAIR: PathBuf = {
dirs::home_dir()
.unwrap()
.join(".config")
.join("solana")
.join("id.json")
};
}
pub struct BenchHelper;
impl BenchHelper {
pub async fn get_payer() -> anyhow::Result<Keypair> {
let payer = tokio::fs::read_to_string(USER_KEYPAIR.as_path())
.await
.context("Error reading payer file")?;
let payer: Vec<u8> = serde_json::from_str(&payer)?;
let payer = Keypair::from_bytes(&payer)?;
Ok(payer)
}
pub async fn wait_till_signature_status(
rpc_client: &RpcClient,
sig: &Signature,
commitment_config: CommitmentConfig,
) -> anyhow::Result<()> {
let instant = Instant::now();
loop {
if instant.elapsed() > Duration::from_secs(WAIT_LIMIT_IN_SECONDS) {
return Err(anyhow::Error::msg("Timedout waiting"));
}
if let Some(err) = rpc_client
.get_signature_status_with_commitment(sig, commitment_config)
.await?
{
err?;
return Ok(());
}
}
}
pub fn create_transaction(funded_payer: &Keypair, blockhash: Hash) -> Transaction {
let to_pubkey = Pubkey::new_unique();
// transfer instruction
let instruction =
system_instruction::transfer(&funded_payer.pubkey(), &to_pubkey, 1_000_000);
let message = Message::new(&[instruction], Some(&funded_payer.pubkey()));
Transaction::new(&[funded_payer], message, blockhash)
}
pub fn generate_random_strings(
num_of_txs: usize,
random_seed: Option<u64>,
n_chars: usize,
) -> Vec<Vec<u8>> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| Alphanumeric.sample_iter(&mut rng).take(n_chars).collect())
.collect()
}
#[inline]
pub fn generate_txs(
num_of_txs: usize,
funded_payer: &Keypair,
blockhash: Hash,
random_seed: Option<u64>,
) -> Vec<Transaction> {
let seed = random_seed.map_or(0, |x| x);
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(seed);
(0..num_of_txs)
.map(|_| {
let random_bytes: Vec<u8> = Alphanumeric.sample_iter(&mut rng).take(10).collect();
Self::create_memo_tx_small(&random_bytes, funded_payer, blockhash)
})
.collect()
}
pub fn create_memo_tx_small(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
let message = Message::new(&[instruction], Some(&payer.pubkey()));
Transaction::new(&[payer], message, blockhash)
}
pub fn create_memo_tx_large(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
let accounts = (0..8).map(|_| Keypair::new()).collect_vec();
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let instruction = Instruction::new_with_bytes(
memo,
msg,
accounts
.iter()
.map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true))
.collect_vec(),
);
let message = Message::new(&[instruction], Some(&payer.pubkey()));
let mut signers = vec![payer];
signers.extend(accounts.iter());
Transaction::new(&signers, message, blockhash)
}
}
#[test]
fn transaction_size_small() {
let blockhash = Hash::default();
let payer_keypair = Keypair::from_base58_string(
"rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let seed = 42;
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 10);
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_small(rand_string, &payer_keypair, blockhash);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 179);
}
#[test]
fn transaction_size_large() {
let blockhash = Hash::default();
let payer_keypair = Keypair::from_base58_string(
"rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let seed = 42;
let random_strings = BenchHelper::generate_random_strings(1, Some(seed), 240);
let rand_string = random_strings.first().unwrap();
let tx = BenchHelper::create_memo_tx_large(rand_string, &payer_keypair, blockhash);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 1186);
}

View File

@ -1,3 +1,247 @@
pub mod cli;
pub mod helpers;
pub mod metrics;
use anyhow::bail;
use futures::future::join_all;
use itertools::Itertools;
use log::{debug, warn};
use rand::{distributions::Alphanumeric, prelude::Distribution, SeedableRng};
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash,
instruction::{AccountMeta, Instruction},
message::Message,
pubkey::Pubkey,
signature::{Keypair, Signature},
signer::Signer,
system_instruction,
transaction::Transaction,
};
use solana_transaction_status::TransactionStatus;
use std::{str::FromStr, time::Duration};
use tokio::time::Instant;
pub mod config;
pub mod tx_size;
//TODO: use CLAP
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
const WAIT_LIMIT_IN_SECONDS: u64 = 60;
pub type Rng8 = rand_chacha::ChaCha8Rng;
pub async fn wait_till_signature_status(
rpc_client: &RpcClient,
sig: &Signature,
commitment_config: CommitmentConfig,
) -> anyhow::Result<()> {
let instant = Instant::now();
loop {
if instant.elapsed() > Duration::from_secs(WAIT_LIMIT_IN_SECONDS) {
return Err(anyhow::Error::msg("Timedout waiting"));
}
if let Some(err) = rpc_client
.get_signature_status_with_commitment(sig, commitment_config)
.await?
{
err?;
return Ok(());
}
}
}
pub async fn send_and_confirm_transactions(
rpc_client: &RpcClient,
txs: &[impl SerializableTransaction],
commitment_config: CommitmentConfig,
tries: Option<usize>,
) -> anyhow::Result<Vec<anyhow::Result<Option<TransactionStatus>>>> {
let started_at = Instant::now();
let sigs_or_fails = join_all(txs.iter().map(|tx| rpc_client.send_transaction(tx))).await;
debug!(
"sent {} transactions in {:.02}ms",
txs.len(),
started_at.elapsed().as_secs_f32() * 1000.0
);
let num_sent_ok = sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_ok())
.count();
let num_sent_failed = sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_err())
.count();
debug!("{} transactions sent successfully", num_sent_ok);
debug!("{} transactions failed to send", num_sent_failed);
if num_sent_failed > 0 {
warn!(
"Some transactions failed to send: {} out of {}",
num_sent_failed,
txs.len()
);
bail!("Failed to send all transactions");
}
let mut results = sigs_or_fails
.iter()
.map(|sig| {
let Err(err) = sig else {
return Ok(None);
};
bail!("Error sending transaction: {:?}", err)
})
.collect_vec();
// 5 tries
for _ in 0..tries.unwrap_or(5) {
let sigs = results
.iter()
.enumerate()
.filter_map(|(index, result)| match result {
Ok(None) => Some(sigs_or_fails[index].as_ref().unwrap().to_owned()),
_ => None,
})
.collect_vec();
let mut statuses = rpc_client
.get_signature_statuses(&sigs)
.await?
.value
.into_iter();
results.iter_mut().for_each(|result| {
if let Ok(None) = result {
*result = Ok(statuses.next().unwrap());
}
});
if results.iter().all(|result| {
let Ok(result) = result else { return true };
if let Some(result) = result {
result.satisfies_commitment(commitment_config)
} else {
false
}
}) {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
Ok(results)
}
pub fn create_transaction(funded_payer: &Keypair, blockhash: Hash) -> Transaction {
let to_pubkey = funded_payer.pubkey();
let instruction = system_instruction::transfer(&funded_payer.pubkey(), &to_pubkey, 5000);
let message = Message::new(&[instruction], Some(&funded_payer.pubkey()));
Transaction::new(&[funded_payer], message, blockhash)
}
#[inline]
pub fn create_rng(seed: Option<u64>) -> Rng8 {
let seed = seed.map_or(0, |x| x);
Rng8::seed_from_u64(seed)
}
#[inline]
pub fn generate_random_string(rng: &mut Rng8, n_chars: usize) -> Vec<u8> {
Alphanumeric.sample_iter(rng).take(n_chars).collect()
}
#[inline]
pub fn generate_txs(
num_of_txs: usize,
payer: &Keypair,
blockhash: Hash,
rng: &mut Rng8,
size: tx_size::TxSize,
) -> Vec<Transaction> {
(0..num_of_txs)
.map(|_| create_memo_tx(payer, blockhash, rng, size))
.collect()
}
pub fn create_memo_tx(
payer: &Keypair,
blockhash: Hash,
rng: &mut Rng8,
size: tx_size::TxSize,
) -> Transaction {
let rand_str = generate_random_string(rng, size.memo_size());
match size {
tx_size::TxSize::Small => create_memo_tx_small(&rand_str, payer, blockhash),
tx_size::TxSize::Large => create_memo_tx_large(&rand_str, payer, blockhash),
}
}
pub fn create_memo_tx_small(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
// TODO make configurable
// 3 -> 6 slots
// 1 -> 31 slots
let cu_budget: Instruction = ComputeBudgetInstruction::set_compute_unit_price(3);
// Program consumed: 12775 of 13700 compute units
let priority_fees: Instruction = ComputeBudgetInstruction::set_compute_unit_limit(14000);
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
let message = Message::new(
&[cu_budget, priority_fees, instruction],
Some(&payer.pubkey()),
);
Transaction::new(&[payer], message, blockhash)
}
pub fn create_memo_tx_large(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
let accounts = (0..8).map(|_| Keypair::new()).collect_vec();
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let instruction = Instruction::new_with_bytes(
memo,
msg,
accounts
.iter()
.map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true))
.collect_vec(),
);
let message = Message::new(&[instruction], Some(&payer.pubkey()));
let mut signers = vec![payer];
signers.extend(accounts.iter());
Transaction::new(&signers, message, blockhash)
}
#[test]
fn transaction_size_small() {
let blockhash = Hash::default();
let payer_keypair = Keypair::from_base58_string(
"rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let mut rng = create_rng(Some(42));
let rand_string = generate_random_string(&mut rng, 10);
let tx = create_memo_tx_small(&rand_string, &payer_keypair, blockhash);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 231);
}
#[test]
fn transaction_size_large() {
let blockhash = Hash::default();
let payer_keypair = Keypair::from_base58_string(
"rKiJ7H5UUp3JR18kNyTF1XPuwPKHEM7gMLWHZPWP5djrW1vSjfwjhvJrevxF9MPmUmN9gJMLHZdLMgc9ao78eKr",
);
let mut rng = create_rng(Some(42));
let rand_string = generate_random_string(&mut rng, 240);
let tx = create_memo_tx_large(&rand_string, &payer_keypair, blockhash);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 1186);
}

View File

@ -1,289 +0,0 @@
use bench::{
cli::Args,
helpers::BenchHelper,
metrics::{AvgMetric, Metric, TxMetricData},
};
use clap::Parser;
use dashmap::DashMap;
use futures::future::join_all;
use log::{error, info, warn};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Signature;
use solana_sdk::{
commitment_config::CommitmentConfig, hash::Hash, signature::Keypair, signer::Signer,
slot_history::Slot,
};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use tokio::{
sync::{mpsc::UnboundedSender, RwLock},
time::{Duration, Instant},
};
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
async fn main() {
tracing_subscriber::fmt::init();
let Args {
tx_count,
runs,
run_interval_ms,
metrics_file_name,
lite_rpc_addr,
transaction_save_file,
large_transactions,
} = Args::parse();
let mut run_interval_ms = tokio::time::interval(Duration::from_millis(run_interval_ms));
let transaction_size = if large_transactions {
TransactionSize::Large
} else {
TransactionSize::Small
};
info!("Connecting to LiteRPC using {lite_rpc_addr}");
let mut avg_metric = AvgMetric::default();
let mut tasks = vec![];
let funded_payer = BenchHelper::get_payer().await.unwrap();
info!("Payer: {}", funded_payer.pubkey());
let rpc_client = Arc::new(RpcClient::new_with_commitment(
lite_rpc_addr.clone(),
CommitmentConfig::confirmed(),
));
let bh = rpc_client.get_latest_blockhash().await.unwrap();
let slot = rpc_client.get_slot().await.unwrap();
let block_hash: Arc<RwLock<Hash>> = Arc::new(RwLock::new(bh));
let current_slot = Arc::new(AtomicU64::new(slot));
{
// block hash updater task
let block_hash = block_hash.clone();
let rpc_client = rpc_client.clone();
let current_slot = current_slot.clone();