From 409db5776075b2ce5d4ecd9684264234fd1157f6 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 8 Jul 2024 15:21:18 +0200 Subject: [PATCH] fix clippy --- bench/src/benches/confirmation_rate.rs | 32 +++++++++---- bench/src/benches/confirmation_slot.rs | 25 +++++++---- bench/src/benches/rpc_interface.rs | 45 +++++++++++-------- .../benches/tx_status_websocket_collector.rs | 36 ++++++++------- bench/src/service_adapter_new.rs | 3 +- benchrunner-service/src/args.rs | 2 +- benchrunner-service/src/main.rs | 1 - 7 files changed, 90 insertions(+), 54 deletions(-) diff --git a/bench/src/benches/confirmation_rate.rs b/bench/src/benches/confirmation_rate.rs index b739caa4..b0db911a 100644 --- a/bench/src/benches/confirmation_rate.rs +++ b/bench/src/benches/confirmation_rate.rs @@ -9,10 +9,10 @@ use std::time::Duration; use crate::benches::rpc_interface::{ send_and_confirm_bulk_transactions, ConfirmationResponseFromRpc, }; +use solana_lite_rpc_util::obfuscate_rpcurl; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer}; use url::Url; -use solana_lite_rpc_util::obfuscate_rpcurl; #[derive(Clone, Copy, Debug, Default, serde::Serialize)] pub struct Metric { @@ -42,8 +42,9 @@ pub async fn confirmation_rate( let rpc = Arc::new(RpcClient::new(rpc_url.clone())); info!("RPC: {}", obfuscate_rpcurl(&rpc.as_ref().url())); - - let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_url.replace("http:", "ws:").replace("https:", "wss:")); + + let ws_addr = tx_status_websocket_addr + .unwrap_or_else(|| rpc_url.replace("http:", "ws:").replace("https:", "wss:")); info!("WS ADDR: {}", obfuscate_rpcurl(&ws_addr)); let payer: Arc = Arc::new(read_keypair_file(payer_path).unwrap()); @@ -52,9 +53,16 @@ pub async fn confirmation_rate( let mut rpc_results = Vec::with_capacity(num_of_runs); for _ in 0..num_of_runs { - match send_bulk_txs_and_wait(&rpc, Url::parse(&ws_addr).expect("Invalid Url"), &payer, txs_per_run, &tx_params, max_timeout) - .await - .context("send bulk tx and wait") + match send_bulk_txs_and_wait( + &rpc, + Url::parse(&ws_addr).expect("Invalid Url"), + &payer, + txs_per_run, + &tx_params, + max_timeout, + ) + .await + .context("send bulk tx and wait") { Ok(stat) => { rpc_results.push(stat); @@ -94,9 +102,15 @@ pub async fn send_bulk_txs_and_wait( trace!("Sending {} transactions in bulk ..", txs.len()); let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> = - send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer.pubkey(), &txs, max_timeout) - .await - .context("send and confirm bulk tx")?; + send_and_confirm_bulk_transactions( + rpc, + tx_status_websocket_addr, + payer.pubkey(), + &txs, + max_timeout, + ) + .await + .context("send and confirm bulk tx")?; trace!("Done sending {} transaction.", txs.len()); let mut tx_sent = 0; diff --git a/bench/src/benches/confirmation_slot.rs b/bench/src/benches/confirmation_slot.rs index 31c8e444..a054052f 100644 --- a/bench/src/benches/confirmation_slot.rs +++ b/bench/src/benches/confirmation_slot.rs @@ -1,5 +1,4 @@ use std::path::Path; -use std::str::FromStr; use std::time::Duration; use crate::benches::rpc_interface::{ @@ -11,13 +10,12 @@ use anyhow::anyhow; use log::{debug, info, warn}; use solana_lite_rpc_util::obfuscate_rpcurl; use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{read_keypair_file, Signature, Signer}; use solana_sdk::transaction::VersionedTransaction; use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}; -use solana_sdk::pubkey::Pubkey; use tokio::time::{sleep, Instant}; use url::Url; -use crate::benches::tx_status_websocket_collector::start_tx_status_collector; #[derive(Clone, Copy, Debug, Default)] pub struct Metric { @@ -64,8 +62,10 @@ pub async fn confirmation_slot( info!("RPC A: {}", obfuscate_rpcurl(&rpc_a_url)); info!("RPC B: {}", obfuscate_rpcurl(&rpc_b_url)); - let ws_addr_a = tx_status_websocket_addr_a.unwrap_or_else(|| rpc_a_url.replace("http:", "ws:").replace("https:", "wss:")); - let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:")); + let ws_addr_a = tx_status_websocket_addr_a + .unwrap_or_else(|| rpc_a_url.replace("http:", "ws:").replace("https:", "wss:")); + let ws_addr_b = tx_status_websocket_addr_b + .unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:")); let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL"); let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL"); @@ -113,13 +113,15 @@ pub async fn confirmation_slot( let a_task = tokio::spawn(async move { sleep(Duration::from_secs_f64(a_delay)).await; debug!("(A) sending tx {}", rpc_a_tx.signatures[0]); - send_and_confirm_transaction(&rpc_a, ws_addr_a, payer_pubkey, rpc_a_tx, max_timeout).await + send_and_confirm_transaction(&rpc_a, ws_addr_a, payer_pubkey, rpc_a_tx, max_timeout) + .await }); let b_task = tokio::spawn(async move { sleep(Duration::from_secs_f64(b_delay)).await; debug!("(B) sending tx {}", rpc_b_tx.signatures[0]); - send_and_confirm_transaction(&rpc_b, ws_addr_b, payer_pubkey, rpc_b_tx, max_timeout).await + send_and_confirm_transaction(&rpc_b, ws_addr_b, payer_pubkey, rpc_b_tx, max_timeout) + .await }); let (a, b) = tokio::join!(a_task, b_task); @@ -180,7 +182,14 @@ async fn send_and_confirm_transaction( max_timeout: Duration, ) -> anyhow::Result { let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> = - send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer_pubkey, &[tx], max_timeout).await?; + send_and_confirm_bulk_transactions( + rpc, + tx_status_websocket_addr, + payer_pubkey, + &[tx], + max_timeout, + ) + .await?; assert_eq!(result_vec.len(), 1, "expected 1 result"); let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap(); diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs index 84f9b996..0a31b049 100644 --- a/bench/src/benches/rpc_interface.rs +++ b/bench/src/benches/rpc_interface.rs @@ -1,32 +1,29 @@ +use crate::benches::tx_status_websocket_collector::start_tx_status_collector; use anyhow::{bail, Context, Error}; + use futures::future::join_all; use futures::TryFutureExt; use itertools::Itertools; -use log::{debug, info, trace, warn}; +use log::{debug, trace, warn}; + +use solana_lite_rpc_util::obfuscate_rpcurl; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::rpc_client::SerializableTransaction; use solana_rpc_client_api::client_error::ErrorKind; use solana_rpc_client_api::config::RpcSendTransactionConfig; + use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; use solana_sdk::transaction::VersionedTransaction; use solana_transaction_status::TransactionConfirmationStatus; use std::collections::{HashMap, HashSet}; -use std::iter::zip; -use std::str::FromStr; + use std::sync::Arc; use std::time::Duration; -use dashmap::mapref::multiple::RefMulti; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use solana_rpc_client_api::response::{Response, RpcBlockUpdate, RpcResponseContext, SlotUpdate}; -use solana_sdk::pubkey::Pubkey; -use tokio::time::{Instant, timeout}; +use tokio::time::Instant; use url::Url; -use websocket_tungstenite_retry::websocket_stable::WsMessage; -use solana_lite_rpc_util::obfuscate_rpcurl; -use crate::benches::tx_status_websocket_collector::start_tx_status_collector; pub fn create_rpc_client(rpc_url: &Url) -> RpcClient { RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed()) @@ -65,7 +62,12 @@ pub async fn send_and_confirm_bulk_transactions( }; // note: we get confirmed but never finaliized - let (tx_status_map, _jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await; + let (tx_status_map, _jh_collector) = start_tx_status_collector( + tx_status_websocket_addr.clone(), + payer_pubkey, + CommitmentConfig::confirmed(), + ) + .await; let started_at = Instant::now(); trace!( @@ -145,7 +147,10 @@ pub async fn send_and_confirm_bulk_transactions( // items get moved from pending_status_set to result_status_map - debug!("Waiting for transaction confirmations from websocket source <{}> ..", obfuscate_rpcurl(tx_status_websocket_addr.as_str())); + debug!( + "Waiting for transaction confirmations from websocket source <{}> ..", + obfuscate_rpcurl(tx_status_websocket_addr.as_str()) + ); let started_at = Instant::now(); let timeout_at = started_at + max_timeout; // "poll" the status dashmap @@ -163,10 +168,14 @@ pub async fn send_and_confirm_bulk_transactions( let (tx_sig, confirmed_slot) = multi.pair(); // status is confirmed - if pending_status_set.remove(&tx_sig) { - trace!("take status for sig {:?} and confirmed_slot: {:?} from websocket source", tx_sig, confirmed_slot); + if pending_status_set.remove(tx_sig) { + trace!( + "take status for sig {:?} and confirmed_slot: {:?} from websocket source", + tx_sig, + confirmed_slot + ); let prev_value = result_status_map.insert( - tx_sig.clone(), + *tx_sig, ConfirmationResponseFromRpc::Success( send_slot, *confirmed_slot, @@ -177,10 +186,8 @@ pub async fn send_and_confirm_bulk_transactions( ); assert!(prev_value.is_none(), "Must not override existing value"); } - } // -- END for tx_status_map loop - if pending_status_set.is_empty() { debug!( "All transactions confirmed after {:?}", diff --git a/bench/src/benches/tx_status_websocket_collector.rs b/bench/src/benches/tx_status_websocket_collector.rs index c047589c..fc4c1292 100644 --- a/bench/src/benches/tx_status_websocket_collector.rs +++ b/bench/src/benches/tx_status_websocket_collector.rs @@ -1,23 +1,25 @@ -use std::str::FromStr; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use dashmap::{DashMap, DashSet}; -use log::{debug, info, trace}; +use dashmap::DashMap; +use log::debug; use serde_json::json; use solana_rpc_client_api::response::{Response, RpcBlockUpdate}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; use tokio::task::AbortHandle; -use tokio::time::sleep; use url::Url; use websocket_tungstenite_retry::websocket_stable; use websocket_tungstenite_retry::websocket_stable::WsMessage; // returns map of transaction signatures to the slot they were confirmed -pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig) - -> (Arc>, AbortHandle) { +pub async fn start_tx_status_collector( + ws_url: Url, + payer_pubkey: Pubkey, + commitment_config: CommitmentConfig, +) -> (Arc>, AbortHandle) { // e.g. "commitment" let commitment_str = format!("{:?}", commitment_config); @@ -40,17 +42,16 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit ] }), Duration::from_secs(10), - ).await + ) + .await .expect("Failed to connect to websocket"); let mut channel = web_socket_slots.subscribe_message_channel(); let observed_transactions: Arc> = Arc::new(DashMap::with_capacity(64)); - let observed_transactions_write = Arc::downgrade(&observed_transactions); + let observed_transactions_write = Arc::downgrade(&observed_transactions); let jh = tokio::spawn(async move { - let started_at = Instant::now(); - while let Ok(msg) = channel.recv().await { if let WsMessage::Text(payload) = msg { let ws_result: jsonrpsee_types::SubscriptionResponse> = @@ -61,10 +62,15 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit debug!("observed_transactions map dropped - stopping task"); return; }; - if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { + if let Some(tx_sigs_from_block) = + block_update.value.block.and_then(|b| b.signatures) + { for tx_sig in tx_sigs_from_block { let tx_sig = Signature::from_str(&tx_sig).unwrap(); - debug!("Transaction signature found in block: {} - slot {}", tx_sig, slot); + debug!( + "Transaction signature found in block: {} - slot {}", + tx_sig, slot + ); map.entry(tx_sig).or_insert(slot); } } @@ -73,4 +79,4 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit }); (observed_transactions, jh.abort_handle()) -} \ No newline at end of file +} diff --git a/bench/src/service_adapter_new.rs b/bench/src/service_adapter_new.rs index 15547e7f..92c58cd0 100644 --- a/bench/src/service_adapter_new.rs +++ b/bench/src/service_adapter_new.rs @@ -22,7 +22,8 @@ pub async fn benchnew_confirmation_rate_servicerunner( }; let max_timeout = Duration::from_secs(60); - let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_addr.replace("http:", "ws:").replace("https:", "wss:")); + let ws_addr = tx_status_websocket_addr + .unwrap_or_else(|| rpc_addr.replace("http:", "ws:").replace("https:", "wss:")); let result = send_bulk_txs_and_wait( &rpc, diff --git a/benchrunner-service/src/args.rs b/benchrunner-service/src/args.rs index 61d4ca6a..1cc7120b 100644 --- a/benchrunner-service/src/args.rs +++ b/benchrunner-service/src/args.rs @@ -58,7 +58,7 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec .iter() .at_most_one() .expect("need TENANT_X_TX_STATUS_WS_ADDR") - .map(|(_, v)| v.to_string()) + .map(|(_, v)| v.to_string()), }) .collect::>(); diff --git a/benchrunner-service/src/main.rs b/benchrunner-service/src/main.rs index edbbcb8d..d0f117c3 100644 --- a/benchrunner-service/src/main.rs +++ b/benchrunner-service/src/main.rs @@ -23,7 +23,6 @@ use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::sync::OnceCell; -use solana_lite_rpc_util::obfuscate_rpcurl; #[tokio::main] async fn main() {