diff --git a/Cargo.lock b/Cargo.lock index 141db9f030..ed7d71e484 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4560,6 +4560,7 @@ dependencies = [ "solana-clap-utils", "solana-faucet", "solana-logger 1.9.0", + "solana-measure", "solana-net-utils", "solana-sdk", "solana-transaction-status", diff --git a/accounts-cluster-bench/src/main.rs b/accounts-cluster-bench/src/main.rs index 14a673058d..0c2a926676 100644 --- a/accounts-cluster-bench/src/main.rs +++ b/accounts-cluster-bench/src/main.rs @@ -5,10 +5,9 @@ use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana_account_decoder::parse_token::spl_token_v2_0_pubkey; use solana_clap_utils::input_parsers::pubkey_of; -use solana_client::rpc_client::RpcClient; +use solana_client::{rpc_client::RpcClient, transaction_executor::TransactionExecutor}; use solana_faucet::faucet::{request_airdrop_transaction, FAUCET_PORT}; use solana_gossip::gossip_service::discover; -use solana_measure::measure::Measure; use solana_runtime::inline_spl_token_v2_0; use solana_sdk::{ commitment_config::CommitmentConfig, @@ -16,9 +15,8 @@ use solana_sdk::{ message::Message, pubkey::Pubkey, rpc_port::DEFAULT_RPC_PORT, - signature::{read_keypair_file, Keypair, Signature, Signer}, + signature::{read_keypair_file, Keypair, Signer}, system_instruction, system_program, - timing::timestamp, transaction::Transaction, }; use solana_streamer::socket::SocketAddrSpace; @@ -27,10 +25,10 @@ use std::{ net::SocketAddr, process::exit, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, RwLock, + atomic::{AtomicU64, Ordering}, + Arc, }, - thread::{sleep, Builder, JoinHandle}, + thread::sleep, time::{Duration, Instant}, }; @@ -97,160 +95,6 @@ pub fn airdrop_lamports( true } -// signature, timestamp, id -type PendingQueue = Vec<(Signature, u64, u64)>; - -struct TransactionExecutor { - sig_clear_t: JoinHandle<()>, - sigs: Arc>, - cleared: Arc>>, - exit: Arc, - counter: AtomicU64, - client: RpcClient, -} - -impl TransactionExecutor { - fn new(entrypoint_addr: SocketAddr) -> Self { - let sigs = Arc::new(RwLock::new(Vec::new())); - let cleared = Arc::new(RwLock::new(Vec::new())); - let exit = Arc::new(AtomicBool::new(false)); - let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, entrypoint_addr); - let client = - RpcClient::new_socket_with_commitment(entrypoint_addr, CommitmentConfig::confirmed()); - Self { - sigs, - cleared, - sig_clear_t, - exit, - counter: AtomicU64::new(0), - client, - } - } - - fn num_outstanding(&self) -> usize { - self.sigs.read().unwrap().len() - } - - fn push_transactions(&self, txs: Vec) -> Vec { - let mut ids = vec![]; - let new_sigs = txs.into_iter().filter_map(|tx| { - let id = self.counter.fetch_add(1, Ordering::Relaxed); - ids.push(id); - match self.client.send_transaction(&tx) { - Ok(sig) => { - return Some((sig, timestamp(), id)); - } - Err(e) => { - info!("error: {:#?}", e); - } - } - None - }); - let mut sigs_w = self.sigs.write().unwrap(); - sigs_w.extend(new_sigs); - ids - } - - fn drain_cleared(&self) -> Vec { - std::mem::take(&mut *self.cleared.write().unwrap()) - } - - fn close(self) { - self.exit.store(true, Ordering::Relaxed); - self.sig_clear_t.join().unwrap(); - } - - fn start_sig_clear_thread( - exit: &Arc, - sigs: &Arc>, - cleared: &Arc>>, - entrypoint_addr: SocketAddr, - ) -> JoinHandle<()> { - let sigs = sigs.clone(); - let exit = exit.clone(); - let cleared = cleared.clone(); - Builder::new() - .name("sig_clear".to_string()) - .spawn(move || { - let client = RpcClient::new_socket_with_commitment( - entrypoint_addr, - CommitmentConfig::confirmed(), - ); - let mut success = 0; - let mut error_count = 0; - let mut timed_out = 0; - let mut last_log = Instant::now(); - while !exit.load(Ordering::Relaxed) { - let sigs_len = sigs.read().unwrap().len(); - if sigs_len > 0 { - let mut sigs_w = sigs.write().unwrap(); - let mut start = Measure::start("sig_status"); - let statuses: Vec<_> = sigs_w - .chunks(200) - .flat_map(|sig_chunk| { - let only_sigs: Vec<_> = sig_chunk.iter().map(|s| s.0).collect(); - client - .get_signature_statuses(&only_sigs) - .expect("status fail") - .value - }) - .collect(); - let mut num_cleared = 0; - let start_len = sigs_w.len(); - let now = timestamp(); - let mut new_ids = vec![]; - let mut i = 0; - let mut j = 0; - while i != sigs_w.len() { - let mut retain = true; - let sent_ts = sigs_w[i].1; - if let Some(e) = &statuses[j] { - debug!("error: {:?}", e); - if e.status.is_ok() { - success += 1; - } else { - error_count += 1; - } - num_cleared += 1; - retain = false; - } else if now - sent_ts > 30_000 { - retain = false; - timed_out += 1; - } - if !retain { - new_ids.push(sigs_w.remove(i).2); - } else { - i += 1; - } - j += 1; - } - let final_sigs_len = sigs_w.len(); - drop(sigs_w); - cleared.write().unwrap().extend(new_ids); - start.stop(); - debug!( - "sigs len: {:?} success: {} took: {}ms cleared: {}/{}", - final_sigs_len, - success, - start.as_ms(), - num_cleared, - start_len, - ); - if last_log.elapsed().as_millis() > 5000 { - info!( - "success: {} error: {} timed_out: {}", - success, error_count, timed_out, - ); - last_log = Instant::now(); - } - } - sleep(Duration::from_millis(200)); - } - }) - .unwrap() - } -} - struct SeedTracker { max_created: Arc, max_closed: Arc, @@ -720,6 +564,7 @@ pub mod test { local_cluster::{ClusterConfig, LocalCluster}, validator_configs::make_identical_validator_configs, }; + use solana_measure::measure::Measure; use solana_sdk::poh_config::PohConfig; #[test] diff --git a/client/Cargo.toml b/client/Cargo.toml index afca9df0d3..4776388f63 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -27,6 +27,7 @@ solana-account-decoder = { path = "../account-decoder", version = "=1.9.0" } solana-clap-utils = { path = "../clap-utils", version = "=1.9.0" } solana-faucet = { path = "../faucet", version = "=1.9.0" } solana-net-utils = { path = "../net-utils", version = "=1.9.0" } +solana-measure = { path = "../measure", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" } solana-version = { path = "../version", version = "=1.9.0" } diff --git a/client/src/lib.rs b/client/src/lib.rs index 61f354a62e..33d96e5def 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -20,3 +20,4 @@ pub mod rpc_response; pub mod rpc_sender; pub mod thin_client; pub mod tpu_client; +pub mod transaction_executor; diff --git a/client/src/transaction_executor.rs b/client/src/transaction_executor.rs new file mode 100644 index 0000000000..377e52fb15 --- /dev/null +++ b/client/src/transaction_executor.rs @@ -0,0 +1,171 @@ +#![allow(clippy::integer_arithmetic)] +use crate::rpc_client::RpcClient; +use log::*; +use solana_measure::measure::Measure; +use solana_sdk::{ + commitment_config::CommitmentConfig, signature::Signature, timing::timestamp, + transaction::Transaction, +}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, RwLock, + }, + thread::{sleep, Builder, JoinHandle}, + time::{Duration, Instant}, +}; + +// signature, timestamp, id +type PendingQueue = Vec<(Signature, u64, u64)>; + +pub struct TransactionExecutor { + sig_clear_t: JoinHandle<()>, + sigs: Arc>, + cleared: Arc>>, + exit: Arc, + counter: AtomicU64, + client: RpcClient, +} + +impl TransactionExecutor { + pub fn new(entrypoint_addr: SocketAddr) -> Self { + let sigs = Arc::new(RwLock::new(Vec::new())); + let cleared = Arc::new(RwLock::new(Vec::new())); + let exit = Arc::new(AtomicBool::new(false)); + let sig_clear_t = Self::start_sig_clear_thread(&exit, &sigs, &cleared, entrypoint_addr); + let client = + RpcClient::new_socket_with_commitment(entrypoint_addr, CommitmentConfig::confirmed()); + Self { + sigs, + cleared, + sig_clear_t, + exit, + counter: AtomicU64::new(0), + client, + } + } + + pub fn num_outstanding(&self) -> usize { + self.sigs.read().unwrap().len() + } + + pub fn push_transactions(&self, txs: Vec) -> Vec { + let mut ids = vec![]; + let new_sigs = txs.into_iter().filter_map(|tx| { + let id = self.counter.fetch_add(1, Ordering::Relaxed); + ids.push(id); + match self.client.send_transaction(&tx) { + Ok(sig) => { + return Some((sig, timestamp(), id)); + } + Err(e) => { + info!("error: {:#?}", e); + } + } + None + }); + let mut sigs_w = self.sigs.write().unwrap(); + sigs_w.extend(new_sigs); + ids + } + + pub fn drain_cleared(&self) -> Vec { + std::mem::take(&mut *self.cleared.write().unwrap()) + } + + pub fn close(self) { + self.exit.store(true, Ordering::Relaxed); + self.sig_clear_t.join().unwrap(); + } + + fn start_sig_clear_thread( + exit: &Arc, + sigs: &Arc>, + cleared: &Arc>>, + entrypoint_addr: SocketAddr, + ) -> JoinHandle<()> { + let sigs = sigs.clone(); + let exit = exit.clone(); + let cleared = cleared.clone(); + Builder::new() + .name("sig_clear".to_string()) + .spawn(move || { + let client = RpcClient::new_socket_with_commitment( + entrypoint_addr, + CommitmentConfig::confirmed(), + ); + let mut success = 0; + let mut error_count = 0; + let mut timed_out = 0; + let mut last_log = Instant::now(); + while !exit.load(Ordering::Relaxed) { + let sigs_len = sigs.read().unwrap().len(); + if sigs_len > 0 { + let mut sigs_w = sigs.write().unwrap(); + let mut start = Measure::start("sig_status"); + let statuses: Vec<_> = sigs_w + .chunks(200) + .flat_map(|sig_chunk| { + let only_sigs: Vec<_> = sig_chunk.iter().map(|s| s.0).collect(); + client + .get_signature_statuses(&only_sigs) + .expect("status fail") + .value + }) + .collect(); + let mut num_cleared = 0; + let start_len = sigs_w.len(); + let now = timestamp(); + let mut new_ids = vec![]; + let mut i = 0; + let mut j = 0; + while i != sigs_w.len() { + let mut retain = true; + let sent_ts = sigs_w[i].1; + if let Some(e) = &statuses[j] { + debug!("error: {:?}", e); + if e.status.is_ok() { + success += 1; + } else { + error_count += 1; + } + num_cleared += 1; + retain = false; + } else if now - sent_ts > 30_000 { + retain = false; + timed_out += 1; + } + if !retain { + new_ids.push(sigs_w.remove(i).2); + } else { + i += 1; + } + j += 1; + } + let final_sigs_len = sigs_w.len(); + drop(sigs_w); + cleared.write().unwrap().extend(new_ids); + start.stop(); + debug!( + "sigs len: {:?} success: {} took: {}ms cleared: {}/{}", + final_sigs_len, + success, + start.as_ms(), + num_cleared, + start_len, + ); + if last_log.elapsed().as_millis() > 5000 { + info!( + "success: {} error: {} timed_out: {}", + success, error_count, timed_out, + ); + last_log = Instant::now(); + } + } + sleep(Duration::from_millis(200)); + } + }) + .unwrap() + } +} diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index a60504eaaa..7d7ff7dae3 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -2904,6 +2904,7 @@ dependencies = [ "solana-account-decoder", "solana-clap-utils", "solana-faucet", + "solana-measure", "solana-net-utils", "solana-sdk", "solana-transaction-status",