solving issue with banking stage errors
This commit is contained in:
parent
dfb95559da
commit
e3867c76c2
|
@ -14,8 +14,6 @@ use solana_sdk::{
|
|||
};
|
||||
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
|
||||
|
||||
use crate::transaction_info::TransactionInfo;
|
||||
|
||||
pub struct BlockInfo {
|
||||
pub block_hash: String,
|
||||
pub slot: i64,
|
||||
|
@ -31,7 +29,7 @@ pub struct BlockInfo {
|
|||
impl BlockInfo {
|
||||
pub fn new(
|
||||
block: &SubscribeUpdateBlock,
|
||||
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
|
||||
errors_by_slots: &Arc<DashMap<u64, u64>>,
|
||||
) -> BlockInfo {
|
||||
let block_hash = block.blockhash.clone();
|
||||
let slot = block.slot;
|
||||
|
@ -52,17 +50,7 @@ impl BlockInfo {
|
|||
.filter(|x| x.meta.as_ref().map(|x| x.err.is_none()).unwrap_or(false))
|
||||
.count() as u64;
|
||||
let processed_transactions = block.transactions.len() as u64;
|
||||
let banking_stage_errors = map_of_infos
|
||||
.iter()
|
||||
.filter(|x| x.first_notification_slot == slot)
|
||||
.map(|x| {
|
||||
x.errors
|
||||
.iter()
|
||||
.filter(|(x, _)| x.slot == slot)
|
||||
.map(|(_, x)| *x)
|
||||
.sum::<usize>()
|
||||
})
|
||||
.sum::<usize>() as u64;
|
||||
let banking_stage_errors = errors_by_slots.get(&slot).map(|x| *x).unwrap_or_default();
|
||||
let total_cu_used = block
|
||||
.transactions
|
||||
.iter()
|
||||
|
@ -194,7 +182,6 @@ impl BlockInfo {
|
|||
Some(x) => {
|
||||
x.0 += cu_requested;
|
||||
x.1 += cu_consumed;
|
||||
|
||||
}
|
||||
None => {
|
||||
writelocked_accounts.insert(writable_account, (cu_requested, cu_consumed));
|
||||
|
@ -205,12 +192,14 @@ impl BlockInfo {
|
|||
|
||||
let mut heavily_writelocked_accounts = writelocked_accounts
|
||||
.iter()
|
||||
.filter(|x| x.1.1 > 1000000)
|
||||
.filter(|x| x.1 .1 > 1000000)
|
||||
.collect_vec();
|
||||
heavily_writelocked_accounts.sort_by(|lhs, rhs| (*rhs.1).cmp(lhs.1));
|
||||
let heavily_writelocked_accounts = heavily_writelocked_accounts
|
||||
.iter()
|
||||
.map(|(pubkey, (cu_req, cu_con))| format!("(k:{}, cu_req:{}, cu_con:{})", **pubkey, *cu_req, *cu_con))
|
||||
.map(|(pubkey, (cu_req, cu_con))| {
|
||||
format!("(k:{}, cu_req:{}, cu_con:{})", **pubkey, *cu_req, *cu_con)
|
||||
})
|
||||
.collect_vec();
|
||||
BlockInfo {
|
||||
block_hash,
|
||||
|
|
15
src/main.rs
15
src/main.rs
|
@ -26,6 +26,7 @@ async fn main() {
|
|||
let grpc_addr = args.grpc_address;
|
||||
let mut client = GeyserGrpcClient::connect(grpc_addr, None::<&'static str>, None).unwrap();
|
||||
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
|
||||
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
|
||||
|
||||
let postgres = postgres::Postgres::new().await;
|
||||
let slot = Arc::new(AtomicU64::new(0));
|
||||
|
@ -65,8 +66,8 @@ async fn main() {
|
|||
};
|
||||
|
||||
let Some(update) = message.update_oneof else {
|
||||
continue;
|
||||
};
|
||||
continue;
|
||||
};
|
||||
|
||||
match update {
|
||||
UpdateOneof::BankingTransactionErrors(transaction) => {
|
||||
|
@ -75,6 +76,14 @@ async fn main() {
|
|||
}
|
||||
log::info!("got banking stage transaction erros");
|
||||
let sig = transaction.signature.to_string();
|
||||
match slot_by_errors.get_mut(&transaction.slot) {
|
||||
Some(mut value) => {
|
||||
*value = *value + 1;
|
||||
}
|
||||
None => {
|
||||
slot_by_errors.insert(transaction.slot, 1);
|
||||
}
|
||||
}
|
||||
match map_of_infos.get_mut(&sig) {
|
||||
Some(mut x) => {
|
||||
x.add_notification(&transaction);
|
||||
|
@ -99,7 +108,7 @@ async fn main() {
|
|||
}
|
||||
}
|
||||
|
||||
let block_info = BlockInfo::new(&block, map_of_infos.clone());
|
||||
let block_info = BlockInfo::new(&block, &slot_by_errors);
|
||||
if let Err(e) = postgres.save_block_info(block_info).await {
|
||||
log::error!("Error saving block {}", e);
|
||||
}
|
||||
|
|
|
@ -154,8 +154,7 @@ impl TransactionInfo {
|
|||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue