Merge pull request #79 from blockworks-foundation/add_more_metrics_for_prometheus

adding more tokio metrics, other metrics, forcing new instance of TPU…
This commit is contained in:
galactus 2023-03-06 14:42:24 +01:00 committed by GitHub
commit fee6daaf1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 59 deletions

View File

@ -17,7 +17,7 @@ use log::info;
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
use prometheus::{core::GenericGauge, opts, register_counter, register_int_gauge, Counter};
use prometheus::{core::GenericGauge, opts, register_int_counter, register_int_gauge, IntCounter};
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_rpc_client_api::{
config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig},
@ -35,20 +35,20 @@ use tokio::{
};
lazy_static::lazy_static! {
static ref RPC_SEND_TX: Counter =
register_counter!(opts!("literpc_rpc_send_tx", "RPC call send transaction")).unwrap();
static ref RPC_GET_LATEST_BLOCKHASH: Counter =
register_counter!(opts!("literpc_rpc_get_latest_blockhash", "RPC call to get latest block hash")).unwrap();
static ref RPC_IS_BLOCKHASH_VALID: Counter =
register_counter!(opts!("literpc_rpc_is_blockhash_valid", "RPC call to check if blockhash is vali calld")).unwrap();
static ref RPC_GET_SIGNATURE_STATUSES: Counter =
register_counter!(opts!("literpc_rpc_get_signature_statuses", "RPC call to get signature statuses")).unwrap();
static ref RPC_GET_VERSION: Counter =
register_counter!(opts!("literpc_rpc_get_version", "RPC call to version")).unwrap();
static ref RPC_REQUEST_AIRDROP: Counter =
register_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap();
static ref RPC_SIGNATURE_SUBSCRIBE: Counter =
register_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap();
static ref RPC_SEND_TX: IntCounter =
register_int_counter!(opts!("literpc_rpc_send_tx", "RPC call send transaction")).unwrap();
static ref RPC_GET_LATEST_BLOCKHASH: IntCounter =
register_int_counter!(opts!("literpc_rpc_get_latest_blockhash", "RPC call to get latest block hash")).unwrap();
static ref RPC_IS_BLOCKHASH_VALID: IntCounter =
register_int_counter!(opts!("literpc_rpc_is_blockhash_valid", "RPC call to check if blockhash is vali calld")).unwrap();
static ref RPC_GET_SIGNATURE_STATUSES: IntCounter =
register_int_counter!(opts!("literpc_rpc_get_signature_statuses", "RPC call to get signature statuses")).unwrap();
static ref RPC_GET_VERSION: IntCounter =
register_int_counter!(opts!("literpc_rpc_get_version", "RPC call to version")).unwrap();
static ref RPC_REQUEST_AIRDROP: IntCounter =
register_int_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap();
static ref RPC_SIGNATURE_SUBSCRIBE: IntCounter =
register_int_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap();
pub static ref TXS_IN_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_txs_in_channel", "Transactions in channel")).unwrap();
}
@ -142,6 +142,7 @@ impl LiteBridge {
self.tx_sender.clone(),
self.block_listner.clone(),
self.block_store.clone(),
self.tpu_manager.clone(),
)
.start(clean_interval);

View File

