adding few more metrics, removing queue and creating more tasks for blcoks (#3)

This commit is contained in:
galactus 2023-11-25 15:02:29 +01:00 committed by GitHub
parent 86bc0e4e6a
commit 3338f638d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 34 additions and 38 deletions

View File

@ -18,8 +18,7 @@ use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use transaction_info::TransactionInfo;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
SubscribeUpdateBlock,
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks
};
mod block_info;
@ -39,6 +38,10 @@ lazy_static::lazy_static! {
register_int_counter!(opts!("bankingstage_banking_stage_events_counter", "Banking stage events received")).unwrap();
static ref BANKING_STAGE_BLOCKS_COUNTER: IntCounter =
register_int_counter!(opts!("bankingstage_blocks_counter", "Banking stage blocks received")).unwrap();
static ref BANKING_STAGE_BLOCKS_TASK: IntGauge =
register_int_gauge!(opts!("bankingstage_blocks_in_queue", "Banking stage blocks in queue")).unwrap();
static ref BANKING_STAGE_BLOCKS_IN_RPC_QUEUE: IntGauge =
register_int_gauge!(opts!("bankingstage_blocks_in_rpc_queue", "Banking stage blocks in rpc queue")).unwrap();
}
#[tokio::main()]
@ -87,39 +90,6 @@ async fn main() {
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let (send_block, mut recv_block) =
tokio::sync::mpsc::unbounded_channel::<SubscribeUpdateBlock>();
let slot_by_error_task = slot_by_errors.clone();
let map_of_infos_task = map_of_infos.clone();
// process blocks with 2 mins delay so that we process all the banking stage errors before processing blocks
let jh = {
let postgres = postgres.clone();
tokio::spawn(async move {
while let Some(block) = recv_block.recv().await {
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos_task.get_mut(&signature.to_string()) {
info.add_transaction(transaction, block.slot);
}
}
let block_info = BlockInfo::new(&block);
TXERROR_COUNT
.set(block_info.processed_transactions - block_info.successful_transactions);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_error_task.remove(&block.slot);
}
})
};
// get blocks from rpc server
// because validator does not send banking blocks
let (rpc_blocks_sender, rpc_blocks_reciever) =
@ -177,7 +147,7 @@ async fn main() {
BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count);
if let Some(block_info) = block_info {
BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count);
TXERROR_COUNT.set(
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres.save_block_info(block_info).await {
@ -231,12 +201,38 @@ async fn main() {
debug!("got block {}", block.slot);
BLOCK_TXS.set(block.transactions.len() as i64);
BANKING_STAGE_BLOCKS_COUNTER.inc();
send_block.send(block).expect("should works");
BANKING_STAGE_BLOCKS_TASK.inc();
let postgres = postgres.clone();
let slot = slot.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_error = slot_by_errors.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
info.add_transaction(transaction, block.slot);
}
}
let block_info = BlockInfo::new(&block);
TXERROR_COUNT
.add(block_info.processed_transactions - block_info.successful_transactions);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_error.remove(&block.slot);
BANKING_STAGE_BLOCKS_TASK.dec();
});
// delay queue so that we get all the banking stage errors before processing block
}
_ => {}
};
}
jh.await.unwrap();
jh2.await.unwrap();
}