diff --git a/src/postgres.rs b/src/postgres.rs index 4b81a3a..92adb2b 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -188,7 +188,7 @@ impl PostgresSession { let statement = format!( r#" INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature from {} - ON CONFLICT DO NOTHING; + ON CONFLICT DO NOTHING "#, temp_table ); @@ -250,7 +250,7 @@ impl PostgresSession { let statement = format!( r#" INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {} - ON CONFLICT DO NOTHING; + ON CONFLICT DO NOTHING "#, temp_table ); @@ -340,7 +340,7 @@ impl PostgresSession { FROM ( SELECT sig, slot, error_code, count, utc_timestamp from {} ) - as t (sig, slot, error_code, count, utc_timestamp) ON CONFLICT DO NOTHING; + as t (sig, slot, error_code, count, utc_timestamp) ON CONFLICT DO NOTHING "#, temp_table ); @@ -418,16 +418,13 @@ impl PostgresSession { r#" INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl) SELECT - ( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ), - ( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ), - t.is_writable, - t.is_signer, - t.is_atl - FROM ( - SELECT account_key, signature, is_writable, is_signer, is_atl from {} - ) - as t (account_key, signature, is_writable, is_signer, is_atl) - ON CONFLICT DO NOTHING; + ( select acc_id from banking_stage_results_2.accounts where account_key = new_amt_data.account_key ), + ( select transaction_id from banking_stage_results_2.transactions where signature = new_amt_data.signature ), + new_amt_data.is_writable, + new_amt_data.is_signer, + new_amt_data.is_atl + FROM {} AS new_amt_data + ON CONFLICT DO NOTHING "#, temp_table ); @@ -478,7 +475,7 @@ impl PostgresSession { r#" INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids) SELECT acc_id, tx_ids_agg FROM {temp_table_name} - ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids; + ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids "#, temp_table_name = temp_table_latest_agged ); @@ -512,7 +509,7 @@ impl PostgresSession { cu_consumed BIGINT, prioritization_fees BIGINT, supp_infos text - );", + )", temp_table ) .as_str(), @@ -579,11 +576,8 @@ impl PostgresSession { t.cu_consumed, t.prioritization_fees, t.supp_infos - FROM ( - SELECT signature, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos from {} - ) - as t (signature, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos) - ON CONFLICT DO NOTHING; + FROM {} AS t + ON CONFLICT DO NOTHING "#, temp_table ); @@ -611,7 +605,7 @@ impl PostgresSession { total_cu_requested BIGINT, total_cu_consumed BIGINT, prioritization_fees_info text - );", + )", temp_table ) .as_str(), @@ -697,7 +691,7 @@ impl PostgresSession { total_cu_consumed, prioritization_fees_info ) - ON CONFLICT DO NOTHING; + ON CONFLICT DO NOTHING "#, temp_table ); @@ -724,7 +718,8 @@ impl PostgresSession { total_cu_used, total_cu_requested, supp_infos - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8); + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT DO NOTHING "#; let started_at = Instant::now(); let num_rows = self.client @@ -748,6 +743,10 @@ impl PostgresSession { started_at.elapsed().as_millis() ); + if num_rows == 0 { + warn!("block_info already exists in blocks table - skipping insert"); + } + Ok(()) }