From 0966211555e8ece09259cc52c0905f3c2a6b5879 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 17 Jun 2024 20:35:12 +0200 Subject: [PATCH 1/9] listen status from ws --- Cargo.lock | 98 +++++++++++-- bench/Cargo.toml | 5 + bench/src/benches/confirmation_rate.rs | 7 +- bench/src/benches/confirmation_slot.rs | 22 ++- bench/src/benches/mod.rs | 1 + bench/src/benches/rpc_interface.rs | 132 +++++++++--------- .../benches/tx_status_websocket_collector.rs | 80 +++++++++++ bench/src/service_adapter_new.rs | 3 + benchrunner-service/src/args.rs | 1 + benchrunner-service/src/main.rs | 8 ++ 10 files changed, 277 insertions(+), 80 deletions(-) create mode 100644 bench/src/benches/tx_status_websocket_collector.rs diff --git a/Cargo.lock b/Cargo.lock index aa745b06..a28248e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -578,11 +578,13 @@ dependencies = [ "dirs", "futures", "itertools 0.10.5", + "jsonrpsee-types 0.22.5", "lazy_static", "log", "rand 0.8.5", "rand_chacha 0.3.1", "reqwest", + "scopeguard", "serde", "serde_json", "solana-lite-rpc-util", @@ -595,6 +597,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "websocket-tungstenite-retry", ] [[package]] @@ -2341,7 +2344,7 @@ dependencies = [ "jsonrpsee-http-client", "jsonrpsee-proc-macros", "jsonrpsee-server", - "jsonrpsee-types", + "jsonrpsee-types 0.20.3", "jsonrpsee-wasm-client", "jsonrpsee-ws-client", "tokio", @@ -2384,7 +2387,7 @@ dependencies = [ "futures-timer", "futures-util", "hyper", - "jsonrpsee-types", + "jsonrpsee-types 0.20.3", "parking_lot", "rand 0.8.5", "rustc-hash", @@ -2407,7 +2410,7 @@ dependencies = [ "hyper", "hyper-rustls", "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-types 0.20.3", "serde", "serde_json", "thiserror", @@ -2440,7 +2443,7 @@ dependencies = [ "http", "hyper", "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-types 0.20.3", "route-recognizer", "serde", "serde_json", @@ -2467,6 +2470,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "jsonrpsee-types" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "150d6168405890a7a3231a3c74843f58b8959471f6df76078db2619ddee1d07d" +dependencies = [ + "anyhow", + "beef", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "jsonrpsee-wasm-client" version = "0.20.3" @@ -2475,7 +2491,7 @@ checksum = "7c7cbb3447cf14fd4d2f407c3cc96e6c9634d5440aa1fbed868a31f3c02b27f0" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-types 0.20.3", ] [[package]] @@ -2487,7 +2503,7 @@ dependencies = [ "http", "jsonrpsee-client-transport", "jsonrpsee-core", - "jsonrpsee-types", + "jsonrpsee-types 0.20.3", "url", ] @@ -4149,6 +4165,17 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4290,7 +4317,7 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "sha-1", + "sha-1 0.9.8", ] [[package]] @@ -5085,8 +5112,8 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite", - "tungstenite", + "tokio-tungstenite 0.20.1", + "tungstenite 0.20.1", "url", ] @@ -6054,6 +6081,20 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite 0.17.3", +] + [[package]] name = "tokio-tungstenite" version = "0.20.1" @@ -6065,7 +6106,7 @@ dependencies = [ "rustls", "tokio", "tokio-rustls", - "tungstenite", + "tungstenite 0.20.1", "webpki-roots 0.25.4", ] @@ -6308,6 +6349,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "native-tls", + "rand 0.8.5", + "sha-1 0.10.1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.20.1" @@ -6598,6 +6659,23 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "websocket-tungstenite-retry" +version = "0.6.0" +source = "git+https://github.com/grooviegermanikus/websocket-tungstenite-retry.git?tag=v0.8.0#2f423ed40171b023aa0d5c67287ad778958a6722" +dependencies = [ + "anyhow", + "futures-util", + "log", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite 0.17.2", + "tokio-util", + "tracing", + "url", +] + [[package]] name = "which" version = "4.4.2" diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 9dc526af..193cfe79 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -13,6 +13,10 @@ name = "benchnew" path = "src/benchnew.rs" [dependencies] + +websocket-tungstenite-retry = { git = "https://github.com/grooviegermanikus/websocket-tungstenite-retry.git", tag = "v0.8.0" } +jsonrpsee-types = "0.22.2" + clap = { workspace = true } csv = "1.2.1" dirs = "5.0.0" @@ -38,6 +42,7 @@ spl-memo = "4.0.0" url = "*" reqwest = "0.11.26" lazy_static = "1.4.0" +scopeguard = "1.2.0" [dev-dependencies] bincode = { workspace = true } diff --git a/bench/src/benches/confirmation_rate.rs b/bench/src/benches/confirmation_rate.rs index e24c81e0..5b40436d 100644 --- a/bench/src/benches/confirmation_rate.rs +++ b/bench/src/benches/confirmation_rate.rs @@ -11,6 +11,7 @@ use crate::benches::rpc_interface::{ }; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer}; +use url::Url; #[derive(Clone, Copy, Debug, Default, serde::Serialize)] pub struct Metric { @@ -28,6 +29,7 @@ pub struct Metric { pub async fn confirmation_rate( payer_path: &Path, rpc_url: String, + tx_status_websocket_addr: String, tx_params: BenchmarkTransactionParams, max_timeout: Duration, txs_per_run: usize, @@ -46,7 +48,7 @@ pub async fn confirmation_rate( let mut rpc_results = Vec::with_capacity(num_of_runs); for _ in 0..num_of_runs { - match send_bulk_txs_and_wait(&rpc, &payer, txs_per_run, &tx_params, max_timeout) + match send_bulk_txs_and_wait(&rpc, Url::parse(&tx_status_websocket_addr).expect("Invalid Url"), &payer, txs_per_run, &tx_params, max_timeout) .await .context("send bulk tx and wait") { @@ -72,6 +74,7 @@ pub async fn confirmation_rate( pub async fn send_bulk_txs_and_wait( rpc: &RpcClient, + tx_status_websocket_addr: Url, payer: &Keypair, num_txs: usize, tx_params: &BenchmarkTransactionParams, @@ -87,7 +90,7 @@ pub async fn send_bulk_txs_and_wait( trace!("Sending {} transactions in bulk ..", txs.len()); let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> = - send_and_confirm_bulk_transactions(rpc, &txs, max_timeout) + send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer.pubkey(), &txs, max_timeout) .await .context("send and confirm bulk tx")?; trace!("Done sending {} transaction.", txs.len()); diff --git a/bench/src/benches/confirmation_slot.rs b/bench/src/benches/confirmation_slot.rs index 85160cf2..7c8ea3ee 100644 --- a/bench/src/benches/confirmation_slot.rs +++ b/bench/src/benches/confirmation_slot.rs @@ -1,4 +1,5 @@ use std::path::Path; +use std::str::FromStr; use std::time::Duration; use crate::benches::rpc_interface::{ @@ -13,8 +14,10 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::{read_keypair_file, Signature, Signer}; use solana_sdk::transaction::VersionedTransaction; use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}; +use solana_sdk::pubkey::Pubkey; use tokio::time::{sleep, Instant}; use url::Url; +use crate::benches::tx_status_websocket_collector::start_tx_status_collector; #[derive(Clone, Copy, Debug, Default)] pub struct Metric { @@ -46,6 +49,7 @@ pub async fn confirmation_slot( payer_path: &Path, rpc_a_url: String, rpc_b_url: String, + tx_status_websocket_addr: String, tx_params: BenchmarkTransactionParams, max_timeout: Duration, num_of_runs: usize, @@ -66,12 +70,20 @@ pub async fn confirmation_slot( let mut rng = create_rng(None); let payer = read_keypair_file(payer_path).expect("payer file"); - info!("Payer: {}", payer.pubkey().to_string()); + let payer_pubkey = payer.pubkey(); + info!("Payer: {}", payer_pubkey.to_string()); // let mut ping_thing_tasks = vec![]; + // FIXME + // let (tx_status_map, jh_collector) = start_tx_status_collector(Url::parse(&tx_status_websocket_addr).unwrap(), payer.pubkey(), CommitmentConfig::confirmed()).await; + for _ in 0..num_of_runs { let rpc_a = create_rpc_client(&rpc_a_url); let rpc_b = create_rpc_client(&rpc_b_url); + + let tx_status_websocket_addr_a = Url::parse(&tx_status_websocket_addr).expect("Invalid URL"); + let tx_status_websocket_addr_b = Url::parse(&tx_status_websocket_addr).expect("Invalid URL"); + // measure network time to reach the respective RPC endpoints, // used to mitigate the difference in distance by delaying the txn sending let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64(); @@ -95,13 +107,13 @@ pub async fn confirmation_slot( let a_task = tokio::spawn(async move { sleep(Duration::from_secs_f64(a_delay)).await; debug!("(A) sending tx {}", rpc_a_tx.signatures[0]); - send_and_confirm_transaction(&rpc_a, rpc_a_tx, max_timeout).await + send_and_confirm_transaction(&rpc_a, tx_status_websocket_addr_a, payer_pubkey, rpc_a_tx, max_timeout).await }); let b_task = tokio::spawn(async move { sleep(Duration::from_secs_f64(b_delay)).await; debug!("(B) sending tx {}", rpc_b_tx.signatures[0]); - send_and_confirm_transaction(&rpc_b, rpc_b_tx, max_timeout).await + send_and_confirm_transaction(&rpc_b, tx_status_websocket_addr_b, payer_pubkey, rpc_b_tx, max_timeout).await }); let (a, b) = tokio::join!(a_task, b_task); @@ -156,11 +168,13 @@ async fn create_tx( async fn send_and_confirm_transaction( rpc: &RpcClient, + tx_status_websocket_addr: Url, + payer_pubkey: Pubkey, tx: VersionedTransaction, max_timeout: Duration, ) -> anyhow::Result { let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> = - send_and_confirm_bulk_transactions(rpc, &[tx], max_timeout).await?; + send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer_pubkey, &[tx], max_timeout).await?; assert_eq!(result_vec.len(), 1, "expected 1 result"); let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap(); diff --git a/bench/src/benches/mod.rs b/bench/src/benches/mod.rs index c560a93c..bac4ce38 100644 --- a/bench/src/benches/mod.rs +++ b/bench/src/benches/mod.rs @@ -2,3 +2,4 @@ pub mod api_load; pub mod confirmation_rate; pub mod confirmation_slot; pub mod rpc_interface; +mod tx_status_websocket_collector; diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs index 591db446..87be37e3 100644 --- a/bench/src/benches/rpc_interface.rs +++ b/bench/src/benches/rpc_interface.rs @@ -2,7 +2,7 @@ use anyhow::{bail, Context, Error}; use futures::future::join_all; use futures::TryFutureExt; use itertools::Itertools; -use log::{debug, trace, warn}; +use log::{debug, info, trace, warn}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::rpc_client::SerializableTransaction; use solana_rpc_client_api::client_error::ErrorKind; @@ -14,10 +14,19 @@ use solana_sdk::transaction::VersionedTransaction; use solana_transaction_status::TransactionConfirmationStatus; use std::collections::{HashMap, HashSet}; use std::iter::zip; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use tokio::time::Instant; +use dashmap::mapref::multiple::RefMulti; +use scopeguard::defer; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use solana_rpc_client_api::response::{Response, RpcBlockUpdate, RpcResponseContext, SlotUpdate}; +use solana_sdk::pubkey::Pubkey; +use tokio::time::{Instant, timeout}; use url::Url; +use websocket_tungstenite_retry::websocket_stable::WsMessage; +use crate::benches::tx_status_websocket_collector::start_tx_status_collector; pub fn create_rpc_client(rpc_url: &Url) -> RpcClient { RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed()) @@ -28,7 +37,7 @@ pub enum ConfirmationResponseFromRpc { // RPC error on send_transaction SendError(Arc), // (sent slot at confirmed commitment, confirmed slot, ..., ...) - // transaction_confirmation_status is "confirmed" or "finalized" + // transaction_confirmation_status is "confirmed" (finalized is not reported by blockSubscribe websocket Success(Slot, Slot, TransactionConfirmationStatus, Duration), // timout waiting for confirmation status Timeout(Duration), @@ -36,6 +45,8 @@ pub enum ConfirmationResponseFromRpc { pub async fn send_and_confirm_bulk_transactions( rpc_client: &RpcClient, + tx_status_websocket_addr: Url, + payer_pubkey: Pubkey, txs: &[VersionedTransaction], max_timeout: Duration, ) -> anyhow::Result> { @@ -53,6 +64,10 @@ pub async fn send_and_confirm_bulk_transactions( min_context_slot: None, }; + // note: we get confirmed but never finaliized + let (tx_status_map, jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await; + defer!(jh_collector.abort()); + let started_at = Instant::now(); trace!( "Sending {} transactions via RPC (retries=off) ..", @@ -99,15 +114,16 @@ pub async fn send_and_confirm_bulk_transactions( trace!("- tx_fail {}", tx_sig.get_signature()); } } + let elapsed = started_at.elapsed(); debug!( "{} transactions sent successfully in {:.02}ms", num_sent_ok, - started_at.elapsed().as_secs_f32() * 1000.0 + elapsed.as_secs_f32() * 1000.0 ); debug!( "{} transactions failed to send in {:.02}ms", num_sent_failed, - started_at.elapsed().as_secs_f32() * 1000.0 + elapsed.as_secs_f32() * 1000.0 ); if num_sent_failed > 0 { @@ -132,75 +148,63 @@ pub async fn send_and_confirm_bulk_transactions( let started_at = Instant::now(); let timeout_at = started_at + max_timeout; + // "poll" the status dashmap 'polling_loop: for iteration in 1.. { - let iteration_ends_at = started_at + Duration::from_millis(iteration * 400); + let iteration_ends_at = started_at + Duration::from_millis(iteration * 100); assert_eq!( pending_status_set.len() + result_status_map.len(), num_sent_ok, "Items must move between pending+result" ); - let tx_batch = pending_status_set.iter().cloned().collect_vec(); - debug!( - "Request status for batch of remaining {} transactions in iteration {}", - tx_batch.len(), - iteration - ); + // let tx_batch = pending_status_set.iter().cloned().collect_vec(); + // debug!( + // "Request status for batch of remaining {} transactions in iteration {}", + // tx_batch.len(), + // iteration + // ); - let status_started_at = Instant::now(); - let mut batch_status = Vec::new(); - // "Too many inputs provided; max 256" - for chunk in tx_batch.chunks(256) { - // fail hard if not possible to poll status - let chunk_responses = rpc_client - .get_signature_statuses(chunk) - .await - .expect("get signature statuses"); - batch_status.extend(chunk_responses.value); - } - if status_started_at.elapsed() > Duration::from_millis(500) { - warn!( - "SLOW get_signature_statuses for {} transactions took {:?}", - tx_batch.len(), - status_started_at.elapsed() - ); - } + // let status_started_at = Instant::now(); + // let mut batch_status = Vec::new(); + // // "Too many inputs provided; max 256" + // for chunk in tx_batch.chunks(256) { + // // fail hard if not possible to poll status + // let chunk_responses = rpc_client + // .get_signature_statuses(chunk) + // .await + // .expect("get signature statuses"); + // batch_status.extend(chunk_responses.value); + // } + // if status_started_at.elapsed() > Duration::from_millis(500) { + // warn!( + // "SLOW get_signature_statuses for {} transactions took {:?}", + // tx_batch.len(), + // status_started_at.elapsed() + // ); + // } let elapsed = started_at.elapsed(); - for (tx_sig, status_response) in zip(tx_batch, batch_status) { - match status_response { - Some(tx_status) => { - trace!( - "Some signature status {:?} received for {} after {:.02}ms", - tx_status.confirmation_status, - tx_sig, - elapsed.as_secs_f32() * 1000.0 - ); - if !tx_status.satisfies_commitment(CommitmentConfig::confirmed()) { - continue 'polling_loop; - } - // status is confirmed or finalized - pending_status_set.remove(&tx_sig); - let prev_value = result_status_map.insert( - tx_sig, - ConfirmationResponseFromRpc::Success( - send_slot, - tx_status.slot, - tx_status.confirmation_status(), - elapsed, - ), - ); - assert!(prev_value.is_none(), "Must not override existing value"); - } - None => { - // None: not yet processed by the cluster - trace!( - "No signature status was received for {} after {:.02}ms - continue waiting", - tx_sig, - elapsed.as_secs_f32() * 1000.0 - ); - } + for multi in tx_status_map.iter() { + // note that we will see tx_sigs we did not send + let (tx_sig, confirmed_slot) = multi.pair(); + + // status is confirmed or finalized + if pending_status_set.remove(&tx_sig) { + trace!("take status for sig {:?} and confirmed_slot: {:?} from websocket source", tx_sig, confirmed_slot); + let prev_value = result_status_map.insert( + tx_sig.clone(), + ConfirmationResponseFromRpc::Success( + send_slot, + *confirmed_slot, + // note: this is not optimal + TransactionConfirmationStatus::Confirmed, + elapsed, + ), + ); + assert!(prev_value.is_none(), "Must not override existing value"); } - } + + } // -- END for tx_status_map loop + if pending_status_set.is_empty() { debug!( diff --git a/bench/src/benches/tx_status_websocket_collector.rs b/bench/src/benches/tx_status_websocket_collector.rs new file mode 100644 index 00000000..6a7a7fe2 --- /dev/null +++ b/bench/src/benches/tx_status_websocket_collector.rs @@ -0,0 +1,80 @@ +use std::str::FromStr; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use dashmap::{DashMap, DashSet}; +use log::info; +use serde_json::json; +use solana_rpc_client_api::response::{Response, RpcBlockUpdate}; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signature; +use tokio::task::AbortHandle; +use tokio::time::sleep; +use url::Url; +use websocket_tungstenite_retry::websocket_stable; +use websocket_tungstenite_retry::websocket_stable::WsMessage; + +// TODO void race condition when this is not started up fast enough to catch transaction +// returns map of transaction signatures to the slot they were confirmed +pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig) + -> (Arc>, AbortHandle) { + // e.g. "commitment" + let commitment_str = format!("{:?}", commitment_config); + + let mut web_socket_slots = websocket_stable::StableWebSocket::new_with_timeout( + ws_url, + json!({ + "jsonrpc": "2.0", + "id": "1", + "method": "blockSubscribe", + "params": [ + { + "mentionsAccountOrProgram": payer_pubkey.to_string(), + }, + { + "commitment": commitment_str, + "encoding": "base64", + "showRewards": false, + "transactionDetails": "signatures" + } + ] + }), + Duration::from_secs(10), + ).await + .expect("Failed to connect to websocket"); + + + let mut channel = web_socket_slots.subscribe_message_channel(); + + let observed_transactions: Arc> = Arc::new(DashMap::with_capacity(64)); + + let observed_transactions_write = observed_transactions.clone(); + let jh = tokio::spawn(async move { + let started_at = Instant::now(); + + while let Ok(msg) = channel.recv().await { + // TOOD use this to know when we are subscribed + info!("SOME MESSGE FROM SUbsCRIPTN"); + if let WsMessage::Text(payload) = msg { + let ws_result: jsonrpsee_types::SubscriptionResponse> = + serde_json::from_str(&payload).unwrap(); + let block_update = ws_result.params.result; + let slot = block_update.value.slot; + if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { + for tx_sig in tx_sigs_from_block { + let tx_sig = Signature::from_str(&tx_sig).unwrap(); + info!("Transaction signature: {} _> slot {}", tx_sig, slot); + info!("status map size: {} - up {:?}", observed_transactions_write.len(), started_at.elapsed()); + observed_transactions_write.entry(tx_sig).or_insert(slot); + } + } + } + } + }); + + // FIXME avoid race condition + sleep(Duration::from_secs(3)).await; + + (observed_transactions, jh.abort_handle()) +} \ No newline at end of file diff --git a/bench/src/service_adapter_new.rs b/bench/src/service_adapter_new.rs index 06d71d14..d8583c3b 100644 --- a/bench/src/service_adapter_new.rs +++ b/bench/src/service_adapter_new.rs @@ -7,10 +7,12 @@ use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Keypair; use std::sync::Arc; use std::time::Duration; +use url::Url; pub async fn benchnew_confirmation_rate_servicerunner( bench_config: &BenchConfig, rpc_addr: String, + tx_status_websocket_addr: String, funded_payer: Keypair, ) -> confirmation_rate::Metric { let rpc = Arc::new(RpcClient::new(rpc_addr)); @@ -21,6 +23,7 @@ pub async fn benchnew_confirmation_rate_servicerunner( let max_timeout = Duration::from_secs(60); let result = send_bulk_txs_and_wait( &rpc, + Url::parse(&tx_status_websocket_addr).expect("Invalid URL"), &funded_payer, bench_config.tx_count, &tx_params, diff --git a/benchrunner-service/src/args.rs b/benchrunner-service/src/args.rs index 5e4a25ea..801c829c 100644 --- a/benchrunner-service/src/args.rs +++ b/benchrunner-service/src/args.rs @@ -6,6 +6,7 @@ pub struct TenantConfig { // technical identifier for the tenant, e.g. "solana-rpc" pub tenant_id: String, pub rpc_addr: String, + pub tx_status_ws_addr: String, } // recommend to use one payer keypair for all targets and fund that keypair with enough SOL diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index fea54323..2e5d2678 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -23,6 +23,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::sync::OnceCell; +use solana_lite_rpc_util::obfuscate_rpcurl; #[tokio::main] async fn main() { @@ -42,10 +43,13 @@ async fn main() { let funded_payer = Arc::new(get_funded_payer_from_env()); let tenant_configs = read_tenant_configs(std::env::vars().collect::>()); + // this should point to a reliable websocket RPC node + let tx_status_websocket_addr: String = std::env::var("TX_STATUS_WS_ADDR").expect("need TX_STATUS_WS_ADDR env var"); info!("Use postgres config: {:?}", postgres_config.is_some()); info!("Use prio fees: [{}]", prio_fees.iter().join(",")); info!("Start running benchmarks every {:?}", bench_interval); + info!("Use websocket for tx status: {:?}", obfuscate_rpcurl(&tx_status_websocket_addr)); info!( "Found tenants: {}", tenant_configs.iter().map(|tc| &tc.tenant_id).join(", ") @@ -87,6 +91,7 @@ async fn main() { let tenant_id = tenant_config.tenant_id.clone(); let postgres_session = postgres_session.clone(); let tenant_config = tenant_config.clone(); + let tx_status_websocket_addr = tx_status_websocket_addr.clone(); let bench_configs = bench_configs.clone(); let jh_runner = tokio::spawn(async move { let mut interval = tokio::time::interval(bench_interval); @@ -103,6 +108,7 @@ async fn main() { 0 => Box::new(BenchRunnerConfirmationRateImpl { benchrun_at, tenant_config: tenant_config.clone(), + tx_status_websocket_addr: tx_status_websocket_addr.clone(), bench_config: bench_config.clone(), funded_payer: funded_payer.clone(), metric: OnceCell::new(), @@ -237,6 +243,7 @@ impl BenchTrait for BenchRunnerConfirmationRateImpl {} struct BenchRunnerConfirmationRateImpl { pub benchrun_at: SystemTime, pub tenant_config: TenantConfig, + pub tx_status_websocket_addr: String, pub bench_config: BenchConfig, pub funded_payer: Arc, pub metric: OnceCell, @@ -248,6 +255,7 @@ impl BenchRunner for BenchRunnerConfirmationRateImpl { let metric = bench::service_adapter_new::benchnew_confirmation_rate_servicerunner( &self.bench_config, self.tenant_config.rpc_addr.clone(), + self.tx_status_websocket_addr.clone(), self.funded_payer.insecure_clone(), ) .await; From f1def98652a48a1e91439315f595d31b1c0bd355 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 19 Jun 2024 09:21:47 +0200 Subject: [PATCH 2/9] add ws addr --- bench/src/benches/confirmation_rate.rs | 16 ++++++++++------ bench/src/benches/confirmation_slot.rs | 18 ++++++++++++------ bench/src/benches/rpc_interface.rs | 16 ++++++++-------- .../benches/tx_status_websocket_collector.rs | 8 ++------ bench/src/benchnew.rs | 12 ++++++++++++ benchrunner-service/src/args.rs | 9 +++++++++ benchrunner-service/src/main.rs | 8 +------- 7 files changed, 54 insertions(+), 33 deletions(-) diff --git a/bench/src/benches/confirmation_rate.rs b/bench/src/benches/confirmation_rate.rs index 5b40436d..b739caa4 100644 --- a/bench/src/benches/confirmation_rate.rs +++ b/bench/src/benches/confirmation_rate.rs @@ -12,6 +12,7 @@ use crate::benches::rpc_interface::{ use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer}; use url::Url; +use solana_lite_rpc_util::obfuscate_rpcurl; #[derive(Clone, Copy, Debug, Default, serde::Serialize)] pub struct Metric { @@ -29,7 +30,7 @@ pub struct Metric { pub async fn confirmation_rate( payer_path: &Path, rpc_url: String, - tx_status_websocket_addr: String, + tx_status_websocket_addr: Option, tx_params: BenchmarkTransactionParams, max_timeout: Duration, txs_per_run: usize, @@ -39,8 +40,11 @@ pub async fn confirmation_rate( assert!(num_of_runs > 0, "num_of_runs must be greater than 0"); - let rpc = Arc::new(RpcClient::new(rpc_url)); - info!("RPC: {}", rpc.as_ref().url()); + let rpc = Arc::new(RpcClient::new(rpc_url.clone())); + info!("RPC: {}", obfuscate_rpcurl(&rpc.as_ref().url())); + + let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_url.replace("http:", "ws:").replace("https:", "wss:")); + info!("WS ADDR: {}", obfuscate_rpcurl(&ws_addr)); let payer: Arc = Arc::new(read_keypair_file(payer_path).unwrap()); info!("Payer: {}", payer.pubkey().to_string()); @@ -48,7 +52,7 @@ pub async fn confirmation_rate( let mut rpc_results = Vec::with_capacity(num_of_runs); for _ in 0..num_of_runs { - match send_bulk_txs_and_wait(&rpc, Url::parse(&tx_status_websocket_addr).expect("Invalid Url"), &payer, txs_per_run, &tx_params, max_timeout) + match send_bulk_txs_and_wait(&rpc, Url::parse(&ws_addr).expect("Invalid Url"), &payer, txs_per_run, &tx_params, max_timeout) .await .context("send bulk tx and wait") { @@ -130,9 +134,9 @@ pub async fn send_bulk_txs_and_wait( } ConfirmationResponseFromRpc::Timeout(elapsed) => { debug!( - "Signature {} not confirmed after {:.02}ms", + "Signature {} not confirmed after {:.03}s", tx_sig, - elapsed.as_secs_f32() * 1000.0 + elapsed.as_secs_f32() ); tx_sent += 1; tx_unconfirmed += 1; diff --git a/bench/src/benches/confirmation_slot.rs b/bench/src/benches/confirmation_slot.rs index 7c8ea3ee..3382d931 100644 --- a/bench/src/benches/confirmation_slot.rs +++ b/bench/src/benches/confirmation_slot.rs @@ -49,7 +49,8 @@ pub async fn confirmation_slot( payer_path: &Path, rpc_a_url: String, rpc_b_url: String, - tx_status_websocket_addr: String, + tx_status_websocket_addr_a: Option, + tx_status_websocket_addr_b: Option, tx_params: BenchmarkTransactionParams, max_timeout: Duration, num_of_runs: usize, @@ -63,6 +64,11 @@ pub async fn confirmation_slot( info!("RPC A: {}", obfuscate_rpcurl(&rpc_a_url)); info!("RPC B: {}", obfuscate_rpcurl(&rpc_b_url)); + let ws_addr_a = tx_status_websocket_addr_a.unwrap_or_else(|| rpc_a_url.replace("http:", "ws:").replace("https:", "wss:")); + let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:")); + let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL"); + let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL"); + let rpc_a_url = Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?; let rpc_b_url = @@ -80,9 +86,9 @@ pub async fn confirmation_slot( for _ in 0..num_of_runs { let rpc_a = create_rpc_client(&rpc_a_url); let rpc_b = create_rpc_client(&rpc_b_url); - - let tx_status_websocket_addr_a = Url::parse(&tx_status_websocket_addr).expect("Invalid URL"); - let tx_status_websocket_addr_b = Url::parse(&tx_status_websocket_addr).expect("Invalid URL"); + + let ws_addr_a = ws_addr_a.clone(); + let ws_addr_b = ws_addr_b.clone(); // measure network time to reach the respective RPC endpoints, // used to mitigate the difference in distance by delaying the txn sending @@ -107,13 +113,13 @@ pub async fn confirmation_slot( let a_task = tokio::spawn(async move { sleep(Duration::from_secs_f64(a_delay)).await; debug!("(A) sending tx {}", rpc_a_tx.signatures[0]); - send_and_confirm_transaction(&rpc_a, tx_status_websocket_addr_a, payer_pubkey, rpc_a_tx, max_timeout).await + send_and_confirm_transaction(&rpc_a, ws_addr_a, payer_pubkey, rpc_a_tx, max_timeout).await }); let b_task = tokio::spawn(async move { sleep(Duration::from_secs_f64(b_delay)).await; debug!("(B) sending tx {}", rpc_b_tx.signatures[0]); - send_and_confirm_transaction(&rpc_b, tx_status_websocket_addr_b, payer_pubkey, rpc_b_tx, max_timeout).await + send_and_confirm_transaction(&rpc_b, ws_addr_b, payer_pubkey, rpc_b_tx, max_timeout).await }); let (a, b) = tokio::join!(a_task, b_task); diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs index 87be37e3..3bed54d1 100644 --- a/bench/src/benches/rpc_interface.rs +++ b/bench/src/benches/rpc_interface.rs @@ -116,14 +116,14 @@ pub async fn send_and_confirm_bulk_transactions( } let elapsed = started_at.elapsed(); debug!( - "{} transactions sent successfully in {:.02}ms", + "send_transaction successful for {} txs in {:.03}s", num_sent_ok, - elapsed.as_secs_f32() * 1000.0 + elapsed.as_secs_f32() ); debug!( - "{} transactions failed to send in {:.02}ms", + "send_transaction failed to send {} txs in {:.03}s", num_sent_failed, - elapsed.as_secs_f32() * 1000.0 + elapsed.as_secs_f32() ); if num_sent_failed > 0 { @@ -146,6 +146,7 @@ pub async fn send_and_confirm_bulk_transactions( // items get moved from pending_status_set to result_status_map + debug!("Wating for transaction confirmations .."); let started_at = Instant::now(); let timeout_at = started_at + max_timeout; // "poll" the status dashmap @@ -208,8 +209,7 @@ pub async fn send_and_confirm_bulk_transactions( if pending_status_set.is_empty() { debug!( - "All transactions confirmed after {} iterations / {:?}", - iteration, + "All transactions confirmed after {:?}", started_at.elapsed() ); break 'polling_loop; @@ -217,8 +217,8 @@ pub async fn send_and_confirm_bulk_transactions( if Instant::now() > timeout_at { warn!( - "Timeout waiting for transactions to confirm after {} iterations", - iteration + "Timeout waiting for transactions to confirm after {:?}", + started_at.elapsed() ); break 'polling_loop; } diff --git a/bench/src/benches/tx_status_websocket_collector.rs b/bench/src/benches/tx_status_websocket_collector.rs index 6a7a7fe2..6866fb5d 100644 --- a/bench/src/benches/tx_status_websocket_collector.rs +++ b/bench/src/benches/tx_status_websocket_collector.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; use dashmap::{DashMap, DashSet}; -use log::info; +use log::{info, trace}; use serde_json::json; use solana_rpc_client_api::response::{Response, RpcBlockUpdate}; use solana_sdk::clock::Slot; @@ -55,7 +55,6 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit while let Ok(msg) = channel.recv().await { // TOOD use this to know when we are subscribed - info!("SOME MESSGE FROM SUbsCRIPTN"); if let WsMessage::Text(payload) = msg { let ws_result: jsonrpsee_types::SubscriptionResponse> = serde_json::from_str(&payload).unwrap(); @@ -64,8 +63,7 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { for tx_sig in tx_sigs_from_block { let tx_sig = Signature::from_str(&tx_sig).unwrap(); - info!("Transaction signature: {} _> slot {}", tx_sig, slot); - info!("status map size: {} - up {:?}", observed_transactions_write.len(), started_at.elapsed()); + trace!("Transaction signature: {} - slot {}", tx_sig, slot); observed_transactions_write.entry(tx_sig).or_insert(slot); } } @@ -73,8 +71,6 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit } }); - // FIXME avoid race condition - sleep(Duration::from_secs(3)).await; (observed_transactions, jh.abort_handle()) } \ No newline at end of file diff --git a/bench/src/benchnew.rs b/bench/src/benchnew.rs index 5d5ebf70..a03d5d01 100644 --- a/bench/src/benchnew.rs +++ b/bench/src/benchnew.rs @@ -39,6 +39,8 @@ enum SubCommand { payer_path: PathBuf, #[clap(short, long)] rpc_url: String, + #[clap(short = 'w', long)] + tx_status_websocket_addr: Option, #[clap(short, long)] size_tx: TxSize, /// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed @@ -65,6 +67,10 @@ enum SubCommand { #[clap(short, long)] #[arg(short = 'b')] rpc_b: String, + #[clap(long)] + tx_status_websocket_addr_a: Option, + #[clap(long)] + tx_status_websocket_addr_b: Option, #[clap(short, long)] size_tx: TxSize, /// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed @@ -108,6 +114,7 @@ async fn main() { SubCommand::ConfirmationRate { payer_path, rpc_url, + tx_status_websocket_addr, size_tx, max_timeout_ms, txs_per_run, @@ -116,6 +123,7 @@ async fn main() { } => confirmation_rate( &payer_path, rpc_url, + tx_status_websocket_addr, BenchmarkTransactionParams { tx_size: size_tx, cu_price_micro_lamports: cu_price, @@ -130,6 +138,8 @@ async fn main() { payer_path, rpc_a, rpc_b, + tx_status_websocket_addr_a, + tx_status_websocket_addr_b, size_tx, max_timeout_ms, num_of_runs, @@ -139,6 +149,8 @@ async fn main() { &payer_path, rpc_a, rpc_b, + tx_status_websocket_addr_a, + tx_status_websocket_addr_b, BenchmarkTransactionParams { tx_size: size_tx, cu_price_micro_lamports: cu_price, diff --git a/benchrunner-service/src/args.rs b/benchrunner-service/src/args.rs index 801c829c..e0f1cc92 100644 --- a/benchrunner-service/src/args.rs +++ b/benchrunner-service/src/args.rs @@ -6,6 +6,7 @@ pub struct TenantConfig { // technical identifier for the tenant, e.g. "solana-rpc" pub tenant_id: String, pub rpc_addr: String, + // needs to point to a reliable websocket server that can be used to get tx status pub tx_status_ws_addr: String, } @@ -51,6 +52,14 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec .expect("need RPC_ADDR") .1 .to_string(), + tx_status_ws_addr: v + .iter() + .find(|(v, _)| *v == format!("TENANT{}_TX_STATUS_WS_ADDR", tc)) + .iter() + .exactly_one() + .expect("need TX_STATUS_WS_ADDR") + .1 + .to_string(), }) .collect::>(); diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index 2e5d2678..edbbcb8d 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -43,13 +43,10 @@ async fn main() { let funded_payer = Arc::new(get_funded_payer_from_env()); let tenant_configs = read_tenant_configs(std::env::vars().collect::>()); - // this should point to a reliable websocket RPC node - let tx_status_websocket_addr: String = std::env::var("TX_STATUS_WS_ADDR").expect("need TX_STATUS_WS_ADDR env var"); info!("Use postgres config: {:?}", postgres_config.is_some()); info!("Use prio fees: [{}]", prio_fees.iter().join(",")); info!("Start running benchmarks every {:?}", bench_interval); - info!("Use websocket for tx status: {:?}", obfuscate_rpcurl(&tx_status_websocket_addr)); info!( "Found tenants: {}", tenant_configs.iter().map(|tc| &tc.tenant_id).join(", ") @@ -91,7 +88,6 @@ async fn main() { let tenant_id = tenant_config.tenant_id.clone(); let postgres_session = postgres_session.clone(); let tenant_config = tenant_config.clone(); - let tx_status_websocket_addr = tx_status_websocket_addr.clone(); let bench_configs = bench_configs.clone(); let jh_runner = tokio::spawn(async move { let mut interval = tokio::time::interval(bench_interval); @@ -108,7 +104,6 @@ async fn main() { 0 => Box::new(BenchRunnerConfirmationRateImpl { benchrun_at, tenant_config: tenant_config.clone(), - tx_status_websocket_addr: tx_status_websocket_addr.clone(), bench_config: bench_config.clone(), funded_payer: funded_payer.clone(), metric: OnceCell::new(), @@ -243,7 +238,6 @@ impl BenchTrait for BenchRunnerConfirmationRateImpl {} struct BenchRunnerConfirmationRateImpl { pub benchrun_at: SystemTime, pub tenant_config: TenantConfig, - pub tx_status_websocket_addr: String, pub bench_config: BenchConfig, pub funded_payer: Arc, pub metric: OnceCell, @@ -255,7 +249,7 @@ impl BenchRunner for BenchRunnerConfirmationRateImpl { let metric = bench::service_adapter_new::benchnew_confirmation_rate_servicerunner( &self.bench_config, self.tenant_config.rpc_addr.clone(), - self.tx_status_websocket_addr.clone(), + self.tenant_config.tx_status_ws_addr.clone(), self.funded_payer.insecure_clone(), ) .await; From c9147d850b5da7e275cf5ea4a25f448243fea454 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 19 Jun 2024 10:25:10 +0200 Subject: [PATCH 3/9] propagate ws for status --- bench/src/benches/confirmation_slot.rs | 4 +- bench/src/benches/rpc_interface.rs | 29 +-------------- .../benches/tx_status_websocket_collector.rs | 8 +--- bench/src/benchnew.rs | 4 ++ benchrunner-service/README.md | 37 ++++++++++++------- 5 files changed, 33 insertions(+), 49 deletions(-) diff --git a/bench/src/benches/confirmation_slot.rs b/bench/src/benches/confirmation_slot.rs index 3382d931..31c8e444 100644 --- a/bench/src/benches/confirmation_slot.rs +++ b/bench/src/benches/confirmation_slot.rs @@ -68,7 +68,7 @@ pub async fn confirmation_slot( let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:")); let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL"); let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL"); - + let rpc_a_url = Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?; let rpc_b_url = @@ -86,7 +86,7 @@ pub async fn confirmation_slot( for _ in 0..num_of_runs { let rpc_a = create_rpc_client(&rpc_a_url); let rpc_b = create_rpc_client(&rpc_b_url); - + let ws_addr_a = ws_addr_a.clone(); let ws_addr_b = ws_addr_b.clone(); diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs index 3bed54d1..9b33b15f 100644 --- a/bench/src/benches/rpc_interface.rs +++ b/bench/src/benches/rpc_interface.rs @@ -26,6 +26,7 @@ use solana_sdk::pubkey::Pubkey; use tokio::time::{Instant, timeout}; use url::Url; use websocket_tungstenite_retry::websocket_stable::WsMessage; +use solana_lite_rpc_util::obfuscate_rpcurl; use crate::benches::tx_status_websocket_collector::start_tx_status_collector; pub fn create_rpc_client(rpc_url: &Url) -> RpcClient { @@ -146,7 +147,7 @@ pub async fn send_and_confirm_bulk_transactions( // items get moved from pending_status_set to result_status_map - debug!("Wating for transaction confirmations .."); + debug!("Waiting for transaction confirmations from websocket source <{}> ..", obfuscate_rpcurl(tx_status_websocket_addr.as_str())); let started_at = Instant::now(); let timeout_at = started_at + max_timeout; // "poll" the status dashmap @@ -157,31 +158,6 @@ pub async fn send_and_confirm_bulk_transactions( num_sent_ok, "Items must move between pending+result" ); - // let tx_batch = pending_status_set.iter().cloned().collect_vec(); - // debug!( - // "Request status for batch of remaining {} transactions in iteration {}", - // tx_batch.len(), - // iteration - // ); - - // let status_started_at = Instant::now(); - // let mut batch_status = Vec::new(); - // // "Too many inputs provided; max 256" - // for chunk in tx_batch.chunks(256) { - // // fail hard if not possible to poll status - // let chunk_responses = rpc_client - // .get_signature_statuses(chunk) - // .await - // .expect("get signature statuses"); - // batch_status.extend(chunk_responses.value); - // } - // if status_started_at.elapsed() > Duration::from_millis(500) { - // warn!( - // "SLOW get_signature_statuses for {} transactions took {:?}", - // tx_batch.len(), - // status_started_at.elapsed() - // ); - // } let elapsed = started_at.elapsed(); for multi in tx_status_map.iter() { @@ -223,7 +199,6 @@ pub async fn send_and_confirm_bulk_transactions( break 'polling_loop; } - // avg 2 samples per slot tokio::time::sleep_until(iteration_ends_at).await; } // -- END polling loop diff --git a/bench/src/benches/tx_status_websocket_collector.rs b/bench/src/benches/tx_status_websocket_collector.rs index 6866fb5d..633b604e 100644 --- a/bench/src/benches/tx_status_websocket_collector.rs +++ b/bench/src/benches/tx_status_websocket_collector.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; use dashmap::{DashMap, DashSet}; -use log::{info, trace}; +use log::{debug, info, trace}; use serde_json::json; use solana_rpc_client_api::response::{Response, RpcBlockUpdate}; use solana_sdk::clock::Slot; @@ -15,7 +15,6 @@ use url::Url; use websocket_tungstenite_retry::websocket_stable; use websocket_tungstenite_retry::websocket_stable::WsMessage; -// TODO void race condition when this is not started up fast enough to catch transaction // returns map of transaction signatures to the slot they were confirmed pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig) -> (Arc>, AbortHandle) { @@ -44,7 +43,6 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit ).await .expect("Failed to connect to websocket"); - let mut channel = web_socket_slots.subscribe_message_channel(); let observed_transactions: Arc> = Arc::new(DashMap::with_capacity(64)); @@ -54,7 +52,6 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit let started_at = Instant::now(); while let Ok(msg) = channel.recv().await { - // TOOD use this to know when we are subscribed if let WsMessage::Text(payload) = msg { let ws_result: jsonrpsee_types::SubscriptionResponse> = serde_json::from_str(&payload).unwrap(); @@ -63,7 +60,7 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { for tx_sig in tx_sigs_from_block { let tx_sig = Signature::from_str(&tx_sig).unwrap(); - trace!("Transaction signature: {} - slot {}", tx_sig, slot); + debug!("Transaction signature found in block: {} - slot {}", tx_sig, slot); observed_transactions_write.entry(tx_sig).or_insert(slot); } } @@ -71,6 +68,5 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit } }); - (observed_transactions, jh.abort_handle()) } \ No newline at end of file diff --git a/bench/src/benchnew.rs b/bench/src/benchnew.rs index a03d5d01..f25bb743 100644 --- a/bench/src/benchnew.rs +++ b/bench/src/benchnew.rs @@ -39,6 +39,10 @@ enum SubCommand { payer_path: PathBuf, #[clap(short, long)] rpc_url: String, + /// Set websocket source (blockSubscribe method) for transaction status updates. + /// You might want to send tx to one RPC and listen to another (reliable) RPC for status updates. + /// Not all RPC nodes support this method. + /// If not provided, the RPC URL is used to derive the websocket URL. #[clap(short = 'w', long)] tx_status_websocket_addr: Option, #[clap(short, long)] diff --git a/benchrunner-service/README.md b/benchrunner-service/README.md index c43b9786..51ba9938 100644 --- a/benchrunner-service/README.md +++ b/benchrunner-service/README.md @@ -6,25 +6,34 @@ Hardware: recommend 1024MB RAM, 2 vCPUs, small disk ### Environment Variables -| Environment Variable | Purpose | Required? | Default Value | -|----------------------|-------------------------------------------------------|---------------|---------------| -| `PG_ENABLED` | Enable writing to PostgreSQL | No | false | -| `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | | -| `TENANT1_ID` | Technical ID for the tenant | Yes | | -| `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | | -| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | | +| Environment Variable | Purpose | Required? | Default Value | +|----------------------|-------------------------------------------------------|--------------|--------------------------------------------| +| `PG_ENABLED` | Enable writing to PostgreSQL | No | false | +| `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | | +| `TENANT1_ID` | Technical ID for the tenant | Yes | | +| `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | | +| `TENANT1_TX_STATUS_WS_ADDR` | Websocket source for tx status |No | RPC Url
(replacing schema with ws/wss) | +| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | | ### Command-line Arguments ``` Options: - -b, --bench-interval - interval in milliseconds to run the benchmark [default: 60000] - -n, --tx-count - [default: 10] + -p, --payer-path + + -r, --rpc-url + + -w, --tx-status-websocket-addr + Set websocket source (blockSubscribe method) for transaction status updates. You might want to send tx to one RPC and listen to another (reliable) RPC for status updates. Not all RPC nodes support this method. If not provided, the RPC URL is used to derive the websocket URL -s, --size-tx - [default: small] [possible values: small, large] - -p, --prio-fees - [default: 0] + [possible values: small, large] + -m, --max-timeout-ms + Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed [default: 15000] + -t, --txs-per-run + + -n, --num-of-runs + + -f, --cu-price + The CU price in micro lamports [default: 300] ``` ```bash From c01fc3efedd513dc9820fc2c48f371c6d589c2ef Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 19 Jun 2024 11:11:48 +0200 Subject: [PATCH 4/9] cleanup --- Cargo.lock | 1 - bench/Cargo.toml | 1 - bench/src/benches/rpc_interface.rs | 4 +--- bench/src/benches/tx_status_websocket_collector.rs | 8 ++++++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a28248e9..a1e3ff6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -584,7 +584,6 @@ dependencies = [ "rand 0.8.5", "rand_chacha 0.3.1", "reqwest", - "scopeguard", "serde", "serde_json", "solana-lite-rpc-util", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 193cfe79..ebd586a7 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -42,7 +42,6 @@ spl-memo = "4.0.0" url = "*" reqwest = "0.11.26" lazy_static = "1.4.0" -scopeguard = "1.2.0" [dev-dependencies] bincode = { workspace = true } diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs index 9b33b15f..f25f7c86 100644 --- a/bench/src/benches/rpc_interface.rs +++ b/bench/src/benches/rpc_interface.rs @@ -18,7 +18,6 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use dashmap::mapref::multiple::RefMulti; -use scopeguard::defer; use serde::{Deserialize, Serialize}; use serde_json::json; use solana_rpc_client_api::response::{Response, RpcBlockUpdate, RpcResponseContext, SlotUpdate}; @@ -66,8 +65,7 @@ pub async fn send_and_confirm_bulk_transactions( }; // note: we get confirmed but never finaliized - let (tx_status_map, jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await; - defer!(jh_collector.abort()); + let (tx_status_map, _jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await; let started_at = Instant::now(); trace!( diff --git a/bench/src/benches/tx_status_websocket_collector.rs b/bench/src/benches/tx_status_websocket_collector.rs index 633b604e..c047589c 100644 --- a/bench/src/benches/tx_status_websocket_collector.rs +++ b/bench/src/benches/tx_status_websocket_collector.rs @@ -47,7 +47,7 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit let observed_transactions: Arc> = Arc::new(DashMap::with_capacity(64)); - let observed_transactions_write = observed_transactions.clone(); + let observed_transactions_write = Arc::downgrade(&observed_transactions); let jh = tokio::spawn(async move { let started_at = Instant::now(); @@ -57,11 +57,15 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit serde_json::from_str(&payload).unwrap(); let block_update = ws_result.params.result; let slot = block_update.value.slot; + let Some(map) = observed_transactions_write.upgrade() else { + debug!("observed_transactions map dropped - stopping task"); + return; + }; if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { for tx_sig in tx_sigs_from_block { let tx_sig = Signature::from_str(&tx_sig).unwrap(); debug!("Transaction signature found in block: {} - slot {}", tx_sig, slot); - observed_transactions_write.entry(tx_sig).or_insert(slot); + map.entry(tx_sig).or_insert(slot); } } } From 04580abbe7e6aa6a7b76d818fe5224b6cbaa8ac1 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 20 Jun 2024 13:23:54 +0200 Subject: [PATCH 5/9] imrpove error message --- benchrunner-service/src/args.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/benchrunner-service/src/args.rs b/benchrunner-service/src/args.rs index e0f1cc92..9a4cf31d 100644 --- a/benchrunner-service/src/args.rs +++ b/benchrunner-service/src/args.rs @@ -41,7 +41,7 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec .find(|(v, _)| *v == format!("TENANT{}_ID", tc)) .iter() .exactly_one() - .expect("need ID") + .expect("need TENANT_X_ID") .1 .to_string(), rpc_addr: v @@ -49,7 +49,7 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec .find(|(v, _)| *v == format!("TENANT{}_RPC_ADDR", tc)) .iter() .exactly_one() - .expect("need RPC_ADDR") + .expect("need TENANT_X_RPC_ADDR") .1 .to_string(), tx_status_ws_addr: v @@ -57,7 +57,7 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec .find(|(v, _)| *v == format!("TENANT{}_TX_STATUS_WS_ADDR", tc)) .iter() .exactly_one() - .expect("need TX_STATUS_WS_ADDR") + .expect("need TENANT_X_TX_STATUS_WS_ADDR") .1 .to_string(), }) From 819164671b264b2949aad740b02fc5f8142856d5 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 20 Jun 2024 13:31:53 +0200 Subject: [PATCH 6/9] improve ws_addr arg handling --- bench/src/service_adapter_new.rs | 9 ++++++--- benchrunner-service/src/args.rs | 7 +++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/bench/src/service_adapter_new.rs b/bench/src/service_adapter_new.rs index d8583c3b..15547e7f 100644 --- a/bench/src/service_adapter_new.rs +++ b/bench/src/service_adapter_new.rs @@ -12,18 +12,21 @@ use url::Url; pub async fn benchnew_confirmation_rate_servicerunner( bench_config: &BenchConfig, rpc_addr: String, - tx_status_websocket_addr: String, + tx_status_websocket_addr: Option, funded_payer: Keypair, ) -> confirmation_rate::Metric { - let rpc = Arc::new(RpcClient::new(rpc_addr)); + let rpc = Arc::new(RpcClient::new(rpc_addr.clone())); let tx_params = BenchmarkTransactionParams { tx_size: bench_config.tx_size, cu_price_micro_lamports: bench_config.cu_price_micro_lamports, }; let max_timeout = Duration::from_secs(60); + + let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_addr.replace("http:", "ws:").replace("https:", "wss:")); + let result = send_bulk_txs_and_wait( &rpc, - Url::parse(&tx_status_websocket_addr).expect("Invalid URL"), + Url::parse(&ws_addr).expect("Invalid WS URL"), &funded_payer, bench_config.tx_count, &tx_params, diff --git a/benchrunner-service/src/args.rs b/benchrunner-service/src/args.rs index 9a4cf31d..61d4ca6a 100644 --- a/benchrunner-service/src/args.rs +++ b/benchrunner-service/src/args.rs @@ -7,7 +7,7 @@ pub struct TenantConfig { pub tenant_id: String, pub rpc_addr: String, // needs to point to a reliable websocket server that can be used to get tx status - pub tx_status_ws_addr: String, + pub tx_status_ws_addr: Option, } // recommend to use one payer keypair for all targets and fund that keypair with enough SOL @@ -56,10 +56,9 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec .iter() .find(|(v, _)| *v == format!("TENANT{}_TX_STATUS_WS_ADDR", tc)) .iter() - .exactly_one() + .at_most_one() .expect("need TENANT_X_TX_STATUS_WS_ADDR") - .1 - .to_string(), + .map(|(_, v)| v.to_string()) }) .collect::>(); From a2dee849bd9828ba98a6a865dd476f81ef03f3b1 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 20 Jun 2024 14:30:07 +0200 Subject: [PATCH 7/9] rename column average_slot_confirmation_time_ms --- benchrunner-service/src/postgres/metrics_dbstore.rs | 2 +- migrations/create_benchrunner.sql | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/benchrunner-service/src/postgres/metrics_dbstore.rs b/benchrunner-service/src/postgres/metrics_dbstore.rs index 55054ccf..b2784e83 100644 --- a/benchrunner-service/src/postgres/metrics_dbstore.rs +++ b/benchrunner-service/src/postgres/metrics_dbstore.rs @@ -134,7 +134,7 @@ impl BenchMetricsPostgresSaver for BenchRunnerConfirmationRateImpl { txs_confirmed, txs_un_confirmed, average_confirmation_time_ms, - average_slot_confirmation_time_ms, + average_slot_confirmation_time, metric_json ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) diff --git a/migrations/create_benchrunner.sql b/migrations/create_benchrunner.sql index 240b3a5b..3aff4d65 100644 --- a/migrations/create_benchrunner.sql +++ b/migrations/create_benchrunner.sql @@ -47,3 +47,7 @@ GRANT SELECT ON ALL TABLES IN SCHEMA benchrunner TO ro_benchrunner; ALTER DEFAULT PRIVILEGES IN SCHEMA benchrunner GRANT SELECT ON TABLES TO ro_benchrunner; ALTER TABLE benchrunner.bench_metrics RENAME TO bench_metrics_bench1; + +ALTER TABLE benchrunner.bench_metrics_confirmation_rate ADD COLUMN bench_metrics_confirmation_rate real; + +ALTER TABLE benchrunner.bench_metrics_confirmation_rate DROP COLUMN average_slot_confirmation_time_ms; From 95fb7089cc480b1db88f50f5c901f1f80a3eafaa Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 27 Jun 2024 11:55:08 +0200 Subject: [PATCH 8/9] fix table name --- migrations/create_benchrunner.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations/create_benchrunner.sql b/migrations/create_benchrunner.sql index 3aff4d65..f2135070 100644 --- a/migrations/create_benchrunner.sql +++ b/migrations/create_benchrunner.sql @@ -48,6 +48,6 @@ ALTER DEFAULT PRIVILEGES IN SCHEMA benchrunner GRANT SELECT ON TABLES TO ro_benc ALTER TABLE benchrunner.bench_metrics RENAME TO bench_metrics_bench1; -ALTER TABLE benchrunner.bench_metrics_confirmation_rate ADD COLUMN bench_metrics_confirmation_rate real; +ALTER TABLE benchrunner.bench_metrics_confirmation_rate ADD COLUMN average_slot_confirmation_time real; ALTER TABLE benchrunner.bench_metrics_confirmation_rate DROP COLUMN average_slot_confirmation_time_ms; From ff82bd69a250baec9e5cb55e18ae19d05b696491 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 27 Jun 2024 11:58:38 +0200 Subject: [PATCH 9/9] improve comment --- bench/src/benches/rpc_interface.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs index f25f7c86..84f9b996 100644 --- a/bench/src/benches/rpc_interface.rs +++ b/bench/src/benches/rpc_interface.rs @@ -162,7 +162,7 @@ pub async fn send_and_confirm_bulk_transactions( // note that we will see tx_sigs we did not send let (tx_sig, confirmed_slot) = multi.pair(); - // status is confirmed or finalized + // status is confirmed if pending_status_set.remove(&tx_sig) { trace!("take status for sig {:?} and confirmed_slot: {:?} from websocket source", tx_sig, confirmed_slot); let prev_value = result_status_map.insert( @@ -170,7 +170,7 @@ pub async fn send_and_confirm_bulk_transactions( ConfirmationResponseFromRpc::Success( send_slot, *confirmed_slot, - // note: this is not optimal + // note: this is not optimal as we do not cover finalized here TransactionConfirmationStatus::Confirmed, elapsed, ),