From 2c51ea8abf229f415fcb74f9cc1ca0c617834671 Mon Sep 17 00:00:00 2001 From: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com> Date: Mon, 12 Feb 2024 17:10:40 +0100 Subject: [PATCH] issue #58: fix INSERT INTO deadlock by ordering data (#59) * issue #58: fix INSERT INTO deadlock by ordering data * order transactions store in INSERT INTO batch to fix https://github.com/blockworks-foundation/BankingStageErrorsTrackingSidecar/issues/58 * log tx info batch size * workaround for cargo-chef MSRV issue --- Dockerfile | 2 +- src/main.rs | 9 ++++++++- src/postgres.rs | 24 ++++++++++++++++++------ src/transaction_info.rs | 5 ++++- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/Dockerfile b/Dockerfile index 088ba4f..367913d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # syntax = docker/dockerfile:1.2 FROM rust:1.70.0 as base -RUN cargo install cargo-chef --locked +RUN cargo install cargo-chef@0.1.62 --locked RUN rustup component add rustfmt RUN apt-get update && apt-get install -y clang cmake ssh WORKDIR /app diff --git a/src/main.rs b/src/main.rs index 30c95c0..f550b85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,6 +55,7 @@ lazy_static::lazy_static! { pub async fn start_tracking_banking_stage_errors( grpc_address: String, map_of_infos: Arc>, + error_plugin_write_version: Arc, slot: Arc, _subscribe_to_slots: bool, ) { @@ -128,7 +129,9 @@ pub async fn start_tracking_banking_stage_errors( tx_info.add_notification(&transaction); } None => { - let tx_info = TransactionInfo::new(&transaction); + // map_of_infos might get populated by parallel writers if multiple geyser sources are configured + let write_version = error_plugin_write_version.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let tx_info = TransactionInfo::new(&transaction, write_version); map_of_infos.insert((sig, transaction.slot), tx_info); } } @@ -341,6 +344,8 @@ async fn main() -> anyhow::Result<()> { let grpc_block_addr = args.grpc_address_to_fetch_blocks; let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new()); + // maintain a global serial version for deterministic transaction ordering + let error_plugin_write_version = Arc::new(AtomicU64::new(0)); let postgres1 = postgres::Postgres::new_with_workmem(0).await; let slot = Arc::new(AtomicU64::new(0)); @@ -375,10 +380,12 @@ async fn main() -> anyhow::Result<()> { let address = address.clone(); let map_of_infos = map_of_infos.clone(); let slot = slot.clone(); + let global_error_plugin_write_version = error_plugin_write_version.clone(); tokio::spawn(async move { start_tracking_banking_stage_errors( address, map_of_infos, + global_error_plugin_write_version, slot, no_block_subscription, ) diff --git a/src/postgres.rs b/src/postgres.rs index 7b527c6..0a975a4 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -234,7 +234,9 @@ impl PostgresSession { let statement = format!( r#" - INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature from {} + INSERT INTO banking_stage_results_2.transactions(signature) + SELECT signature FROM {} + ORDER BY signature ON CONFLICT DO NOTHING "#, temp_table @@ -295,7 +297,9 @@ impl PostgresSession { let statement = format!( r#" - INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {} + INSERT INTO banking_stage_results_2.accounts(account_key) + SELECT key FROM {} + ORDER BY key ON CONFLICT DO NOTHING "#, temp_table @@ -386,7 +390,9 @@ impl PostgresSession { FROM ( SELECT sig, slot, error_code, count, utc_timestamp from {} ) - as t (sig, slot, error_code, count, utc_timestamp) ON CONFLICT DO NOTHING + as t (sig, slot, error_code, count, utc_timestamp) + ORDER BY 1,2,3 -- sorry + ON CONFLICT DO NOTHING "#, temp_table ); @@ -460,6 +466,7 @@ impl PostgresSession { started_at.elapsed().as_millis() ); + // note: no lock ordering here, as the accounts_map_transaction does not seem to cause deadlocks (issue 58) // merge data from temp table into accounts_map_transaction let statement = format!( r#" @@ -524,6 +531,7 @@ impl PostgresSession { r#" INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids) SELECT acc_id, tx_ids_agg FROM {temp_table_name} + ORDER BY acc_id ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids "#, temp_table_name = temp_table_latest_agged @@ -626,6 +634,7 @@ impl PostgresSession { t.prioritization_fees, t.supp_infos FROM {} AS t + ORDER BY 1 -- sorry ON CONFLICT DO NOTHING "#, temp_table @@ -757,6 +766,7 @@ impl PostgresSession { total_cu_consumed, prioritization_fees_info ) + ORDER BY 1,2 -- sorry ON CONFLICT DO NOTHING "#, temp_table @@ -1309,14 +1319,16 @@ impl Postgres { tokio::time::sleep(Duration::from_secs(60)).await; let slot = slot.load(std::sync::atomic::Ordering::Relaxed); let mut txs_to_store = vec![]; - for tx in map_of_transaction.iter() { - if slot > tx.key().1 + 300 { + // restore transactions sort order + for tx in map_of_transaction.iter() + .sorted_by_key(|txi| txi.write_version) { + if tx.key().1 < slot - 300 { txs_to_store.push(tx.key().clone()); } } if !txs_to_store.is_empty() { - debug!("saving transaction infos for {}", txs_to_store.len()); + info!("saving transaction infos for {} txs", txs_to_store.len()); let batches = txs_to_store .iter() .filter_map(|key| map_of_transaction.remove(key)) diff --git a/src/transaction_info.rs b/src/transaction_info.rs index e884a21..306b82a 100644 --- a/src/transaction_info.rs +++ b/src/transaction_info.rs @@ -79,10 +79,12 @@ pub struct TransactionInfo { pub slot: u64, pub utc_timestamp: DateTime, pub account_used: Vec<(String, bool)>, + // local write_version used in lite-rpc + pub write_version: u64, } impl TransactionInfo { - pub fn new(notification: &SubscribeUpdateBankingTransactionResults) -> Self { + pub fn new(notification: &SubscribeUpdateBankingTransactionResults, global_error_plugin_write_version: u64) -> Self { let mut errors = HashMap::new(); // Get time let utc_timestamp = Utc::now(); @@ -107,6 +109,7 @@ impl TransactionInfo { slot: notification.slot, utc_timestamp, account_used, + write_version: global_error_plugin_write_version, } }