From 8c4cb2fafe139151f47988c53356bcc45ee9b02f Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 5 Dec 2023 14:00:43 +0100 Subject: [PATCH] Making the blocks parsing optional, so that triton can launch side car without blocks parsing --- src/cli.rs | 4 +- src/main.rs | 197 +++++++++++++++++++++++++--------------------------- 2 files changed, 96 insertions(+), 105 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 8a24878..f168dc7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -3,8 +3,8 @@ use clap::Parser; #[derive(Parser, Debug, Clone)] #[command(author, version, about, long_about = None)] pub struct Args { - #[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))] - pub grpc_address_to_fetch_blocks: String, + #[arg(short, long)] + pub grpc_address_to_fetch_blocks: Option, #[arg(short = 'x', long)] pub grpc_x_token: Option, diff --git a/src/main.rs b/src/main.rs index b96584c..a25c1dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,75 +39,12 @@ lazy_static::lazy_static! { register_int_gauge!(opts!("bankingstage_blocks_in_rpc_queue", "Banking stage blocks in rpc queue")).unwrap(); } -// fn spawn_rpc_block_processor() -> tokio::task::JoinHandle<()> { -// let map_of_infos = map_of_infos.clone(); -// let postgres = postgres.clone(); -// let slot_by_errors = slot_by_errors.clone(); -// tokio::spawn(async move { -// let mut rpc_blocks_reciever = rpc_blocks_reciever; -// let rpc_client = RpcClient::new(rpc_url); -// while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await { -// tokio::time::sleep_until(wait_until).await; -// let block = if let Ok(block) = rpc_client -// .get_block_with_config( -// slot, -// solana_rpc_client_api::config::RpcBlockConfig { -// encoding: Some( -// solana_transaction_status::UiTransactionEncoding::Base64, -// ), -// transaction_details: Some( -// solana_transaction_status::TransactionDetails::Full, -// ), -// rewards: Some(true), -// commitment: Some(CommitmentConfig::confirmed()), -// max_supported_transaction_version: Some(0), -// }, -// ) -// .await -// { -// block -// } else { -// continue; -// }; - -// let Some(transactions) = &block.transactions else { -// continue; -// }; - -// for transaction in transactions { -// let Some(transaction) = &transaction.transaction.decode() else { -// continue; -// }; -// let signature = transaction.signatures[0].to_string(); -// if let Some(mut info) = map_of_infos.get_mut(&signature) { -// info.add_rpc_transaction(slot, transaction); -// } -// } - -// let banking_stage_error_count = slot_by_errors -// .get(&slot) -// .map(|x| *x.value() as i64) -// .unwrap_or_default(); -// let block_info = -// BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count); -// if let Some(block_info) = block_info { -// BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count); -// TXERROR_COUNT.add( -// block_info.processed_transactions - block_info.successful_transactions, -// ); -// if let Err(e) = postgres.save_block_info(block_info).await { -// error!("Error saving block {}", e); -// } -// slot_by_errors.remove(&slot); -// } -// } -// }) -// } - pub async fn start_tracking_banking_stage_errors( grpc_address: String, map_of_infos: Arc>, slot_by_errors: Arc>, + slot: Arc, + subscribe_to_slots: bool, ) { loop { let token: Option = None; @@ -118,9 +55,23 @@ pub async fn start_tracking_banking_stage_errors( ) .unwrap(); + let slot_subscription: HashMap< + String, + yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots, + > = if subscribe_to_slots { + let mut slot_sub = HashMap::new(); + slot_sub.insert( + "slot_sub".to_string(), + yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {}, + ); + slot_sub + } else { + HashMap::new() + }; + let mut geyser_stream = client .subscribe_once( - HashMap::new(), + slot_subscription, Default::default(), HashMap::new(), Default::default(), @@ -142,7 +93,8 @@ pub async fn start_tracking_banking_stage_errors( continue; }; - if let yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) = update { + match update{ + yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) => { if transaction.error.is_none() && transaction.accounts.is_empty() { continue; } @@ -166,47 +118,36 @@ pub async fn start_tracking_banking_stage_errors( map_of_infos.insert(sig, x); } } - } + }, + yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof::Slot(s) => { + let load_slot = slot.load(std::sync::atomic::Ordering::Relaxed); + if load_slot < s.slot { + // update slot to process updates + slot.store(s.slot, std::sync::atomic::Ordering::Relaxed); + } + }, + _=>{} + } } error!("geyser banking stage connection failed {}", grpc_address); tokio::time::sleep(Duration::from_secs(1)).await; } } -#[tokio::main()] -async fn main() { - tracing_subscriber::fmt::init(); - - let args = Args::parse(); - - let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); - - let grpc_block_addr = args.grpc_address_to_fetch_blocks; +async fn start_tracking_blocks( + grpc_block_addr: String, + grpc_x_token: Option, + postgres: postgres::Postgres, + slot: Arc, + slot_by_errors: Arc>, + map_of_infos: Arc>, +) { let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect( grpc_block_addr, - args.grpc_x_token, + grpc_x_token, None, ) .unwrap(); - let map_of_infos = Arc::new(DashMap::::new()); - let slot_by_errors = Arc::new(DashMap::::new()); - - let postgres = postgres::Postgres::new().await; - let slot = Arc::new(AtomicU64::new(0)); - - postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); - let _jhs = args - .banking_grpc_addresses - .iter() - .map(|address| { - let address = address.clone(); - let map_of_infos = map_of_infos.clone(); - let slot_by_errors = slot_by_errors.clone(); - tokio::spawn(async move { - start_tracking_banking_stage_errors(address, map_of_infos, slot_by_errors).await; - }) - }) - .collect_vec(); loop { let mut blocks_subs = HashMap::new(); @@ -244,12 +185,12 @@ async fn main() { .unwrap(); while let Some(message) = geyser_stream.next().await { let Ok(message) = message else { - continue; - }; + continue; + }; let Some(update) = message.update_oneof else { - continue; - }; + continue; + }; match update { yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block( @@ -267,8 +208,8 @@ async fn main() { tokio::time::sleep(Duration::from_secs(30)).await; for transaction in &block.transactions { let Some(tx) = &transaction.transaction else { - continue; - }; + continue; + }; let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { info.add_transaction(transaction, block.slot); @@ -298,3 +239,53 @@ async fn main() { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } + +#[tokio::main()] +async fn main() { + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + + let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); + + let grpc_block_addr = args.grpc_address_to_fetch_blocks; + let map_of_infos = Arc::new(DashMap::::new()); + let slot_by_errors = Arc::new(DashMap::::new()); + + let postgres = postgres::Postgres::new().await; + let slot = Arc::new(AtomicU64::new(0)); + let no_block_subscription = grpc_block_addr.is_none(); + postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); + let jhs = args + .banking_grpc_addresses + .iter() + .map(|address| { + let address = address.clone(); + let map_of_infos = map_of_infos.clone(); + let slot_by_errors = slot_by_errors.clone(); + let slot = slot.clone(); + tokio::spawn(async move { + start_tracking_banking_stage_errors( + address, + map_of_infos, + slot_by_errors, + slot, + no_block_subscription, + ) + .await; + }) + }) + .collect_vec(); + if let Some(gprc_block_addr) = grpc_block_addr { + start_tracking_blocks( + gprc_block_addr, + args.grpc_x_token, + postgres, + slot, + slot_by_errors, + map_of_infos, + ) + .await; + } + futures::future::join_all(jhs).await; +}