issue #58: fix INSERT INTO deadlock by ordering data (#59)

* issue #58: fix INSERT INTO deadlock by ordering data

* order transactions store in INSERT INTO batch to fix https://github.com/blockworks-foundation/BankingStageErrorsTrackingSidecar/issues/58

* log tx info batch size

* workaround for cargo-chef MSRV issue
This commit is contained in:
Groovie | Mango 2024-02-12 17:10:40 +01:00 committed by GitHub
parent 9ca22c9a46
commit 2c51ea8abf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 31 additions and 9 deletions

View File

@ -1,6 +1,6 @@
# syntax = docker/dockerfile:1.2
FROM rust:1.70.0 as base
RUN cargo install cargo-chef --locked
RUN cargo install cargo-chef@0.1.62 --locked
RUN rustup component add rustfmt
RUN apt-get update && apt-get install -y clang cmake ssh
WORKDIR /app

View File

@ -55,6 +55,7 @@ lazy_static::lazy_static! {
pub async fn start_tracking_banking_stage_errors(
grpc_address: String,
map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>,
error_plugin_write_version: Arc<AtomicU64>,
slot: Arc<AtomicU64>,
_subscribe_to_slots: bool,
) {
@ -128,7 +129,9 @@ pub async fn start_tracking_banking_stage_errors(
tx_info.add_notification(&transaction);
}
None => {
let tx_info = TransactionInfo::new(&transaction);
// map_of_infos might get populated by parallel writers if multiple geyser sources are configured
let write_version = error_plugin_write_version.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tx_info = TransactionInfo::new(&transaction, write_version);
map_of_infos.insert((sig, transaction.slot), tx_info);
}
}
@ -341,6 +344,8 @@ async fn main() -> anyhow::Result<()> {
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new());
// maintain a global serial version for deterministic transaction ordering
let error_plugin_write_version = Arc::new(AtomicU64::new(0));
let postgres1 = postgres::Postgres::new_with_workmem(0).await;
let slot = Arc::new(AtomicU64::new(0));
@ -375,10 +380,12 @@ async fn main() -> anyhow::Result<()> {
let address = address.clone();
let map_of_infos = map_of_infos.clone();
let slot = slot.clone();
let global_error_plugin_write_version = error_plugin_write_version.clone();
tokio::spawn(async move {
start_tracking_banking_stage_errors(
address,
map_of_infos,
global_error_plugin_write_version,
slot,
no_block_subscription,
)

View File

@ -234,7 +234,9 @@ impl PostgresSession {
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature from {}
INSERT INTO banking_stage_results_2.transactions(signature)
SELECT signature FROM {}
ORDER BY signature
ON CONFLICT DO NOTHING
"#,
temp_table
@ -295,7 +297,9 @@ impl PostgresSession {
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {}
INSERT INTO banking_stage_results_2.accounts(account_key)
SELECT key FROM {}
ORDER BY key
ON CONFLICT DO NOTHING
"#,
temp_table
@ -386,7 +390,9 @@ impl PostgresSession {
FROM (
SELECT sig, slot, error_code, count, utc_timestamp from {}
)
as t (sig, slot, error_code, count, utc_timestamp) ON CONFLICT DO NOTHING
as t (sig, slot, error_code, count, utc_timestamp)
ORDER BY 1,2,3 -- sorry
ON CONFLICT DO NOTHING
"#,
temp_table
);
@ -460,6 +466,7 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
// note: no lock ordering here, as the accounts_map_transaction does not seem to cause deadlocks (issue 58)
// merge data from temp table into accounts_map_transaction
let statement = format!(
r#"
@ -524,6 +531,7 @@ impl PostgresSession {
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
SELECT acc_id, tx_ids_agg FROM {temp_table_name}
ORDER BY acc_id
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
"#,
temp_table_name = temp_table_latest_agged
@ -626,6 +634,7 @@ impl PostgresSession {
t.prioritization_fees,
t.supp_infos
FROM {} AS t
ORDER BY 1 -- sorry
ON CONFLICT DO NOTHING
"#,
temp_table
@ -757,6 +766,7 @@ impl PostgresSession {
total_cu_consumed,
prioritization_fees_info
)
ORDER BY 1,2 -- sorry
ON CONFLICT DO NOTHING
"#,
temp_table
@ -1309,14 +1319,16 @@ impl Postgres {
tokio::time::sleep(Duration::from_secs(60)).await;
let slot = slot.load(std::sync::atomic::Ordering::Relaxed);
let mut txs_to_store = vec![];
for tx in map_of_transaction.iter() {
if slot > tx.key().1 + 300 {
// restore transactions sort order
for tx in map_of_transaction.iter()
.sorted_by_key(|txi| txi.write_version) {
if tx.key().1 < slot - 300 {
txs_to_store.push(tx.key().clone());
}
}
if !txs_to_store.is_empty() {
debug!("saving transaction infos for {}", txs_to_store.len());
info!("saving transaction infos for {} txs", txs_to_store.len());
let batches = txs_to_store
.iter()
.filter_map(|key| map_of_transaction.remove(key))

View File

@ -79,10 +79,12 @@ pub struct TransactionInfo {
pub slot: u64,
pub utc_timestamp: DateTime<Utc>,
pub account_used: Vec<(String, bool)>,
// local write_version used in lite-rpc
pub write_version: u64,
}
impl TransactionInfo {
pub fn new(notification: &SubscribeUpdateBankingTransactionResults) -> Self {
pub fn new(notification: &SubscribeUpdateBankingTransactionResults, global_error_plugin_write_version: u64) -> Self {
let mut errors = HashMap::new();
// Get time
let utc_timestamp = Utc::now();
@ -107,6 +109,7 @@ impl TransactionInfo {
slot: notification.slot,
utc_timestamp,
account_used,
write_version: global_error_plugin_write_version,
}
}