Merge pull request #404 from blockworks-foundation/feature/benchrunner-status-from-websocket

This commit is contained in:
Lou-Kamades 2024-07-02 00:06:34 -05:00 committed by GitHub
commit 5cc0173666
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 318 additions and 110 deletions

97
Cargo.lock generated
View File

@ -578,6 +578,7 @@ dependencies = [
"dirs",
"futures",
"itertools 0.10.5",
"jsonrpsee-types 0.22.5",
"lazy_static",
"log",
"rand 0.8.5",
@ -595,6 +596,7 @@ dependencies = [
"tracing",
"tracing-subscriber",
"url",
"websocket-tungstenite-retry",
]
[[package]]
@ -2341,7 +2343,7 @@ dependencies = [
"jsonrpsee-http-client",
"jsonrpsee-proc-macros",
"jsonrpsee-server",
"jsonrpsee-types",
"jsonrpsee-types 0.20.3",
"jsonrpsee-wasm-client",
"jsonrpsee-ws-client",
"tokio",
@ -2384,7 +2386,7 @@ dependencies = [
"futures-timer",
"futures-util",
"hyper",
"jsonrpsee-types",
"jsonrpsee-types 0.20.3",
"parking_lot",
"rand 0.8.5",
"rustc-hash",
@ -2407,7 +2409,7 @@ dependencies = [
"hyper",
"hyper-rustls",
"jsonrpsee-core",
"jsonrpsee-types",
"jsonrpsee-types 0.20.3",
"serde",
"serde_json",
"thiserror",
@ -2440,7 +2442,7 @@ dependencies = [
"http",
"hyper",
"jsonrpsee-core",
"jsonrpsee-types",
"jsonrpsee-types 0.20.3",
"route-recognizer",
"serde",
"serde_json",
@ -2467,6 +2469,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "jsonrpsee-types"
version = "0.22.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "150d6168405890a7a3231a3c74843f58b8959471f6df76078db2619ddee1d07d"
dependencies = [
"anyhow",
"beef",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "jsonrpsee-wasm-client"
version = "0.20.3"
@ -2475,7 +2490,7 @@ checksum = "7c7cbb3447cf14fd4d2f407c3cc96e6c9634d5440aa1fbed868a31f3c02b27f0"
dependencies = [
"jsonrpsee-client-transport",
"jsonrpsee-core",
"jsonrpsee-types",
"jsonrpsee-types 0.20.3",
]
[[package]]
@ -2487,7 +2502,7 @@ dependencies = [
"http",
"jsonrpsee-client-transport",
"jsonrpsee-core",
"jsonrpsee-types",
"jsonrpsee-types 0.20.3",
"url",
]
@ -4149,6 +4164,17 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "sha-1"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.7",
]
[[package]]
name = "sha1"
version = "0.10.6"
@ -4290,7 +4316,7 @@ dependencies = [
"httparse",
"log",
"rand 0.8.5",
"sha-1",
"sha-1 0.9.8",
]
[[package]]
@ -5085,8 +5111,8 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tungstenite",
"tokio-tungstenite 0.20.1",
"tungstenite 0.20.1",
"url",
]
@ -6054,6 +6080,20 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "tokio-tungstenite"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
dependencies = [
"futures-util",
"log",
"native-tls",
"tokio",
"tokio-native-tls",
"tungstenite 0.17.3",
]
[[package]]
name = "tokio-tungstenite"
version = "0.20.1"
@ -6065,7 +6105,7 @@ dependencies = [
"rustls",
"tokio",
"tokio-rustls",
"tungstenite",
"tungstenite 0.20.1",
"webpki-roots 0.25.4",
]
@ -6308,6 +6348,26 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"native-tls",
"rand 0.8.5",
"sha-1 0.10.1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.20.1"
@ -6598,6 +6658,23 @@ version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
[[package]]
name = "websocket-tungstenite-retry"
version = "0.6.0"
source = "git+https://github.com/grooviegermanikus/websocket-tungstenite-retry.git?tag=v0.8.0#2f423ed40171b023aa0d5c67287ad778958a6722"
dependencies = [
"anyhow",
"futures-util",
"log",
"serde",
"serde_json",
"tokio",
"tokio-tungstenite 0.17.2",
"tokio-util",
"tracing",
"url",
]
[[package]]
name = "which"
version = "4.4.2"

View File

@ -13,6 +13,10 @@ name = "benchnew"
path = "src/benchnew.rs"
[dependencies]
websocket-tungstenite-retry = { git = "https://github.com/grooviegermanikus/websocket-tungstenite-retry.git", tag = "v0.8.0" }
jsonrpsee-types = "0.22.2"
clap = { workspace = true }
csv = "1.2.1"
dirs = "5.0.0"

View File

@ -11,6 +11,8 @@ use crate::benches::rpc_interface::{
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer};
use url::Url;
use solana_lite_rpc_util::obfuscate_rpcurl;
#[derive(Clone, Copy, Debug, Default, serde::Serialize)]
pub struct Metric {
@ -28,6 +30,7 @@ pub struct Metric {
pub async fn confirmation_rate(
payer_path: &Path,
rpc_url: String,
tx_status_websocket_addr: Option<String>,
tx_params: BenchmarkTransactionParams,
max_timeout: Duration,
txs_per_run: usize,
@ -37,8 +40,11 @@ pub async fn confirmation_rate(
assert!(num_of_runs > 0, "num_of_runs must be greater than 0");
let rpc = Arc::new(RpcClient::new(rpc_url));
info!("RPC: {}", rpc.as_ref().url());
let rpc = Arc::new(RpcClient::new(rpc_url.clone()));
info!("RPC: {}", obfuscate_rpcurl(&rpc.as_ref().url()));
let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_url.replace("http:", "ws:").replace("https:", "wss:"));
info!("WS ADDR: {}", obfuscate_rpcurl(&ws_addr));
let payer: Arc<Keypair> = Arc::new(read_keypair_file(payer_path).unwrap());
info!("Payer: {}", payer.pubkey().to_string());
@ -46,7 +52,7 @@ pub async fn confirmation_rate(
let mut rpc_results = Vec::with_capacity(num_of_runs);
for _ in 0..num_of_runs {
match send_bulk_txs_and_wait(&rpc, &payer, txs_per_run, &tx_params, max_timeout)
match send_bulk_txs_and_wait(&rpc, Url::parse(&ws_addr).expect("Invalid Url"), &payer, txs_per_run, &tx_params, max_timeout)
.await
.context("send bulk tx and wait")
{
@ -72,6 +78,7 @@ pub async fn confirmation_rate(
pub async fn send_bulk_txs_and_wait(
rpc: &RpcClient,
tx_status_websocket_addr: Url,
payer: &Keypair,
num_txs: usize,
tx_params: &BenchmarkTransactionParams,
@ -87,7 +94,7 @@ pub async fn send_bulk_txs_and_wait(
trace!("Sending {} transactions in bulk ..", txs.len());
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &txs, max_timeout)
send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer.pubkey(), &txs, max_timeout)
.await
.context("send and confirm bulk tx")?;
trace!("Done sending {} transaction.", txs.len());
@ -127,9 +134,9 @@ pub async fn send_bulk_txs_and_wait(
}
ConfirmationResponseFromRpc::Timeout(elapsed) => {
debug!(
"Signature {} not confirmed after {:.02}ms",
"Signature {} not confirmed after {:.03}s",
tx_sig,
elapsed.as_secs_f32() * 1000.0
elapsed.as_secs_f32()
);
tx_sent += 1;
tx_unconfirmed += 1;

View File

@ -1,4 +1,5 @@
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use crate::benches::rpc_interface::{
@ -13,8 +14,10 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Signature, Signer};
use solana_sdk::transaction::VersionedTransaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use solana_sdk::pubkey::Pubkey;
use tokio::time::{sleep, Instant};
use url::Url;
use crate::benches::tx_status_websocket_collector::start_tx_status_collector;
#[derive(Clone, Copy, Debug, Default)]
pub struct Metric {
@ -46,6 +49,8 @@ pub async fn confirmation_slot(
payer_path: &Path,
rpc_a_url: String,
rpc_b_url: String,
tx_status_websocket_addr_a: Option<String>,
tx_status_websocket_addr_b: Option<String>,
tx_params: BenchmarkTransactionParams,
max_timeout: Duration,
num_of_runs: usize,
@ -59,6 +64,11 @@ pub async fn confirmation_slot(
info!("RPC A: {}", obfuscate_rpcurl(&rpc_a_url));
info!("RPC B: {}", obfuscate_rpcurl(&rpc_b_url));
let ws_addr_a = tx_status_websocket_addr_a.unwrap_or_else(|| rpc_a_url.replace("http:", "ws:").replace("https:", "wss:"));
let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:"));
let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL");
let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL");
let rpc_a_url =
Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?;
let rpc_b_url =
@ -66,12 +76,20 @@ pub async fn confirmation_slot(
let mut rng = create_rng(None);
let payer = read_keypair_file(payer_path).expect("payer file");
info!("Payer: {}", payer.pubkey().to_string());
let payer_pubkey = payer.pubkey();
info!("Payer: {}", payer_pubkey.to_string());
// let mut ping_thing_tasks = vec![];
// FIXME
// let (tx_status_map, jh_collector) = start_tx_status_collector(Url::parse(&tx_status_websocket_addr).unwrap(), payer.pubkey(), CommitmentConfig::confirmed()).await;
for _ in 0..num_of_runs {
let rpc_a = create_rpc_client(&rpc_a_url);
let rpc_b = create_rpc_client(&rpc_b_url);
let ws_addr_a = ws_addr_a.clone();
let ws_addr_b = ws_addr_b.clone();
// measure network time to reach the respective RPC endpoints,
// used to mitigate the difference in distance by delaying the txn sending
let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64();
@ -95,13 +113,13 @@ pub async fn confirmation_slot(
let a_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(a_delay)).await;
debug!("(A) sending tx {}", rpc_a_tx.signatures[0]);
send_and_confirm_transaction(&rpc_a, rpc_a_tx, max_timeout).await
send_and_confirm_transaction(&rpc_a, ws_addr_a, payer_pubkey, rpc_a_tx, max_timeout).await
});
let b_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(b_delay)).await;
debug!("(B) sending tx {}", rpc_b_tx.signatures[0]);
send_and_confirm_transaction(&rpc_b, rpc_b_tx, max_timeout).await
send_and_confirm_transaction(&rpc_b, ws_addr_b, payer_pubkey, rpc_b_tx, max_timeout).await
});
let (a, b) = tokio::join!(a_task, b_task);
@ -156,11 +174,13 @@ async fn create_tx(
async fn send_and_confirm_transaction(
rpc: &RpcClient,
tx_status_websocket_addr: Url,
payer_pubkey: Pubkey,
tx: VersionedTransaction,
max_timeout: Duration,
) -> anyhow::Result<ConfirmationResponseFromRpc> {
let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &[tx], max_timeout).await?;
send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer_pubkey, &[tx], max_timeout).await?;
assert_eq!(result_vec.len(), 1, "expected 1 result");
let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap();

View File

@ -2,3 +2,4 @@ pub mod api_load;
pub mod confirmation_rate;
pub mod confirmation_slot;
pub mod rpc_interface;
mod tx_status_websocket_collector;

View File

@ -2,7 +2,7 @@ use anyhow::{bail, Context, Error};
use futures::future::join_all;
use futures::TryFutureExt;
use itertools::Itertools;
use log::{debug, trace, warn};
use log::{debug, info, trace, warn};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_rpc_client_api::client_error::ErrorKind;
@ -14,10 +14,19 @@ use solana_sdk::transaction::VersionedTransaction;
use solana_transaction_status::TransactionConfirmationStatus;
use std::collections::{HashMap, HashSet};
use std::iter::zip;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use dashmap::mapref::multiple::RefMulti;
use serde::{Deserialize, Serialize};
use serde_json::json;
use solana_rpc_client_api::response::{Response, RpcBlockUpdate, RpcResponseContext, SlotUpdate};
use solana_sdk::pubkey::Pubkey;
use tokio::time::{Instant, timeout};
use url::Url;
use websocket_tungstenite_retry::websocket_stable::WsMessage;
use solana_lite_rpc_util::obfuscate_rpcurl;
use crate::benches::tx_status_websocket_collector::start_tx_status_collector;
pub fn create_rpc_client(rpc_url: &Url) -> RpcClient {
RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed())
@ -28,7 +37,7 @@ pub enum ConfirmationResponseFromRpc {
// RPC error on send_transaction
SendError(Arc<ErrorKind>),
// (sent slot at confirmed commitment, confirmed slot, ..., ...)
// transaction_confirmation_status is "confirmed" or "finalized"
// transaction_confirmation_status is "confirmed" (finalized is not reported by blockSubscribe websocket
Success(Slot, Slot, TransactionConfirmationStatus, Duration),
// timout waiting for confirmation status
Timeout(Duration),
@ -36,6 +45,8 @@ pub enum ConfirmationResponseFromRpc {
pub async fn send_and_confirm_bulk_transactions(
rpc_client: &RpcClient,
tx_status_websocket_addr: Url,
payer_pubkey: Pubkey,
txs: &[VersionedTransaction],
max_timeout: Duration,
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
@ -53,6 +64,9 @@ pub async fn send_and_confirm_bulk_transactions(
min_context_slot: None,
};
// note: we get confirmed but never finaliized
let (tx_status_map, _jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await;
let started_at = Instant::now();
trace!(
"Sending {} transactions via RPC (retries=off) ..",
@ -99,15 +113,16 @@ pub async fn send_and_confirm_bulk_transactions(
trace!("- tx_fail {}", tx_sig.get_signature());
}
}
let elapsed = started_at.elapsed();
debug!(
"{} transactions sent successfully in {:.02}ms",
"send_transaction successful for {} txs in {:.03}s",
num_sent_ok,
started_at.elapsed().as_secs_f32() * 1000.0
elapsed.as_secs_f32()
);
debug!(
"{} transactions failed to send in {:.02}ms",
"send_transaction failed to send {} txs in {:.03}s",
num_sent_failed,
started_at.elapsed().as_secs_f32() * 1000.0
elapsed.as_secs_f32()
);
if num_sent_failed > 0 {
@ -130,82 +145,45 @@ pub async fn send_and_confirm_bulk_transactions(
// items get moved from pending_status_set to result_status_map
debug!("Waiting for transaction confirmations from websocket source <{}> ..", obfuscate_rpcurl(tx_status_websocket_addr.as_str()));
let started_at = Instant::now();
let timeout_at = started_at + max_timeout;
// "poll" the status dashmap
'polling_loop: for iteration in 1.. {
let iteration_ends_at = started_at + Duration::from_millis(iteration * 400);
let iteration_ends_at = started_at + Duration::from_millis(iteration * 100);
assert_eq!(
pending_status_set.len() + result_status_map.len(),
num_sent_ok,
"Items must move between pending+result"
);
let tx_batch = pending_status_set.iter().cloned().collect_vec();
debug!(
"Request status for batch of remaining {} transactions in iteration {}",
tx_batch.len(),
iteration
);
let status_started_at = Instant::now();
let mut batch_status = Vec::new();
// "Too many inputs provided; max 256"
for chunk in tx_batch.chunks(256) {
// fail hard if not possible to poll status
let chunk_responses = rpc_client
.get_signature_statuses(chunk)
.await
.expect("get signature statuses");
batch_status.extend(chunk_responses.value);
}
if status_started_at.elapsed() > Duration::from_millis(500) {
warn!(
"SLOW get_signature_statuses for {} transactions took {:?}",
tx_batch.len(),
status_started_at.elapsed()
);
}
let elapsed = started_at.elapsed();
for (tx_sig, status_response) in zip(tx_batch, batch_status) {
match status_response {
Some(tx_status) => {
trace!(
"Some signature status {:?} received for {} after {:.02}ms",
tx_status.confirmation_status,
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
if !tx_status.satisfies_commitment(CommitmentConfig::confirmed()) {
continue 'polling_loop;
}
// status is confirmed or finalized
pending_status_set.remove(&tx_sig);
let prev_value = result_status_map.insert(
tx_sig,
ConfirmationResponseFromRpc::Success(
send_slot,
tx_status.slot,
tx_status.confirmation_status(),
elapsed,
),
);
assert!(prev_value.is_none(), "Must not override existing value");
}
None => {
// None: not yet processed by the cluster
trace!(
"No signature status was received for {} after {:.02}ms - continue waiting",
tx_sig,
elapsed.as_secs_f32() * 1000.0
);
}
for multi in tx_status_map.iter() {
// note that we will see tx_sigs we did not send
let (tx_sig, confirmed_slot) = multi.pair();
// status is confirmed
if pending_status_set.remove(&tx_sig) {
trace!("take status for sig {:?} and confirmed_slot: {:?} from websocket source", tx_sig, confirmed_slot);
let prev_value = result_status_map.insert(
tx_sig.clone(),
ConfirmationResponseFromRpc::Success(
send_slot,
*confirmed_slot,
// note: this is not optimal as we do not cover finalized here
TransactionConfirmationStatus::Confirmed,
elapsed,
),
);
assert!(prev_value.is_none(), "Must not override existing value");
}
}
} // -- END for tx_status_map loop
if pending_status_set.is_empty() {
debug!(
"All transactions confirmed after {} iterations / {:?}",
iteration,
"All transactions confirmed after {:?}",
started_at.elapsed()
);
break 'polling_loop;
@ -213,13 +191,12 @@ pub async fn send_and_confirm_bulk_transactions(
if Instant::now() > timeout_at {
warn!(
"Timeout waiting for transactions to confirm after {} iterations",
iteration
"Timeout waiting for transactions to confirm after {:?}",
started_at.elapsed()
);
break 'polling_loop;
}
// avg 2 samples per slot
tokio::time::sleep_until(iteration_ends_at).await;
} // -- END polling loop

View File

@ -0,0 +1,76 @@
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::{DashMap, DashSet};
use log::{debug, info, trace};
use serde_json::json;
use solana_rpc_client_api::response::{Response, RpcBlockUpdate};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use tokio::task::AbortHandle;
use tokio::time::sleep;
use url::Url;
use websocket_tungstenite_retry::websocket_stable;
use websocket_tungstenite_retry::websocket_stable::WsMessage;
// returns map of transaction signatures to the slot they were confirmed
pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig)
-> (Arc<DashMap<Signature, Slot>>, AbortHandle) {
// e.g. "commitment"
let commitment_str = format!("{:?}", commitment_config);
let mut web_socket_slots = websocket_stable::StableWebSocket::new_with_timeout(
ws_url,
json!({
"jsonrpc": "2.0",
"id": "1",
"method": "blockSubscribe",
"params": [
{
"mentionsAccountOrProgram": payer_pubkey.to_string(),
},
{
"commitment": commitment_str,
"encoding": "base64",
"showRewards": false,
"transactionDetails": "signatures"
}
]
}),
Duration::from_secs(10),
).await
.expect("Failed to connect to websocket");
let mut channel = web_socket_slots.subscribe_message_channel();
let observed_transactions: Arc<DashMap<Signature, Slot>> = Arc::new(DashMap::with_capacity(64));
let observed_transactions_write = Arc::downgrade(&observed_transactions);
let jh = tokio::spawn(async move {
let started_at = Instant::now();
while let Ok(msg) = channel.recv().await {
if let WsMessage::Text(payload) = msg {
let ws_result: jsonrpsee_types::SubscriptionResponse<Response<RpcBlockUpdate>> =
serde_json::from_str(&payload).unwrap();
let block_update = ws_result.params.result;
let slot = block_update.value.slot;
let Some(map) = observed_transactions_write.upgrade() else {
debug!("observed_transactions map dropped - stopping task");
return;
};
if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) {
for tx_sig in tx_sigs_from_block {
let tx_sig = Signature::from_str(&tx_sig).unwrap();
debug!("Transaction signature found in block: {} - slot {}", tx_sig, slot);
map.entry(tx_sig).or_insert(slot);
}
}
}
}
});
(observed_transactions, jh.abort_handle())
}

View File

@ -39,6 +39,12 @@ enum SubCommand {
payer_path: PathBuf,
#[clap(short, long)]
rpc_url: String,
/// Set websocket source (blockSubscribe method) for transaction status updates.
/// You might want to send tx to one RPC and listen to another (reliable) RPC for status updates.
/// Not all RPC nodes support this method.
/// If not provided, the RPC URL is used to derive the websocket URL.
#[clap(short = 'w', long)]
tx_status_websocket_addr: Option<String>,
#[clap(short, long)]
size_tx: TxSize,
/// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed
@ -65,6 +71,10 @@ enum SubCommand {
#[clap(short, long)]
#[arg(short = 'b')]
rpc_b: String,
#[clap(long)]
tx_status_websocket_addr_a: Option<String>,
#[clap(long)]
tx_status_websocket_addr_b: Option<String>,
#[clap(short, long)]
size_tx: TxSize,
/// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed
@ -108,6 +118,7 @@ async fn main() {
SubCommand::ConfirmationRate {
payer_path,
rpc_url,
tx_status_websocket_addr,
size_tx,
max_timeout_ms,
txs_per_run,
@ -116,6 +127,7 @@ async fn main() {
} => confirmation_rate(
&payer_path,
rpc_url,
tx_status_websocket_addr,
BenchmarkTransactionParams {
tx_size: size_tx,
cu_price_micro_lamports: cu_price,
@ -130,6 +142,8 @@ async fn main() {
payer_path,
rpc_a,
rpc_b,
tx_status_websocket_addr_a,
tx_status_websocket_addr_b,
size_tx,
max_timeout_ms,
num_of_runs,
@ -139,6 +153,8 @@ async fn main() {
&payer_path,
rpc_a,
rpc_b,
tx_status_websocket_addr_a,
tx_status_websocket_addr_b,
BenchmarkTransactionParams {
tx_size: size_tx,
cu_price_micro_lamports: cu_price,

View File

@ -7,20 +7,26 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Keypair;
use std::sync::Arc;
use std::time::Duration;
use url::Url;
pub async fn benchnew_confirmation_rate_servicerunner(
bench_config: &BenchConfig,
rpc_addr: String,
tx_status_websocket_addr: Option<String>,
funded_payer: Keypair,
) -> confirmation_rate::Metric {
let rpc = Arc::new(RpcClient::new(rpc_addr));
let rpc = Arc::new(RpcClient::new(rpc_addr.clone()));
let tx_params = BenchmarkTransactionParams {
tx_size: bench_config.tx_size,
cu_price_micro_lamports: bench_config.cu_price_micro_lamports,
};
let max_timeout = Duration::from_secs(60);
let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_addr.replace("http:", "ws:").replace("https:", "wss:"));
let result = send_bulk_txs_and_wait(
&rpc,
Url::parse(&ws_addr).expect("Invalid WS URL"),
&funded_payer,
bench_config.tx_count,
&tx_params,

View File

@ -6,25 +6,34 @@ Hardware: recommend 1024MB RAM, 2 vCPUs, small disk
### Environment Variables
| Environment Variable | Purpose | Required? | Default Value |
|----------------------|-------------------------------------------------------|---------------|---------------|
| `PG_ENABLED` | Enable writing to PostgreSQL | No | false |
| `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | |
| `TENANT1_ID` | Technical ID for the tenant | Yes | |
| `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | |
| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | |
| Environment Variable | Purpose | Required? | Default Value |
|----------------------|-------------------------------------------------------|--------------|--------------------------------------------|
| `PG_ENABLED` | Enable writing to PostgreSQL | No | false |
| `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | |
| `TENANT1_ID` | Technical ID for the tenant | Yes | |
| `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | |
| `TENANT1_TX_STATUS_WS_ADDR` | Websocket source for tx status |No | RPC Url<br/>(replacing schema with ws/wss) |
| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | |
### Command-line Arguments
```
Options:
-b, --bench-interval <BENCH_INTERVAL>
interval in milliseconds to run the benchmark [default: 60000]
-n, --tx-count <TX_COUNT>
[default: 10]
-p, --payer-path <PAYER_PATH>
-r, --rpc-url <RPC_URL>
-w, --tx-status-websocket-addr <TX_STATUS_WEBSOCKET_ADDR>
Set websocket source (blockSubscribe method) for transaction status updates. You might want to send tx to one RPC and listen to another (reliable) RPC for status updates. Not all RPC nodes support this method. If not provided, the RPC URL is used to derive the websocket URL
-s, --size-tx <SIZE_TX>
[default: small] [possible values: small, large]
-p, --prio-fees <PRIO_FEES>
[default: 0]
[possible values: small, large]
-m, --max-timeout-ms <MAX_TIMEOUT_MS>
Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed [default: 15000]
-t, --txs-per-run <TXS_PER_RUN>
-n, --num-of-runs <NUM_OF_RUNS>
-f, --cu-price <CU_PRICE>
The CU price in micro lamports [default: 300]
```
```bash

View File

@ -6,6 +6,8 @@ pub struct TenantConfig {
// technical identifier for the tenant, e.g. "solana-rpc"
pub tenant_id: String,
pub rpc_addr: String,
// needs to point to a reliable websocket server that can be used to get tx status
pub tx_status_ws_addr: Option<String>,
}
// recommend to use one payer keypair for all targets and fund that keypair with enough SOL
@ -39,7 +41,7 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec<TenantConfig>
.find(|(v, _)| *v == format!("TENANT{}_ID", tc))
.iter()
.exactly_one()
.expect("need ID")
.expect("need TENANT_X_ID")
.1
.to_string(),
rpc_addr: v
@ -47,9 +49,16 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec<TenantConfig>
.find(|(v, _)| *v == format!("TENANT{}_RPC_ADDR", tc))
.iter()
.exactly_one()
.expect("need RPC_ADDR")
.expect("need TENANT_X_RPC_ADDR")
.1
.to_string(),
tx_status_ws_addr: v
.iter()
.find(|(v, _)| *v == format!("TENANT{}_TX_STATUS_WS_ADDR", tc))
.iter()
.at_most_one()
.expect("need TENANT_X_TX_STATUS_WS_ADDR")
.map(|(_, v)| v.to_string())
})
.collect::<Vec<TenantConfig>>();

View File

@ -23,6 +23,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::OnceCell;
use solana_lite_rpc_util::obfuscate_rpcurl;
#[tokio::main]
async fn main() {
@ -248,6 +249,7 @@ impl BenchRunner for BenchRunnerConfirmationRateImpl {
let metric = bench::service_adapter_new::benchnew_confirmation_rate_servicerunner(
&self.bench_config,
self.tenant_config.rpc_addr.clone(),
self.tenant_config.tx_status_ws_addr.clone(),
self.funded_payer.insecure_clone(),
)
.await;

View File

@ -134,7 +134,7 @@ impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl {
txs_confirmed,
txs_un_confirmed,
average_confirmation_time_ms,
average_slot_confirmation_time_ms,
average_slot_confirmation_time,
metric_json
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)

View File

@ -47,3 +47,7 @@ GRANT SELECT ON ALL TABLES IN SCHEMA benchrunner TO ro_benchrunner;
ALTER DEFAULT PRIVILEGES IN SCHEMA benchrunner GRANT SELECT ON TABLES TO ro_benchrunner;
ALTER TABLE benchrunner.bench_metrics RENAME TO bench_metrics_bench1;
ALTER TABLE benchrunner.bench_metrics_confirmation_rate ADD COLUMN average_slot_confirmation_time real;
ALTER TABLE benchrunner.bench_metrics_confirmation_rate DROP COLUMN average_slot_confirmation_time_ms;