From e025f91650d804da9de142b811ee37e6e27402de Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Wed, 25 Oct 2023 10:37:20 +0200 Subject: [PATCH] Adding some optimization --- Cargo.lock | 2 ++ cluster-endpoints/Cargo.toml | 4 +++- .../src/rpc_polling/poll_blocks.rs | 13 ++++++++++--- core/src/stores/data_cache.rs | 8 ++------ core/src/stores/tx_store.rs | 4 ++-- .../src/inbound/proxy_listener.rs | 2 +- .../src/tpu_utils/tpu_connection_manager.rs | 19 +++++++++++++++++-- 7 files changed, 37 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1006752f..00b18690 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4244,7 +4244,9 @@ dependencies = [ "derive_more", "futures", "itertools", + "lazy_static", "log", + "prometheus", "quinn", "rustls 0.20.9", "serde", diff --git a/cluster-endpoints/Cargo.toml b/cluster-endpoints/Cargo.toml index 1899809e..7345d557 100644 --- a/cluster-endpoints/Cargo.toml +++ b/cluster-endpoints/Cargo.toml @@ -37,4 +37,6 @@ async-channel = { workspace = true } solana-lite-rpc-core = { workspace = true } yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } -itertools = {workspace = true} \ No newline at end of file +itertools = {workspace = true} +prometheus = { workspace = true } +lazy_static = { workspace = true } \ No newline at end of file diff --git a/cluster-endpoints/src/rpc_polling/poll_blocks.rs b/cluster-endpoints/src/rpc_polling/poll_blocks.rs index af269fd2..527a1398 100644 --- a/cluster-endpoints/src/rpc_polling/poll_blocks.rs +++ b/cluster-endpoints/src/rpc_polling/poll_blocks.rs @@ -1,4 +1,5 @@ use anyhow::Context; +use prometheus::{core::GenericGauge, opts, register_int_gauge}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_lite_rpc_core::{ structures::{produced_block::ProducedBlock, slot_notification::SlotNotification}, @@ -13,6 +14,10 @@ use std::{ }; use tokio::sync::broadcast::{Receiver, Sender}; +lazy_static::lazy_static! { + static ref NB_BLOCK_FETCHING_TASKS: GenericGauge = register_int_gauge!(opts!("literpc-num-blockfetching-tasks", "Transactions in store")).unwrap(); +} + pub async fn process_block( rpc_client: &RpcClient, slot: Slot, @@ -62,6 +67,7 @@ pub fn poll_block( for slot in last_processed_slot + 1..processed_slot + 1 { let premit = counting_semaphore.clone().acquire_owned().await?; + NB_BLOCK_FETCHING_TASKS.inc(); let rpc_client = rpc_client.clone(); let block_notification_sender = block_notification_sender.clone(); let current_slot = current_slot.clone(); @@ -70,7 +76,7 @@ pub fn poll_block( while current_slot .load(std::sync::atomic::Ordering::Relaxed) .saturating_sub(slot) - < 32 + < 128 { if let Some(processed_block) = process_block( rpc_client.as_ref(), @@ -85,12 +91,12 @@ pub fn poll_block( } tokio::time::sleep(Duration::from_millis(50)).await; } - + tokio::time::sleep(Duration::from_secs(5)).await; while confirmed_slot_fetch && current_slot .load(std::sync::atomic::Ordering::Relaxed) .saturating_sub(slot) - < 128 + < 256 { if let Some(processed_block) = process_block( rpc_client.as_ref(), @@ -104,6 +110,7 @@ pub fn poll_block( } tokio::time::sleep(Duration::from_millis(50)).await; } + NB_BLOCK_FETCHING_TASKS.dec(); drop(premit) }); } diff --git a/core/src/stores/data_cache.rs b/core/src/stores/data_cache.rs index 9270a1d4..4ef0673a 100644 --- a/core/src/stores/data_cache.rs +++ b/core/src/stores/data_cache.rs @@ -54,15 +54,11 @@ impl DataCache { pub async fn check_if_confirmed_or_expired_blockheight( &self, sent_transaction_info: &SentTransactionInfo, + current_blockheight: u64, ) -> bool { self.txs .is_transaction_confirmed(&sent_transaction_info.signature) - || self - .block_information_store - .get_latest_block(CommitmentConfig::processed()) - .await - .block_height - > sent_transaction_info.last_valid_block_height + || current_blockheight > sent_transaction_info.last_valid_block_height } pub async fn get_current_epoch(&self, commitment: CommitmentConfig) -> Epoch { diff --git a/core/src/stores/tx_store.rs b/core/src/stores/tx_store.rs index c9c29e24..8f0aa46e 100644 --- a/core/src/stores/tx_store.rs +++ b/core/src/stores/tx_store.rs @@ -60,10 +60,10 @@ impl TxStore { self.store.get(signature).map(|x| x.value().clone()) } - pub fn clean(&self, current_finalized_blochash: u64) { + pub fn clean(&self, current_finalized_blockhash: u64) { let length_before = self.store.len(); self.store.retain(|_k, v| { - v.last_valid_blockheight >= current_finalized_blochash + self.save_for_additional_slots + v.last_valid_blockheight >= current_finalized_blockhash + self.save_for_additional_slots }); log::info!("Cleaned {} transactions", length_before - self.store.len()); } diff --git a/quic-forward-proxy/src/inbound/proxy_listener.rs b/quic-forward-proxy/src/inbound/proxy_listener.rs index f18418db..1aa31a78 100644 --- a/quic-forward-proxy/src/inbound/proxy_listener.rs +++ b/quic-forward-proxy/src/inbound/proxy_listener.rs @@ -72,7 +72,7 @@ impl ProxyListener { let mut quinn_server_config = ServerConfig::with_crypto(Arc::new(server_tls_config)); // note: this config must be aligned with lite-rpc's client config - let mut transport_config = Arc::get_mut(&mut quinn_server_config.transport).unwrap(); + let transport_config = Arc::get_mut(&mut quinn_server_config.transport).unwrap(); transport_config.max_concurrent_uni_streams(VarInt::from_u32(MAX_CONCURRENT_UNI_STREAMS)); // no bidi streams used transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0)); diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 2ffe12ce..bf0d3126 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -21,7 +21,10 @@ use std::{ Arc, }, }; -use tokio::sync::{broadcast::Receiver, broadcast::Sender}; +use tokio::{ + sync::{broadcast::Receiver, broadcast::Sender}, + time::Instant, +}; lazy_static::lazy_static! { static ref NB_QUIC_CONNECTIONS: GenericGauge = @@ -93,6 +96,13 @@ impl ActiveConnection { max_uni_stream_connections, ); + let mut current_blockheight = self + .data_cache + .block_information_store + .get_latest_block_info(solana_sdk::commitment_config::CommitmentConfig::processed()) + .await + .block_height; + let mut block_height_update_time = Instant::now(); loop { // exit signal set if exit_signal.load(Ordering::Relaxed) { @@ -108,7 +118,12 @@ impl ActiveConnection { let tx: Vec = match tx { Ok(transaction_sent_info) => { - if self.data_cache.check_if_confirmed_or_expired_blockheight(&transaction_sent_info).await { + if Instant::now() - block_height_update_time > std::time::Duration::from_secs(1) { + // update block height information every second + current_blockheight = self.data_cache.block_information_store.get_latest_block_info(solana_sdk::commitment_config::CommitmentConfig::processed()).await.block_height; + block_height_update_time = Instant::now(); + } + if self.data_cache.check_if_confirmed_or_expired_blockheight(&transaction_sent_info, current_blockheight).await { // transaction is already confirmed/ no need to send continue; }