diff --git a/src/block_info.rs b/src/block_info.rs index 153ee81..b1b9217 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -140,9 +140,7 @@ impl BlockInfo { Some(x) => { x.cu_requested += cu_requested; x.cu_consumed += cu_consumed; - if prioritization_fees > 0 { - x.vec_pf.push(prioritization_fees); - } + x.vec_pf.push(prioritization_fees); } None => { writelocked_accounts.insert( @@ -163,9 +161,7 @@ impl BlockInfo { Some(x) => { x.cu_requested += cu_requested; x.cu_consumed += cu_consumed; - if prioritization_fees > 0 { - x.vec_pf.push(prioritization_fees); - } + x.vec_pf.push(prioritization_fees); } None => { readlocked_accounts.insert( diff --git a/src/main.rs b/src/main.rs index c672e88..1111e3e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -186,42 +186,8 @@ async fn main() { let postgres = postgres::Postgres::new().await; let slot = Arc::new(AtomicU64::new(0)); - let mut blocks_subs = HashMap::new(); - blocks_subs.insert( - "sidecar_block_subscription".to_string(), - yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterBlocks { - account_include: Default::default(), - include_transactions: Some(true), - include_accounts: Some(false), - include_entries: Some(false), - }, - ); - - let mut slot_sub = HashMap::new(); - slot_sub.insert( - "slot_sub".to_string(), - yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterSlots { - filter_by_commitment: None, - }, - ); - - let mut geyser_stream = client - .subscribe_once( - slot_sub, - Default::default(), - HashMap::new(), - Default::default(), - blocks_subs, - Default::default(), - None, - Default::default(), - None, - ) - .await - .unwrap(); - postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); - let jhs = args + let _jhs = args .banking_grpc_addresses .iter() .map(|address| { @@ -234,58 +200,93 @@ async fn main() { }) .collect_vec(); - while let Some(message) = geyser_stream.next().await { - let Ok(message) = message else { - continue; - }; + loop { + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "sidecar_block_subscription".to_string(), + yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); - let Some(update) = message.update_oneof else { - continue; - }; + let mut slot_sub = HashMap::new(); + slot_sub.insert( + "slot_sub".to_string(), + yellowstone_grpc_proto_original::prelude::SubscribeRequestFilterSlots { + filter_by_commitment: None, + }, + ); - match update { - yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block( - block, - ) => { - debug!("got block {}", block.slot); - BLOCK_TXS.set(block.transactions.len() as i64); - BANKING_STAGE_BLOCKS_COUNTER.inc(); - BANKING_STAGE_BLOCKS_TASK.inc(); - let postgres = postgres.clone(); - let slot = slot.clone(); - let map_of_infos = map_of_infos.clone(); - let slot_by_errors = slot_by_errors.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(30)).await; - for transaction in &block.transactions { - let Some(tx) = &transaction.transaction else { - continue; - }; - let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); - if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { - info.add_transaction(transaction, block.slot); + let mut geyser_stream = client + .subscribe_once( + slot_sub, + Default::default(), + HashMap::new(), + Default::default(), + blocks_subs, + Default::default(), + None, + Default::default(), + None, + ) + .await + .unwrap(); + while let Some(message) = geyser_stream.next().await { + let Ok(message) = message else { + continue; + }; + + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block( + block, + ) => { + debug!("got block {}", block.slot); + BLOCK_TXS.set(block.transactions.len() as i64); + BANKING_STAGE_BLOCKS_COUNTER.inc(); + BANKING_STAGE_BLOCKS_TASK.inc(); + let postgres = postgres.clone(); + let slot = slot.clone(); + let map_of_infos = map_of_infos.clone(); + let slot_by_errors = slot_by_errors.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(30)).await; + for transaction in &block.transactions { + let Some(tx) = &transaction.transaction else { + continue; + }; + let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); + if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { + info.add_transaction(transaction, block.slot); + } } - } - let banking_stage_error_count = - slot_by_errors.get(&block.slot).map(|x| *x.value() as i64); - let block_info = BlockInfo::new(&block, banking_stage_error_count); + let banking_stage_error_count = + slot_by_errors.get(&block.slot).map(|x| *x.value() as i64); + let block_info = BlockInfo::new(&block, banking_stage_error_count); - TXERROR_COUNT.add( - block_info.processed_transactions - block_info.successful_transactions, - ); - if let Err(e) = postgres.save_block_info(block_info).await { - error!("Error saving block {}", e); - } - slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); - slot_by_errors.remove(&block.slot); - BANKING_STAGE_BLOCKS_TASK.dec(); - }); - // delay queue so that we get all the banking stage errors before processing block - } - _ => {} - }; + TXERROR_COUNT.add( + block_info.processed_transactions - block_info.successful_transactions, + ); + if let Err(e) = postgres.save_block_info(block_info).await { + error!("Error saving block {}", e); + } + slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); + slot_by_errors.remove(&block.slot); + BANKING_STAGE_BLOCKS_TASK.dec(); + }); + // delay queue so that we get all the banking stage errors before processing block + } + _ => {} + }; + } + log::error!("stopping the sidecar, geyser block stream is broken"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - log::error!("stopping the sidecar, geyser block stream is broken"); - let _ = futures::future::select_all(jhs).await; }