From cf5a6717d2714d848d811a66ee3577dd8789a155 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 28 Feb 2024 08:30:07 +0100 Subject: [PATCH] use one linear stream --- src/main.rs | 46 +++++++++++++++++++++------------------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0a84919..c54f1df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -155,7 +155,7 @@ async fn start_tracking_blocks( rpc_client: Arc, grpc_block_addr: String, grpc_x_token: Option, - block_sender_postgres: Vec>, + block_saver: Sender, slot: Arc, alts_list: Vec, ) { @@ -296,25 +296,25 @@ async fn start_tracking_blocks( BANKING_STAGE_BLOCKS_TASK.inc(); let count = block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) as usize; - let pg_size = block_sender_postgres.len(); - let block_sender = block_sender_postgres[count%pg_size].clone(); + // let pg_size = block_sender_postgres.len(); + // let block_sender = block_sender_postgres[count%pg_size].clone(); let slot = slot.clone(); let atl_store = atl_store.clone(); - tokio::spawn(async move { - info!("spawn task"); - // to support address lookup tables delay processing a littlebit - tokio::time::sleep(Duration::from_secs(2)).await; - let block_info = BlockInfo::new(atl_store, &block).await; - TXERROR_COUNT.add( - block_info.processed_transactions - block_info.successful_transactions, - ); - if let Err(e) = postgres::send_block_info_to_buffer(block_sender, block_info).await { - panic!("Error saving block {}", e); - } - slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); - BANKING_STAGE_BLOCKS_TASK.dec(); - }); + // tokio::spawn(async move { + info!("spawn task"); + // to support address lookup tables delay processing a littlebit + tokio::time::sleep(Duration::from_secs(2)).await; + let block_info = BlockInfo::new(atl_store, &block).await; + TXERROR_COUNT.add( + block_info.processed_transactions - block_info.successful_transactions, + ); + if let Err(e) = postgres::send_block_info_to_buffer(block_saver.clone(), block_info).await { + panic!("Error saving block {}", e); + } + slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); + BANKING_STAGE_BLOCKS_TASK.dec(); + // }); // delay queue so that we get all the banking stage errors before processing block }, yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Account(account_update) => { @@ -364,13 +364,9 @@ async fn main() -> anyhow::Result<()> { .map(|x| Pubkey::from_str(&x).unwrap()) .collect_vec(); - let mut block_senders = vec![]; - for i in 1..=4 { - let s = postgres::Postgres::new_with_workmem(i) - .await - .spawn_block_saver(); - block_senders.push(s); - } + let block_saver = postgres::Postgres::new_with_workmem(33) + .await + .spawn_block_saver(); postgres1.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); @@ -399,7 +395,7 @@ async fn main() -> anyhow::Result<()> { rpc_client, gprc_block_addr, args.grpc_x_token, - block_senders, + block_saver, slot, alts_list, )