propagate ws for status

This commit is contained in:
GroovieGermanikus 2024-06-19 10:25:10 +02:00
parent f1def98652
commit c9147d850b
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 33 additions and 49 deletions

View File

@ -68,7 +68,7 @@ pub async fn confirmation_slot(
let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:")); let ws_addr_b = tx_status_websocket_addr_b.unwrap_or_else(|| rpc_b_url.replace("http:", "ws:").replace("https:", "wss:"));
let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL"); let ws_addr_a = Url::parse(&ws_addr_a).expect("Invalid URL");
let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL"); let ws_addr_b = Url::parse(&ws_addr_b).expect("Invalid URL");
let rpc_a_url = let rpc_a_url =
Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?; Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?;
let rpc_b_url = let rpc_b_url =
@ -86,7 +86,7 @@ pub async fn confirmation_slot(
for _ in 0..num_of_runs { for _ in 0..num_of_runs {
let rpc_a = create_rpc_client(&rpc_a_url); let rpc_a = create_rpc_client(&rpc_a_url);
let rpc_b = create_rpc_client(&rpc_b_url); let rpc_b = create_rpc_client(&rpc_b_url);
let ws_addr_a = ws_addr_a.clone(); let ws_addr_a = ws_addr_a.clone();
let ws_addr_b = ws_addr_b.clone(); let ws_addr_b = ws_addr_b.clone();

View File

@ -26,6 +26,7 @@ use solana_sdk::pubkey::Pubkey;
use tokio::time::{Instant, timeout}; use tokio::time::{Instant, timeout};
use url::Url; use url::Url;
use websocket_tungstenite_retry::websocket_stable::WsMessage; use websocket_tungstenite_retry::websocket_stable::WsMessage;
use solana_lite_rpc_util::obfuscate_rpcurl;
use crate::benches::tx_status_websocket_collector::start_tx_status_collector; use crate::benches::tx_status_websocket_collector::start_tx_status_collector;
pub fn create_rpc_client(rpc_url: &Url) -> RpcClient { pub fn create_rpc_client(rpc_url: &Url) -> RpcClient {
@ -146,7 +147,7 @@ pub async fn send_and_confirm_bulk_transactions(
// items get moved from pending_status_set to result_status_map // items get moved from pending_status_set to result_status_map
debug!("Wating for transaction confirmations .."); debug!("Waiting for transaction confirmations from websocket source <{}> ..", obfuscate_rpcurl(tx_status_websocket_addr.as_str()));
let started_at = Instant::now(); let started_at = Instant::now();
let timeout_at = started_at + max_timeout; let timeout_at = started_at + max_timeout;
// "poll" the status dashmap // "poll" the status dashmap
@ -157,31 +158,6 @@ pub async fn send_and_confirm_bulk_transactions(
num_sent_ok, num_sent_ok,
"Items must move between pending+result" "Items must move between pending+result"
); );
// let tx_batch = pending_status_set.iter().cloned().collect_vec();
// debug!(
// "Request status for batch of remaining {} transactions in iteration {}",
// tx_batch.len(),
// iteration
// );
// let status_started_at = Instant::now();
// let mut batch_status = Vec::new();
// // "Too many inputs provided; max 256"
// for chunk in tx_batch.chunks(256) {
// // fail hard if not possible to poll status
// let chunk_responses = rpc_client
// .get_signature_statuses(chunk)
// .await
// .expect("get signature statuses");
// batch_status.extend(chunk_responses.value);
// }
// if status_started_at.elapsed() > Duration::from_millis(500) {
// warn!(
// "SLOW get_signature_statuses for {} transactions took {:?}",
// tx_batch.len(),
// status_started_at.elapsed()
// );
// }
let elapsed = started_at.elapsed(); let elapsed = started_at.elapsed();
for multi in tx_status_map.iter() { for multi in tx_status_map.iter() {
@ -223,7 +199,6 @@ pub async fn send_and_confirm_bulk_transactions(
break 'polling_loop; break 'polling_loop;
} }
// avg 2 samples per slot
tokio::time::sleep_until(iteration_ends_at).await; tokio::time::sleep_until(iteration_ends_at).await;
} // -- END polling loop } // -- END polling loop

View File

@ -2,7 +2,7 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use dashmap::{DashMap, DashSet}; use dashmap::{DashMap, DashSet};
use log::{info, trace}; use log::{debug, info, trace};
use serde_json::json; use serde_json::json;
use solana_rpc_client_api::response::{Response, RpcBlockUpdate}; use solana_rpc_client_api::response::{Response, RpcBlockUpdate};
use solana_sdk::clock::Slot; use solana_sdk::clock::Slot;
@ -15,7 +15,6 @@ use url::Url;
use websocket_tungstenite_retry::websocket_stable; use websocket_tungstenite_retry::websocket_stable;
use websocket_tungstenite_retry::websocket_stable::WsMessage; use websocket_tungstenite_retry::websocket_stable::WsMessage;
// TODO void race condition when this is not started up fast enough to catch transaction
// returns map of transaction signatures to the slot they were confirmed // returns map of transaction signatures to the slot they were confirmed
pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig) pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commitment_config: CommitmentConfig)
-> (Arc<DashMap<Signature, Slot>>, AbortHandle) { -> (Arc<DashMap<Signature, Slot>>, AbortHandle) {
@ -44,7 +43,6 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit
).await ).await
.expect("Failed to connect to websocket"); .expect("Failed to connect to websocket");
let mut channel = web_socket_slots.subscribe_message_channel(); let mut channel = web_socket_slots.subscribe_message_channel();
let observed_transactions: Arc<DashMap<Signature, Slot>> = Arc::new(DashMap::with_capacity(64)); let observed_transactions: Arc<DashMap<Signature, Slot>> = Arc::new(DashMap::with_capacity(64));
@ -54,7 +52,6 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit
let started_at = Instant::now(); let started_at = Instant::now();
while let Ok(msg) = channel.recv().await { while let Ok(msg) = channel.recv().await {
// TOOD use this to know when we are subscribed
if let WsMessage::Text(payload) = msg { if let WsMessage::Text(payload) = msg {
let ws_result: jsonrpsee_types::SubscriptionResponse<Response<RpcBlockUpdate>> = let ws_result: jsonrpsee_types::SubscriptionResponse<Response<RpcBlockUpdate>> =
serde_json::from_str(&payload).unwrap(); serde_json::from_str(&payload).unwrap();
@ -63,7 +60,7 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit
if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) {
for tx_sig in tx_sigs_from_block { for tx_sig in tx_sigs_from_block {
let tx_sig = Signature::from_str(&tx_sig).unwrap(); let tx_sig = Signature::from_str(&tx_sig).unwrap();
trace!("Transaction signature: {} - slot {}", tx_sig, slot); debug!("Transaction signature found in block: {} - slot {}", tx_sig, slot);
observed_transactions_write.entry(tx_sig).or_insert(slot); observed_transactions_write.entry(tx_sig).or_insert(slot);
} }
} }
@ -71,6 +68,5 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit
} }
}); });
(observed_transactions, jh.abort_handle()) (observed_transactions, jh.abort_handle())
} }

