From 1e631c6199fe4d43c886e15b004e8947f68780c3 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Wed, 4 Jan 2023 19:02:41 +0100 Subject: [PATCH] performance counter will count confirmations and finalized while processing the blocks --- src/cli.rs | 10 +++++----- src/context.rs | 43 ++++++++++++++++++------------------------- src/main.rs | 9 +++++---- src/pubsub.rs | 3 +-- src/rpc.rs | 39 +++++++++++++++++++++++++-------------- 5 files changed, 54 insertions(+), 50 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 1f865360..3b796227 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -12,13 +12,13 @@ use solana_cli_config::ConfigInput; " )] pub struct Args { - #[arg(short, long, default_value_t = String::from("8899"))] - pub port: String, - #[arg(short, long, default_value_t = String::from("8900"))] - pub subscription_port: String, + #[arg(short, long, default_value_t = 9000)] + pub port: u16, + #[arg(short, long, default_value_t = 9001)] + pub subscription_port: u16, #[arg(short, long, default_value_t = String::from("http://localhost:8899"))] pub rpc_url: String, - #[arg(short, long, default_value_t = String::from("http://localhost:8900"))] + #[arg(short, long, default_value_t = String::from("ws://localhost:8900"))] pub websocket_url: String, } diff --git a/src/context.rs b/src/context.rs index c72e138a..5b339211 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,6 +1,7 @@ use chrono::DateTime; use crossbeam_channel::Sender; use dashmap::DashMap; +use libc::file_clone_range; use serde::Serialize; use solana_client::{ rpc_client::RpcClient, @@ -52,28 +53,10 @@ impl BlockInformation { pub struct SignatureStatus { pub status: Option, - pub error: Option, + pub error: Option, pub created: Instant, } -impl SignatureStatus { - pub fn new() -> Self { - Self { - status: None, - error: None, - created: Instant::now(), - } - } - - pub fn new_from_commitment(commitment: CommitmentLevel) -> Self { - Self { - status: Some(commitment), - error: None, - created: Instant::now(), - } - } -} - pub struct LiteRpcContext { pub signature_status: DashMap, pub finalized_block_info: BlockInformation, @@ -290,14 +273,17 @@ impl LiteRpcSubsrciptionControl { #[derive(Clone)] pub struct PerformanceCounter { + pub total_finalized: Arc, pub total_confirmations: Arc, pub total_transactions_sent: Arc, pub transaction_sent_error: Arc, + pub finalized_per_seconds: Arc, pub confirmations_per_seconds: Arc, pub transactions_per_seconds: Arc, pub send_transactions_errors_per_seconds: Arc, + last_count_for_finalized: Arc, last_count_for_confirmations: Arc, last_count_for_transactions_sent: Arc, last_count_for_sent_errors: Arc, @@ -306,10 +292,13 @@ pub struct PerformanceCounter { impl PerformanceCounter { pub fn new() -> Self { Self { + total_finalized: Arc::new(AtomicU64::new(0)), total_confirmations: Arc::new(AtomicU64::new(0)), total_transactions_sent: Arc::new(AtomicU64::new(0)), confirmations_per_seconds: Arc::new(AtomicU64::new(0)), transactions_per_seconds: Arc::new(AtomicU64::new(0)), + finalized_per_seconds: Arc::new(AtomicU64::new(0)), + last_count_for_finalized: Arc::new(AtomicU64::new(0)), last_count_for_confirmations: Arc::new(AtomicU64::new(0)), last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)), transaction_sent_error: Arc::new(AtomicU64::new(0)), @@ -319,12 +308,17 @@ impl PerformanceCounter { } pub fn update_per_seconds_transactions(&self) { + let total_finalized: u64 = self.total_finalized.load(Ordering::Relaxed); let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed); let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed); let total_errors: u64 = self.transaction_sent_error.load(Ordering::Relaxed); + self.finalized_per_seconds.store( + total_finalized - self.last_count_for_finalized.load(Ordering::Relaxed), + Ordering::Release, + ); self.confirmations_per_seconds.store( total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed), Ordering::Release, @@ -341,6 +335,8 @@ impl PerformanceCounter { Ordering::Release, ); + self.last_count_for_finalized + .store(total_finalized, Ordering::Relaxed); self.last_count_for_confirmations .store(total_confirmations, Ordering::Relaxed); self.last_count_for_transactions_sent @@ -348,10 +344,6 @@ impl PerformanceCounter { self.last_count_for_sent_errors .store(total_errors, Ordering::Relaxed); } - - pub fn update_confirm_transaction_counter(&self) { - self.total_confirmations.fetch_add(1, Ordering::Relaxed); - } } pub fn launch_performance_updating_thread( @@ -373,9 +365,10 @@ pub fn launch_performance_updating_thread( let total_transactions_per_seconds = performance_counter .transactions_per_seconds .load(Ordering::Acquire); + let finalized_per_second = performance_counter.finalized_per_seconds.load(Ordering::Acquire); println!( - "At {} second, Sent {} transactions and confrimed {} transactions", - nb_seconds, total_transactions_per_seconds, confirmations_per_seconds + "At {} second, Sent {} transactions, finalized {} and confirmed {} transactions", + nb_seconds, total_transactions_per_seconds, finalized_per_second, confirmations_per_seconds ); let runtime = start.elapsed(); nb_seconds += 1; diff --git a/src/main.rs b/src/main.rs index 700dfe77..26308460 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ mod context; mod pubsub; mod rpc; -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, thread::sleep}; use clap::Parser; use context::LiteRpcSubsrciptionControl; @@ -69,7 +69,7 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String subscription_port, performance_counter.clone(), ); - let broadcast_thread = { + let _broadcast_thread = { // build broadcasting thread let pubsub_control = pubsub_control.clone(); std::thread::Builder::new() @@ -79,6 +79,7 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String }) .unwrap() }; + let mut io = MetaIoHandler::default(); let lite_rpc = lite_rpc::LightRpc; io.extend_with(lite_rpc.to_delegate()); @@ -89,14 +90,14 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String notification_sender, performance_counter.clone(), ); - let cleaning_thread = { + let _cleaning_thread = { // build cleaning thread let context = request_processor.context.clone(); std::thread::Builder::new() .name("cleaning thread".to_string()) .spawn(move || { context.remove_stale_data(60 * 10); - sleep(std::time::Duration::from_secs(60 * 5)) + sleep(std::time::Duration::from_secs(60)) }) .unwrap() }; diff --git a/src/pubsub.rs b/src/pubsub.rs index 6fc408eb..be779f4f 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -161,7 +161,7 @@ enum HandleError { async fn handle_connection( socket: TcpStream, subscription_control: Arc, - performance_counter: PerformanceCounter, + _performance_counter: PerformanceCounter, ) -> core::result::Result<(), HandleError> { let mut server = Server::new(socket.compat()); let request = server.receive_request().await?; @@ -194,7 +194,6 @@ async fn handle_connection( result = broadcast_receiver.recv() => { if let Ok(x) = result { if rpc_impl.current_subscriptions.contains_key(&x.subscription_id) { - performance_counter.update_confirm_transaction_counter(); sender.send_text(&x.json).await?; } } diff --git a/src/rpc.rs b/src/rpc.rs index 5ed85af4..26eb0c01 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -69,6 +69,7 @@ impl LightRpcRequestProcessor { let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender)); + println!("ws_url {}", websocket_url); // subscribe for confirmed_blocks let (client_confirmed, receiver_confirmed) = Self::subscribe_block(websocket_url, CommitmentLevel::Confirmed).unwrap(); @@ -85,11 +86,13 @@ impl LightRpcRequestProcessor { receiver_confirmed, &context, CommitmentLevel::Confirmed, + performance_counter.clone(), ), Self::build_thread_to_process_blocks( receiver_finalized, &context, CommitmentLevel::Finalized, + performance_counter.clone(), ), Self::build_thread_to_process_transactions( json_rpc_url.to_string(), @@ -148,6 +151,7 @@ impl LightRpcRequestProcessor { reciever: Receiver>, context: &Arc, commitment: CommitmentLevel, + performance_counters: PerformanceCounter, ) -> JoinHandle<()> { let context = context.clone(); Builder::new() @@ -164,6 +168,7 @@ impl LightRpcRequestProcessor { commitment, &context.notification_sender, block_info, + performance_counters, ); }) .unwrap() @@ -238,6 +243,7 @@ impl LightRpcRequestProcessor { commitment: CommitmentLevel, notification_sender: &crossbeam_channel::Sender, block_information: &BlockInformation, + performance_counters: PerformanceCounter, ) { loop { let block_data = reciever.recv(); @@ -301,6 +307,12 @@ impl LightRpcRequestProcessor { ); } + if commitment.eq(&CommitmentLevel::Finalized) { + performance_counters.finalized_per_seconds.fetch_add(1, Ordering::Relaxed); + } else { + performance_counters.confirmations_per_seconds.fetch_add(1, Ordering::Relaxed); + } + x.insert(SignatureStatus { status: Some(commitment), error: transaction_error, @@ -452,7 +464,11 @@ pub mod lite_rpc { let signature = transaction.signatures[0].to_string(); meta.context .signature_status - .insert(signature.clone(), SignatureStatus::new()); + .insert(signature.clone(), SignatureStatus{ + status: None, + error: None, + created: Instant::now(), + }); match meta.tpu_producer_channel.send(transaction) { Ok(_) => Ok(signature), @@ -560,13 +576,10 @@ pub mod lite_rpc { .slot .load(Ordering::Relaxed) }; - meta.performance_counter - .update_confirm_transaction_counter(); match k_value { - Some(value) => match value.clone() { - Some(signature_status) => { - let commitment = signature_status.commitment_level; + Some(value) => match value.status { + Some(commitment) => { let commitment_matches = if commitment.eq(&CommitmentLevel::Finalized) { commitment.eq(&CommitmentLevel::Finalized) } else { @@ -576,7 +589,7 @@ pub mod lite_rpc { Ok(RpcResponse { context: RpcResponseContext::new(slot), value: commitment_matches - && signature_status.transaction_error.is_none(), + && value.error.is_none(), }) } None => Ok(RpcResponse { @@ -620,17 +633,15 @@ pub mod lite_rpc { let singature_status = meta.context.signature_status.get(x); let k_value = singature_status; match k_value { - Some(value) => match value.clone() { - Some(commitment_for_signature) => { + Some(value) => match value.status { + Some(commitment_level) => { let slot = meta .context .confirmed_block_info .slot .load(Ordering::Relaxed); - meta.performance_counter - .update_confirm_transaction_counter(); - let status = match commitment_for_signature.commitment_level { + let status = match commitment_level { CommitmentLevel::Finalized => { TransactionConfirmationStatus::Finalized } @@ -638,9 +649,9 @@ pub mod lite_rpc { }; Some(TransactionStatus { slot, - confirmations: Some(1), + confirmations: None, status: Ok(()), - err: commitment_for_signature.transaction_error, + err: value.error.clone(), confirmation_status: Some(status), }) }