@ -7,6 +7,7 @@ use std::{
};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_quic_client::{QuicConfig, QuicPool};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Keypair;
@ -22,6 +23,11 @@ pub type QuicConnectionCache = TpuConnectionCache<QuicPool>;
const TPU_CONNECTION_CACHE_SIZE: usize = 8;
lazy_static::lazy_static! {
static ref TPU_CONNECTION_RESET: IntCounter =
register_int_counter!(opts!("literpc_tpu_connection_reset", "Number of times tpu connection was reseted")).unwrap();
}
#[derive(Clone)]
pub struct TpuManager {
error_count: Arc<AtomicU32>,
@ -30,7 +36,7 @@ pub struct TpuManager {
tpu_client: Arc<RwLock<Arc<QuicTpuClient>>>,
pub ws_addr: String,
fanout_slots: u64,
connection_cache: Arc<QuicConnectionCache>,
identity: Arc<Keypair>,
}
impl TpuManager {
@ -48,13 +54,9 @@ impl TpuManager {
let connection_cache =
QuicConnectionCache::new_with_config(TPU_CONNECTION_CACHE_SIZE, tpu_config);
let connection_cache = Arc::new(connection_cache);
let tpu_client = Self::new_tpu_client(
rpc_client.clone(),
&ws_addr,
fanout_slots,
connection_cache.clone(),
)
.await?;
let tpu_client =
Self::new_tpu_client(rpc_client.clone(), &ws_addr, fanout_slots, connection_cache)
.await?;
let tpu_client = Arc::new(RwLock::new(Arc::new(tpu_client)));
Ok(Self {
@ -63,7 +65,7 @@ impl TpuManager {
ws_addr,
fanout_slots,
error_count: Default::default(),
connection_cache,
identity: Arc::new(identity),
})
}
@ -82,26 +84,40 @@ impl TpuManager {
.await?)
}
pub async fn reset_tpu_client(&self) -> anyhow::Result<()> {
let mut tpu_config = QuicConfig::new().unwrap();
tpu_config
.update_client_certificate(&self.identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.unwrap();
let connection_cache =
QuicConnectionCache::new_with_config(TPU_CONNECTION_CACHE_SIZE, tpu_config);
let connection_cache = Arc::new(connection_cache);
let tpu_client = Self::new_tpu_client(
self.rpc_client.clone(),
&self.ws_addr,
self.fanout_slots,
connection_cache,
)
.await?;
self.error_count.store(0, Ordering::Relaxed);
*self.tpu_client.write().await = Arc::new(tpu_client);
TPU_CONNECTION_RESET.inc();
Ok(())
}
pub async fn reset(&self) -> anyhow::Result<()> {
self.error_count.fetch_add(1, Ordering::Relaxed);
if self.error_count.load(Ordering::Relaxed) > 5 {
let tpu_client = Self::new_tpu_client(
self.rpc_client.clone(),
&self.ws_addr,
self.fanout_slots,
self.connection_cache.clone(),
)
.await?;
self.error_count.store(0, Ordering::Relaxed);
*self.tpu_client.write().await = Arc::new(tpu_client);
self.reset_tpu_client().await?;
info!("TPU Reset after 5 errors");
}
Ok(())
}
pub async fn get_tpu_client(&self) -> Arc<QuicTpuClient> {
async fn get_tpu_client(&self) -> Arc<QuicTpuClient> {
self.tpu_client.read().await.clone()
}

View File

@ -8,8 +8,8 @@ use dashmap::DashMap;
use jsonrpsee::SubscriptionSink;
use log::{info, warn};
use prometheus::{
core::GenericGauge, histogram_opts, opts, register_counter, register_histogram,
register_int_gauge, Counter, Histogram,
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
register_int_gauge, Histogram, IntCounter,
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
@ -52,18 +52,18 @@ lazy_static::lazy_static! {
"Time to receive finalized block from block subscribe",
))
.unwrap();
static ref FIN_BLOCKS_RECV: Counter =
register_counter!(opts!("literpc_fin_blocks_recv", "Number of Finalized Blocks Received")).unwrap();
static ref CON_BLOCKS_RECV: Counter =
register_counter!(opts!("literpc_con_blocks_recv", "Number of Confirmed Blocks Received")).unwrap();
static ref INCOMPLETE_FIN_BLOCKS_RECV: Counter =
register_counter!(opts!("literpc_incomplete_fin_blocks_recv", "Number of Incomplete Finalized Blocks Received")).unwrap();
static ref INCOMPLETE_CON_BLOCKS_RECV: Counter =
register_counter!(opts!("literpc_incomplete_con_blocks_recv", "Number of Incomplete Confirmed Blocks Received")).unwrap();
static ref TXS_CONFIRMED: Counter =
register_counter!(opts!("literpc_txs_confirmed", "Number of Transactions Confirmed")).unwrap();
static ref TXS_FINALIZED: Counter =
register_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap();
static ref FIN_BLOCKS_RECV: IntCounter =
register_int_counter!(opts!("literpc_fin_blocks_recv", "Number of Finalized Blocks Received")).unwrap();
static ref CON_BLOCKS_RECV: IntCounter =
register_int_counter!(opts!("literpc_con_blocks_recv", "Number of Confirmed Blocks Received")).unwrap();
static ref INCOMPLETE_FIN_BLOCKS_RECV: IntCounter =
register_int_counter!(opts!("literpc_incomplete_fin_blocks_recv", "Number of Incomplete Finalized Blocks Received")).unwrap();
static ref INCOMPLETE_CON_BLOCKS_RECV: IntCounter =
register_int_counter!(opts!("literpc_incomplete_con_blocks_recv", "Number of Incomplete Confirmed Blocks Received")).unwrap();
static ref TXS_CONFIRMED: IntCounter =
register_int_counter!(opts!("literpc_txs_confirmed", "Number of Transactions Confirmed")).unwrap();
static ref TXS_FINALIZED: IntCounter =
register_int_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap();
static ref BLOCKS_IN_QUEUE: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_blocks_in_queue", "Number of blocks waiting to deque")).unwrap();
static ref BLOCKS_IN_RETRY_QUEUE: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_blocks_in_retry_queue", "Number of blocks waiting in retry")).unwrap();
static ref NUMBER_OF_SIGNATURE_SUBSCRIBERS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_number_of_signature_sub", "Number of signature subscriber")).unwrap();

View File

@ -1,9 +1,9 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use log::info;
use tokio::task::JoinHandle;
use crate::block_store::BlockStore;
use crate::{block_store::BlockStore, tpu_manager::TpuManager};
use super::{BlockListener, TxSender};
@ -13,6 +13,7 @@ pub struct Cleaner {
tx_sender: TxSender,
block_listenser: BlockListener,
block_store: BlockStore,
tpu_manager: Arc<TpuManager>,
}
impl Cleaner {
@ -20,11 +21,13 @@ impl Cleaner {
tx_sender: TxSender,
block_listenser: BlockListener,
block_store: BlockStore,
tpu_manager: Arc<TpuManager>,
) -> Self {
Self {
tx_sender,
block_listenser,
block_store: block_store,
block_store,
tpu_manager,
}
}
@ -60,6 +63,7 @@ impl Cleaner {
self.clean_tx_sender(ttl_duration);
self.clean_block_listeners(ttl_duration);
self.clean_block_store(ttl_duration).await;
let _ = self.tpu_manager.reset_tpu_client().await;
}
})
}

View File

@ -15,6 +15,7 @@ lazy_static::lazy_static! {
static ref TOKIO_INJQUEUEDEPTH: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tokio_injection_queue_depth", "Tokio tasks in injection queue in lite rpc")).unwrap();
static ref TOKIO_NB_BLOCKING_THREADS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tokio_blocking_threads", "Tokio blocking threads in lite rpc")).unwrap();
static ref TOKIO_NB_IDLE_THREADS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tokio_idle_threads", "Tokio idle threads in lite rpc")).unwrap();
static ref TOKIO_REMOTE_SCHEDULED_COUNT: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tokio_remote_scheduled", "Tokio remote scheduled tasks")).unwrap();
static ref STD_THREADS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_threads", "Nb of threads used by literpc")).unwrap();
}
@ -88,13 +89,14 @@ impl MetricsCapture {
metrics.txs_confirmed = txs_confirmed;
metrics.txs_finalized = txs_finalized;
TXS_IN_STORE.set(txs_sent as i64);
let metrics = tokio::runtime::Handle::current().metrics();
TOKIO_TASKS.set(metrics.num_workers() as i64);
TOKIO_QUEUEDEPTH.set(metrics.blocking_queue_depth() as i64);
TOKIO_NB_BLOCKING_THREADS.set(metrics.num_blocking_threads() as i64);
TOKIO_NB_IDLE_THREADS.set(metrics.num_idle_blocking_threads() as i64);
TOKIO_INJQUEUEDEPTH.set( metrics.injection_queue_depth() as i64 );
TOKIO_INJQUEUEDEPTH.set(metrics.injection_queue_depth() as i64);
TOKIO_REMOTE_SCHEDULED_COUNT.set(metrics.remote_schedule_count() as i64);
}
})
}