View File

@ -39,6 +39,10 @@ enum SubCommand {
payer_path: PathBuf, payer_path: PathBuf,
#[clap(short, long)] #[clap(short, long)]
rpc_url: String, rpc_url: String,
/// Set websocket source (blockSubscribe method) for transaction status updates.
/// You might want to send tx to one RPC and listen to another (reliable) RPC for status updates.
/// Not all RPC nodes support this method.
/// If not provided, the RPC URL is used to derive the websocket URL.
#[clap(short = 'w', long)] #[clap(short = 'w', long)]
tx_status_websocket_addr: Option<String>, tx_status_websocket_addr: Option<String>,
#[clap(short, long)] #[clap(short, long)]

View File

@ -6,25 +6,34 @@ Hardware: recommend 1024MB RAM, 2 vCPUs, small disk
### Environment Variables ### Environment Variables
| Environment Variable | Purpose | Required? | Default Value | | Environment Variable | Purpose | Required? | Default Value |
|----------------------|-------------------------------------------------------|---------------|---------------| |----------------------|-------------------------------------------------------|--------------|--------------------------------------------|
| `PG_ENABLED` | Enable writing to PostgreSQL | No | false | | `PG_ENABLED` | Enable writing to PostgreSQL | No | false |
| `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | | | `PG_CONFIG` | PostgreSQL connection string | if PG_ENABLED | |
| `TENANT1_ID` | Technical ID for the tenant | Yes | | | `TENANT1_ID` | Technical ID for the tenant | Yes | |
| `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | | | `TENANT1_RPC_ADDR` | RPC address for the target RPC node | Yes | |
| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | | | `TENANT1_TX_STATUS_WS_ADDR` | Websocket source for tx status |No | RPC Url<br/>(replacing schema with ws/wss) |
| `TENANT2_.. | more tenants can be added using TENANT2, TENANT3, ... | | |
### Command-line Arguments ### Command-line Arguments
``` ```
Options: Options:
-b, --bench-interval <BENCH_INTERVAL> -p, --payer-path <PAYER_PATH>
interval in milliseconds to run the benchmark [default: 60000]
-n, --tx-count <TX_COUNT> -r, --rpc-url <RPC_URL>
[default: 10]
-w, --tx-status-websocket-addr <TX_STATUS_WEBSOCKET_ADDR>
Set websocket source (blockSubscribe method) for transaction status updates. You might want to send tx to one RPC and listen to another (reliable) RPC for status updates. Not all RPC nodes support this method. If not provided, the RPC URL is used to derive the websocket URL
-s, --size-tx <SIZE_TX> -s, --size-tx <SIZE_TX>
[default: small] [possible values: small, large] [possible values: small, large]
-p, --prio-fees <PRIO_FEES> -m, --max-timeout-ms <MAX_TIMEOUT_MS>
[default: 0] Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed [default: 15000]
-t, --txs-per-run <TXS_PER_RUN>
-n, --num-of-runs <NUM_OF_RUNS>
-f, --cu-price <CU_PRICE>
The CU price in micro lamports [default: 300]
``` ```
```bash ```bash