use one linear stream

This commit is contained in:
GroovieGermanikus 2024-02-28 08:30:07 +01:00
parent c46f557a49
commit cf5a6717d2
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 21 additions and 25 deletions

View File

@ -155,7 +155,7 @@ async fn start_tracking_blocks(
rpc_client: Arc<RpcClient>,
grpc_block_addr: String,
grpc_x_token: Option<String>,
block_sender_postgres: Vec<Sender<BlockInfo>>,
block_saver: Sender<BlockInfo>,
slot: Arc<AtomicU64>,
alts_list: Vec<Pubkey>,
) {
@ -296,25 +296,25 @@ async fn start_tracking_blocks(
BANKING_STAGE_BLOCKS_TASK.inc();
let count = block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) as usize;
let pg_size = block_sender_postgres.len();
let block_sender = block_sender_postgres[count%pg_size].clone();
// let pg_size = block_sender_postgres.len();
// let block_sender = block_sender_postgres[count%pg_size].clone();
let slot = slot.clone();
let atl_store = atl_store.clone();
tokio::spawn(async move {
info!("spawn task");
// to support address lookup tables delay processing a littlebit
tokio::time::sleep(Duration::from_secs(2)).await;
let block_info = BlockInfo::new(atl_store, &block).await;
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres::send_block_info_to_buffer(block_sender, block_info).await {
panic!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
BANKING_STAGE_BLOCKS_TASK.dec();
});
// tokio::spawn(async move {
info!("spawn task");
// to support address lookup tables delay processing a littlebit
tokio::time::sleep(Duration::from_secs(2)).await;
let block_info = BlockInfo::new(atl_store, &block).await;
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres::send_block_info_to_buffer(block_saver.clone(), block_info).await {
panic!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
BANKING_STAGE_BLOCKS_TASK.dec();
// });
// delay queue so that we get all the banking stage errors before processing block
},
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Account(account_update) => {
@ -364,13 +364,9 @@ async fn main() -> anyhow::Result<()> {
.map(|x| Pubkey::from_str(&x).unwrap())
.collect_vec();
let mut block_senders = vec![];
for i in 1..=4 {
let s = postgres::Postgres::new_with_workmem(i)
.await
.spawn_block_saver();
block_senders.push(s);
}
let block_saver = postgres::Postgres::new_with_workmem(33)
.await
.spawn_block_saver();
postgres1.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
@ -399,7 +395,7 @@ async fn main() -> anyhow::Result<()> {
rpc_client,
gprc_block_addr,
args.grpc_x_token,
block_senders,
block_saver,
slot,
alts_list,
)