diff --git a/src/main.rs b/src/main.rs index 1116af4..bd8b698 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use clap::Parser; use itertools::Itertools; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, sync::{atomic::AtomicU64, Arc}, time::Duration, }; @@ -40,7 +40,7 @@ lazy_static::lazy_static! { pub async fn start_tracking_banking_stage_errors( grpc_address: String, - map_of_infos: Arc>>, + map_of_infos: Arc>, slot_by_errors: Arc>, slot: Arc, subscribe_to_slots: bool, @@ -109,22 +109,14 @@ pub async fn start_tracking_banking_stage_errors( slot_by_errors.insert(transaction.slot, 1); } } - match map_of_infos.get_mut(&sig) { + match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) { Some(mut x) => { - let slot_map = x.value_mut(); - match slot_map.get_mut(&transaction.slot) { - Some(tx_info) => { - tx_info.add_notification(&transaction) - }, - None => { - let tx_info = TransactionInfo::new(&transaction); - slot_map.insert(transaction.slot, tx_info); - } - } + let tx_info = x.value_mut(); + tx_info.add_notification(&transaction); } None => { let tx_info = TransactionInfo::new(&transaction); - map_of_infos.insert(sig, BTreeMap::from([(transaction.slot, tx_info)])); + map_of_infos.insert((sig, transaction.slot), tx_info); } } }, @@ -242,7 +234,7 @@ async fn main() { let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); let grpc_block_addr = args.grpc_address_to_fetch_blocks; - let map_of_infos = Arc::new(DashMap::>::new()); + let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new()); let slot_by_errors = Arc::new(DashMap::::new()); let postgres = postgres::Postgres::new().await; diff --git a/src/postgres.rs b/src/postgres.rs index 08d85f7..189eb24 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,5 +1,4 @@ use std::{ - collections::BTreeMap, sync::{atomic::AtomicU64, Arc}, time::Duration, }; @@ -706,7 +705,7 @@ impl Postgres { pub fn spawn_transaction_infos_saver( &self, - map_of_transaction: Arc>>, + map_of_transaction: Arc>, slot: Arc, ) { let session = self.session.clone(); @@ -716,29 +715,20 @@ impl Postgres { let slot = slot.load(std::sync::atomic::Ordering::Relaxed); let mut txs_to_store = vec![]; for tx in map_of_transaction.iter() { - let slot_map = tx.value(); - let first_slot = slot_map.keys().next().cloned().unwrap_or_default(); - if slot > first_slot + 300 { + if slot > tx.key().1 + 300 { txs_to_store.push(tx.key().clone()); } } if !txs_to_store.is_empty() { debug!("saving transaction infos for {}", txs_to_store.len()); - let data = txs_to_store + let batches = txs_to_store .iter() .filter_map(|key| map_of_transaction.remove(key)) - .map(|(_, tree)| tree.iter().map(|(_, info)| info).cloned().collect_vec()) - .flatten() + .map(|(_, trans)| trans) .collect_vec(); - let batches = data.chunks(1024).collect_vec(); - for batch in batches { - if let Err(err) = session - .save_banking_transaction_results(batch.to_vec()) - .await - { - panic!("saving transaction infos failed {}", err); - } + if let Err(err) = session.save_banking_transaction_results(batches).await { + panic!("saving transaction infos failed {}", err); } } }