View File

@ -7,7 +7,7 @@ use anyhow::bail;
use dashmap::DashMap;
use log::{info, warn};
use prometheus::{register_counter, Counter};
use prometheus::{core::GenericGauge, opts, register_int_counter, register_int_gauge, IntCounter};
use solana_transaction_status::TransactionStatus;
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle};
@ -20,8 +20,11 @@ use crate::{
use super::PostgresMpscSend;
lazy_static::lazy_static! {
static ref TXS_SENT: Counter =
register_counter!("literpc_txs_sent", "Number of transactions forwarded to tpu").unwrap();
static ref TXS_SENT: IntCounter =
register_int_counter!("literpc_txs_sent", "Number of transactions forwarded to tpu").unwrap();
static ref TXS_SENT_ERRORS: IntCounter =
register_int_counter!("literpc_txs_sent_errors", "Number of errors while transactions forwarded to tpu").unwrap();
static ref TX_BATCH_SIZES: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tx_batch_size", "batchsize of tx sent by literpc")).unwrap();
}
pub type WireTransaction = Vec<u8>;
@ -81,18 +84,19 @@ impl TxSender {
txs_sent.insert(sig.to_owned(), TxProps::default());
}
// metrics
TXS_SENT.inc_by(sigs_and_slots.len() as f64);
TXS_SENT.inc_by(sigs_and_slots.len() as u64);
1
}
Err(err) => {
TXS_SENT_ERRORS.inc_by(sigs_and_slots.len() as u64);
warn!("{err}");
0
}
};
if let Some(postgres) = postgres {
let forwarded_slot = tpu_client.get_tpu_client().await.estimated_current_slot();
let forwarded_slot = tpu_client.estimated_current_slot().await;
for (sig, recent_slot) in sigs_and_slots {
MESSAGES_IN_POSTGRES_CHANNEL.inc();
@ -161,7 +165,7 @@ impl TxSender {
}
}
}
TX_BATCH_SIZES.set(txs.len() as i64);
batch_send.send((sigs_and_slots, txs)).await?;
}
})