Revert "disable prometheus to check performance impact of prometheus on fly"

This reverts commit 2945153bd2.
This commit is contained in:
Godmode Galactus 2023-09-09 17:43:51 +02:00
parent 9b4778373b
commit 2f4cb1f839
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
8 changed files with 196 additions and 27 deletions

View File

@ -4,11 +4,14 @@ use crate::{
rpc::LiteRpcServer,
};
use solana_lite_rpc_services::transaction_service::TransactionService;
use solana_lite_rpc_services::{
transaction_service::TransactionService, tx_sender::TXS_IN_CHANNEL,
};
use anyhow::Context;
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use log::info;
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_core::{
block_information_store::BlockInformation, data_cache::DataCache, AnyhowJoinHandle,
};
@ -24,6 +27,23 @@ use solana_transaction_status::TransactionStatus;
use std::{str::FromStr, sync::Arc};
use tokio::net::ToSocketAddrs;
lazy_static::lazy_static! {
static ref RPC_SEND_TX: IntCounter =
register_int_counter!(opts!("literpc_rpc_send_tx", "RPC call send transaction")).unwrap();
static ref RPC_GET_LATEST_BLOCKHASH: IntCounter =
register_int_counter!(opts!("literpc_rpc_get_latest_blockhash", "RPC call to get latest block hash")).unwrap();
static ref RPC_IS_BLOCKHASH_VALID: IntCounter =
register_int_counter!(opts!("literpc_rpc_is_blockhash_valid", "RPC call to check if blockhash is vali calld")).unwrap();
static ref RPC_GET_SIGNATURE_STATUSES: IntCounter =
register_int_counter!(opts!("literpc_rpc_get_signature_statuses", "RPC call to get signature statuses")).unwrap();
static ref RPC_GET_VERSION: IntCounter =
register_int_counter!(opts!("literpc_rpc_get_version", "RPC call to version")).unwrap();
static ref RPC_REQUEST_AIRDROP: IntCounter =
register_int_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap();
static ref RPC_SIGNATURE_SUBSCRIBE: IntCounter =
register_int_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap();
}
/// A bridge between clients and tpu
pub struct LiteBridge {
data_cache: DataCache,
@ -95,6 +115,8 @@ impl LiteRpcServer for LiteBridge {
tx: String,
send_transaction_config: Option<SendTransactionConfig>,
) -> crate::rpc::Result<String> {
RPC_SEND_TX.inc();
let SendTransactionConfig {
encoding,
max_retries,
@ -112,7 +134,11 @@ impl LiteRpcServer for LiteBridge {
.send_transaction(raw_tx, max_retries)
.await
{
Ok(sig) => Ok(sig),
Ok(sig) => {
TXS_IN_CHANNEL.inc();
Ok(sig)
}
Err(e) => Err(jsonrpsee::core::Error::Custom(e.to_string())),
}
}
@ -121,6 +147,8 @@ impl LiteRpcServer for LiteBridge {
&self,
config: Option<RpcContextConfig>,
) -> crate::rpc::Result<RpcResponse<RpcBlockhash>> {
RPC_GET_LATEST_BLOCKHASH.inc();
let commitment_config = config
.map(|config| config.commitment.unwrap_or_default())
.unwrap_or_default();
@ -155,6 +183,8 @@ impl LiteRpcServer for LiteBridge {
blockhash: String,
config: Option<IsBlockHashValidConfig>,
) -> crate::rpc::Result<RpcResponse<bool>> {
RPC_IS_BLOCKHASH_VALID.inc();
let commitment = config.unwrap_or_default().commitment.unwrap_or_default();
let commitment = CommitmentConfig { commitment };
@ -198,6 +228,8 @@ impl LiteRpcServer for LiteBridge {
sigs: Vec<String>,
_config: Option<RpcSignatureStatusConfig>,
) -> crate::rpc::Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
RPC_GET_SIGNATURE_STATUSES.inc();
let sig_statuses = sigs
.iter()
.map(|sig| self.data_cache.txs.get(sig).and_then(|v| v.status))
@ -218,6 +250,8 @@ impl LiteRpcServer for LiteBridge {
}
fn get_version(&self) -> crate::rpc::Result<RpcVersionInfo> {
RPC_GET_VERSION.inc();
let version = solana_version::Version::default();
Ok(RpcVersionInfo {
solana_core: version.to_string(),
@ -231,6 +265,8 @@ impl LiteRpcServer for LiteBridge {
lamports: u64,
config: Option<RpcRequestAirdropConfig>,
) -> crate::rpc::Result<String> {
RPC_REQUEST_AIRDROP.inc();
let pubkey = match Pubkey::from_str(&pubkey_str) {
Ok(pubkey) => pubkey,
Err(err) => {
@ -285,6 +321,7 @@ impl LiteRpcServer for LiteBridge {
signature: String,
commitment_config: CommitmentConfig,
) -> SubscriptionResult {
RPC_SIGNATURE_SUBSCRIBE.inc();
let sink = pending.accept().await?;
let jsonrpsee_sink = JsonRpseeSubscriptionHandlerSink::new(sink);

View File

@ -3,6 +3,7 @@ use chrono::{DateTime, Utc};
use futures::join;
use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use std::{sync::Arc, time::Duration};
use tokio::sync::{RwLock, RwLockReadGuard};
@ -19,6 +20,11 @@ use solana_lite_rpc_core::{
AnyhowJoinHandle,
};
lazy_static::lazy_static! {
pub static ref MESSAGES_IN_POSTGRES_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_messages_in_postgres", "Number of messages in postgres")).unwrap();
pub static ref POSTGRES_SESSION_ERRORS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_session_errors", "Number of failures while establishing postgres session")).unwrap();
}
use std::convert::From;
const MAX_QUERY_SIZE: usize = 200_000; // 0.2 mb
@ -434,21 +440,25 @@ impl Postgres {
}
match recv.try_recv() {
Ok(msg) => match msg {
NotificationMsg::TxNotificationMsg(tx) => {
let mut tx = tx.iter().map(|x| x.into()).collect::<Vec<_>>();
tx_batch.append(&mut tx)
}
NotificationMsg::BlockNotificationMsg(block) => {
block_batch.push(block.into())
}
NotificationMsg::UpdateTransactionMsg(update) => {
let mut update = update.iter().map(|x| x.into()).collect();
update_batch.append(&mut update)
}
Ok(msg) => {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
NotificationMsg::AccountAddrMsg(_) => todo!(),
},
match msg {
NotificationMsg::TxNotificationMsg(tx) => {
let mut tx = tx.iter().map(|x| x.into()).collect::<Vec<_>>();
tx_batch.append(&mut tx)
}
NotificationMsg::BlockNotificationMsg(block) => {
block_batch.push(block.into())
}
NotificationMsg::UpdateTransactionMsg(update) => {
let mut update = update.iter().map(|x| x.into()).collect();
update_batch.append(&mut update)
}
NotificationMsg::AccountAddrMsg(_) => todo!(),
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
log::error!("Postgres channel broke");
@ -468,6 +478,8 @@ impl Postgres {
session_establish_error = session.is_err();
let Ok(session) = session else {
POSTGRES_SESSION_ERRORS.inc();
const TIME_OUT: Duration = Duration::from_millis(1000);
warn!("Unable to get postgres session. Retrying in {TIME_OUT:?}");
tokio::time::sleep(TIME_OUT).await;
@ -475,6 +487,8 @@ impl Postgres {
continue;
};
POSTGRES_SESSION_ERRORS.set(0);
// write to database when a successful connection is made
let (res_txs, res_blocks, res_update) = join!(
session.send_txs(&tx_batch),

View File

@ -1,6 +1,8 @@
use std::time::Duration;
use anyhow::{bail, Context};
use prometheus::core::GenericGauge;
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter};
use solana_lite_rpc_core::block_information_store::BlockInformation;
use solana_lite_rpc_core::data_cache::DataCache;
use solana_lite_rpc_core::streams::{
@ -10,6 +12,26 @@ use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::commitment_config::CommitmentLevel;
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_cluster_nodes", "Number of cluster nodes in saved")).unwrap();
static ref CURRENT_SLOT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_current_slot", "Current slot seen by last rpc")).unwrap();
static ref ESTIMATED_SLOT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_estimated_slot", "Estimated slot seen by last rpc")).unwrap();
static ref TXS_CONFIRMED: IntCounter =
register_int_counter!(opts!("literpc_txs_confirmed", "Number of Transactions Confirmed")).unwrap();
static ref TXS_FINALIZED: IntCounter =
register_int_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap();
static ref TXS_PROCESSED: IntCounter =
register_int_counter!(opts!("literpc_txs_processed", "Number of Transactions Processed")).unwrap();
}
pub struct DataCachingService {
pub data_cache: DataCache,
pub clean_duration: Duration,
@ -45,7 +67,7 @@ impl DataCachingService {
};
for tx in block.txs {
data_cache.txs.update_status(
if data_cache.txs.update_status(
&tx.signature,
TransactionStatus {
slot: block.slot,
@ -54,7 +76,20 @@ impl DataCachingService {
err: tx.err.clone(),
confirmation_status: Some(confirmation_status.clone()),
},
);
) {
// transaction updated
match confirmation_status {
TransactionConfirmationStatus::Finalized => {
TXS_FINALIZED.inc();
}
TransactionConfirmationStatus::Confirmed => {
TXS_CONFIRMED.inc();
}
TransactionConfirmationStatus::Processed => {
TXS_PROCESSED.inc();
}
}
}
// notify
data_cache
.tx_subs
@ -70,6 +105,8 @@ impl DataCachingService {
loop {
match slot_notification.recv().await {
Ok(slot_notification) => {
CURRENT_SLOT.set(slot_notification.processed_slot as i64);
ESTIMATED_SLOT.set(slot_notification.estimated_processed_slot as i64);
data_cache.slot_cache.update(slot_notification);
}
Err(e) => {
@ -87,6 +124,7 @@ impl DataCachingService {
.cluster_info
.load_cluster_info(&mut cluster_info_notification)
.await?;
NB_CLUSTER_NODES.set(data_cache.cluster_info.cluster_nodes.len() as i64);
}
});

View File

@ -1,5 +1,6 @@
use dashmap::DashMap;
use log::{error, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::Endpoint;
use solana_lite_rpc_core::{
quic_connection::QuicConnectionPool,
@ -20,6 +21,17 @@ use std::{
};
use tokio::sync::{broadcast::Receiver, broadcast::Sender};
lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
static ref NB_QUIC_ACTIVE_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_connections", "Number quic tasks that are running")).unwrap();
static ref NB_CONNECTIONS_TO_KEEP: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_connections_to_keep", "Number of connections to keep asked by tpu service")).unwrap();
static ref NB_QUIC_TASKS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_tasks", "Number of connections to keep asked by tpu service")).unwrap();
}
#[derive(Clone)]
struct ActiveConnection {
endpoints: RotatingQueue<Endpoint>,
@ -64,6 +76,7 @@ impl ActiveConnection {
identity_stakes: IdentityStakesData,
txs_sent_store: TxStore,
) {
NB_QUIC_ACTIVE_CONNECTIONS.inc();
let mut transaction_reciever = transaction_reciever;
let mut exit_oneshot_channel = exit_oneshot_channel;
let identity = self.identity;
@ -138,6 +151,7 @@ impl ActiveConnection {
// add more connections to the pool
if connection_pool.len() < max_number_of_connections {
connection_pool.add_connection().await;
NB_QUIC_CONNECTIONS.inc();
}
let task_counter = task_counter.clone();
@ -145,7 +159,9 @@ impl ActiveConnection {
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
connection_pool.send_transaction_batch(txs).await;
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
},
@ -155,6 +171,8 @@ impl ActiveConnection {
}
}
drop(transaction_reciever);
NB_QUIC_CONNECTIONS.dec();
NB_QUIC_ACTIVE_CONNECTIONS.dec();
}
pub fn start_listening(
@ -213,6 +231,7 @@ impl TpuConnectionManager {
txs_sent_store: TxStore,
connection_parameters: QuicConnectionParameters,
) {
NB_CONNECTIONS_TO_KEEP.set(connections_to_keep.len() as i64);
for (identity, socket_addr) in &connections_to_keep {
if self.identity_to_active_connection.get(identity).is_none() {
trace!("added a connection for {}, {}", identity, socket_addr);

View File

@ -1,4 +1,5 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use super::tpu_connection_manager::TpuConnectionManager;
use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
@ -17,6 +18,20 @@ use std::{
};
use tokio::time::Duration;
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_cluster_nodes", "Number of cluster nodes in saved")).unwrap();
static ref NB_OF_LEADERS_IN_SCHEDULE: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_cached_leader", "Number of leaders in schedule cache")).unwrap();
static ref CURRENT_SLOT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_current_slot", "Current slot seen by last rpc")).unwrap();
static ref ESTIMATED_SLOT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_estimated_slot", "Estimated slot seen by last rpc")).unwrap();
}
#[derive(Clone, Copy)]
pub struct TpuServiceConfig {
pub fanout_slots: u64,

View File

@ -1,6 +1,7 @@
use crate::tpu_utils::tpu_service::TpuService;
use anyhow::bail;
use log::error;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::{tx_store::TxStore, AnyhowJoinHandle};
use std::time::Duration;
use tokio::{
@ -8,6 +9,11 @@ use tokio::{
time::Instant,
};
lazy_static::lazy_static! {
pub static ref MESSAGES_IN_REPLAY_QUEUE: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_messages_in_replay_queue", "Number of transactions waiting for replay")).unwrap();
}
#[derive(Debug, Clone)]
pub struct TransactionReplay {
pub signature: String,
@ -44,6 +50,7 @@ impl TransactionReplayer {
tokio::spawn(async move {
while let Some(mut tx_replay) = reciever.recv().await {
MESSAGES_IN_REPLAY_QUEUE.dec();
if Instant::now() < tx_replay.replay_at {
tokio::time::sleep_until(tx_replay.replay_at).await;
}
@ -66,6 +73,8 @@ impl TransactionReplayer {
if let Err(e) = sender.send(tx_replay) {
error!("error while scheduling replay ({})", e);
continue;
} else {
MESSAGES_IN_REPLAY_QUEUE.inc();
}
}
}

View File

@ -5,7 +5,7 @@ use std::time::Duration;
use crate::{
tpu_utils::tpu_service::TpuService,
transaction_replayer::{TransactionReplay, TransactionReplayer},
transaction_replayer::{TransactionReplay, TransactionReplayer, MESSAGES_IN_REPLAY_QUEUE},
tx_sender::{TransactionInfo, TxSender},
};
use anyhow::bail;
@ -148,13 +148,19 @@ impl TransactionService {
}
let replay_at = Instant::now() + self.replay_after;
// ignore error for replay service
let _ = self.replay_channel.send(TransactionReplay {
signature: signature.to_string(),
tx: raw_tx_clone,
replay_count: 0,
max_replay,
replay_at,
});
if self
.replay_channel
.send(TransactionReplay {
signature: signature.to_string(),
tx: raw_tx_clone,
replay_count: 0,
max_replay,
replay_at,
})
.is_ok()
{
MESSAGES_IN_REPLAY_QUEUE.inc();
}
Ok(signature.to_string())
}
}

View File

@ -3,6 +3,11 @@ use std::time::{Duration, Instant};
use anyhow::bail;
use chrono::Utc;
use log::{trace, warn};
use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
register_int_gauge, Histogram, IntCounter,
};
use solana_sdk::slot_history::Slot;
use tokio::sync::mpsc::Receiver;
@ -14,6 +19,22 @@ use solana_lite_rpc_core::{
AnyhowJoinHandle,
};
lazy_static::lazy_static! {
static ref TXS_SENT: IntCounter =
register_int_counter!("literpc_txs_sent", "Number of transactions forwarded to tpu").unwrap();
static ref TXS_SENT_ERRORS: IntCounter =
register_int_counter!("literpc_txs_sent_errors", "Number of errors while transactions forwarded to tpu").unwrap();
static ref TX_BATCH_SIZES: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tx_batch_size", "batchsize of tx sent by literpc")).unwrap();
static ref TT_SENT_TIMER: Histogram = register_histogram!(histogram_opts!(
"literpc_txs_send_timer",
"Time to send transaction batch",
))
.unwrap();
static ref TX_TIMED_OUT: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_tx_timeout", "Number of transactions that timeout")).unwrap();
pub static ref TXS_IN_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_txs_in_channel", "Transactions in channel")).unwrap();
}
pub type WireTransaction = Vec<u8>;
#[derive(Clone, Debug)]
@ -54,6 +75,7 @@ impl TxSender {
return;
}
let histo_timer = TT_SENT_TIMER.start_timer();
let start = Instant::now();
let tpu_client = self.tpu_service.clone();
@ -83,8 +105,12 @@ impl TxSender {
transaction_info.signature.clone(),
transaction_info.transaction.clone(),
) {
Ok(_) => 1,
Ok(_) => {
TXS_SENT.inc_by(1);
1
}
Err(err) => {
TXS_SENT_ERRORS.inc_by(1);
warn!("{err}");
0
}
@ -109,6 +135,7 @@ impl TxSender {
// ignore error on sent because the channel may be already closed
let _ = notifier.send(NotificationMsg::TxNotificationMsg(notification_msgs));
}
histo_timer.observe_duration();
trace!(
"It took {} ms to send a batch of {} transaction(s)",
start.elapsed().as_millis(),
@ -136,6 +163,8 @@ impl TxSender {
{
Ok(value) => match value {
Some(transaction_info) => {
TXS_IN_CHANNEL.dec();
// duplicate transaction
if self
.data_cache
@ -165,6 +194,8 @@ impl TxSender {
continue;
}
TX_BATCH_SIZES.set(transaction_infos.len() as i64);
self.forward_txs(transaction_infos, notifier.clone()).await;
}
})