process blocks with a delay
This commit is contained in:
parent
e3867c76c2
commit
50b7447063
47
src/main.rs
47
src/main.rs
|
@ -1,7 +1,8 @@
|
|||
use clap::Parser;
|
||||
use tokio::time::Instant;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
sync::{atomic::AtomicU64, Arc}, time::Duration,
|
||||
};
|
||||
|
||||
use block_info::BlockInfo;
|
||||
|
@ -12,7 +13,7 @@ use solana_sdk::signature::Signature;
|
|||
use transaction_info::TransactionInfo;
|
||||
use yellowstone_grpc_client::GeyserGrpcClient;
|
||||
use yellowstone_grpc_proto::prelude::{
|
||||
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
|
||||
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdateBlock,
|
||||
};
|
||||
|
||||
mod block_info;
|
||||
|
@ -60,6 +61,33 @@ async fn main() {
|
|||
|
||||
postgres.start_saving_transaction(map_of_infos.clone(), slot.clone());
|
||||
|
||||
let (send_block, mut recv_block) = tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>();
|
||||
let slot_by_error_task = slot_by_errors.clone();
|
||||
let map_of_infos_task = map_of_infos.clone();
|
||||
|
||||
// process blocks with 2 mins delay so that we process all the banking stage errors before processing blocks
|
||||
tokio::spawn(async move {
|
||||
while let Some((wait_until, block)) = recv_block.recv().await {
|
||||
println!("b");
|
||||
tokio::time::sleep_until(wait_until).await;
|
||||
for transaction in &block.transactions {
|
||||
let Some(tx) = &transaction.transaction else {
|
||||
continue;
|
||||
};
|
||||
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
|
||||
if let Some(mut info) = map_of_infos_task.get_mut(&signature.to_string()) {
|
||||
info.add_transaction(&transaction, block.slot);
|
||||
}
|
||||
}
|
||||
|
||||
let block_info = BlockInfo::new(&block, &slot_by_error_task);
|
||||
if let Err(e) = postgres.save_block_info(block_info).await {
|
||||
log::error!("Error saving block {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
while let Some(message) = stream.next().await {
|
||||
let Ok(message) = message else {
|
||||
continue;
|
||||
|
@ -98,20 +126,9 @@ async fn main() {
|
|||
UpdateOneof::Block(block) => {
|
||||
log::info!("got block");
|
||||
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
|
||||
for transaction in &block.transactions {
|
||||
let Some(tx) = &transaction.transaction else {
|
||||
continue;
|
||||
};
|
||||
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
|
||||
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
|
||||
info.add_transaction(&transaction, block.slot);
|
||||
}
|
||||
}
|
||||
|
||||
let block_info = BlockInfo::new(&block, &slot_by_errors);
|
||||
if let Err(e) = postgres.save_block_info(block_info).await {
|
||||
log::error!("Error saving block {}", e);
|
||||
}
|
||||
send_block.send(( Instant::now() + Duration::from_secs(120), block)).expect("should works");
|
||||
// delay queue so that we get all the banking stage errors before processing block
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue