From 50b74470639063d757c7a37de3ee3bdc7976c38e Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Fri, 27 Oct 2023 16:49:31 +0200 Subject: [PATCH] process blocks with a delay --- src/main.rs | 47 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2774cf6..23242bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,8 @@ use clap::Parser; +use tokio::time::Instant; use std::{ collections::HashMap, - sync::{atomic::AtomicU64, Arc}, + sync::{atomic::AtomicU64, Arc}, time::Duration, }; use block_info::BlockInfo; @@ -12,7 +13,7 @@ use solana_sdk::signature::Signature; use transaction_info::TransactionInfo; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::prelude::{ - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdateBlock, }; mod block_info; @@ -60,6 +61,33 @@ async fn main() { postgres.start_saving_transaction(map_of_infos.clone(), slot.clone()); + let (send_block, mut recv_block) = tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>(); + let slot_by_error_task = slot_by_errors.clone(); + let map_of_infos_task = map_of_infos.clone(); + + // process blocks with 2 mins delay so that we process all the banking stage errors before processing blocks + tokio::spawn(async move { + while let Some((wait_until, block)) = recv_block.recv().await { + println!("b"); + tokio::time::sleep_until(wait_until).await; + for transaction in &block.transactions { + let Some(tx) = &transaction.transaction else { + continue; + }; + let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); + if let Some(mut info) = map_of_infos_task.get_mut(&signature.to_string()) { + info.add_transaction(&transaction, block.slot); + } + } + + let block_info = BlockInfo::new(&block, &slot_by_error_task); + if let Err(e) = postgres.save_block_info(block_info).await { + log::error!("Error saving block {}", e); + } + } + }); + + while let Some(message) = stream.next().await { let Ok(message) = message else { continue; @@ -98,20 +126,9 @@ async fn main() { UpdateOneof::Block(block) => { log::info!("got block"); slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); - for transaction in &block.transactions { - let Some(tx) = &transaction.transaction else { - continue; - }; - let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); - if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { - info.add_transaction(&transaction, block.slot); - } - } - let block_info = BlockInfo::new(&block, &slot_by_errors); - if let Err(e) = postgres.save_block_info(block_info).await { - log::error!("Error saving block {}", e); - } + send_block.send(( Instant::now() + Duration::from_secs(120), block)).expect("should works"); + // delay queue so that we get all the banking stage errors before processing block } _ => {} };