Adding the unnecessary btree map
This commit is contained in:
parent
7a0e8a6084
commit
9b93487249
22
src/main.rs
22
src/main.rs
|
@ -1,7 +1,7 @@
|
|||
use clap::Parser;
|
||||
use itertools::Itertools;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
collections::HashMap,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
|
@ -40,7 +40,7 @@ lazy_static::lazy_static! {
|
|||
|
||||
pub async fn start_tracking_banking_stage_errors(
|
||||
grpc_address: String,
|
||||
map_of_infos: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
|
||||
map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>,
|
||||
slot_by_errors: Arc<DashMap<u64, u64>>,
|
||||
slot: Arc<AtomicU64>,
|
||||
subscribe_to_slots: bool,
|
||||
|
@ -109,22 +109,14 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
slot_by_errors.insert(transaction.slot, 1);
|
||||
}
|
||||
}
|
||||
match map_of_infos.get_mut(&sig) {
|
||||
match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) {
|
||||
Some(mut x) => {
|
||||
let slot_map = x.value_mut();
|
||||
match slot_map.get_mut(&transaction.slot) {
|
||||
Some(tx_info) => {
|
||||
tx_info.add_notification(&transaction)
|
||||
},
|
||||
None => {
|
||||
let tx_info = TransactionInfo::new(&transaction);
|
||||
slot_map.insert(transaction.slot, tx_info);
|
||||
}
|
||||
}
|
||||
let tx_info = x.value_mut();
|
||||
tx_info.add_notification(&transaction);
|
||||
}
|
||||
None => {
|
||||
let tx_info = TransactionInfo::new(&transaction);
|
||||
map_of_infos.insert(sig, BTreeMap::from([(transaction.slot, tx_info)]));
|
||||
map_of_infos.insert((sig, transaction.slot), tx_info);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -242,7 +234,7 @@ async fn main() {
|
|||
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
|
||||
|
||||
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
|
||||
let map_of_infos = Arc::new(DashMap::<String, BTreeMap<u64, TransactionInfo>>::new());
|
||||
let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new());
|
||||
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
|
||||
|
||||
let postgres = postgres::Postgres::new().await;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use std::{
|
||||
collections::BTreeMap,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
|
@ -706,7 +705,7 @@ impl Postgres {
|
|||
|
||||
pub fn spawn_transaction_infos_saver(
|
||||
&self,
|
||||
map_of_transaction: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
|
||||
map_of_transaction: Arc<DashMap<(String, u64), TransactionInfo>>,
|
||||
slot: Arc<AtomicU64>,
|
||||
) {
|
||||
let session = self.session.clone();
|
||||
|
@ -716,29 +715,20 @@ impl Postgres {
|
|||
let slot = slot.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let mut txs_to_store = vec![];
|
||||
for tx in map_of_transaction.iter() {
|
||||
let slot_map = tx.value();
|
||||
let first_slot = slot_map.keys().next().cloned().unwrap_or_default();
|
||||
if slot > first_slot + 300 {
|
||||
if slot > tx.key().1 + 300 {
|
||||
txs_to_store.push(tx.key().clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !txs_to_store.is_empty() {
|
||||
debug!("saving transaction infos for {}", txs_to_store.len());
|
||||
let data = txs_to_store
|
||||
let batches = txs_to_store
|
||||
.iter()
|
||||
.filter_map(|key| map_of_transaction.remove(key))
|
||||
.map(|(_, tree)| tree.iter().map(|(_, info)| info).cloned().collect_vec())
|
||||
.flatten()
|
||||
.map(|(_, trans)| trans)
|
||||
.collect_vec();
|
||||
let batches = data.chunks(1024).collect_vec();
|
||||
for batch in batches {
|
||||
if let Err(err) = session
|
||||
.save_banking_transaction_results(batch.to_vec())
|
||||
.await
|
||||
{
|
||||
panic!("saving transaction infos failed {}", err);
|
||||
}
|
||||
if let Err(err) = session.save_banking_transaction_results(batches).await {
|
||||
panic!("saving transaction infos failed {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue