From c9147d850b5da7e275cf5ea4a25f448243fea454 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 19 Jun 2024 10:25:10 +0200 Subject: [PATCH] propagate ws for status --- bench/src/benches/confirmation_slot.rs | 4 +- bench/src/benches/rpc_interface.rs | 29 +-------------- .../benches/tx_status_websocket_collector.rs | 8 +--- bench/src/benchnew.rs | 4 ++ benchrunner-service/README.md | 37 ++++++++++++------- 5 files changed, 33 insertions(+), 49 deletions(-) diff --git a/bench/src/benches/confirmation_slot.rs b/bench/src/benches/confirmation_slot.rs index 3382d931..31c8e444 100644 --- a/bench/src/benches/confirmation_slot.rs +++ b/bench/src/benches/confirmation_slot.rs @@ -68,7 +68,7 @@ pub async fn confirmation_slot( let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:")); let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL"); let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL"); - + let rpc_a_url = Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?; let rpc_b_url = @@ -86,7 +86,7 @@ pub async fn confirmation_slot( for _ in 0..num_of_runs { let rpc_a = create_rpc_client(&rpc_a_url); let rpc_b = create_rpc_client(&rpc_b_url); - + let ws_addr_a = ws_addr_a.clone(); let ws_addr_b = ws_addr_b.clone(); diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs index 3bed54d1..9b33b15f 100644 --- a/bench/src/benches/rpc_interface.rs +++ b/bench/src/benches/rpc_interface.rs @@ -26,6 +26,7 @@ use solana_sdk::pubkey::Pubkey; use tokio::time::{Instant, timeout}; use url::Url; use websocket_tungstenite_retry::websocket_stable::WsMessage; +use solana_lite_rpc_util::obfuscate_rpcurl; use crate::benches::tx_status_websocket_collector::start_tx_status_collector; pub fn create_rpc_client(rpc_url: &Url) -> RpcClient { @@ -146,7 +147,7 @@ pub async fn send_and_confirm_bulk_transactions( // items get moved from pending_status_set to result_status_map - debug!("Wating for transaction confirmations .."); + debug!("Waiting for transaction confirmations from websocket source <{}> ..", obfuscate_rpcurl(tx_status_websocket_addr.as_str())); let started_at = Instant::now(); let timeout_at = started_at + max_timeout; // "poll" the status dashmap @@ -157,31 +158,6 @@ pub async fn send_and_confirm_bulk_transactions( num_sent_ok, "Items must move between pending+result" ); - // let tx_batch = pending_status_set.iter().cloned().collect_vec(); - // debug!( - // "Request status for batch of remaining {} transactions in iteration {}", - // tx_batch.len(), - // iteration - // ); - - // let status_started_at = Instant::now(); - // let mut batch_status = Vec::new(); - // // "Too many inputs provided; max 256" - // for chunk in tx_batch.chunks(256) { - // // fail hard if not possible to poll status - // let chunk_responses = rpc_client - // .get_signature_statuses(chunk) - // .await - // .expect("get signature statuses"); - // batch_status.extend(chunk_responses.value); - // } - // if status_started_at.elapsed() > Duration::from_millis(500) { - // warn!( - // "SLOW get_signature_statuses for {} transactions took {:?}", - // tx_batch.len(), - // status_started_at.elapsed() - // ); - // } let elapsed = started_at.elapsed(); for multi in tx_status_map.iter() { @@ -223,7 +199,6 @@ pub async fn send_and_confirm_bulk_transactions( break 'polling_loop; } - // avg 2 samples per slot tokio::time::sleep_until(iteration_ends_at).await; } // -- END polling loop diff --git a/bench/src/benches/tx_status_websocket_collector.rs b/bench/src/benches/tx_status_websocket_collector.rs index 6866fb5d..633b604e 100644 --- a/bench/src/benches/tx_status_websocket_collector.rs +++ b/bench/src/benches/tx_status_websocket_collector.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; use dashmap::{DashMap, DashSet}; -use log::{info, trace}; +use log::{debug, info, trace}; use serde_json::json; use solana_rpc_client_api::response::{Response, RpcBlockUpdate}; use solana_sdk::clock::Slot; @@ -15,7 +15,6 @@ use url::Url; use websocket_tungstenite_retry::websocket_stable; use websocket_tungstenite_retry::websocket_stable::WsMessage; -// TODO void race condition when this is not started up fast enough to catch transaction // returns map of transaction signatures to the slot they were confirmed pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig) -> (Arc>, AbortHandle) { @@ -44,7 +43,6 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit ).await .expect("Failed to connect to websocket"); - let mut channel = web_socket_slots.subscribe_message_channel(); let observed_transactions: Arc> = Arc::new(DashMap::with_capacity(64)); @@ -54,7 +52,6 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit let started_at = Instant::now(); while let Ok(msg) = channel.recv().await { - // TOOD use this to know when we are subscribed if let WsMessage::Text(payload) = msg { let ws_result: jsonrpsee_types::SubscriptionResponse> = serde_json::from_str(&payload).unwrap(); @@ -63,7 +60,7 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { for tx_sig in tx_sigs_from_block { let tx_sig = Signature::from_str(&tx_sig).unwrap(); - trace!("Transaction signature: {} - slot {}", tx_sig, slot); + debug!("Transaction signature found in block: {} - slot {}", tx_sig, slot); observed_transactions_write.entry(tx_sig).or_insert(slot); } } @@ -71,6 +68,5 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit } }); - (observed_transactions, jh.abort_handle()) } \ No newline at end of file diff --git a/bench/src/benchnew.rs b/bench/src/benchnew.rs index a03d5d01..f25bb743 100644 --- a/bench/src/benchnew.rs +++ b/bench/src/benchnew.rs @@ -39,6 +39,10 @@ enum SubCommand { payer_path: PathBuf, #[clap(short, long)] rpc_url: String, + /// Set websocket source (blockSubscribe method) for transaction status updates. + /// You might want to send tx to one RPC and listen to another (reliable) RPC for status updates. + /// Not all RPC nodes support this method. + /// If not provided, the RPC URL is used to derive the websocket URL. #[clap(short = 'w', long)] tx_status_websocket_addr: Option, #[clap(short, long)] diff --git a/benchrunner-service/README.md b/benchrunner-service/README.md index c43b9786..51ba9938 100644 --- a/benchrunner-service/README.md +++ b/benchrunner-service/README.md @@ -6,25 +6,34 @@ Hardware: recommend 1024MB RAM, 2 vCPUs, small disk ### Environment Variables -| Environment Variable | Purpose | Required? | Default Value | -|----------------------|-------------------------------------------------------|---------------|---------------| -| `PG_ENABLED` | Enable writing to PostgreSQL | No | false | -| `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | | -| `TENANT1_ID` | Technical ID for the tenant | Yes | | -| `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | | -| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | | +| Environment Variable | Purpose | Required? | Default Value | +|----------------------|-------------------------------------------------------|--------------|--------------------------------------------| +| `PG_ENABLED` | Enable writing to PostgreSQL | No | false | +| `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | | +| `TENANT1_ID` | Technical ID for the tenant | Yes | | +| `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | | +| `TENANT1_TX_STATUS_WS_ADDR` | Websocket source for tx status |No | RPC Url
(replacing schema with ws/wss) | +| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | | ### Command-line Arguments ``` Options: - -b, --bench-interval - interval in milliseconds to run the benchmark [default: 60000] - -n, --tx-count - [default: 10] + -p, --payer-path + + -r, --rpc-url + + -w, --tx-status-websocket-addr + Set websocket source (blockSubscribe method) for transaction status updates. You might want to send tx to one RPC and listen to another (reliable) RPC for status updates. Not all RPC nodes support this method. If not provided, the RPC URL is used to derive the websocket URL -s, --size-tx - [default: small] [possible values: small, large] - -p, --prio-fees - [default: 0] + [possible values: small, large] + -m, --max-timeout-ms + Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed [default: 15000] + -t, --txs-per-run + + -n, --num-of-runs + + -f, --cu-price + The CU price in micro lamports [default: 300] ``` ```bash