optimizing the maps (#13)

This commit is contained in:
galactus 2023-12-06 11:20:20 +01:00 committed by GitHub
parent f6608e5c1c
commit 5eae4abec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 19 deletions

View File

@ -1,7 +1,7 @@
use clap::Parser;
use itertools::Itertools;
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
@ -41,7 +41,7 @@ lazy_static::lazy_static! {
pub async fn start_tracking_banking_stage_errors(
grpc_address: String,
map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>,
map_of_infos: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
slot_by_errors: Arc<DashMap<u64, u64>>,
slot: Arc<AtomicU64>,
subscribe_to_slots: bool,
@ -108,14 +108,16 @@ pub async fn start_tracking_banking_stage_errors(
slot_by_errors.insert(transaction.slot, 1);
}
}
match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) {
match map_of_infos.get_mut(&sig) {
Some(mut x) => {
x.add_notification(&transaction);
x.value_mut().iter_mut().for_each(|(_, tx_info)| {
tx_info.add_notification(&transaction)
});
}
None => {
let mut x = TransactionInfo::new(&transaction);
x.add_notification(&transaction);
map_of_infos.insert((sig.clone(), transaction.slot), x);
map_of_infos.insert(sig, BTreeMap::from([(transaction.slot, x)]));
}
}
},
@ -140,7 +142,7 @@ async fn start_tracking_blocks(
postgres: postgres::Postgres,
slot: Arc<AtomicU64>,
slot_by_errors: Arc<DashMap<u64, u64>>,
map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>,
map_of_infos: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
) {
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
grpc_block_addr,
@ -210,11 +212,12 @@ async fn start_tracking_blocks(
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
for mut tx_data in map_of_infos.iter_mut() {
let (sig, _) = tx_data.key();
if *sig == signature.to_string() {
tx_data.value_mut().add_transaction(transaction, block.slot);
let signature = Signature::try_from(tx.signatures[0].clone())
.unwrap()
.to_string();
if let Some(mut tree) = map_of_infos.get_mut(&signature) {
for (_, tx_info) in tree.iter_mut() {
tx_info.add_transaction(transaction, block.slot);
}
}
}
@ -252,7 +255,7 @@ async fn main() {
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new());
let map_of_infos = Arc::new(DashMap::<String, BTreeMap<u64, TransactionInfo>>::new());
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
let postgres = postgres::Postgres::new().await;

View File

@ -1,4 +1,5 @@
use std::{
collections::BTreeMap,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
@ -118,7 +119,7 @@ impl PostgresSession {
pub async fn save_banking_transaction_results(
&self,
txs: Vec<&TransactionInfo>,
txs: Vec<TransactionInfo>,
) -> anyhow::Result<()> {
if txs.is_empty() {
return Ok(());
@ -243,7 +244,7 @@ impl Postgres {
pub fn spawn_transaction_infos_saver(
&self,
map_of_transaction: Arc<DashMap<(String, u64), TransactionInfo>>,
map_of_transaction: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
slot: Arc<AtomicU64>,
) {
let session = self.session.clone();
@ -253,18 +254,25 @@ impl Postgres {
let slot = slot.load(std::sync::atomic::Ordering::Relaxed);
let mut txs_to_store = vec![];
for tx in map_of_transaction.iter() {
if slot > tx.first_notification_slot + 300 {
let slot_map = tx.value();
let first_slot = slot_map.keys().next().cloned().unwrap_or_default();
if slot > first_slot + 300 {
txs_to_store.push(tx.key().clone());
}
}
if !txs_to_store.is_empty() {
debug!("saving transaction infos for {}", txs_to_store.len());
let data = txs_to_store.iter().filter_map(|key| map_of_transaction.remove(key)).collect_vec();
let data = txs_to_store
.iter()
.filter_map(|key| map_of_transaction.remove(key))
.map(|(_, tree)| tree.iter().map(|(_, info)| info).cloned().collect_vec())
.flatten()
.collect_vec();
let batches = data.chunks(8).collect_vec();
for batch in batches {
session
.save_banking_transaction_results(batch.iter().map(|(_, tx)| tx).collect_vec())
.save_banking_transaction_results(batch.to_vec())
.await
.unwrap();
}
@ -304,8 +312,8 @@ pub struct AccountUsed {
writable: bool,
}
impl From<&&TransactionInfo> for PostgresTransactionInfo {
fn from(value: &&TransactionInfo) -> Self {
impl From<&TransactionInfo> for PostgresTransactionInfo {
fn from(value: &TransactionInfo) -> Self {
let errors = value
.errors
.iter()