diff --git a/bench_transactions_send.ts b/bench_transactions_send.ts index 04f78f02..30c307df 100644 --- a/bench_transactions_send.ts +++ b/bench_transactions_send.ts @@ -36,6 +36,7 @@ export async function main() { const users = InFile.users.map(x => Keypair.fromSecretKey(Uint8Array.from(x.secretKey))); const userAccounts = InFile.tokenAccounts.map(x => new PublicKey(x)); let promises_to_unpack : Promise[][] = []; + let time_taken_to_send = []; for (let i = 0; i 0) { + time_taken_to_send[i] = diff; + if (diff > 0 && diff < 1000) { await sleep(1000 - diff) } } @@ -81,19 +83,20 @@ export async function main() { for (let i=0; i< size; ++i) { const promises = promises_to_unpack[i]; - const signatures = await Promise.all(promises); - let statuses = await connection.getSignatureStatuses(signatures, {searchTransactionHistory: false}) - for (const status of statuses.value) { - if(status != null && status.confirmationStatus && status.err == null) { - successes[i] += 1; - } - else { - failures[i] += 1; + for (const promise of promises) { + const signature = await promise; + const confirmed = await connection.getSignatureStatus(signature); + if (confirmed != null && confirmed.value != null && confirmed.value.err == null) { + successes[i]++; + } else { + failures[i]++; } } + } - console.log("sucesses " + successes) - console.log("failures " + failures) + console.log("sucesses : " + successes) + console.log("failures : " + failures) + console.log("time taken to send : " + time_taken_to_send) } } diff --git a/src/context.rs b/src/context.rs index f4ea0cd6..3d5f65b1 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,7 +1,5 @@ -use chrono::DateTime; use crossbeam_channel::Sender; use dashmap::DashMap; -use libc::file_clone_range; use serde::Serialize; use solana_client::{ rpc_client::RpcClient, @@ -21,7 +19,7 @@ use std::{ Arc, RwLock, }, thread::{self, Builder, JoinHandle}, - time::{Duration, Instant, SystemTime}, + time::{Duration, Instant}, }; use tokio::sync::broadcast; @@ -277,16 +275,21 @@ pub struct PerformanceCounter { pub total_confirmations: Arc, pub total_transactions_sent: Arc, pub transaction_sent_error: Arc, - - pub finalized_per_seconds: Arc, - pub confirmations_per_seconds: Arc, - pub transactions_per_seconds: Arc, - pub send_transactions_errors_per_seconds: Arc, + pub total_transactions_recieved: Arc, last_count_for_finalized: Arc, last_count_for_confirmations: Arc, last_count_for_transactions_sent: Arc, last_count_for_sent_errors: Arc, + last_count_for_transactions_recieved: Arc, +} + +pub struct PerformancePerSec { + pub finalized_per_seconds: u64, + pub confirmations_per_seconds: u64, + pub transactions_per_seconds: u64, + pub send_transactions_errors_per_seconds: u64, + pub transaction_recieved_per_second: u64, } impl PerformanceCounter { @@ -295,54 +298,52 @@ impl PerformanceCounter { total_finalized: Arc::new(AtomicU64::new(0)), total_confirmations: Arc::new(AtomicU64::new(0)), total_transactions_sent: Arc::new(AtomicU64::new(0)), - confirmations_per_seconds: Arc::new(AtomicU64::new(0)), - transactions_per_seconds: Arc::new(AtomicU64::new(0)), - finalized_per_seconds: Arc::new(AtomicU64::new(0)), + total_transactions_recieved: Arc::new(AtomicU64::new(0)), + transaction_sent_error: Arc::new(AtomicU64::new(0)), last_count_for_finalized: Arc::new(AtomicU64::new(0)), last_count_for_confirmations: Arc::new(AtomicU64::new(0)), last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)), - transaction_sent_error: Arc::new(AtomicU64::new(0)), + last_count_for_transactions_recieved: Arc::new(AtomicU64::new(0)), last_count_for_sent_errors: Arc::new(AtomicU64::new(0)), - send_transactions_errors_per_seconds: Arc::new(AtomicU64::new(0)), } } - pub fn update_per_seconds_transactions(&self) { + pub fn update_per_seconds_transactions(&self) -> PerformancePerSec { let total_finalized: u64 = self.total_finalized.load(Ordering::Relaxed); let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed); - let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed); - let total_errors: u64 = self.transaction_sent_error.load(Ordering::Relaxed); + let total_transactions_recieved: u64 = + self.total_transactions_recieved.load(Ordering::Relaxed); - self.finalized_per_seconds.store( - total_finalized - self.last_count_for_finalized.load(Ordering::Relaxed), - Ordering::Release, - ); - self.confirmations_per_seconds.store( - total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed), - Ordering::Release, - ); - self.transactions_per_seconds.store( - total_transactions - - self - .last_count_for_transactions_sent - .load(Ordering::Relaxed), - Ordering::Release, - ); - self.send_transactions_errors_per_seconds.store( - total_errors - self.last_count_for_sent_errors.load(Ordering::Relaxed), - Ordering::Release, - ); + let finalized_per_seconds = total_finalized + - self + .last_count_for_finalized + .swap(total_finalized, Ordering::Relaxed); + let confirmations_per_seconds = total_confirmations + - self + .last_count_for_confirmations + .swap(total_confirmations, Ordering::Relaxed); + let transactions_per_seconds = total_transactions + - self + .last_count_for_transactions_sent + .swap(total_transactions, Ordering::Relaxed); + let send_transactions_errors_per_seconds = total_errors + - self + .last_count_for_sent_errors + .swap(total_errors, Ordering::Relaxed); + let transaction_recieved_per_second = total_transactions_recieved + - self + .last_count_for_transactions_recieved + .swap(total_transactions_recieved, Ordering::Relaxed); - self.last_count_for_finalized - .store(total_finalized, Ordering::Relaxed); - self.last_count_for_confirmations - .store(total_confirmations, Ordering::Relaxed); - self.last_count_for_transactions_sent - .store(total_transactions, Ordering::Relaxed); - self.last_count_for_sent_errors - .store(total_errors, Ordering::Relaxed); + PerformancePerSec { + confirmations_per_seconds, + finalized_per_seconds, + send_transactions_errors_per_seconds, + transaction_recieved_per_second, + transactions_per_seconds, + } } } @@ -358,17 +359,10 @@ pub fn launch_performance_updating_thread( let wait_time = Duration::from_millis(1000); let performance_counter = performance_counter.clone(); - performance_counter.update_per_seconds_transactions(); - let confirmations_per_seconds = performance_counter - .confirmations_per_seconds - .load(Ordering::Acquire); - let total_transactions_per_seconds = performance_counter - .transactions_per_seconds - .load(Ordering::Acquire); - let finalized_per_second = performance_counter.finalized_per_seconds.load(Ordering::Acquire); + let data = performance_counter.update_per_seconds_transactions(); println!( - "At {} second, Sent {} transactions, finalized {} and confirmed {} transactions", - nb_seconds, total_transactions_per_seconds, finalized_per_second, confirmations_per_seconds + "At {} second, Recieved {}, Sent {} transactions, finalized {} and confirmed {} transactions", + nb_seconds, data.transaction_recieved_per_second, data.transactions_per_seconds, data.finalized_per_seconds, data.confirmations_per_seconds ); let runtime = start.elapsed(); nb_seconds += 1; diff --git a/src/rpc.rs b/src/rpc.rs index 72f17d94..a3007004 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -65,10 +65,9 @@ impl LightRpcRequestProcessor { performance_counter: PerformanceCounter, ) -> LightRpcRequestProcessor { let rpc_client = Arc::new(RpcClient::new(json_rpc_url)); - let connection_cache = Arc::new(ConnectionCache::default()); - let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender)); + let connection_cache = Arc::new(ConnectionCache::default()); println!("ws_url {}", websocket_url); // subscribe for confirmed_blocks let (client_confirmed, receiver_confirmed) = @@ -97,24 +96,10 @@ impl LightRpcRequestProcessor { Self::build_thread_to_process_transactions( json_rpc_url.to_string(), websocket_url.to_string(), - connection_cache.clone(), + &context, tpu_consumer.clone(), performance_counter.clone(), ), - Self::build_thread_to_process_transactions( - json_rpc_url.to_string(), - websocket_url.to_string(), - connection_cache.clone(), - tpu_consumer.clone(), - performance_counter.clone(), - ), - Self::build_thread_to_process_transactions( - json_rpc_url.to_string(), - websocket_url.to_string(), - connection_cache.clone(), - tpu_consumer, - performance_counter.clone(), - ), ]; LightRpcRequestProcessor { @@ -177,29 +162,33 @@ impl LightRpcRequestProcessor { fn build_thread_to_process_transactions( json_rpc_url: String, websocket_url: String, - connection_cache: Arc, + context: &Arc, receiver: Receiver, performance_counters: PerformanceCounter, ) -> JoinHandle<()> { + let context = context.clone(); Builder::new() .name("thread working on confirmation block".to_string()) .spawn(move || { let rpc_client = Arc::new(RpcClient::new(json_rpc_url.to_string())); + + let mut connection_cache = Arc::new(ConnectionCache::default()); let tpu_client = TpuClient::new_with_connection_cache( - rpc_client, + rpc_client.clone(), websocket_url.as_str(), TpuClientConfig::default(), // value for max fanout slots connection_cache.clone(), ); - let tpu_client = Arc::new(tpu_client.unwrap()); + let mut tpu_client = Arc::new(tpu_client.unwrap()); + let mut consecutive_errors: u8 = 0; loop { let recv_res = receiver.recv(); match recv_res { Ok(transaction) => { let mut transactions_vec = vec![transaction]; - let mut time_remaining = Duration::from_micros(200); + let mut time_remaining = Duration::from_micros(1000); for _i in 1..TPU_BATCH_SIZE { let start = std::time::Instant::now(); let another = receiver.recv_timeout(time_remaining); @@ -216,15 +205,46 @@ impl LightRpcRequestProcessor { let count: u64 = transactions_vec.len() as u64; let slice = transactions_vec.as_slice(); let fut_res = tpu_client.try_send_transaction_batch(slice); + + // insert sent transactions into signature status map + transactions_vec.iter().for_each(|x| { + let signature = x.signatures[0].to_string(); + context.signature_status.insert( + signature.clone(), + SignatureStatus { + status: None, + error: None, + created: Instant::now(), + }, + ); + }); + match fut_res { - Ok(_) => performance_counters + Ok(_) => { + consecutive_errors = 0; + + performance_counters .total_transactions_sent - .fetch_add(count, Ordering::Relaxed), + .fetch_add(count, Ordering::Relaxed); + }, Err(e) => { println!("Got error while sending transaction batch of size {}, error {}", count, e.to_string()); + consecutive_errors += 1; + if consecutive_errors > 3 { + connection_cache = Arc::new(ConnectionCache::default()); + + let new_tpu_client = TpuClient::new_with_connection_cache( + rpc_client.clone(), + websocket_url.as_str(), + TpuClientConfig::default(), // value for max fanout slots + connection_cache.clone(), + ); + // reset TPU connection + tpu_client = Arc::new(new_tpu_client.unwrap()); + } performance_counters .transaction_sent_error - .fetch_add(count, Ordering::Relaxed) + .fetch_add(count, Ordering::Relaxed); } }; } @@ -418,12 +438,6 @@ pub mod lite_rpc { config: Option, ) -> Result; - #[rpc(meta, name = "getPerformanceCounters")] - fn get_performance_counters( - &self, - meta: Self::Metadata, - ) -> Result; - #[rpc(meta, name = "getLatestBlockhash")] fn get_latest_blockhash( &self, @@ -462,17 +476,11 @@ pub mod lite_rpc { tx_encoding )) })?; - let (_wire_transaction, transaction) = - decode_and_deserialize::(data, binary_encoding)?; + let transaction = decode_and_deserialize::(data, binary_encoding)?; let signature = transaction.signatures[0].to_string(); - meta.context.signature_status.insert( - signature.clone(), - SignatureStatus { - status: None, - error: None, - created: Instant::now(), - }, - ); + meta.performance_counter + .total_transactions_recieved + .fetch_add(1, Ordering::Relaxed); match meta.tpu_producer_channel.send(transaction) { Ok(_) => Ok(signature), @@ -694,49 +702,12 @@ pub mod lite_rpc { feature_set: Some(version.feature_set), }) } - - fn get_performance_counters( - &self, - meta: Self::Metadata, - ) -> Result { - let total_transactions_count = meta - .performance_counter - .total_transactions_sent - .load(Ordering::Relaxed); - let total_confirmations_count = meta - .performance_counter - .total_confirmations - .load(Ordering::Relaxed); - let transactions_per_seconds = meta - .performance_counter - .transactions_per_seconds - .load(Ordering::Acquire); - let confirmations_per_seconds = meta - .performance_counter - .confirmations_per_seconds - .load(Ordering::Acquire); - - let procinfo::pid::Statm { size, .. } = procinfo::pid::statm_self().unwrap(); - let procinfo::pid::Stat { num_threads, .. } = procinfo::pid::stat_self().unwrap(); - - Ok(RpcPerformanceCounterResults { - confirmations_per_seconds, - transactions_per_seconds, - total_confirmations_count, - total_transactions_count, - memory_used: size as u64, - nb_threads: num_threads as u64, - }) - } } } const MAX_BASE58_SIZE: usize = 1683; // Golden, bump if PACKET_DATA_SIZE changes const MAX_BASE64_SIZE: usize = 1644; // Golden, bump if PACKET_DATA_SIZE changes -fn decode_and_deserialize( - encoded: String, - encoding: TransactionBinaryEncoding, -) -> Result<(Vec, T)> +fn decode_and_deserialize(encoded: String, encoding: TransactionBinaryEncoding) -> Result where T: serde::de::DeserializeOwned, { @@ -789,5 +760,4 @@ where &err.to_string() )) }) - .map(|output| (wire_output, output)) }