Fixing perofmrance issues after checking last block height

This commit is contained in:
godmodegalactus 2024-03-08 17:58:49 +01:00
parent 60902db5bc
commit 3e01ebb7cc
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
4 changed files with 34 additions and 57 deletions

View File

@ -3,6 +3,7 @@ use log::info;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::{clock::MAX_RECENT_BLOCKHASHES, slot_history::Slot};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::RwLock;
@ -34,6 +35,7 @@ impl BlockInformation {
#[derive(Clone)]
pub struct BlockInformationStore {
blocks: Arc<DashMap<String, BlockInformation>>,
last_blockheight: Arc<AtomicU64>,
latest_confirmed_block: Arc<RwLock<BlockInformation>>,
latest_finalized_block: Arc<RwLock<BlockInformation>>,
}
@ -48,6 +50,7 @@ impl BlockInformationStore {
);
Self {
last_blockheight: Arc::new(AtomicU64::new(latest_finalized_block.block_height)),
latest_confirmed_block: Arc::new(RwLock::new(latest_finalized_block.clone())),
latest_finalized_block: Arc::new(RwLock::new(latest_finalized_block)),
blocks,
@ -100,6 +103,17 @@ impl BlockInformationStore {
// save slot copy to avoid borrow issues
let slot = block_info.slot;
let commitment_config = block_info.commitment_config;
if self
.last_blockheight
.load(std::sync::atomic::Ordering::Relaxed)
< block_info.block_height
{
// update last seen blockheight
self.last_blockheight.store(
block_info.block_height,
std::sync::atomic::Ordering::Relaxed,
);
}
// check if the block has already been added with higher commitment level
match self.blocks.get_mut(&block_info.blockhash) {
Some(mut prev_block_info) => {
@ -164,4 +178,9 @@ impl BlockInformationStore {
None => (false, latest_block.slot),
}
}
pub fn get_last_blockheight(&self) -> u64 {
self.last_blockheight
.load(std::sync::atomic::Ordering::Relaxed)
}
}

View File

@ -53,17 +53,14 @@ impl DataCache {
self.tx_subs.clean(ttl_duration);
}
pub async fn check_if_confirmed_or_expired_blockheight(
pub fn check_if_confirmed_or_expired_blockheight(
&self,
sent_transaction_info: &SentTransactionInfo,
) -> bool {
let last_block = self
.block_information_store
.get_latest_block_info(CommitmentConfig::processed())
.await;
let last_block_height = self.block_information_store.get_last_blockheight();
self.txs
.is_transaction_confirmed(&sent_transaction_info.signature)
|| last_block.block_height > sent_transaction_info.last_valid_block_height
|| last_block_height > sent_transaction_info.last_valid_block_height
}
pub async fn get_current_epoch(&self, commitment: CommitmentConfig) -> Epoch {

View File

@ -111,7 +111,7 @@ impl ActiveConnection {
let tx: Vec<u8> = match tx {
Ok(transaction_sent_info) => {
if self.data_cache.check_if_confirmed_or_expired_blockheight(&transaction_sent_info).await {
if self.data_cache.check_if_confirmed_or_expired_blockheight(&transaction_sent_info) {
// transaction is already confirmed/ no need to send
continue;
}

View File

@ -1,6 +1,5 @@
use std::time::{Duration, Instant};
use std::time::Instant;
use anyhow::bail;
use chrono::Utc;
use log::{trace, warn};
@ -36,11 +35,6 @@ lazy_static::lazy_static! {
}
// making 250 as sleep time will effectively make lite rpc send
// (1000/250) * 5 * 512 = 10240 tps
const INTERVAL_PER_BATCH_IN_MS: u64 = 50;
const MAX_BATCH_SIZE_IN_PER_INTERVAL: usize = 2000;
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
#[derive(Clone)]
pub struct TxSender {
@ -134,52 +128,19 @@ impl TxSender {
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
let mut transaction_infos = Vec::with_capacity(MAX_BATCH_SIZE_IN_PER_INTERVAL);
let mut timeout_interval = INTERVAL_PER_BATCH_IN_MS;
// In solana there in sig verify stage rate is limited to 2000 txs in 50ms
// taking this as reference
while transaction_infos.len() <= MAX_BATCH_SIZE_IN_PER_INTERVAL {
let instance = tokio::time::Instant::now();
match tokio::time::timeout(Duration::from_millis(timeout_interval), recv.recv())
.await
while let Some(transaction_info) = recv.recv().await {
// duplicate transaction
if self
.data_cache
.txs
.contains_key(&transaction_info.signature)
{
Ok(value) => match value {
Some(transaction_info) => {
TXS_IN_CHANNEL.dec();
// duplicate transaction
if self
.data_cache
.txs
.contains_key(&transaction_info.signature)
{
continue;
}
transaction_infos.push(transaction_info);
// update the timeout inteval
timeout_interval = timeout_interval
.saturating_sub(instance.elapsed().as_millis() as u64)
.max(1);
}
None => {
log::error!("Channel Disconnected");
bail!("Channel Disconnected");
}
},
Err(_) => {
break;
}
continue;
}
self.forward_txs(vec![transaction_info], notifier.clone())
.await;
}
if transaction_infos.is_empty() {
continue;
}
TX_BATCH_SIZES.set(transaction_infos.len() as i64);
self.forward_txs(transaction_infos, notifier.clone()).await;
}
})
}