chaning primary key on transaction infos table (#11)
This commit is contained in:
parent
80283c5d48
commit
63edab3060
|
@ -2,16 +2,17 @@ CREATE SCHEMA banking_stage_results;
|
|||
|
||||
CREATE TABLE banking_stage_results.transaction_infos (
|
||||
signature CHAR(88) PRIMARY KEY,
|
||||
first_notification_slot BIGINT NOT NULL,
|
||||
errors text,
|
||||
is_executed BOOL,
|
||||
is_confirmed BOOL,
|
||||
first_notification_slot BIGINT NOT NULL,
|
||||
cu_requested BIGINT,
|
||||
prioritization_fees BIGINT,
|
||||
utc_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
accounts_used text,
|
||||
processed_slot BIGINT,
|
||||
supp_infos text
|
||||
supp_infos text,
|
||||
Primary key (signature, first_notification_slot)
|
||||
);
|
||||
|
||||
CREATE TABLE banking_stage_results.blocks (
|
||||
|
|
17
src/main.rs
17
src/main.rs
|
@ -41,7 +41,7 @@ lazy_static::lazy_static! {
|
|||
|
||||
pub async fn start_tracking_banking_stage_errors(
|
||||
grpc_address: String,
|
||||
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
|
||||
map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>,
|
||||
slot_by_errors: Arc<DashMap<u64, u64>>,
|
||||
slot: Arc<AtomicU64>,
|
||||
subscribe_to_slots: bool,
|
||||
|
@ -108,14 +108,14 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
slot_by_errors.insert(transaction.slot, 1);
|
||||
}
|
||||
}
|
||||
match map_of_infos.get_mut(&sig) {
|
||||
match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) {
|
||||
Some(mut x) => {
|
||||
x.add_notification(&transaction);
|
||||
}
|
||||
None => {
|
||||
let mut x = TransactionInfo::new(&transaction);
|
||||
x.add_notification(&transaction);
|
||||
map_of_infos.insert(sig, x);
|
||||
map_of_infos.insert((sig.clone(), transaction.slot), x);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -140,7 +140,7 @@ async fn start_tracking_blocks(
|
|||
postgres: postgres::Postgres,
|
||||
slot: Arc<AtomicU64>,
|
||||
slot_by_errors: Arc<DashMap<u64, u64>>,
|
||||
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
|
||||
map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>,
|
||||
) {
|
||||
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
|
||||
grpc_block_addr,
|
||||
|
@ -211,8 +211,11 @@ async fn start_tracking_blocks(
|
|||
continue;
|
||||
};
|
||||
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
|
||||
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
|
||||
info.add_transaction(transaction, block.slot);
|
||||
for mut tx_data in map_of_infos.iter_mut() {
|
||||
let (sig, _) = tx_data.key();
|
||||
if *sig == signature.to_string() {
|
||||
tx_data.value_mut().add_transaction(transaction, block.slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -249,7 +252,7 @@ async fn main() {
|
|||
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
|
||||
|
||||
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
|
||||
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
|
||||
let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new());
|
||||
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
|
||||
|
||||
let postgres = postgres::Postgres::new().await;
|
||||
|
|
|
@ -118,7 +118,7 @@ impl PostgresSession {
|
|||
|
||||
pub async fn save_banking_transaction_results(
|
||||
&self,
|
||||
txs: &[TransactionInfo],
|
||||
txs: Vec<&TransactionInfo>,
|
||||
) -> anyhow::Result<()> {
|
||||
if txs.is_empty() {
|
||||
return Ok(());
|
||||
|
@ -243,7 +243,7 @@ impl Postgres {
|
|||
|
||||
pub fn spawn_transaction_infos_saver(
|
||||
&self,
|
||||
map_of_transaction: Arc<DashMap<String, TransactionInfo>>,
|
||||
map_of_transaction: Arc<DashMap<(String, u64), TransactionInfo>>,
|
||||
slot: Arc<AtomicU64>,
|
||||
) {
|
||||
let session = self.session.clone();
|
||||
|
@ -254,19 +254,17 @@ impl Postgres {
|
|||
let mut txs_to_store = vec![];
|
||||
for tx in map_of_transaction.iter() {
|
||||
if slot > tx.first_notification_slot + 300 {
|
||||
txs_to_store.push(tx.clone());
|
||||
txs_to_store.push(tx.key().clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !txs_to_store.is_empty() {
|
||||
debug!("saving transaction infos for {}", txs_to_store.len());
|
||||
for tx in &txs_to_store {
|
||||
map_of_transaction.remove(&tx.signature);
|
||||
}
|
||||
let batches = txs_to_store.chunks(8).collect_vec();
|
||||
let data = txs_to_store.iter().filter_map(|key| map_of_transaction.remove(key)).collect_vec();
|
||||
let batches = data.chunks(8).collect_vec();
|
||||
for batch in batches {
|
||||
session
|
||||
.save_banking_transaction_results(batch)
|
||||
.save_banking_transaction_results(batch.iter().map(|(_, tx)| tx).collect_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -306,8 +304,8 @@ pub struct AccountUsed {
|
|||
writable: bool,
|
||||
}
|
||||
|
||||
impl From<&TransactionInfo> for PostgresTransactionInfo {
|
||||
fn from(value: &TransactionInfo) -> Self {
|
||||
impl From<&&TransactionInfo> for PostgresTransactionInfo {
|
||||
fn from(value: &&TransactionInfo) -> Self {
|
||||
let errors = value
|
||||
.errors
|
||||
.iter()
|
||||
|
|
Loading…
Reference in New Issue