Merge pull request #235 from blockworks-foundation/some_optimizations

Adding some optimization
This commit is contained in:
galactus 2023-10-25 11:16:42 +02:00 committed by GitHub
commit ba43b4a5da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 37 additions and 15 deletions

2
Cargo.lock generated
View File

@ -4244,7 +4244,9 @@ dependencies = [
"derive_more",
"futures",
"itertools",
"lazy_static",
"log",
"prometheus",
"quinn",
"rustls 0.20.9",
"serde",

View File

@ -37,4 +37,6 @@ async-channel = { workspace = true }
solana-lite-rpc-core = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }
itertools = {workspace = true}
itertools = {workspace = true}
prometheus = { workspace = true }
lazy_static = { workspace = true }

View File

@ -1,4 +1,5 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
structures::{produced_block::ProducedBlock, slot_notification::SlotNotification},
@ -13,6 +14,10 @@ use std::{
};
use tokio::sync::broadcast::{Receiver, Sender};
lazy_static::lazy_static! {
static ref NB_BLOCK_FETCHING_TASKS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc-num-blockfetching-tasks", "Transactions in store")).unwrap();
}
pub async fn process_block(
rpc_client: &RpcClient,
slot: Slot,
@ -62,6 +67,7 @@ pub fn poll_block(
for slot in last_processed_slot + 1..processed_slot + 1 {
let premit = counting_semaphore.clone().acquire_owned().await?;
NB_BLOCK_FETCHING_TASKS.inc();
let rpc_client = rpc_client.clone();
let block_notification_sender = block_notification_sender.clone();
let current_slot = current_slot.clone();
@ -70,7 +76,7 @@ pub fn poll_block(
while current_slot
.load(std::sync::atomic::Ordering::Relaxed)
.saturating_sub(slot)
< 32
< 128
{
if let Some(processed_block) = process_block(
rpc_client.as_ref(),
@ -85,12 +91,12 @@ pub fn poll_block(
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_secs(5)).await;
while confirmed_slot_fetch
&& current_slot
.load(std::sync::atomic::Ordering::Relaxed)
.saturating_sub(slot)
< 128
< 256
{
if let Some(processed_block) = process_block(
rpc_client.as_ref(),
@ -104,6 +110,7 @@ pub fn poll_block(
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
NB_BLOCK_FETCHING_TASKS.dec();
drop(premit)
});
}

View File

@ -54,15 +54,11 @@ impl DataCache {
pub async fn check_if_confirmed_or_expired_blockheight(
&self,
sent_transaction_info: &SentTransactionInfo,
current_blockheight: u64,
) -> bool {
self.txs
.is_transaction_confirmed(&sent_transaction_info.signature)
|| self
.block_information_store
.get_latest_block(CommitmentConfig::processed())
.await
.block_height
> sent_transaction_info.last_valid_block_height
|| current_blockheight > sent_transaction_info.last_valid_block_height
}
pub async fn get_current_epoch(&self, commitment: CommitmentConfig) -> Epoch {

View File

@ -60,10 +60,10 @@ impl TxStore {
self.store.get(signature).map(|x| x.value().clone())
}
pub fn clean(&self, current_finalized_blochash: u64) {
pub fn clean(&self, current_finalized_blockhash: u64) {
let length_before = self.store.len();
self.store.retain(|_k, v| {
v.last_valid_blockheight >= current_finalized_blochash + self.save_for_additional_slots
v.last_valid_blockheight >= current_finalized_blockhash + self.save_for_additional_slots
});
log::info!("Cleaned {} transactions", length_before - self.store.len());
}

View File

@ -72,7 +72,7 @@ impl ProxyListener {
let mut quinn_server_config = ServerConfig::with_crypto(Arc::new(server_tls_config));
// note: this config must be aligned with lite-rpc's client config
let mut transport_config = Arc::get_mut(&mut quinn_server_config.transport).unwrap();
let transport_config = Arc::get_mut(&mut quinn_server_config.transport).unwrap();
transport_config.max_concurrent_uni_streams(VarInt::from_u32(MAX_CONCURRENT_UNI_STREAMS));
// no bidi streams used
transport_config.max_concurrent_bidi_streams(VarInt::from_u32(0));

View File

@ -21,7 +21,10 @@ use std::{
Arc,
},
};
use tokio::sync::{broadcast::Receiver, broadcast::Sender};
use tokio::{
sync::{broadcast::Receiver, broadcast::Sender},
time::Instant,
};
lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
@ -93,6 +96,13 @@ impl ActiveConnection {
max_uni_stream_connections,
);
let mut current_blockheight = self
.data_cache
.block_information_store
.get_latest_block_info(solana_sdk::commitment_config::CommitmentConfig::processed())
.await
.block_height;
let mut block_height_update_time = Instant::now();
loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
@ -108,7 +118,12 @@ impl ActiveConnection {
let tx: Vec<u8> = match tx {
Ok(transaction_sent_info) => {
if self.data_cache.check_if_confirmed_or_expired_blockheight(&transaction_sent_info).await {
if Instant::now() - block_height_update_time > std::time::Duration::from_secs(1) {
// update block height information every second
current_blockheight = self.data_cache.block_information_store.get_latest_block_info(solana_sdk::commitment_config::CommitmentConfig::processed()).await.block_height;
block_height_update_time = Instant::now();
}
if self.data_cache.check_if_confirmed_or_expired_blockheight(&transaction_sent_info, current_blockheight).await {
// transaction is already confirmed/ no need to send
continue;
}