diff --git a/migration.sql b/migration.sql index 8833c5e..fdea41e 100644 --- a/migration.sql +++ b/migration.sql @@ -11,6 +11,7 @@ CREATE TABLE banking_stage_results.transaction_infos ( prioritization_fees BIGINT, utc_timestamp TIMESTAMP WITH TIME ZONE NOT NULL, accounts_used text[] + processed_slot BIGINT, ); CREATE TABLE banking_stage_results.blocks ( diff --git a/src/main.rs b/src/main.rs index e865790..822b508 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,11 @@ +use clap::Parser; use std::{ collections::HashMap, sync::{atomic::AtomicU64, Arc}, }; use block_info::BlockInfo; +use cli::Args; use dashmap::DashMap; use futures::StreamExt; use solana_sdk::signature::Signature; @@ -14,12 +16,14 @@ use yellowstone_grpc_proto::prelude::{ }; mod block_info; +mod cli; mod postgres; mod transaction_info; #[tokio::main()] async fn main() { - let grpc_addr = "http://127.0.0.1:10000"; + let args = Args::parse(); + let grpc_addr = args.grpc_address; let mut client = GeyserGrpcClient::connect(grpc_addr, None::<&'static str>, None).unwrap(); let map_of_infos = Arc::new(DashMap::::new()); @@ -87,7 +91,7 @@ async fn main() { }; let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { - info.add_transaction(&transaction); + info.add_transaction(&transaction, block.slot); } } diff --git a/src/postgres.rs b/src/postgres.rs index bee65ae..f94c7da 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -101,12 +101,13 @@ impl PostgresSession { args.push(&tx.prioritization_fees); args.push(&tx.utc_timestamp); args.push(&tx.accounts_used); + args.push(&tx.processed_slot); } let mut query = String::from( r#" INSERT INTO banking_stage_results.transaction_infos - (signature, message, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used) + (signature, message, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot) VALUES "#, ); @@ -207,6 +208,7 @@ pub struct PostgresTransactionInfo { pub prioritization_fees: Option, pub utc_timestamp: DateTime, pub accounts_used: Vec, + pub processed_slot: Option, } impl From<&TransactionInfo> for PostgresTransactionInfo { @@ -234,6 +236,7 @@ impl From<&TransactionInfo> for PostgresTransactionInfo { prioritization_fees: value.prioritization_fees.map(|x| x as i64), utc_timestamp: value.utc_timestamp, accounts_used, + processed_slot: value.processed_slot.map(|x| x as i64), } } } diff --git a/src/transaction_info.rs b/src/transaction_info.rs index 9dcfca9..8e00362 100644 --- a/src/transaction_info.rs +++ b/src/transaction_info.rs @@ -92,6 +92,7 @@ pub struct TransactionInfo { pub prioritization_fees: Option, pub utc_timestamp: DateTime, pub account_used: HashMap, + pub processed_slot: Option, } impl TransactionInfo { @@ -134,6 +135,7 @@ impl TransactionInfo { prioritization_fees: None, utc_timestamp, account_used, + processed_slot: None, } } @@ -158,7 +160,7 @@ impl TransactionInfo { } } - pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo) { + pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo, slot: Slot) { let Some(transaction) = &transaction.transaction else { return; }; @@ -282,5 +284,6 @@ impl TransactionInfo { self.is_confirmed = true; self.transaction_message = Some(message); self.is_executed = true; + self.processed_slot = Some(slot); } }