Adding performance counters

This commit is contained in:
Godmode Galactus 2022-12-03 22:07:23 +01:00
parent 5d05abebde
commit b7df23d470
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
4 changed files with 182 additions and 31 deletions

View File

@ -13,8 +13,12 @@ use solana_sdk::{
signature::Signature,
};
use std::{
sync::{atomic::AtomicU64, Arc, RwLock},
time::Instant,
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
use tokio::sync::broadcast;
@ -251,3 +255,93 @@ impl LiteRpcSubsrciptionControl {
}
}
}
#[derive(Clone)]
pub struct PerformanceCounter {
pub total_confirmations: Arc<AtomicU64>,
pub total_transactions_sent: Arc<AtomicU64>,
pub confirmations_per_seconds: Arc<AtomicU64>,
pub transactions_per_seconds: Arc<AtomicU64>,
last_count_for_confirmations: Arc<AtomicU64>,
last_count_for_transactions_sent: Arc<AtomicU64>,
}
unsafe impl Send for PerformanceCounter {}
unsafe impl Sync for PerformanceCounter {}
impl PerformanceCounter {
pub fn new() -> Self {
Self {
total_confirmations: Arc::new(AtomicU64::new(0)),
total_transactions_sent: Arc::new(AtomicU64::new(0)),
confirmations_per_seconds: Arc::new(AtomicU64::new(0)),
transactions_per_seconds: Arc::new(AtomicU64::new(0)),
last_count_for_confirmations: Arc::new(AtomicU64::new(0)),
last_count_for_transactions_sent: Arc::new(AtomicU64::new(0)),
}
}
pub fn update_per_seconds_transactions(&self) {
let total_confirmations: u64 = self.total_confirmations.load(Ordering::Relaxed);
let total_transactions: u64 = self.total_transactions_sent.load(Ordering::Relaxed);
self.confirmations_per_seconds.store(
total_confirmations - self.last_count_for_confirmations.load(Ordering::Relaxed),
Ordering::Release,
);
self.transactions_per_seconds.store(
total_transactions
- self
.last_count_for_transactions_sent
.load(Ordering::Relaxed),
Ordering::Release,
);
self.last_count_for_confirmations
.store(total_confirmations, Ordering::Relaxed);
self.last_count_for_transactions_sent
.store(total_transactions, Ordering::Relaxed);
}
pub fn update_sent_transactions_counter(&self) {
self.total_transactions_sent.fetch_add(1, Ordering::Relaxed);
}
pub fn update_confirm_transaction_counter(&self) {
self.total_confirmations.fetch_add(1, Ordering::Relaxed);
}
}
pub fn launch_performance_updating_thread(
performance_counter: PerformanceCounter,
) -> JoinHandle<()> {
Builder::new()
.name("Performance Counter".to_string())
.spawn(move || loop {
let start = Instant::now();
let wait_time = Duration::from_millis(1000);
let performance_counter = performance_counter.clone();
performance_counter.update_per_seconds_transactions();
let confirmations_per_seconds = performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
let total_transactions_per_seconds = performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let runtime = start.elapsed();
if let Some(remaining) = wait_time.checked_sub(runtime) {
println!(
"Sent {} transactions and confrimed {} transactions",
total_transactions_per_seconds, confirmations_per_seconds
);
thread::sleep(remaining);
}
})
.unwrap()
}

View File

