fix clippy

This commit is contained in:
GroovieGermanikus 2024-07-08 15:21:18 +02:00
parent 5cc0173666
commit 409db57760
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
7 changed files with 90 additions and 54 deletions

View File

@ -9,10 +9,10 @@ use std::time::Duration;
use crate::benches::rpc_interface::{ use crate::benches::rpc_interface::{
send_and_confirm_bulk_transactions, ConfirmationResponseFromRpc, send_and_confirm_bulk_transactions, ConfirmationResponseFromRpc,
}; };
use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer}; use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer};
use url::Url; use url::Url;
use solana_lite_rpc_util::obfuscate_rpcurl;
#[derive(Clone, Copy, Debug, Default, serde::Serialize)] #[derive(Clone, Copy, Debug, Default, serde::Serialize)]
pub struct Metric { pub struct Metric {
@ -42,8 +42,9 @@ pub async fn confirmation_rate(
let rpc = Arc::new(RpcClient::new(rpc_url.clone())); let rpc = Arc::new(RpcClient::new(rpc_url.clone()));
info!("RPC: {}", obfuscate_rpcurl(&rpc.as_ref().url())); info!("RPC: {}", obfuscate_rpcurl(&rpc.as_ref().url()));
let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_url.replace("http:", "ws:").replace("https:", "wss:")); let ws_addr = tx_status_websocket_addr
.unwrap_or_else(|| rpc_url.replace("http:", "ws:").replace("https:", "wss:"));
info!("WS ADDR: {}", obfuscate_rpcurl(&ws_addr)); info!("WS ADDR: {}", obfuscate_rpcurl(&ws_addr));
let payer: Arc<Keypair> = Arc::new(read_keypair_file(payer_path).unwrap()); let payer: Arc<Keypair> = Arc::new(read_keypair_file(payer_path).unwrap());
@ -52,9 +53,16 @@ pub async fn confirmation_rate(
let mut rpc_results = Vec::with_capacity(num_of_runs); let mut rpc_results = Vec::with_capacity(num_of_runs);
for _ in 0..num_of_runs { for _ in 0..num_of_runs {
match send_bulk_txs_and_wait(&rpc, Url::parse(&ws_addr).expect("Invalid Url"), &payer, txs_per_run, &tx_params, max_timeout) match send_bulk_txs_and_wait(
.await &rpc,
.context("send bulk tx and wait") Url::parse(&ws_addr).expect("Invalid Url"),
&payer,
txs_per_run,
&tx_params,
max_timeout,
)
.await
.context("send bulk tx and wait")
{ {
Ok(stat) => { Ok(stat) => {
rpc_results.push(stat); rpc_results.push(stat);
@ -94,9 +102,15 @@ pub async fn send_bulk_txs_and_wait(
trace!("Sending {} transactions in bulk ..", txs.len()); trace!("Sending {} transactions in bulk ..", txs.len());
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> = let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer.pubkey(), &txs, max_timeout) send_and_confirm_bulk_transactions(
.await rpc,
.context("send and confirm bulk tx")?; tx_status_websocket_addr,
payer.pubkey(),
&txs,
max_timeout,
)
.await
.context("send and confirm bulk tx")?;
trace!("Done sending {} transaction.", txs.len()); trace!("Done sending {} transaction.", txs.len());
let mut tx_sent = 0; let mut tx_sent = 0;

View File

@ -1,5 +1,4 @@
use std::path::Path; use std::path::Path;
use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use crate::benches::rpc_interface::{ use crate::benches::rpc_interface::{
@ -11,13 +10,12 @@ use anyhow::anyhow;
use log::{debug, info, warn}; use log::{debug, info, warn};
use solana_lite_rpc_util::obfuscate_rpcurl; use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{read_keypair_file, Signature, Signer}; use solana_sdk::signature::{read_keypair_file, Signature, Signer};
use solana_sdk::transaction::VersionedTransaction; use solana_sdk::transaction::VersionedTransaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}; use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use solana_sdk::pubkey::Pubkey;
use tokio::time::{sleep, Instant}; use tokio::time::{sleep, Instant};
use url::Url; use url::Url;
use crate::benches::tx_status_websocket_collector::start_tx_status_collector;
#[derive(Clone, Copy, Debug, Default)] #[derive(Clone, Copy, Debug, Default)]
pub struct Metric { pub struct Metric {
@ -64,8 +62,10 @@ pub async fn confirmation_slot(
info!("RPC A: {}", obfuscate_rpcurl(&rpc_a_url)); info!("RPC A: {}", obfuscate_rpcurl(&rpc_a_url));
info!("RPC B: {}", obfuscate_rpcurl(&rpc_b_url)); info!("RPC B: {}", obfuscate_rpcurl(&rpc_b_url));
let ws_addr_a = tx_status_websocket_addr_a.unwrap_or_else(|| rpc_a_url.replace("http:", "ws:").replace("https:", "wss:")); let ws_addr_a = tx_status_websocket_addr_a
let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:")); .unwrap_or_else(|| rpc_a_url.replace("http:", "ws:").replace("https:", "wss:"));
let ws_addr_b = tx_status_websocket_addr_b
.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:"));
let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL"); let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL");
let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL"); let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL");
@ -113,13 +113,15 @@ pub async fn confirmation_slot(
let a_task = tokio::spawn(async move { let a_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(a_delay)).await; sleep(Duration::from_secs_f64(a_delay)).await;
debug!("(A) sending tx {}", rpc_a_tx.signatures[0]); debug!("(A) sending tx {}", rpc_a_tx.signatures[0]);
send_and_confirm_transaction(&rpc_a, ws_addr_a, payer_pubkey, rpc_a_tx, max_timeout).await send_and_confirm_transaction(&rpc_a, ws_addr_a, payer_pubkey, rpc_a_tx, max_timeout)
.await
}); });
let b_task = tokio::spawn(async move { let b_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(b_delay)).await; sleep(Duration::from_secs_f64(b_delay)).await;
debug!("(B) sending tx {}", rpc_b_tx.signatures[0]); debug!("(B) sending tx {}", rpc_b_tx.signatures[0]);
send_and_confirm_transaction(&rpc_b, ws_addr_b, payer_pubkey, rpc_b_tx, max_timeout).await send_and_confirm_transaction(&rpc_b, ws_addr_b, payer_pubkey, rpc_b_tx, max_timeout)
.await
}); });
let (a, b) = tokio::join!(a_task, b_task); let (a, b) = tokio::join!(a_task, b_task);
@ -180,7 +182,14 @@ async fn send_and_confirm_transaction(
max_timeout: Duration, max_timeout: Duration,
) -> anyhow::Result<ConfirmationResponseFromRpc> { ) -> anyhow::Result<ConfirmationResponseFromRpc> {
let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> = let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, tx_status_websocket_addr, payer_pubkey, &[tx], max_timeout).await?; send_and_confirm_bulk_transactions(
rpc,
tx_status_websocket_addr,
payer_pubkey,
&[tx],
max_timeout,
)
.await?;
assert_eq!(result_vec.len(), 1, "expected 1 result"); assert_eq!(result_vec.len(), 1, "expected 1 result");
let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap(); let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap();

View File

@ -1,32 +1,29 @@
use crate::benches::tx_status_websocket_collector::start_tx_status_collector;
use anyhow::{bail, Context, Error}; use anyhow::{bail, Context, Error};
use futures::future::join_all; use futures::future::join_all;
use futures::TryFutureExt; use futures::TryFutureExt;
use itertools::Itertools; use itertools::Itertools;
use log::{debug, info, trace, warn}; use log::{debug, trace, warn};
use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::SerializableTransaction; use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_rpc_client_api::client_error::ErrorKind; use solana_rpc_client_api::client_error::ErrorKind;
use solana_rpc_client_api::config::RpcSendTransactionConfig; use solana_rpc_client_api::config::RpcSendTransactionConfig;
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
use solana_sdk::transaction::VersionedTransaction; use solana_sdk::transaction::VersionedTransaction;
use solana_transaction_status::TransactionConfirmationStatus; use solana_transaction_status::TransactionConfirmationStatus;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::iter::zip;
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use dashmap::mapref::multiple::RefMulti; use tokio::time::Instant;
use serde::{Deserialize, Serialize};
use serde_json::json;
use solana_rpc_client_api::response::{Response, RpcBlockUpdate, RpcResponseContext, SlotUpdate};
use solana_sdk::pubkey::Pubkey;
use tokio::time::{Instant, timeout};
use url::Url; use url::Url;
use websocket_tungstenite_retry::websocket_stable::WsMessage;
use solana_lite_rpc_util::obfuscate_rpcurl;
use crate::benches::tx_status_websocket_collector::start_tx_status_collector;
pub fn create_rpc_client(rpc_url: &Url) -> RpcClient { pub fn create_rpc_client(rpc_url: &Url) -> RpcClient {
RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed()) RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed())
@ -65,7 +62,12 @@ pub async fn send_and_confirm_bulk_transactions(
}; };
// note: we get confirmed but never finaliized // note: we get confirmed but never finaliized
let (tx_status_map, _jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await; let (tx_status_map, _jh_collector) = start_tx_status_collector(
tx_status_websocket_addr.clone(),
payer_pubkey,
CommitmentConfig::confirmed(),
)
.await;
let started_at = Instant::now(); let started_at = Instant::now();
trace!( trace!(
@ -145,7 +147,10 @@ pub async fn send_and_confirm_bulk_transactions(
// items get moved from pending_status_set to result_status_map // items get moved from pending_status_set to result_status_map
debug!("Waiting for transaction confirmations from websocket source <{}> ..", obfuscate_rpcurl(tx_status_websocket_addr.as_str())); debug!(
"Waiting for transaction confirmations from websocket source <{}> ..",
obfuscate_rpcurl(tx_status_websocket_addr.as_str())
);
let started_at = Instant::now(); let started_at = Instant::now();
let timeout_at = started_at + max_timeout; let timeout_at = started_at + max_timeout;
// "poll" the status dashmap // "poll" the status dashmap
@ -163,10 +168,14 @@ pub async fn send_and_confirm_bulk_transactions(
let (tx_sig, confirmed_slot) = multi.pair(); let (tx_sig, confirmed_slot) = multi.pair();
// status is confirmed // status is confirmed
if pending_status_set.remove(&tx_sig) { if pending_status_set.remove(tx_sig) {
trace!("take status for sig {:?} and confirmed_slot: {:?} from websocket source", tx_sig, confirmed_slot); trace!(
"take status for sig {:?} and confirmed_slot: {:?} from websocket source",
tx_sig,
confirmed_slot
);
let prev_value = result_status_map.insert( let prev_value = result_status_map.insert(
tx_sig.clone(), *tx_sig,
ConfirmationResponseFromRpc::Success( ConfirmationResponseFromRpc::Success(
send_slot, send_slot,
*confirmed_slot, *confirmed_slot,
@ -177,10 +186,8 @@ pub async fn send_and_confirm_bulk_transactions(
); );
assert!(prev_value.is_none(), "Must not override existing value"); assert!(prev_value.is_none(), "Must not override existing value");
} }
} // -- END for tx_status_map loop } // -- END for tx_status_map loop
if pending_status_set.is_empty() { if pending_status_set.is_empty() {
debug!( debug!(
"All transactions confirmed after {:?}", "All transactions confirmed after {:?}",

View File

@ -1,23 +1,25 @@
use std::str::FromStr; use dashmap::DashMap;
use std::sync::Arc; use log::debug;
use std::time::{Duration, Instant};
use dashmap::{DashMap, DashSet};
use log::{debug, info, trace};
use serde_json::json; use serde_json::json;
use solana_rpc_client_api::response::{Response, RpcBlockUpdate}; use solana_rpc_client_api::response::{Response, RpcBlockUpdate};
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::AbortHandle; use tokio::task::AbortHandle;
use tokio::time::sleep;
use url::Url; use url::Url;
use websocket_tungstenite_retry::websocket_stable; use websocket_tungstenite_retry::websocket_stable;
use websocket_tungstenite_retry::websocket_stable::WsMessage; use websocket_tungstenite_retry::websocket_stable::WsMessage;
// returns map of transaction signatures to the slot they were confirmed // returns map of transaction signatures to the slot they were confirmed
pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig) pub async fn start_tx_status_collector(
-> (Arc<DashMap<Signature, Slot>>, AbortHandle) { ws_url: Url,
payer_pubkey: Pubkey,
commitment_config: CommitmentConfig,
) -> (Arc<DashMap<Signature, Slot>>, AbortHandle) {
// e.g. "commitment" // e.g. "commitment"
let commitment_str = format!("{:?}", commitment_config); let commitment_str = format!("{:?}", commitment_config);
@ -40,17 +42,16 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit
] ]
}), }),
Duration::from_secs(10), Duration::from_secs(10),
).await )
.await
.expect("Failed to connect to websocket"); .expect("Failed to connect to websocket");
let mut channel = web_socket_slots.subscribe_message_channel(); let mut channel = web_socket_slots.subscribe_message_channel();
let observed_transactions: Arc<DashMap<Signature, Slot>> = Arc::new(DashMap::with_capacity(64)); let observed_transactions: Arc<DashMap<Signature, Slot>> = Arc::new(DashMap::with_capacity(64));
let observed_transactions_write = Arc::downgrade(&observed_transactions); let observed_transactions_write = Arc::downgrade(&observed_transactions);
let jh = tokio::spawn(async move { let jh = tokio::spawn(async move {
let started_at = Instant::now();
while let Ok(msg) = channel.recv().await { while let Ok(msg) = channel.recv().await {
if let WsMessage::Text(payload) = msg { if let WsMessage::Text(payload) = msg {
let ws_result: jsonrpsee_types::SubscriptionResponse<Response<RpcBlockUpdate>> = let ws_result: jsonrpsee_types::SubscriptionResponse<Response<RpcBlockUpdate>> =
@ -61,10 +62,15 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit
debug!("observed_transactions map dropped - stopping task"); debug!("observed_transactions map dropped - stopping task");
return; return;
}; };
if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { if let Some(tx_sigs_from_block) =
block_update.value.block.and_then(|b| b.signatures)
{
for tx_sig in tx_sigs_from_block { for tx_sig in tx_sigs_from_block {
let tx_sig = Signature::from_str(&tx_sig).unwrap(); let tx_sig = Signature::from_str(&tx_sig).unwrap();
debug!("Transaction signature found in block: {} - slot {}", tx_sig, slot); debug!(
"Transaction signature found in block: {} - slot {}",
tx_sig, slot
);
map.entry(tx_sig).or_insert(slot); map.entry(tx_sig).or_insert(slot);
} }
} }
@ -73,4 +79,4 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit
}); });
(observed_transactions, jh.abort_handle()) (observed_transactions, jh.abort_handle())
} }

View File

@ -22,7 +22,8 @@ pub async fn benchnew_confirmation_rate_servicerunner(
}; };
let max_timeout = Duration::from_secs(60); let max_timeout = Duration::from_secs(60);
let ws_addr = tx_status_websocket_addr.unwrap_or_else(|| rpc_addr.replace("http:", "ws:").replace("https:", "wss:")); let ws_addr = tx_status_websocket_addr
.unwrap_or_else(|| rpc_addr.replace("http:", "ws:").replace("https:", "wss:"));
let result = send_bulk_txs_and_wait( let result = send_bulk_txs_and_wait(
&rpc, &rpc,

View File

@ -58,7 +58,7 @@ pub fn read_tenant_configs(env_vars: Vec<(String, String)>) -> Vec<TenantConfig>
.iter() .iter()
.at_most_one() .at_most_one()
.expect("need TENANT_X_TX_STATUS_WS_ADDR") .expect("need TENANT_X_TX_STATUS_WS_ADDR")
.map(|(_, v)| v.to_string()) .map(|(_, v)| v.to_string()),
}) })
.collect::<Vec<TenantConfig>>(); .collect::<Vec<TenantConfig>>();

View File

@ -23,7 +23,6 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use solana_lite_rpc_util::obfuscate_rpcurl;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {