Revert "use one linear stream"

This reverts commit cf5a6717d2.
This commit is contained in:
GroovieGermanikus 2024-02-28 09:11:37 +01:00
parent cf5a6717d2
commit 1aea32786c
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 25 additions and 21 deletions

View File

@ -155,7 +155,7 @@ async fn start_tracking_blocks(
rpc_client: Arc<RpcClient>,
grpc_block_addr: String,
grpc_x_token: Option<String>,
block_saver: Sender<BlockInfo>,
block_sender_postgres: Vec<Sender<BlockInfo>>,
slot: Arc<AtomicU64>,
alts_list: Vec<Pubkey>,
) {
@ -296,25 +296,25 @@ async fn start_tracking_blocks(
BANKING_STAGE_BLOCKS_TASK.inc();
let count = block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) as usize;
// let pg_size = block_sender_postgres.len();
// let block_sender = block_sender_postgres[count%pg_size].clone();
let pg_size = block_sender_postgres.len();
let block_sender = block_sender_postgres[count%pg_size].clone();
let slot = slot.clone();
let atl_store = atl_store.clone();
// tokio::spawn(async move {
info!("spawn task");
// to support address lookup tables delay processing a littlebit
tokio::time::sleep(Duration::from_secs(2)).await;
let block_info = BlockInfo::new(atl_store, &block).await;
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres::send_block_info_to_buffer(block_saver.clone(), block_info).await {
panic!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
BANKING_STAGE_BLOCKS_TASK.dec();
// });
tokio::spawn(async move {
info!("spawn task");
// to support address lookup tables delay processing a littlebit
tokio::time::sleep(Duration::from_secs(2)).await;
let block_info = BlockInfo::new(atl_store, &block).await;
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres::send_block_info_to_buffer(block_sender, block_info).await {
panic!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
BANKING_STAGE_BLOCKS_TASK.dec();
});
// delay queue so that we get all the banking stage errors before processing block
},
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Account(account_update) => {
@ -364,9 +364,13 @@ async fn main() -> anyhow::Result<()> {
.map(|x| Pubkey::from_str(&x).unwrap())
.collect_vec();
let block_saver = postgres::Postgres::new_with_workmem(33)
.await
.spawn_block_saver();
let mut block_senders = vec![];
for i in 1..=4 {
let s = postgres::Postgres::new_with_workmem(i)
.await
.spawn_block_saver();
block_senders.push(s);
}
postgres1.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
@ -395,7 +399,7 @@ async fn main() -> anyhow::Result<()> {
rpc_client,
gprc_block_addr,
args.grpc_x_token,
block_saver,
block_senders,
slot,
alts_list,
)