diff --git a/Cargo.lock b/Cargo.lock index a28248e9..a1e3ff6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -584,7 +584,6 @@ dependencies = [ "rand 0.8.5", "rand_chacha 0.3.1", "reqwest", - "scopeguard", "serde", "serde_json", "solana-lite-rpc-util", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 193cfe79..ebd586a7 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -42,7 +42,6 @@ spl-memo = "4.0.0" url = "*" reqwest = "0.11.26" lazy_static = "1.4.0" -scopeguard = "1.2.0" [dev-dependencies] bincode = { workspace = true } diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs index 9b33b15f..f25f7c86 100644 --- a/bench/src/benches/rpc_interface.rs +++ b/bench/src/benches/rpc_interface.rs @@ -18,7 +18,6 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use dashmap::mapref::multiple::RefMulti; -use scopeguard::defer; use serde::{Deserialize, Serialize}; use serde_json::json; use solana_rpc_client_api::response::{Response, RpcBlockUpdate, RpcResponseContext, SlotUpdate}; @@ -66,8 +65,7 @@ pub async fn send_and_confirm_bulk_transactions( }; // note: we get confirmed but never finaliized - let (tx_status_map, jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await; - defer!(jh_collector.abort()); + let (tx_status_map, _jh_collector) = start_tx_status_collector(tx_status_websocket_addr.clone(), payer_pubkey, CommitmentConfig::confirmed()).await; let started_at = Instant::now(); trace!( diff --git a/bench/src/benches/tx_status_websocket_collector.rs b/bench/src/benches/tx_status_websocket_collector.rs index 633b604e..c047589c 100644 --- a/bench/src/benches/tx_status_websocket_collector.rs +++ b/bench/src/benches/tx_status_websocket_collector.rs @@ -47,7 +47,7 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit let observed_transactions: Arc> = Arc::new(DashMap::with_capacity(64)); - let observed_transactions_write = observed_transactions.clone(); + let observed_transactions_write = Arc::downgrade(&observed_transactions); let jh = tokio::spawn(async move { let started_at = Instant::now(); @@ -57,11 +57,15 @@ pub async fn start_tx_status_collector(ws_url: Url, payer_pubkey: Pubkey, commit serde_json::from_str(&payload).unwrap(); let block_update = ws_result.params.result; let slot = block_update.value.slot; + let Some(map) = observed_transactions_write.upgrade() else { + debug!("observed_transactions map dropped - stopping task"); + return; + }; if let Some(tx_sigs_from_block) = block_update.value.block.and_then(|b| b.signatures) { for tx_sig in tx_sigs_from_block { let tx_sig = Signature::from_str(&tx_sig).unwrap(); debug!("Transaction signature found in block: {} - slot {}", tx_sig, slot); - observed_transactions_write.entry(tx_sig).or_insert(slot); + map.entry(tx_sig).or_insert(slot); } } }