performance counter will count confirmations and finalized while processing the blocks

This commit is contained in:
godmodegalactus 2023-01-04 19:02:41 +01:00
parent eab035b07d
commit 1e631c6199
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
5 changed files with 54 additions and 50 deletions

View File

@ -12,13 +12,13 @@ use solana_cli_config::ConfigInput;
"
)]
pub struct Args {
#[arg(short, long, default_value_t = String::from("8899"))]
pub port: String,
#[arg(short, long, default_value_t = String::from("8900"))]
pub subscription_port: String,
#[arg(short, long, default_value_t = 9000)]
pub port: u16,
#[arg(short, long, default_value_t = 9001)]
pub subscription_port: u16,
#[arg(short, long, default_value_t = String::from("http://localhost:8899"))]
pub rpc_url: String,
#[arg(short, long, default_value_t = String::from("http://localhost:8900"))]
#[arg(short, long, default_value_t = String::from("ws://localhost:8900"))]
pub websocket_url: String,
}

View File

@ -1,6 +1,7 @@
use chrono::DateTime;
use crossbeam_channel::Sender;
use dashmap::DashMap;
use libc::file_clone_range;
use serde::Serialize;
use solana_client::{
rpc_client::RpcClient,
@ -52,28 +53,10 @@ impl BlockInformation {
pub struct SignatureStatus {
pub status: Option<CommitmentLevel>,
pub error: Option<String>,
pub error: Option<TransactionError>,
pub created: Instant,
}
impl SignatureStatus {
pub fn new() -> Self {
Self {
status: None,
error: None,
created: Instant::now(),
}
}
pub fn new_from_commitment(commitment: CommitmentLevel) -> Self {
Self {
status: Some(commitment),
error: None,
created: Instant::now(),
}
}
}
pub struct LiteRpcContext {
pub signature_status: DashMap<String, SignatureStatus>,
pub finalized_block_info: BlockInformation,
@ -290,14 +273,17 @@ impl LiteRpcSubsrciptionControl {
#[derive(Clone)]
pub struct PerformanceCounter {
pub total_finalized: Arc<AtomicU64>,
pub total_confirmations: Arc<AtomicU64>,
pub total_transactions_sent: Arc<AtomicU64>,
pub transaction_sent_error: Arc<AtomicU64>,
pub finalized_per_seconds: Arc<AtomicU64>,
pub confirmations_per_seconds: Arc<AtomicU64>,
pub transactions_per_seconds: Arc<AtomicU64>,
pub send_transactions_errors_per_seconds: Arc<AtomicU64>,
last_count_for_finalized: Arc<AtomicU64>,
last_count_for_confirmations: Arc<AtomicU64>,
last_count_for_transactions_sent: Arc<AtomicU64>,
last_count_for_sent_errors: Arc<AtomicU64>,
@ -306,10 +292,13 @@ pub struct PerformanceCounter {
impl PerformanceCounter {
pub fn new() -> Self {
Self {
total_finalized: Arc::new(AtomicU64::new(0)),
total_confirmations: Arc::new(AtomicU64::new(0)),
total_transactions_sent: Arc::new(AtomicU64::new(0)),
confirmations_per_seconds: Arc::new(AtomicU64::new(0)),
transactions_per_seconds: Arc::new(AtomicU64::new(0)),
finalized_per_seconds: Arc::new(AtomicU64::new(0)),
last_count_for_finalized: Arc::new(AtomicU64::new(0)),
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
transaction_sent_error: Arc::new(AtomicU64::new(0)),
@ -319,12 +308,17 @@ impl PerformanceCounter {
}
pub fn update_per_seconds_transactions(&self) {
let total_finalized: u64 = self.total_finalized.load(Ordering::Relaxed);
let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed);
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
let total_errors: u64 = self.transaction_sent_error.load(Ordering::Relaxed);
self.finalized_per_seconds.store(
total_finalized - self.last_count_for_finalized.load(Ordering::Relaxed),
Ordering::Release,
);
self.confirmations_per_seconds.store(
total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed),
Ordering::Release,
@ -341,6 +335,8 @@ impl PerformanceCounter {
Ordering::Release,
);
self.last_count_for_finalized
.store(total_finalized, Ordering::Relaxed);
self.last_count_for_confirmations
.store(total_confirmations, Ordering::Relaxed);
self.last_count_for_transactions_sent
@ -348,10 +344,6 @@ impl PerformanceCounter {
self.last_count_for_sent_errors
.store(total_errors, Ordering::Relaxed);
}
pub fn update_confirm_transaction_counter(&self) {
self.total_confirmations.fetch_add(1, Ordering::Relaxed);
}
}
pub fn launch_performance_updating_thread(
@ -373,9 +365,10 @@ pub fn launch_performance_updating_thread(
let total_transactions_per_seconds = performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let finalized_per_second = performance_counter.finalized_per_seconds.load(Ordering::Acquire);
println!(
"At {} second, Sent {} transactions and confrimed {} transactions",
nb_seconds, total_transactions_per_seconds, confirmations_per_seconds
"At {} second, Sent {} transactions, finalized {} and confirmed {} transactions",
nb_seconds, total_transactions_per_seconds, finalized_per_second, confirmations_per_seconds
);
let runtime = start.elapsed();
nb_seconds += 1;

View File

@ -3,7 +3,7 @@ mod context;
mod pubsub;
mod rpc;
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, thread::sleep};
use clap::Parser;
use context::LiteRpcSubsrciptionControl;
@ -69,7 +69,7 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String
subscription_port,
performance_counter.clone(),
);
let broadcast_thread = {
let _broadcast_thread = {
// build broadcasting thread
let pubsub_control = pubsub_control.clone();
std::thread::Builder::new()
@ -79,6 +79,7 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String
})
.unwrap()
};
let mut io = MetaIoHandler::default();
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
@ -89,14 +90,14 @@ fn run(port: u16, subscription_port: u16, rpc_url: String, websocket_url: String
notification_sender,
performance_counter.clone(),
);
let cleaning_thread = {
let _cleaning_thread = {
// build cleaning thread
let context = request_processor.context.clone();
std::thread::Builder::new()
.name("cleaning thread".to_string())
.spawn(move || {
context.remove_stale_data(60 * 10);
sleep(std::time::Duration::from_secs(60 * 5))
sleep(std::time::Duration::from_secs(60))
})
.unwrap()
};

View File

@ -161,7 +161,7 @@ enum HandleError {
async fn handle_connection(
socket: TcpStream,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
performance_counter: PerformanceCounter,
_performance_counter: PerformanceCounter,
) -> core::result::Result<(), HandleError> {
let mut server = Server::new(socket.compat());
let request = server.receive_request().await?;
@ -194,7 +194,6 @@ async fn handle_connection(
result = broadcast_receiver.recv() => {
if let Ok(x) = result {
if rpc_impl.current_subscriptions.contains_key(&x.subscription_id) {
performance_counter.update_confirm_transaction_counter();
sender.send_text(&x.json).await?;
}
}

View File

@ -69,6 +69,7 @@ impl LightRpcRequestProcessor {
let context = Arc::new(LiteRpcContext::new(rpc_client.clone(), notification_sender));
println!("ws_url {}", websocket_url);
// subscribe for confirmed_blocks
let (client_confirmed, receiver_confirmed) =
Self::subscribe_block(websocket_url, CommitmentLevel::Confirmed).unwrap();
@ -85,11 +86,13 @@ impl LightRpcRequestProcessor {
receiver_confirmed,
&context,
CommitmentLevel::Confirmed,
performance_counter.clone(),
),
Self::build_thread_to_process_blocks(
receiver_finalized,
&context,
CommitmentLevel::Finalized,
performance_counter.clone(),
),
Self::build_thread_to_process_transactions(
json_rpc_url.to_string(),
@ -148,6 +151,7 @@ impl LightRpcRequestProcessor {
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
context: &Arc<LiteRpcContext>,
commitment: CommitmentLevel,
performance_counters: PerformanceCounter,
) -> JoinHandle<()> {
let context = context.clone();
Builder::new()
@ -164,6 +168,7 @@ impl LightRpcRequestProcessor {
commitment,
&context.notification_sender,
block_info,
performance_counters,
);
})
.unwrap()
@ -238,6 +243,7 @@ impl LightRpcRequestProcessor {
commitment: CommitmentLevel,
notification_sender: &crossbeam_channel::Sender<NotificationType>,
block_information: &BlockInformation,
performance_counters: PerformanceCounter,
) {
loop {
let block_data = reciever.recv();
@ -301,6 +307,12 @@ impl LightRpcRequestProcessor {
);
}
if commitment.eq(&CommitmentLevel::Finalized) {
performance_counters.finalized_per_seconds.fetch_add(1, Ordering::Relaxed);
} else {
performance_counters.confirmations_per_seconds.fetch_add(1, Ordering::Relaxed);
}
x.insert(SignatureStatus {
status: Some(commitment),
error: transaction_error,
@ -452,7 +464,11 @@ pub mod lite_rpc {
let signature = transaction.signatures[0].to_string();
meta.context
.signature_status
.insert(signature.clone(), SignatureStatus::new());
.insert(signature.clone(), SignatureStatus{
status: None,
error: None,
created: Instant::now(),
});
match meta.tpu_producer_channel.send(transaction) {
Ok(_) => Ok(signature),
@ -560,13 +576,10 @@ pub mod lite_rpc {
.slot
.load(Ordering::Relaxed)
};
meta.performance_counter
.update_confirm_transaction_counter();
match k_value {
Some(value) => match value.clone() {
Some(signature_status) => {
let commitment = signature_status.commitment_level;
Some(value) => match value.status {
Some(commitment) => {
let commitment_matches = if commitment.eq(&CommitmentLevel::Finalized) {
commitment.eq(&CommitmentLevel::Finalized)
} else {
@ -576,7 +589,7 @@ pub mod lite_rpc {
Ok(RpcResponse {
context: RpcResponseContext::new(slot),
value: commitment_matches
&& signature_status.transaction_error.is_none(),
&& value.error.is_none(),
})
}
None => Ok(RpcResponse {
@ -620,17 +633,15 @@ pub mod lite_rpc {
let singature_status = meta.context.signature_status.get(x);
let k_value = singature_status;
match k_value {
Some(value) => match value.clone() {
Some(commitment_for_signature) => {
Some(value) => match value.status {
Some(commitment_level) => {
let slot = meta
.context
.confirmed_block_info
.slot
.load(Ordering::Relaxed);
meta.performance_counter
.update_confirm_transaction_counter();
let status = match commitment_for_signature.commitment_level {
let status = match commitment_level {
CommitmentLevel::Finalized => {
TransactionConfirmationStatus::Finalized
}
@ -638,9 +649,9 @@ pub mod lite_rpc {
};
Some(TransactionStatus {
slot,
confirmations: Some(1),
confirmations: None,
status: Ok(()),
err: commitment_for_signature.transaction_error,
err: value.error.clone(),
confirmation_status: Some(status),
})
}