add benchnew with confirmation-slot and confirmation-rate (TC-1 and TC-2)
This commit is contained in:
Lou-Kamades 2024-03-27 06:45:30 -04:00 committed by GitHub
parent 7d39a947ac
commit 4f51c27a08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1005 additions and 477 deletions

19
Cargo.lock generated
View File

@ -582,8 +582,10 @@ dependencies = [
"log",
"rand 0.8.5",
"rand_chacha 0.3.1",
"reqwest",
"serde",
"serde_json",
"solana-lite-rpc-util",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-sdk",
@ -592,6 +594,7 @@ dependencies = [
"tokio",
"tracing",
"tracing-subscriber",
"url",
]
[[package]]
@ -2087,6 +2090,19 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
@ -3722,10 +3738,12 @@ dependencies = [
"http-body",
"hyper",
"hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
@ -3737,6 +3755,7 @@ dependencies = [
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower-service",

View File

@ -10,12 +10,13 @@ path = "src/main.rs"
[[bin]]
# WIP
name = "benchnew"
path = "src/cli.rs"
path = "src/benchnew.rs"
[dependencies]
clap = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"
solana-lite-rpc-util = { workspace = true }
solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }
solana-transaction-status = { workspace = true }
@ -34,6 +35,8 @@ dashmap = { workspace = true }
bincode = { workspace = true }
itertools = "0.10.5"
spl-memo = "4.0.0"
url = "*"
reqwest = "0.11.26"
lazy_static = "1.4.0"
[dev-dependencies]

View File

@ -13,7 +13,12 @@ use solana_sdk::signature::{read_keypair_file, Keypair, Signer};
use crate::create_memo_tx_small;
// TC3 measure how much load the API endpoint can take
pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyhow::Result<()> {
pub async fn api_load(
payer_path: &Path,
rpc_url: String,
test_duration_ms: u64,
cu_price_micro_lamports: u64,
) -> anyhow::Result<()> {
warn!("THIS IS WORK IN PROGRESS");
let rpc = Arc::new(RpcClient::new(rpc_url));
@ -29,7 +34,7 @@ pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyho
let hash = rpc.get_latest_blockhash().await?;
let time = tokio::time::Instant::now();
while time.elapsed().as_millis() < time_ms.into() {
while time.elapsed().as_millis() < test_duration_ms.into() {
let rpc = rpc.clone();
let payer = payer.clone();
@ -40,7 +45,7 @@ pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyho
tokio::spawn(async move {
let msg = msg.as_bytes();
let tx = create_memo_tx_small(msg, &payer, hash);
let tx = create_memo_tx_small(msg, &payer, hash, cu_price_micro_lamports);
match rpc.send_transaction(&tx).await {
Ok(_) => success.fetch_add(1, Ordering::Relaxed),
Err(_) => failed.fetch_add(1, Ordering::Relaxed),
@ -50,7 +55,7 @@ pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyho
txs += 1;
}
let calls_per_second = txs as f64 / (time_ms as f64 * 1000.0);
let calls_per_second = txs as f64 / (test_duration_ms as f64 * 1000.0);
info!("calls_per_second: {}", calls_per_second);
info!("failed: {}", failed.load(Ordering::Relaxed));
info!("success: {}", success.load(Ordering::Relaxed));

View File

@ -1,89 +1,129 @@
use crate::tx_size::TxSize;
use crate::{create_rng, generate_txs};
use anyhow::{bail, Error};
use futures::future::join_all;
use futures::TryFutureExt;
use itertools::Itertools;
use crate::{create_rng, generate_txs, BenchmarkTransactionParams};
use anyhow::Context;
use log::{debug, info, trace, warn};
use std::collections::{HashMap, HashSet};
use std::iter::zip;
use std::ops::Add;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use crate::benches::rpc_interface::{
send_and_confirm_bulk_transactions, ConfirmationResponseFromRpc,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_rpc_client_api::client_error::ErrorKind;
use solana_sdk::signature::{read_keypair_file, Signature, Signer};
use solana_sdk::slot_history::Slot;
use solana_sdk::transaction::Transaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::time::Instant;
use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer};
#[derive(Debug, serde::Serialize)]
pub struct RpcStat {
confirmation_time: f32,
mode_slot: u64,
confirmed: u64,
unconfirmed: u64,
failed: u64,
tx_sent: u64,
tx_confirmed: u64,
// in ms
average_confirmation_time: f32,
// in slots
average_slot_confirmation_time: f32,
tx_send_errors: u64,
tx_unconfirmed: u64,
}
/// TC2 send multiple runs of num_txns, measure the confirmation rate
/// TC2 send multiple runs of num_txs, measure the confirmation rate
pub async fn confirmation_rate(
payer_path: &Path,
rpc_url: String,
tx_size: TxSize,
txns_per_round: usize,
num_rounds: usize,
tx_params: BenchmarkTransactionParams,
max_timeout: Duration,
txs_per_run: usize,
num_of_runs: usize,
) -> anyhow::Result<()> {
warn!("THIS IS WORK IN PROGRESS");
assert!(num_of_runs > 0, "num_of_runs must be greater than 0");
let rpc = Arc::new(RpcClient::new(rpc_url));
info!("RPC: {}", rpc.as_ref().url());
let payer: Arc<Keypair> = Arc::new(read_keypair_file(payer_path).unwrap());
info!("Payer: {}", payer.pubkey().to_string());
let mut rpc_results = Vec::with_capacity(num_rounds);
let mut rpc_results = Vec::with_capacity(num_of_runs);
for _ in 0..num_rounds {
let stat: RpcStat = send_bulk_txs_and_wait(&rpc, &payer, txns_per_round, tx_size).await?;
rpc_results.push(stat);
for _ in 0..num_of_runs {
match send_bulk_txs_and_wait(&rpc, &payer, txs_per_run, &tx_params, max_timeout)
.await
.context("send bulk tx and wait")
{
Ok(stat) => {
rpc_results.push(stat);
}
Err(err) => {
warn!(
"Failed to send bulk txs and wait - no rpc stats available: {}",
err
);
}
}
}
info!("avg_rpc: {:?}", calc_stats_avg(&rpc_results));
if !rpc_results.is_empty() {
info!("avg_rpc: {:?}", calc_stats_avg(&rpc_results));
} else {
info!("avg_rpc: n/a");
}
Ok(())
}
pub async fn send_bulk_txs_and_wait(
rpc: &RpcClient,
payer: &Keypair,
num_txns: usize,
tx_size: TxSize,
num_txs: usize,
tx_params: &BenchmarkTransactionParams,
max_timeout: Duration,
) -> anyhow::Result<RpcStat> {
let hash = rpc.get_latest_blockhash().await?;
trace!("Get latest blockhash and generate transactions");
let hash = rpc.get_latest_blockhash().await.map_err(|err| {
log::error!("Error get latest blockhash : {err:?}");
err
})?;
let mut rng = create_rng(None);
let txs = generate_txs(num_txns, payer, hash, &mut rng, tx_size);
let started_at = tokio::time::Instant::now();
let txs = generate_txs(num_txs, payer, hash, &mut rng, tx_params);
trace!("Sending {} transactions in bulk ..", txs.len());
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &txs).await?;
send_and_confirm_bulk_transactions(rpc, &txs, max_timeout)
.await
.context("send and confirm bulk tx")?;
trace!("Done sending {} transaction.", txs.len());
let elapsed_total = started_at.elapsed();
for (tx_sig, confirmation) in &tx_and_confirmations_from_rpc {
match confirmation {
ConfirmationResponseFromRpc::Success(slots_elapsed, level, elapsed) => {
let mut tx_sent = 0;
let mut tx_send_errors = 0;
let mut tx_confirmed = 0;
let mut tx_unconfirmed = 0;
let mut sum_confirmation_time = Duration::default();
let mut sum_slot_confirmation_time = 0;
for (tx_sig, confirmation_response) in tx_and_confirmations_from_rpc {
match confirmation_response {
ConfirmationResponseFromRpc::Success(
slot_sent,
slot_confirmed,
commitment_status,
confirmation_time,
) => {
debug!(
"Signature {} confirmed with level {:?} after {:.02}ms, {} slots",
tx_sig,
level,
elapsed.as_secs_f32() * 1000.0,
slots_elapsed
commitment_status,
confirmation_time.as_secs_f64() * 1000.0,
slot_confirmed - slot_sent
);
tx_sent += 1;
tx_confirmed += 1;
sum_confirmation_time = sum_confirmation_time.add(confirmation_time);
sum_slot_confirmation_time += slot_confirmed - slot_sent;
}
ConfirmationResponseFromRpc::SendError(error_kind) => {
debug!(
"Signature {} failed to get send via RPC: {:?}",
tx_sig, error_kind
);
tx_send_errors += 1;
}
ConfirmationResponseFromRpc::Timeout(elapsed) => {
debug!(
@ -91,60 +131,30 @@ pub async fn send_bulk_txs_and_wait(
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
}
ConfirmationResponseFromRpc::SendError(_) => {
unreachable!()
tx_sent += 1;
tx_unconfirmed += 1;
}
}
}
let (mut confirmed, mut unconfirmed, mut failed) = (0, 0, 0);
let mut slot_hz: HashMap<Slot, u64> = Default::default();
for (_, result_from_rpc) in tx_and_confirmations_from_rpc {
match result_from_rpc {
ConfirmationResponseFromRpc::Success(slot, _, _) => {
confirmed += 1;
*slot_hz.entry(slot).or_default() += 1;
}
ConfirmationResponseFromRpc::Timeout(_) => {
unconfirmed += 1;
}
ConfirmationResponseFromRpc::SendError(_) => {
failed += 1;
}
}
//
// match tx {
// Ok(Some(status)) => {
// if status.satisfies_commitment(CommitmentConfig::confirmed()) {
// confirmed += 1;
// *slot_hz.entry(status.slot).or_default() += 1;
// } else {
// unconfirmed += 1;
// }
// }
// Ok(None) => {
// unconfirmed += 1;
// }
// Err(_) => {
// failed += 1;
// }
// }
}
let mode_slot = slot_hz
.into_iter()
.max_by_key(|(_, v)| *v)
.map(|(k, _)| k)
.unwrap_or_default();
let average_confirmation_time_ms = if tx_confirmed > 0 {
sum_confirmation_time.as_secs_f32() * 1000.0 / tx_confirmed as f32
} else {
0.0
};
let average_slot_confirmation_time = if tx_confirmed > 0 {
sum_slot_confirmation_time as f32 / tx_confirmed as f32
} else {
0.0
};
Ok(RpcStat {
confirmation_time: elapsed_total.as_secs_f32(),
mode_slot,
confirmed,
unconfirmed,
failed,
tx_sent,
tx_send_errors,
tx_confirmed,
tx_unconfirmed,
average_confirmation_time: average_confirmation_time_ms,
average_slot_confirmation_time,
})
}
@ -152,235 +162,29 @@ fn calc_stats_avg(stats: &[RpcStat]) -> RpcStat {
let len = stats.len();
let mut avg = RpcStat {
confirmation_time: 0.0,
mode_slot: 0,
confirmed: 0,
unconfirmed: 0,
failed: 0,
tx_sent: 0,
tx_send_errors: 0,
tx_confirmed: 0,
tx_unconfirmed: 0,
average_confirmation_time: 0.0,
average_slot_confirmation_time: 0.0,
};
for stat in stats {
avg.confirmation_time += stat.confirmation_time;
avg.confirmed += stat.confirmed;
avg.unconfirmed += stat.unconfirmed;
avg.failed += stat.failed;
avg.tx_sent += stat.tx_sent;
avg.tx_send_errors += stat.tx_send_errors;
avg.tx_confirmed += stat.tx_confirmed;
avg.tx_unconfirmed += stat.tx_unconfirmed;
avg.average_confirmation_time += stat.average_confirmation_time;
avg.average_slot_confirmation_time += stat.average_slot_confirmation_time;
}
avg.confirmation_time /= len as f32;
avg.confirmed /= len as u64;
avg.unconfirmed /= len as u64;
avg.failed /= len as u64;
avg.tx_sent /= len as u64;
avg.tx_send_errors /= len as u64;
avg.tx_confirmed /= len as u64;
avg.tx_unconfirmed /= len as u64;
avg.average_confirmation_time /= len as f32;
avg.average_slot_confirmation_time /= len as f32;
avg
}
#[derive(Clone)]
enum ConfirmationResponseFromRpc {
SendError(Arc<ErrorKind>),
// elapsed slot: current slot (confirmed) at beginning til the slot where transaction showed up with status CONFIRMED
Success(Slot, TransactionConfirmationStatus, Duration),
Timeout(Duration),
}
async fn send_and_confirm_bulk_transactions(
rpc_client: &RpcClient,
txs: &[Transaction],
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
let send_slot = poll_next_slot_start(rpc_client).await?;
let started_at = Instant::now();
let batch_sigs_or_fails = join_all(
txs.iter()
.map(|tx| rpc_client.send_transaction(tx).map_err(|e| e.kind)),
)
.await;
let after_send_slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await?;
// optimal value is "0"
info!(
"slots passed while sending: {}",
after_send_slot - send_slot
);
let num_sent_ok = batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_ok())
.count();
let num_sent_failed = batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_err())
.count();
for (i, tx_sig) in txs.iter().enumerate() {
let tx_sent = batch_sigs_or_fails[i].is_ok();
if tx_sent {
debug!("- tx_sent {}", tx_sig.get_signature());
} else {
debug!("- tx_fail {}", tx_sig.get_signature());
}
}
debug!(
"{} transactions sent successfully in {:.02}ms",
num_sent_ok,
started_at.elapsed().as_secs_f32() * 1000.0
);
debug!(
"{} transactions failed to send in {:.02}ms",
num_sent_failed,
started_at.elapsed().as_secs_f32() * 1000.0
);
if num_sent_failed > 0 {
warn!(
"Some transactions failed to send: {} out of {}",
num_sent_failed,
txs.len()
);
bail!("Failed to send all transactions");
}
let mut pending_status_set: HashSet<Signature> = HashSet::new();
batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_ok())
.for_each(|sig_or_fail| {
pending_status_set.insert(sig_or_fail.as_ref().unwrap().to_owned());
});
let mut result_status_map: HashMap<Signature, ConfirmationResponseFromRpc> = HashMap::new();
// items get moved from pending_status_set to result_status_map
let started_at = Instant::now();
let mut iteration = 1;
'pooling_loop: loop {
let iteration_ends_at = started_at + Duration::from_millis(iteration * 200);
assert_eq!(
pending_status_set.len() + result_status_map.len(),
num_sent_ok,
"Items must move between pending+result"
);
let tx_batch = pending_status_set.iter().cloned().collect_vec();
debug!(
"Request status for batch of remaining {} transactions in iteration {}",
tx_batch.len(),
iteration
);
// TODO warn if get_status api calles are slow
let batch_responses = rpc_client
.get_signature_statuses(tx_batch.as_slice())
.await?;
let elapsed = started_at.elapsed();
for (tx_sig, status_response) in zip(tx_batch, batch_responses.value) {
match status_response {
Some(tx_status) => {
trace!(
"Some signature status {:?} received for {} after {:.02}ms",
tx_status.confirmation_status,
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
if !tx_status.satisfies_commitment(CommitmentConfig::confirmed()) {
continue 'pooling_loop;
}
// status is confirmed or finalized
pending_status_set.remove(&tx_sig);
let prev_value = result_status_map.insert(
tx_sig,
ConfirmationResponseFromRpc::Success(
tx_status.slot - send_slot,
tx_status.confirmation_status(),
elapsed,
),
);
assert!(prev_value.is_none(), "Must not override existing value");
}
None => {
// None: not yet processed by the cluster
trace!(
"No signature status was received for {} after {:.02}ms - continue waiting",
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
}
}
}
if pending_status_set.is_empty() {
debug!("All transactions confirmed after {} iterations", iteration);
break 'pooling_loop;
}
if iteration == 100 {
debug!("Timeout waiting for transactions to confirmed after {} iterations - giving up on {}", iteration, pending_status_set.len());
break 'pooling_loop;
}
iteration += 1;
// avg 2 samples per slot
tokio::time::sleep_until(iteration_ends_at).await;
} // -- END polling loop
let total_time_elapsed_polling = started_at.elapsed();
// all transactions which remain in pending list are considered timed out
for tx_sig in pending_status_set.clone() {
pending_status_set.remove(&tx_sig);
result_status_map.insert(
tx_sig,
ConfirmationResponseFromRpc::Timeout(total_time_elapsed_polling),
);
}
let result_as_vec = batch_sigs_or_fails
.into_iter()
.enumerate()
.map(|(i, sig_or_fail)| match sig_or_fail {
Ok(tx_sig) => {
let confirmation = result_status_map
.get(&tx_sig)
.expect("consistent map with all tx")
.clone()
.to_owned();
(tx_sig, confirmation)
}
Err(send_error) => {
let tx_sig = txs[i].get_signature();
let confirmation = ConfirmationResponseFromRpc::SendError(Arc::new(send_error));
(*tx_sig, confirmation)
}
})
.collect_vec();
Ok(result_as_vec)
}
async fn poll_next_slot_start(rpc_client: &RpcClient) -> Result<Slot, Error> {
let started_at = Instant::now();
let mut last_slot: Option<Slot> = None;
let mut i = 1;
// try to catch slot start
let send_slot = loop {
if i > 500 {
bail!("Timeout waiting for slot change");
}
let iteration_ends_at = started_at + Duration::from_millis(i * 30);
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await?;
trace!("polling slot {}", slot);
if let Some(last_slot) = last_slot {
if last_slot + 1 == slot {
break slot;
}
}
last_slot = Some(slot);
tokio::time::sleep_until(iteration_ends_at).await;
i += 1;
};
Ok(send_slot)
}

View File

@ -1,43 +1,142 @@
use std::path::Path;
use std::time::Duration;
use crate::tx_size::TxSize;
use crate::{create_memo_tx, create_rng, send_and_confirm_transactions, Rng8};
use anyhow::Context;
use log::{info, warn};
use crate::benches::rpc_interface::{
create_rpc_client, send_and_confirm_bulk_transactions, ConfirmationResponseFromRpc,
};
use crate::metrics::PingThing;
use crate::{create_memo_tx, create_rng, BenchmarkTransactionParams, Rng8};
use anyhow::anyhow;
use log::{debug, info, warn};
use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Signer};
use solana_sdk::signature::{read_keypair_file, Signature, Signer};
use solana_sdk::transaction::Transaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use tokio::time::{sleep, Instant};
use url::Url;
/// TC1 send 2 txs (one via LiteRPC, one via Solana RPC) and compare confirmation slot (=slot distance)
#[derive(Clone, Copy, Debug, Default)]
pub struct Metric {
pub txs_sent: u64,
pub txs_confirmed: u64,
pub txs_un_confirmed: u64,
pub average_confirmation_time_ms: f64,
pub average_time_to_send_txs: f64,
}
#[derive(Clone)]
pub enum ConfirmationSlotResult {
Success(ConfirmationSlotSuccess),
}
#[derive(Clone)]
pub struct ConfirmationSlotSuccess {
pub slot_sent: u64,
pub slot_confirmed: u64,
pub confirmation_time: Duration,
}
#[allow(clippy::too_many_arguments)]
/// TC1 -- Send 2 txs to separate RPCs and compare confirmation slot.
/// The benchmark attempts to minimize the effect of real-world distance and synchronize the time that each transaction reaches the RPC.
/// This is achieved by delaying submission of the transaction to the "nearer" RPC.
/// Delay time is calculated as half of the difference in duration of [getHealth](https://solana.com/docs/rpc/http/gethealth) calls to both RPCs.
pub async fn confirmation_slot(
payer_path: &Path,
rpc_a_url: String,
rpc_b_url: String,
tx_size: TxSize,
tx_params: BenchmarkTransactionParams,
max_timeout: Duration,
num_of_runs: usize,
_maybe_ping_thing: Option<PingThing>,
) -> anyhow::Result<()> {
info!(
"START BENCHMARK: confirmation_slot (prio_fees={})",
tx_params.cu_price_micro_lamports
);
warn!("THIS IS WORK IN PROGRESS");
info!("RPC A: {}", obfuscate_rpcurl(&rpc_a_url));
info!("RPC B: {}", obfuscate_rpcurl(&rpc_b_url));
let rpc_a = RpcClient::new(rpc_a_url);
info!("RPC A: {}", rpc_a.url());
let rpc_b = RpcClient::new(rpc_b_url);
info!("RPC B: {}", rpc_b.url());
let rpc_a_url =
Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?;
let rpc_b_url =
Url::parse(&rpc_b_url).map_err(|e| anyhow!("Failed to parse RPC B URL: {}", e))?;
let mut rng = create_rng(None);
let payer = read_keypair_file(payer_path).expect("payer file");
info!("Payer: {}", payer.pubkey().to_string());
// let mut ping_thing_tasks = vec![];
let rpc_a_tx = create_tx(&rpc_a, &payer, &mut rng, tx_size).await?;
let rpc_b_tx = create_tx(&rpc_b, &payer, &mut rng, tx_size).await?;
for _ in 0..num_of_runs {
let rpc_a = create_rpc_client(&rpc_a_url);
let rpc_b = create_rpc_client(&rpc_b_url);
// measure network time to reach the respective RPC endpoints,
// used to mitigate the difference in distance by delaying the txn sending
let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64();
let time_b = rpc_roundtrip_duration(&rpc_b).await?.as_secs_f64();
let (rpc_slot, lite_rpc_slot) = tokio::join!(
send_transaction_and_get_slot(&rpc_a, rpc_a_tx),
send_transaction_and_get_slot(&rpc_b, rpc_b_tx)
);
debug!("(A) rpc network latency: {}", time_a);
debug!("(B) rpc network latency: {}", time_b);
info!("rpc_slot: {}", rpc_slot?);
info!("lite_rpc_slot: {}", lite_rpc_slot?);
let rpc_a_tx = create_tx(&rpc_a, &payer, &mut rng, &tx_params).await?;
let rpc_b_tx = create_tx(&rpc_b, &payer, &mut rng, &tx_params).await?;
let one_way_delay = (time_a - time_b).abs() / 2.0;
let (a_delay, b_delay) = if time_a > time_b {
(0f64, one_way_delay)
} else {
(one_way_delay, 0f64)
};
debug!("A delay: {}s, B delay: {}s", a_delay, b_delay);
let a_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(a_delay)).await;
debug!("(A) sending tx {}", rpc_a_tx.signatures[0]);
send_and_confirm_transaction(&rpc_a, rpc_a_tx, max_timeout).await
});
let b_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(b_delay)).await;
debug!("(B) sending tx {}", rpc_b_tx.signatures[0]);
send_and_confirm_transaction(&rpc_b, rpc_b_tx, max_timeout).await
});
let (a, b) = tokio::join!(a_task, b_task);
// only continue if both paths suceed
let a_result: ConfirmationResponseFromRpc = a??;
let b_result: ConfirmationResponseFromRpc = b??;
if let (
ConfirmationResponseFromRpc::Success(a_slot_sent, a_slot_confirmed, _, _),
ConfirmationResponseFromRpc::Success(b_slot_sent, b_slot_confirmed, _, _),
) = (a_result, b_result)
{
info!(
"txn A landed after {} slots",
a_slot_confirmed - a_slot_sent
);
info!(
"txn B landed after {} slots",
b_slot_confirmed - b_slot_sent
);
}
// if let Some(ping_thing) = maybe_ping_thing.clone() {
// ping_thing_tasks.push(tokio::spawn(async move {
// submit_ping_thing_stats(&a_result, &ping_thing)
// .await
// .unwrap();
// submit_ping_thing_stats(&b_result, &ping_thing)
// .await
// .unwrap();
// }));
// };
}
// futures::future::join_all(ping_thing_tasks).await;
Ok(())
}
@ -46,20 +145,52 @@ async fn create_tx(
rpc: &RpcClient,
payer: &Keypair,
rng: &mut Rng8,
tx_size: TxSize,
tx_params: &BenchmarkTransactionParams,
) -> anyhow::Result<Transaction> {
let hash = rpc.get_latest_blockhash().await?;
let (blockhash, _) = rpc
.get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
.await?;
Ok(create_memo_tx(payer, hash, rng, tx_size))
Ok(create_memo_tx(payer, blockhash, rng, tx_params))
}
async fn send_transaction_and_get_slot(client: &RpcClient, tx: Transaction) -> anyhow::Result<u64> {
let status = send_and_confirm_transactions(client, &[tx], CommitmentConfig::confirmed(), None)
.await?
.into_iter()
.next()
.unwrap()?
.context("unable to confirm tx")?;
async fn send_and_confirm_transaction(
rpc: &RpcClient,
tx: Transaction,
max_timeout: Duration,
) -> anyhow::Result<ConfirmationResponseFromRpc> {
let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &[tx], max_timeout).await?;
assert_eq!(result_vec.len(), 1, "expected 1 result");
let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap();
Ok(status.slot)
Ok(confirmation_response)
}
pub async fn rpc_roundtrip_duration(rpc: &RpcClient) -> anyhow::Result<Duration> {
let started_at = Instant::now();
rpc.get_health().await?;
let duration = started_at.elapsed();
Ok(duration)
}
// async fn submit_ping_thing_stats(
// confirmation_info: &ConfirmationSlotResult,
// ping_thing: &PingThing,
// ) -> anyhow::Result<()> {
// match confirmation_info.result {
// ConfirmationSlotResult::Timeout(_) => Ok(()),
// ConfirmationSlotResult::Success(slot_landed) => {
// ping_thing
// .submit_confirmed_stats(
// confirmation_info.confirmation_time,
// confirmation_info.signature,
// PingThingTxType::Memo,
// true,
// confirmation_info.slot_sent,
// slot_landed,
// )
// .await
// }
// }
// }

View File

@ -1,3 +1,4 @@
pub mod api_load;
pub mod confirmation_rate;
pub mod confirmation_slot;
pub mod rpc_interface;

View File

@ -0,0 +1,285 @@
use anyhow::{bail, Context, Error};
use futures::future::join_all;
use futures::TryFutureExt;
use itertools::Itertools;
use log::{debug, trace, warn};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_rpc_client_api::client_error::ErrorKind;
use solana_rpc_client_api::config::RpcSendTransactionConfig;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::signature::Signature;
use solana_sdk::transaction::Transaction;
use solana_transaction_status::TransactionConfirmationStatus;
use std::collections::{HashMap, HashSet};
use std::iter::zip;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use url::Url;
pub fn create_rpc_client(rpc_url: &Url) -> RpcClient {
RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed())
}
#[derive(Clone)]
pub enum ConfirmationResponseFromRpc {
// RPC error on send_transaction
SendError(Arc<ErrorKind>),
// (sent slot at confirmed commitment, confirmed slot, ..., ...)
// transaction_confirmation_status is "confirmed" or "finalized"
Success(Slot, Slot, TransactionConfirmationStatus, Duration),
// timout waiting for confirmation status
Timeout(Duration),
}
pub async fn send_and_confirm_bulk_transactions(
rpc_client: &RpcClient,
txs: &[Transaction],
max_timeout: Duration,
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
trace!("Polling for next slot ..");
let send_slot = poll_next_slot_start(rpc_client)
.await
.context("poll for next start slot")?;
trace!("Send slot: {}", send_slot);
let send_config = RpcSendTransactionConfig {
skip_preflight: true,
preflight_commitment: None,
encoding: None,
max_retries: None,
min_context_slot: None,
};
let started_at = Instant::now();
trace!(
"Sending {} transactions via RPC (retries=off) ..",
txs.len()
);
let batch_sigs_or_fails = join_all(txs.iter().map(|tx| {
rpc_client
.send_transaction_with_config(tx, send_config)
.map_err(|e| e.kind)
}))
.await;
let after_send_slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await
.context("get slot afterwards")?;
if after_send_slot - send_slot > 0 {
warn!(
"Slot advanced during sending transactions: {} -> {}",
send_slot, after_send_slot
);
} else {
debug!(
"Slot did not advance during sending transactions: {} -> {}",
send_slot, after_send_slot
);
}
let num_sent_ok = batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_ok())
.count();
let num_sent_failed = batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_err())
.count();
for (i, tx_sig) in txs.iter().enumerate() {
let tx_sent = batch_sigs_or_fails[i].is_ok();
if tx_sent {
trace!("- tx_sent {}", tx_sig.get_signature());
} else {
trace!("- tx_fail {}", tx_sig.get_signature());
}
}
debug!(
"{} transactions sent successfully in {:.02}ms",
num_sent_ok,
started_at.elapsed().as_secs_f32() * 1000.0
);
debug!(
"{} transactions failed to send in {:.02}ms",
num_sent_failed,
started_at.elapsed().as_secs_f32() * 1000.0
);
if num_sent_failed > 0 {
warn!(
"Some transactions failed to send: {} out of {}",
num_sent_failed,
txs.len()
);
bail!("Failed to send all transactions");
}
let mut pending_status_set: HashSet<Signature> = HashSet::new();
batch_sigs_or_fails
.iter()
.filter(|sig_or_fail| sig_or_fail.is_ok())
.for_each(|sig_or_fail| {
pending_status_set.insert(sig_or_fail.as_ref().unwrap().to_owned());
});
let mut result_status_map: HashMap<Signature, ConfirmationResponseFromRpc> = HashMap::new();
// items get moved from pending_status_set to result_status_map
let started_at = Instant::now();
let timeout_at = started_at + max_timeout;
'polling_loop: for iteration in 1.. {
let iteration_ends_at = started_at + Duration::from_millis(iteration * 400);
assert_eq!(
pending_status_set.len() + result_status_map.len(),
num_sent_ok,
"Items must move between pending+result"
);
let tx_batch = pending_status_set.iter().cloned().collect_vec();
debug!(
"Request status for batch of remaining {} transactions in iteration {}",
tx_batch.len(),
iteration
);
let status_started_at = Instant::now();
let mut batch_status = Vec::new();
// "Too many inputs provided; max 256"
for chunk in tx_batch.chunks(256) {
// fail hard if not possible to poll status
let chunk_responses = rpc_client
.get_signature_statuses(chunk)
.await
.expect("get signature statuses");
batch_status.extend(chunk_responses.value);
}
if status_started_at.elapsed() > Duration::from_millis(500) {
warn!(
"SLOW get_signature_statuses for {} transactions took {:?}",
tx_batch.len(),
status_started_at.elapsed()
);
}
let elapsed = started_at.elapsed();
for (tx_sig, status_response) in zip(tx_batch, batch_status) {
match status_response {
Some(tx_status) => {
trace!(
"Some signature status {:?} received for {} after {:.02}ms",
tx_status.confirmation_status,
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
if !tx_status.satisfies_commitment(CommitmentConfig::confirmed()) {
continue 'polling_loop;
}
// status is confirmed or finalized
pending_status_set.remove(&tx_sig);
let prev_value = result_status_map.insert(
tx_sig,
ConfirmationResponseFromRpc::Success(
send_slot,
tx_status.slot,
tx_status.confirmation_status(),
elapsed,
),
);
assert!(prev_value.is_none(), "Must not override existing value");
}
None => {
// None: not yet processed by the cluster
trace!(
"No signature status was received for {} after {:.02}ms - continue waiting",
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
}
}
}
if pending_status_set.is_empty() {
debug!(
"All transactions confirmed after {} iterations / {:?}",
iteration,
started_at.elapsed()
);
break 'polling_loop;
}
if Instant::now() > timeout_at {
warn!(
"Timeout waiting for transactions to confirm after {} iterations",
iteration
);
break 'polling_loop;
}
// avg 2 samples per slot
tokio::time::sleep_until(iteration_ends_at).await;
} // -- END polling loop
let total_time_elapsed_polling = started_at.elapsed();
// all transactions which remain in pending list are considered timed out
for tx_sig in pending_status_set.clone() {
pending_status_set.remove(&tx_sig);
result_status_map.insert(
tx_sig,
ConfirmationResponseFromRpc::Timeout(total_time_elapsed_polling),
);
}
let result_as_vec = batch_sigs_or_fails
.into_iter()
.enumerate()
.map(|(i, sig_or_fail)| match sig_or_fail {
Ok(tx_sig) => {
let confirmation = result_status_map
.get(&tx_sig)
.expect("consistent map with all tx")
.clone()
.to_owned();
(tx_sig, confirmation)
}
Err(send_error) => {
let tx_sig = txs[i].get_signature();
let confirmation = ConfirmationResponseFromRpc::SendError(Arc::new(send_error));
(*tx_sig, confirmation)
}
})
.collect_vec();
Ok(result_as_vec)
}
pub async fn poll_next_slot_start(rpc_client: &RpcClient) -> Result<Slot, Error> {
let started_at = Instant::now();
let mut last_slot: Option<Slot> = None;
let mut i = 1;
// try to catch slot start
let send_slot = loop {
if i > 500 {
bail!("Timeout waiting for slot change");
}
let iteration_ends_at = started_at + Duration::from_millis(i * 30);
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await?;
trace!("polling slot {}", slot);
if let Some(last_slot) = last_slot {
if last_slot + 1 == slot {
break slot;
}
}
last_slot = Some(slot);
tokio::time::sleep_until(iteration_ends_at).await;
i += 1;
};
Ok(send_slot)
}

156
bench/src/benchnew.rs Normal file
View File

@ -0,0 +1,156 @@
use std::path::PathBuf;
use std::time::Duration;
use bench::{
benches::{
api_load::api_load, confirmation_rate::confirmation_rate,
confirmation_slot::confirmation_slot,
},
metrics::{PingThing, PingThingCluster},
tx_size::TxSize,
BenchmarkTransactionParams,
};
use clap::{Parser, Subcommand};
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Arguments {
#[clap(subcommand)]
subcommand: SubCommand,
}
#[derive(Subcommand, Debug)]
enum SubCommand {
ApiLoad {
#[clap(short, long)]
payer_path: PathBuf,
#[clap(short, long)]
rpc_url: String,
#[clap(short, long)]
test_duration_ms: u64,
/// The CU price in micro lamports
#[clap(short, long, default_value_t = 3)]
#[arg(short = 'f')]
cu_price: u64,
},
ConfirmationRate {
#[clap(short, long)]
payer_path: PathBuf,
#[clap(short, long)]
rpc_url: String,
#[clap(short, long)]
size_tx: TxSize,
/// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed
#[clap(short, long, default_value_t = 15_000)]
max_timeout_ms: u64,
#[clap(short, long)]
txs_per_run: usize,
#[clap(short, long)]
num_of_runs: usize,
/// The CU price in micro lamports
#[clap(short, long, default_value_t = 300)]
#[arg(short = 'f')]
cu_price: u64,
},
/// Compares the confirmation slot of txs sent to 2 different RPCs
ConfirmationSlot {
#[clap(short, long)]
payer_path: PathBuf,
/// URL of the 1st RPC
#[clap(short, long)]
#[arg(short = 'a')]
rpc_a: String,
/// URL of the 2nd RPC
#[clap(short, long)]
#[arg(short = 'b')]
rpc_b: String,
#[clap(short, long)]
size_tx: TxSize,
/// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed
#[clap(short, long, default_value_t = 15_000)]
max_timeout_ms: u64,
#[clap(short, long)]
num_of_runs: usize,
/// The CU price in micro lamports
#[clap(short, long, default_value_t = 300)]
#[arg(short = 'f')]
cu_price: u64,
#[clap(long)]
ping_thing_token: Option<String>,
},
}
pub fn initialize_logger() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_thread_ids(true)
.with_line_number(true)
.init();
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
async fn main() {
let args = Arguments::parse();
initialize_logger();
match args.subcommand {
SubCommand::ApiLoad {
payer_path,
rpc_url,
test_duration_ms,
cu_price,
} => {
api_load(&payer_path, rpc_url, test_duration_ms, cu_price)
.await
.unwrap();
}
SubCommand::ConfirmationRate {
payer_path,
rpc_url,
size_tx,
max_timeout_ms,
txs_per_run,
num_of_runs,
cu_price,
} => confirmation_rate(
&payer_path,
rpc_url,
BenchmarkTransactionParams {
tx_size: size_tx,
cu_price_micro_lamports: cu_price,
},
Duration::from_millis(max_timeout_ms),
txs_per_run,
num_of_runs,
)
.await
.unwrap(),
SubCommand::ConfirmationSlot {
payer_path,
rpc_a,
rpc_b,
size_tx,
max_timeout_ms,
num_of_runs,
cu_price,
ping_thing_token,
} => confirmation_slot(
&payer_path,
rpc_a,
rpc_b,
BenchmarkTransactionParams {
tx_size: size_tx,
cu_price_micro_lamports: cu_price,
},
Duration::from_millis(max_timeout_ms),
num_of_runs,
ping_thing_token.map(|t| PingThing {
cluster: PingThingCluster::Mainnet,
va_api_key: t,
}),
)
.await
.unwrap(),
}
}

View File

@ -1,95 +0,0 @@
use std::path::PathBuf;
use bench::{
benches::{
api_load::api_load, confirmation_rate::confirmation_rate,
confirmation_slot::confirmation_slot,
},
tx_size::TxSize,
};
use clap::{Parser, Subcommand};
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Arguments {
#[clap(subcommand)]
subcommand: SubCommand,
}
#[derive(Subcommand, Debug)]
enum SubCommand {
ApiLoad {
#[clap(short, long)]
payer_path: PathBuf,
#[clap(short, long)]
rpc_url: String,
#[clap(short, long)]
time_ms: u64,
},
ConfirmationRate {
#[clap(short, long)]
payer_path: PathBuf,
#[clap(short, long)]
rpc_url: String,
#[clap(short, long)]
size_tx: TxSize,
#[clap(short, long)]
txns_per_round: usize,
#[clap(short, long)]
num_rounds: usize,
},
ConfirmationSlot {
#[clap(short, long)]
payer_path: PathBuf,
#[clap(short, long)]
#[arg(short = 'a')]
rpc_a: String,
#[clap(short, long)]
#[arg(short = 'b')]
rpc_b: String,
#[clap(short, long)]
size_tx: TxSize,
},
}
pub fn initialize_logger() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_thread_ids(true)
.with_line_number(true)
.init();
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
async fn main() {
let args = Arguments::parse();
initialize_logger();
match args.subcommand {
SubCommand::ApiLoad {
payer_path,
rpc_url,
time_ms,
} => {
api_load(&payer_path, rpc_url, time_ms).await.unwrap();
}
SubCommand::ConfirmationRate {
payer_path,
rpc_url,
size_tx,
txns_per_round,
num_rounds,
} => confirmation_rate(&payer_path, rpc_url, size_tx, txns_per_round, num_rounds)
.await
.unwrap(),
SubCommand::ConfirmationSlot {
payer_path,
rpc_a,
rpc_b,
size_tx,
} => confirmation_slot(&payer_path, rpc_a, rpc_b, size_tx)
.await
.unwrap(),
}
}

View File

@ -20,6 +20,7 @@ use solana_sdk::{
use solana_transaction_status::TransactionStatus;
use std::{str::FromStr, time::Duration};
use tokio::time::Instant;
use tx_size::TxSize;
pub mod bench1;
pub mod benches;
@ -53,6 +54,11 @@ pub struct Args {
pub large_transactions: bool,
}
pub struct BenchmarkTransactionParams {
pub tx_size: TxSize,
pub cu_price_micro_lamports: u64,
}
const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr";
const WAIT_LIMIT_IN_SECONDS: u64 = 60;
@ -189,10 +195,10 @@ pub fn generate_txs(
payer: &Keypair,
blockhash: Hash,
rng: &mut Rng8,
size: tx_size::TxSize,
tx_params: &BenchmarkTransactionParams,
) -> Vec<Transaction> {
(0..num_of_txs)
.map(|_| create_memo_tx(payer, blockhash, rng, size))
.map(|_| create_memo_tx(payer, blockhash, rng, tx_params))
.collect()
}
@ -200,38 +206,58 @@ pub fn create_memo_tx(
payer: &Keypair,
blockhash: Hash,
rng: &mut Rng8,
size: tx_size::TxSize,
tx_params: &BenchmarkTransactionParams,
) -> Transaction {
let rand_str = generate_random_string(rng, size.memo_size());
let rand_str = generate_random_string(rng, tx_params.tx_size.memo_size());
match size {
tx_size::TxSize::Small => create_memo_tx_small(&rand_str, payer, blockhash),
tx_size::TxSize::Large => create_memo_tx_large(&rand_str, payer, blockhash),
match tx_params.tx_size {
tx_size::TxSize::Small => create_memo_tx_small(
&rand_str,
payer,
blockhash,
tx_params.cu_price_micro_lamports,
),
tx_size::TxSize::Large => create_memo_tx_large(
&rand_str,
payer,
blockhash,
tx_params.cu_price_micro_lamports,
),
}
}
// note: there is another version of this
pub fn create_memo_tx_small(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
pub fn create_memo_tx_small(
msg: &[u8],
payer: &Keypair,
blockhash: Hash,
cu_price_micro_lamports: u64,
) -> Transaction {
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
// TODO make configurable
// 3 -> 6 slots
// 1 -> 31 slots
let cu_budget: Instruction = ComputeBudgetInstruction::set_compute_unit_price(3);
let cu_budget_ix: Instruction =
ComputeBudgetInstruction::set_compute_unit_price(cu_price_micro_lamports);
// Program consumed: 12775 of 13700 compute units
let priority_fees: Instruction = ComputeBudgetInstruction::set_compute_unit_limit(14000);
let cu_limit_ix: Instruction = ComputeBudgetInstruction::set_compute_unit_limit(14000);
let instruction = Instruction::new_with_bytes(memo, msg, vec![]);
let message = Message::new(
&[cu_budget, priority_fees, instruction],
&[cu_budget_ix, cu_limit_ix, instruction],
Some(&payer.pubkey()),
);
Transaction::new(&[payer], message, blockhash)
}
pub fn create_memo_tx_large(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction {
pub fn create_memo_tx_large(
msg: &[u8],
payer: &Keypair,
blockhash: Hash,
cu_price_micro_lamports: u64,
) -> Transaction {
let accounts = (0..8).map(|_| Keypair::new()).collect_vec();
let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap();
let cu_budget_ix: Instruction =
ComputeBudgetInstruction::set_compute_unit_price(cu_price_micro_lamports);
let cu_limit_ix: Instruction = ComputeBudgetInstruction::set_compute_unit_limit(14000);
let instruction = Instruction::new_with_bytes(
memo,
@ -241,7 +267,10 @@ pub fn create_memo_tx_large(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Tra
.map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true))
.collect_vec(),
);
let message = Message::new(&[instruction], Some(&payer.pubkey()));
let message = Message::new(
&[cu_budget_ix, cu_limit_ix, instruction],
Some(&payer.pubkey()),
);
let mut signers = vec![payer];
signers.extend(accounts.iter());
@ -257,8 +286,9 @@ fn transaction_size_small() {
);
let mut rng = create_rng(Some(42));
let rand_string = generate_random_string(&mut rng, 10);
let priority_fee = 100;
let tx = create_memo_tx_small(&rand_string, &payer_keypair, blockhash);
let tx = create_memo_tx_small(&rand_string, &payer_keypair, blockhash, priority_fee);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 231);
}
@ -270,7 +300,8 @@ fn transaction_size_large() {
);
let mut rng = create_rng(Some(42));
let rand_string = generate_random_string(&mut rng, 240);
let priority_fee = 100;
let tx = create_memo_tx_large(&rand_string, &payer_keypair, blockhash);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 1186);
let tx = create_memo_tx_large(&rand_string, &payer_keypair, blockhash, priority_fee);
assert_eq!(bincode::serialized_size(&tx).unwrap(), 1238);
}

View File

@ -1,9 +1,13 @@
use std::{
fmt::{self, Display},
ops::{AddAssign, DivAssign},
time::Duration,
};
use solana_sdk::slot_history::Slot;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use solana_sdk::{signature::Signature, slot_history::Slot};
use tracing::debug;
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
pub struct Metric {
@ -143,3 +147,158 @@ pub struct TxMetricData {
pub time_to_send_in_millis: u64,
pub time_to_confirm_in_millis: u64,
}
#[derive(Clone, Debug)]
pub enum PingThingCluster {
Mainnet,
Testnet,
Devnet,
}
impl PingThingCluster {
pub fn from_arg(cluster: String) -> Self {
match cluster.to_lowercase().as_str() {
"mainnet" => PingThingCluster::Mainnet,
"testnet" => PingThingCluster::Testnet,
"devnet" => PingThingCluster::Devnet,
_ => panic!("incorrect cluster name"),
}
}
}
impl PingThingCluster {
pub fn to_url_part(&self) -> String {
match self {
PingThingCluster::Mainnet => "mainnet",
PingThingCluster::Testnet => "testnet",
PingThingCluster::Devnet => "devnet",
}
.to_string()
}
}
#[derive(Clone, Debug)]
pub enum PingThingTxType {
Transfer,
Memo,
}
impl Display for PingThingTxType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PingThingTxType::Transfer => write!(f, "transfer"),
PingThingTxType::Memo => write!(f, "memo"),
}
}
}
#[derive(Clone)]
pub struct PingThing {
pub cluster: PingThingCluster,
pub va_api_key: String,
}
/// request format see https://github.com/Block-Logic/ping-thing-client/blob/4c008c741164702a639c282f1503a237f7d95e64/ping-thing-client.mjs#L160
#[derive(Debug, Serialize, Deserialize)]
struct PingThingData {
pub time: u128,
pub signature: String, // Tx sig
pub transaction_type: String, // 'transfer',
pub success: bool, // txSuccess
pub application: String, // e.g. 'web3'
pub commitment_level: String, // e.g. 'confirmed'
pub slot_sent: Slot,
pub slot_landed: Slot,
}
impl PingThing {
pub async fn submit_confirmed_stats(
&self,
tx_elapsed: Duration,
tx_sig: Signature,
tx_type: PingThingTxType,
tx_success: bool,
slot_sent: Slot,
slot_landed: Slot,
) -> anyhow::Result<()> {
submit_stats_to_ping_thing(
self.cluster.clone(),
self.va_api_key.clone(),
tx_elapsed,
tx_sig,
tx_type,
tx_success,
slot_sent,
slot_landed,
)
.await
}
}
/// submits to https://www.validators.app/ping-thing?network=mainnet
/// Assumes that the txn was sent on Mainnet and had the "confirmed" commitment level
#[allow(clippy::too_many_arguments)]
async fn submit_stats_to_ping_thing(
cluster: PingThingCluster,
va_api_key: String,
tx_elapsed: Duration,
tx_sig: Signature,
tx_type: PingThingTxType,
tx_success: bool,
slot_sent: Slot,
slot_landed: Slot,
) -> anyhow::Result<()> {
let submit_data_request = PingThingData {
time: tx_elapsed.as_millis(),
signature: tx_sig.to_string(),
transaction_type: tx_type.to_string(),
success: tx_success,
application: "LiteRPC.bench".to_string(),
commitment_level: "confirmed".to_string(),
slot_sent,
slot_landed,
};
let client = reqwest::Client::new();
// cluster: 'mainnet'
let response = client
.post(format!(
"https://www.validators.app/api/v1/ping-thing/{}",
cluster.to_url_part()
))
.header("Content-Type", "application/json")
.header("Token", va_api_key)
.json(&submit_data_request)
.send()
.await?
.error_for_status()?;
assert_eq!(response.status(), StatusCode::CREATED);
debug!("Sent data for tx {} to ping-thing server", tx_sig);
Ok(())
}
#[ignore]
#[tokio::test]
async fn test_ping_thing() {
let token = "".to_string();
assert!(token.is_empty(), "Empty token for ping thing test");
let ping_thing = PingThing {
cluster: PingThingCluster::Mainnet,
va_api_key: token,
};
ping_thing
.submit_confirmed_stats(
Duration::from_secs(2),
Signature::new_unique(),
PingThingTxType::Transfer,
true,
123,
124,
)
.await
.unwrap();
}

View File

@ -0,0 +1,27 @@
use std::time::SystemTime;
#[derive(Debug)]
pub struct PostgresConfirmationSlot {
pub signature: String,
pub bench_datetime: SystemTime,
pub slot_sent: u64,
pub slot_confirmed: u64,
pub endpoint: String,
pub confirmed: bool,
pub confirmation_time_ms: f32,
}
// impl PostgresConfirmationSlot {
// pub fn to_values() -> &[&(dyn ToSql + Sync)] {
// let values: &[&(dyn ToSql + Sync)] = &[
// &self.signature,
// &self.bench_datetime,
// &(self.slot_sent as i64),
// &(self.slot_confirmed as i64),
// &self.endpoint,
// &self.confirmed,
// &self.confirmation_time_ms,
// ];
// values
// }
// }

View File

@ -1,3 +1,4 @@
pub mod confirmation_slot;
pub mod metrics_dbstore;
pub mod postgres_session;
pub mod postgres_session_cache;

View File

@ -56,6 +56,7 @@ use solana_lite_rpc_services::tx_sender::TxSender;
use lite_rpc::postgres_logger;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use solana_lite_rpc_prioritization_fees::start_block_priofees_task;
use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::signature::Keypair;
@ -500,14 +501,6 @@ fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
}
}
// http://mango.rpcpool.com/c232ab232ba2323
fn obfuscate_rpcurl(rpc_addr: &str) -> String {
if rpc_addr.contains("rpcpool.com") {
return rpc_addr.replacen(char::is_numeric, "X", 99);
}
rpc_addr.to_string()
}
fn setup_tracing_subscriber() {
let enable_instrument_tracing = std::env::var("ENABLE_INSTRUMENT_TRACING")
.unwrap_or("false".to_string())

View File

@ -1,3 +1,11 @@
pub mod encoding;
pub mod secrets;
pub mod statistics;
// http://mango.rpcpool.com/c232ab232ba2323
pub fn obfuscate_rpcurl(rpc_addr: &str) -> String {
if rpc_addr.contains("rpcpool.com") {
return rpc_addr.replacen(char::is_numeric, "X", 99);
}
rpc_addr.to_string()
}