From 3338f638d562207be97a5c3ce5092b75a3182a10 Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Sat, 25 Nov 2023 15:02:29 +0100 Subject: [PATCH] adding few more metrics, removing queue and creating more tasks for blcoks (#3) --- src/main.rs | 72 +++++++++++++++++++++++++---------------------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/src/main.rs b/src/main.rs index 759ac40..caf8217 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,8 +18,7 @@ use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; use transaction_info::TransactionInfo; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::prelude::{ - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, - SubscribeUpdateBlock, + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks }; mod block_info; @@ -39,6 +38,10 @@ lazy_static::lazy_static! { register_int_counter!(opts!("bankingstage_banking_stage_events_counter", "Banking stage events received")).unwrap(); static ref BANKING_STAGE_BLOCKS_COUNTER: IntCounter = register_int_counter!(opts!("bankingstage_blocks_counter", "Banking stage blocks received")).unwrap(); + static ref BANKING_STAGE_BLOCKS_TASK: IntGauge = + register_int_gauge!(opts!("bankingstage_blocks_in_queue", "Banking stage blocks in queue")).unwrap(); + static ref BANKING_STAGE_BLOCKS_IN_RPC_QUEUE: IntGauge = + register_int_gauge!(opts!("bankingstage_blocks_in_rpc_queue", "Banking stage blocks in rpc queue")).unwrap(); } #[tokio::main()] @@ -87,39 +90,6 @@ async fn main() { postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); - let (send_block, mut recv_block) = - tokio::sync::mpsc::unbounded_channel::(); - let slot_by_error_task = slot_by_errors.clone(); - let map_of_infos_task = map_of_infos.clone(); - - // process blocks with 2 mins delay so that we process all the banking stage errors before processing blocks - let jh = { - let postgres = postgres.clone(); - tokio::spawn(async move { - while let Some(block) = recv_block.recv().await { - for transaction in &block.transactions { - let Some(tx) = &transaction.transaction else { - continue; - }; - let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); - if let Some(mut info) = map_of_infos_task.get_mut(&signature.to_string()) { - info.add_transaction(transaction, block.slot); - } - } - - let block_info = BlockInfo::new(&block); - - TXERROR_COUNT - .set(block_info.processed_transactions - block_info.successful_transactions); - if let Err(e) = postgres.save_block_info(block_info).await { - error!("Error saving block {}", e); - } - slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); - slot_by_error_task.remove(&block.slot); - } - }) - }; - // get blocks from rpc server // because validator does not send banking blocks let (rpc_blocks_sender, rpc_blocks_reciever) = @@ -177,7 +147,7 @@ async fn main() { BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count); if let Some(block_info) = block_info { BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count); - TXERROR_COUNT.set( + TXERROR_COUNT.add( block_info.processed_transactions - block_info.successful_transactions, ); if let Err(e) = postgres.save_block_info(block_info).await { @@ -231,12 +201,38 @@ async fn main() { debug!("got block {}", block.slot); BLOCK_TXS.set(block.transactions.len() as i64); BANKING_STAGE_BLOCKS_COUNTER.inc(); - send_block.send(block).expect("should works"); + BANKING_STAGE_BLOCKS_TASK.inc(); + let postgres = postgres.clone(); + let slot = slot.clone(); + let map_of_infos = map_of_infos.clone(); + let slot_by_error = slot_by_errors.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(30)).await; + for transaction in &block.transactions { + let Some(tx) = &transaction.transaction else { + continue; + }; + let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); + if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { + info.add_transaction(transaction, block.slot); + } + } + + let block_info = BlockInfo::new(&block); + + TXERROR_COUNT + .add(block_info.processed_transactions - block_info.successful_transactions); + if let Err(e) = postgres.save_block_info(block_info).await { + error!("Error saving block {}", e); + } + slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); + slot_by_error.remove(&block.slot); + BANKING_STAGE_BLOCKS_TASK.dec(); + }); // delay queue so that we get all the banking stage errors before processing block } _ => {} }; } - jh.await.unwrap(); jh2.await.unwrap(); }