diff --git a/migration.sql b/migration.sql index a2290af..e8fe1f7 100644 --- a/migration.sql +++ b/migration.sql @@ -2,16 +2,17 @@ CREATE SCHEMA banking_stage_results; CREATE TABLE banking_stage_results.transaction_infos ( signature CHAR(88) PRIMARY KEY, + first_notification_slot BIGINT NOT NULL, errors text, is_executed BOOL, is_confirmed BOOL, - first_notification_slot BIGINT NOT NULL, cu_requested BIGINT, prioritization_fees BIGINT, utc_timestamp TIMESTAMP WITH TIME ZONE NOT NULL, accounts_used text, processed_slot BIGINT, - supp_infos text + supp_infos text, + Primary key (signature, first_notification_slot) ); CREATE TABLE banking_stage_results.blocks ( diff --git a/src/main.rs b/src/main.rs index a25c1dd..b621eda 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,7 +41,7 @@ lazy_static::lazy_static! { pub async fn start_tracking_banking_stage_errors( grpc_address: String, - map_of_infos: Arc>, + map_of_infos: Arc>, slot_by_errors: Arc>, slot: Arc, subscribe_to_slots: bool, @@ -108,14 +108,14 @@ pub async fn start_tracking_banking_stage_errors( slot_by_errors.insert(transaction.slot, 1); } } - match map_of_infos.get_mut(&sig) { + match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) { Some(mut x) => { x.add_notification(&transaction); } None => { let mut x = TransactionInfo::new(&transaction); x.add_notification(&transaction); - map_of_infos.insert(sig, x); + map_of_infos.insert((sig.clone(), transaction.slot), x); } } }, @@ -140,7 +140,7 @@ async fn start_tracking_blocks( postgres: postgres::Postgres, slot: Arc, slot_by_errors: Arc>, - map_of_infos: Arc>, + map_of_infos: Arc>, ) { let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect( grpc_block_addr, @@ -211,8 +211,11 @@ async fn start_tracking_blocks( continue; }; let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); - if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { - info.add_transaction(transaction, block.slot); + for mut tx_data in map_of_infos.iter_mut() { + let (sig, _) = tx_data.key(); + if *sig == signature.to_string() { + tx_data.value_mut().add_transaction(transaction, block.slot); + } } } @@ -249,7 +252,7 @@ async fn main() { let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); let grpc_block_addr = args.grpc_address_to_fetch_blocks; - let map_of_infos = Arc::new(DashMap::::new()); + let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new()); let slot_by_errors = Arc::new(DashMap::::new()); let postgres = postgres::Postgres::new().await; diff --git a/src/postgres.rs b/src/postgres.rs index ef16ba6..c172bf8 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -118,7 +118,7 @@ impl PostgresSession { pub async fn save_banking_transaction_results( &self, - txs: &[TransactionInfo], + txs: Vec<&TransactionInfo>, ) -> anyhow::Result<()> { if txs.is_empty() { return Ok(()); @@ -243,7 +243,7 @@ impl Postgres { pub fn spawn_transaction_infos_saver( &self, - map_of_transaction: Arc>, + map_of_transaction: Arc>, slot: Arc, ) { let session = self.session.clone(); @@ -254,19 +254,17 @@ impl Postgres { let mut txs_to_store = vec![]; for tx in map_of_transaction.iter() { if slot > tx.first_notification_slot + 300 { - txs_to_store.push(tx.clone()); + txs_to_store.push(tx.key().clone()); } } if !txs_to_store.is_empty() { debug!("saving transaction infos for {}", txs_to_store.len()); - for tx in &txs_to_store { - map_of_transaction.remove(&tx.signature); - } - let batches = txs_to_store.chunks(8).collect_vec(); + let data = txs_to_store.iter().filter_map(|key| map_of_transaction.remove(key)).collect_vec(); + let batches = data.chunks(8).collect_vec(); for batch in batches { session - .save_banking_transaction_results(batch) + .save_banking_transaction_results(batch.iter().map(|(_, tx)| tx).collect_vec()) .await .unwrap(); } @@ -306,8 +304,8 @@ pub struct AccountUsed { writable: bool, } -impl From<&TransactionInfo> for PostgresTransactionInfo { - fn from(value: &TransactionInfo) -> Self { +impl From<&&TransactionInfo> for PostgresTransactionInfo { + fn from(value: &&TransactionInfo) -> Self { let errors = value .errors .iter()