@ -8,9 +8,12 @@ use pubsub::LitePubSubService;
use solana_perf::thread::renice_this_thread;
use tokio::sync::broadcast;
use crate::rpc::{
lite_rpc::{self, Lite},
LightRpcRequestProcessor,
use crate::{
context::{launch_performance_updating_thread, PerformanceCounter},
rpc::{
lite_rpc::{self, Lite},
LightRpcRequestProcessor,
},
};
mod cli;
mod context;
@ -34,6 +37,9 @@ pub fn main() {
..
} = &cli_config;
let performance_counter = PerformanceCounter::new();
launch_performance_updating_thread(performance_counter.clone());
let (broadcast_sender, _broadcast_receiver) = broadcast::channel(128);
let (notification_sender, notification_reciever) = crossbeam_channel::unbounded();
@ -43,8 +49,11 @@ pub fn main() {
));
// start websocket server
let (_trigger, websocket_service) =
LitePubSubService::new(pubsub_control.clone(), *subscription_port);
let (_trigger, websocket_service) = LitePubSubService::new(
pubsub_control.clone(),
*subscription_port,
performance_counter.clone(),
);
// start recieving notifications and broadcast them
{
@ -60,8 +69,12 @@ pub fn main() {
let lite_rpc = lite_rpc::LightRpc;
io.extend_with(lite_rpc.to_delegate());
let mut request_processor =
LightRpcRequestProcessor::new(json_rpc_url, websocket_url, notification_sender);
let mut request_processor = LightRpcRequestProcessor::new(
json_rpc_url,
websocket_url,
notification_sender,
performance_counter.clone(),
);
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()

View File

@ -7,7 +7,7 @@ use stream_cancel::{Trigger, Tripwire};
use tokio::{net::TcpStream, pin, select};
use tokio_util::compat::TokioAsyncReadCompatExt;
use crate::context::LiteRpcSubsrciptionControl;
use crate::context::{LiteRpcSubsrciptionControl, PerformanceCounter};
use {
jsonrpc_core::{Error, Result},
jsonrpc_derive::rpc,
@ -145,6 +145,7 @@ enum HandleError {
async fn handle_connection(
socket: TcpStream,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
performance_counter: PerformanceCounter,
) -> core::result::Result<(), HandleError> {
let mut server = Server::new(socket.compat());
let request = server.receive_request().await?;
@ -177,6 +178,7 @@ async fn handle_connection(
result = broadcast_receiver.recv() => {
if let Ok(x) = result {
if rpc_impl.current_subscriptions.contains_key(&x.subscription_id) {
performance_counter.update_confirm_transaction_counter();
sender.send_text(&x.json).await?;
}
}
@ -195,6 +197,7 @@ async fn listen(
listen_address: SocketAddr,
subscription_control: Arc<LiteRpcSubsrciptionControl>,
mut tripwire: Tripwire,
performance_counter: PerformanceCounter,
) -> std::io::Result<()> {
let listener = tokio::net::TcpListener::bind(&listen_address).await?;
loop {
@ -202,9 +205,10 @@ async fn listen(
result = listener.accept() => match result {
Ok((socket, addr)) => {
let subscription_control = subscription_control.clone();
let performance_counter = performance_counter.clone();
tokio::spawn(async move {
let handle = handle_connection(
socket, subscription_control
socket, subscription_control, performance_counter,
);
match handle.await {
Ok(()) => println!("connection closed ({:?})", addr),
@ -223,6 +227,7 @@ impl LitePubSubService {
pub fn new(
subscription_control: Arc<LiteRpcSubsrciptionControl>,
pubsub_addr: SocketAddr,
performance_counter: PerformanceCounter,
) -> (Trigger, Self) {
let (trigger, tripwire) = Tripwire::new();
@ -234,9 +239,12 @@ impl LitePubSubService {
.enable_all()
.build()
.expect("runtime creation failed");
if let Err(err) =
runtime.block_on(listen(pubsub_addr, subscription_control, tripwire))
{
if let Err(err) = runtime.block_on(listen(
pubsub_addr,
subscription_control,
tripwire,
performance_counter,
)) {
println!("pubsub service failed: {}", err);
};
})

View File

@ -1,6 +1,7 @@
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use solana_client::{
pubsub_client::{BlockSubscription, PubsubClientError, SignatureSubscription},
pubsub_client::{BlockSubscription, PubsubClientError},
tpu_client::TpuClientConfig,
};
use solana_pubsub_client::pubsub_client::{PubsubBlockClientSubscription, PubsubClient};
@ -11,7 +12,8 @@ use std::{
};
use crate::context::{
BlockInformation, LiteRpcContext, NotificationType, SignatureNotification, SlotNotification,
BlockInformation, LiteRpcContext, NotificationType, PerformanceCounter, SignatureNotification,
SlotNotification,
};
use crossbeam_channel::Sender;
use {
@ -48,6 +50,7 @@ pub struct LightRpcRequestProcessor {
_connection_cache: Arc<ConnectionCache>,
joinables: Arc<Mutex<Vec<JoinHandle<()>>>>,
subscribed_clients: Arc<Mutex<Vec<PubsubBlockClientSubscription>>>,
performance_counter: PerformanceCounter,
}
impl LightRpcRequestProcessor {
@ -55,6 +58,7 @@ impl LightRpcRequestProcessor {
json_rpc_url: &str,
websocket_url: &str,
notification_sender: Sender<NotificationType>,
performance_counter: PerformanceCounter,
) -> LightRpcRequestProcessor {
let rpc_client = Arc::new(RpcClient::new(json_rpc_url));
let connection_cache = Arc::new(ConnectionCache::default());
@ -101,6 +105,7 @@ impl LightRpcRequestProcessor {
_connection_cache: connection_cache,
joinables: Arc::new(Mutex::new(joinables)),
subscribed_clients: Arc::new(Mutex::new(vec![client_confirmed, client_finalized])),
performance_counter,
}
}
@ -123,21 +128,6 @@ impl LightRpcRequestProcessor {
)
}
fn subscribe_signature(
websocket_url: &String,
signature: &Signature,
commitment: CommitmentLevel,
) -> std::result::Result<SignatureSubscription, PubsubClientError> {
PubsubClient::signature_subscribe(
websocket_url.as_str(),
signature,
Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig { commitment }),
enable_received_notification: Some(false),
}),
)
}
fn build_thread_to_process_blocks(
reciever: Receiver<RpcResponse<RpcBlockUpdate>>,
context: &Arc<LiteRpcContext>,
@ -268,6 +258,15 @@ impl LightRpcRequestProcessor {
impl Metadata for LightRpcRequestProcessor {}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcPerformanceCounterResults {
pub transactions_per_seconds: u64,
pub confirmations_per_seconds: u64,
pub total_transactions_count: u64,
pub total_confirmations_count: u64,
}
pub mod lite_rpc {
use std::str::FromStr;
@ -309,6 +308,12 @@ pub mod lite_rpc {
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
#[rpc(meta, name = "getPerformanceCounters")]
fn get_performance_counters(
&self,
meta: Self::Metadata,
) -> Result<RpcPerformanceCounterResults>;
}
pub struct LightRpc;
impl Lite for LightRpc {
@ -337,6 +342,7 @@ pub mod lite_rpc {
.insert(transaction.signatures[0].to_string(), None);
println!("added {} to map", transaction.signatures[0]);
meta.tpu_client.send_wire_transaction(wire_transaction);
meta.performance_counter.update_sent_transactions_counter();
Ok(transaction.signatures[0].to_string())
}
@ -404,6 +410,8 @@ pub mod lite_rpc {
.slot
.load(Ordering::Relaxed)
};
meta.performance_counter
.update_confirm_transaction_counter();
match k_value {
Some(value) => match *value {
@ -459,6 +467,34 @@ pub mod lite_rpc {
};
Ok(signature.unwrap().to_string())
}
fn get_performance_counters(
&self,
meta: Self::Metadata,
) -> Result<RpcPerformanceCounterResults> {
let total_transactions_count = meta
.performance_counter
.total_transactions_sent
.load(Ordering::Relaxed);
let total_confirmations_count = meta
.performance_counter
.total_confirmations
.load(Ordering::Relaxed);
let transactions_per_seconds = meta
.performance_counter
.transactions_per_seconds
.load(Ordering::Acquire);
let confirmations_per_seconds = meta
.performance_counter
.confirmations_per_seconds
.load(Ordering::Acquire);
Ok(RpcPerformanceCounterResults {
confirmations_per_seconds,
transactions_per_seconds,
total_confirmations_count,
total_transactions_count,
})
}
}
}