From f0727de608e5c75bf05227e9e33c1d50a121908c Mon Sep 17 00:00:00 2001 From: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com> Date: Wed, 10 Jan 2024 14:13:19 +0100 Subject: [PATCH 1/2] log metrics for postgres write statements (#39) * add debug logs for query execution * use tokio instant * words --- src/postgres.rs | 127 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 108 insertions(+), 19 deletions(-) diff --git a/src/postgres.rs b/src/postgres.rs index 75d5517..15208c2 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,4 +1,3 @@ -use std::time::Instant; use std::{ sync::{atomic::AtomicU64, Arc}, time::Duration, @@ -14,6 +13,7 @@ use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use serde::Serialize; use solana_sdk::transaction::TransactionError; +use tokio::time::Instant; use tokio_postgres::{ binary_copy::BinaryCopyInWriter, config::SslMode, @@ -154,13 +154,19 @@ impl PostgresSession { "#, temp_table ); + let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]); pin_mut!(writer); for signature in signatures { writer.as_mut().write(&[&signature]).await?; } - writer.finish().await?; + let num_rows = writer.finish().await?; + debug!( + "inserted {} signatures into temp table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); let statement = format!( r#" @@ -169,7 +175,14 @@ impl PostgresSession { "#, temp_table ); - self.client.execute(statement.as_str(), &[]).await?; + let started_at = Instant::now(); + let num_rows = self.client.execute(statement.as_str(), &[]).await?; + debug!( + "inserted {} signatures in transactions table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); + self.drop_temp_table(temp_table).await?; Ok(()) @@ -203,13 +216,19 @@ impl PostgresSession { "#, temp_table ); + let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]); pin_mut!(writer); for account in accounts { writer.as_mut().write(&[&account]).await?; } - writer.finish().await?; + let num_rows = writer.finish().await?; + debug!( + "inserted {} account keys into temp table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); let statement = format!( r#" @@ -218,7 +237,14 @@ impl PostgresSession { "#, temp_table ); + let started_at = Instant::now(); self.client.execute(statement.as_str(), &[]).await?; + debug!( + "inserted {} account keys into accounts table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); + self.drop_temp_table(temp_table).await?; Ok(()) } @@ -254,6 +280,7 @@ impl PostgresSession { "#, temp_table ); + let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new( sink, @@ -282,7 +309,12 @@ impl PostgresSession { writer.as_mut().write(&args).await?; } } - writer.finish().await?; + let num_rows = writer.finish().await?; + debug!( + "inserted {} txs for tx_slot into temp table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); let statement = format!( r#" @@ -295,7 +327,15 @@ impl PostgresSession { "#, temp_table ); + let started_at = Instant::now(); self.client.execute(statement.as_str(), &[]).await?; + debug!( + "inserted {} txs into transaction_slot table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); + + self.drop_temp_table(temp_table).await?; Ok(()) } @@ -331,7 +371,7 @@ impl PostgresSession { "#, temp_table ); - + let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new( sink, @@ -349,7 +389,12 @@ impl PostgresSession { writer.as_mut().write(&args).await?; } } - writer.finish().await?; + let num_rows = writer.finish().await?; + debug!( + "inserted {} accounts for transaction into temp table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); // merge data from temp table into accounts_map_transaction let statement = format!( @@ -369,8 +414,13 @@ impl PostgresSession { "#, temp_table ); + let started_at = Instant::now(); let rows = self.client.execute(statement.as_str(), &[]).await?; - debug!("inserted into accounts_map_transaction: {}", rows); + debug!( + "inserted {} accounts into accounts_map_transaction in {}ms", + rows, + started_at.elapsed().as_millis() + ); // merge data from temp table into accounts_map_transaction_latest // note: query uses the array_dedup_append postgres function to deduplicate and limit the array size @@ -399,8 +449,13 @@ impl PostgresSession { temp_table_name = temp_table_latest_agged, limit = LIMIT_LATEST_TXS_PER_ACCOUNT ); - let rows = self.client.execute(statement.as_str(), &[]).await?; - info!("inserted into {}: {}", temp_table_latest_agged, rows); + let started_at = Instant::now(); + let num_rows = self.client.execute(statement.as_str(), &[]).await?; + debug!( + "merged new transactions into accounts_map_transaction_latest for {} accounts in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); let statement = format!( r#" @@ -410,8 +465,13 @@ impl PostgresSession { "#, temp_table_name = temp_table_latest_agged ); - let rows = self.client.execute(statement.as_str(), &[]).await?; - info!("upserted in accounts_map_transaction_latest: {}", rows); + let started_at = Instant::now(); + let num_rows = self.client.execute(statement.as_str(), &[]).await?; + debug!( + "upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); self.drop_temp_table(temp_table_latest_agged).await?; self.drop_temp_table(temp_table).await?; @@ -457,7 +517,7 @@ impl PostgresSession { "#, temp_table ); - + let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new( sink, @@ -483,7 +543,12 @@ impl PostgresSession { args.push(&transaction.supp_infos); writer.as_mut().write(&args).await?; } - writer.finish().await?; + let num_rows = writer.finish().await?; + debug!( + "inserted {} transactions for block into temp table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); let statement = format!( r#" @@ -505,7 +570,13 @@ impl PostgresSession { "#, temp_table ); - self.client.execute(statement.as_str(), &[]).await?; + let started_at = Instant::now(); + let num_rows = self.client.execute(statement.as_str(), &[]).await?; + debug!( + "inserted {} transactions for block into transaction_infos table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); self.drop_temp_table(temp_table).await?; Ok(()) @@ -544,7 +615,7 @@ impl PostgresSession { "#, temp_table ); - + let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new( sink, @@ -570,7 +641,12 @@ impl PostgresSession { args.push(&pf_json); writer.as_mut().write(&args).await?; } - writer.finish().await?; + let num_rows = writer.finish().await?; + debug!( + "inserted {} heavily_locked_accounts into temp table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); let statement = format!( r#" @@ -608,7 +684,13 @@ impl PostgresSession { "#, temp_table ); - self.client.execute(statement.as_str(), &[]).await?; + let started_at = Instant::now(); + let num_rows = self.client.execute(statement.as_str(), &[]).await?; + debug!( + "inserted {} heavily_locked_accounts into accounts_map_blocks table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); self.drop_temp_table(temp_table).await?; Ok(()) @@ -627,7 +709,8 @@ impl PostgresSession { supp_infos ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8); "#; - self.client + let started_at = Instant::now(); + let num_rows = self.client .execute( statement, &[ @@ -642,6 +725,12 @@ impl PostgresSession { ], ) .await?; + debug!( + "inserted {} block info into blocks table in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); + Ok(()) } From f5f9c99375599c3e10f2f58c936b8825301dbf99 Mon Sep 17 00:00:00 2001 From: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com> Date: Wed, 10 Jan 2024 16:39:05 +0100 Subject: [PATCH 2/2] multiple write dbsessions (#40) * postgres connection handling: - use dedicated connections - set work_mem * serialize writes * brush send_block_info_to_buffer logging * remove sleep * brush log * cleanu --- src/main.rs | 18 ++++++---- src/postgres.rs | 89 +++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 84 insertions(+), 23 deletions(-) diff --git a/src/main.rs b/src/main.rs index 47aad5f..bdc86e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,7 @@ use dashmap::DashMap; use futures::StreamExt; use log::{debug, error, info}; use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge}; +use tokio::sync::mpsc::Sender; use transaction_info::TransactionInfo; mod alt_store; @@ -146,7 +147,7 @@ async fn start_tracking_blocks( rpc_client: Arc, grpc_block_addr: String, grpc_x_token: Option, - postgres: postgres::Postgres, + block_sender_postgres: Sender, slot: Arc, alts_list: Vec, ) { @@ -248,7 +249,7 @@ async fn start_tracking_blocks( BLOCK_TXS.set(block.transactions.len() as i64); BANKING_STAGE_BLOCKS_COUNTER.inc(); BANKING_STAGE_BLOCKS_TASK.inc(); - let postgres = postgres.clone(); + let block_sender_postgres = block_sender_postgres.clone(); let slot = slot.clone(); let atl_store = atl_store.clone(); tokio::spawn(async move { @@ -258,7 +259,7 @@ async fn start_tracking_blocks( TXERROR_COUNT.add( block_info.processed_transactions - block_info.successful_transactions, ); - if let Err(e) = postgres.save_block_info(block_info).await { + if let Err(e) = postgres::send_block_info_to_buffer(block_sender_postgres, block_info).await { panic!("Error saving block {}", e); } slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); @@ -295,7 +296,8 @@ async fn main() -> anyhow::Result<()> { let grpc_block_addr = args.grpc_address_to_fetch_blocks; let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new()); - let postgres = postgres::Postgres::new().await; + let postgres1 = postgres::Postgres::new_with_workmem().await; + let postgres2 = postgres::Postgres::new_with_workmem().await; let slot = Arc::new(AtomicU64::new(0)); let no_block_subscription = grpc_block_addr.is_none(); let alts = args.alts; @@ -311,7 +313,11 @@ async fn main() -> anyhow::Result<()> { .map(|x| Pubkey::from_str(&x).unwrap()) .collect_vec(); - postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); + + let block_sender = postgres1.spawn_block_saver(); + + postgres2.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); + let jhs = args .banking_grpc_addresses .iter() @@ -335,7 +341,7 @@ async fn main() -> anyhow::Result<()> { rpc_client, gprc_block_addr, args.grpc_x_token, - postgres, + block_sender, slot, alts_list, ) diff --git a/src/postgres.rs b/src/postgres.rs index 15208c2..4b81a3a 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -13,6 +13,8 @@ use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use serde::Serialize; use solana_sdk::transaction::TransactionError; +use tokio::sync::mpsc::error::SendTimeoutError; +use tokio::sync::mpsc::Sender; use tokio::time::Instant; use tokio_postgres::{ binary_copy::BinaryCopyInWriter, @@ -27,6 +29,7 @@ use crate::{ transaction_info::TransactionInfo, }; +const BLOCK_WRITE_BUFFER_SIZE: usize = 5; const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 1000; pub struct TempTableTracker { @@ -120,6 +123,20 @@ impl PostgresSession { Ok(client) } + pub async fn configure_work_mem(&self) { + self.client + .execute("SET work_mem TO '256MB'", &[]) + .await + .unwrap(); + let work_mem: String = self + .client + .query_one("show work_mem", &[]) + .await + .unwrap() + .get("work_mem"); + info!("Configured work_mem={}", work_mem); + } + pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> { self.client .execute(format!("drop table if exists {};", table).as_str(), &[]) @@ -829,7 +846,7 @@ impl PostgresSession { // save account usage in blocks self.save_account_usage_in_block(&block_info).await?; self.save_block_info(&block_info).await?; - info!("block saved"); + Ok(()) } } @@ -851,17 +868,7 @@ impl PostgresSession { slots_to_keep ); - self.client - .execute("SET work_mem TO '256MB'", &[]) - .await - .unwrap(); - let work_mem: String = self - .client - .query_one("show work_mem", &[]) - .await - .unwrap() - .get("work_mem"); - info!("Configured work_mem={}", work_mem); + self.configure_work_mem().await; { info!("Rows before cleanup:"); @@ -1159,13 +1166,47 @@ pub struct Postgres { } impl Postgres { - pub async fn new() -> Self { + pub async fn new_with_workmem() -> Self { let session = PostgresSession::new().await.unwrap(); + let session = Arc::new(session); + session.configure_work_mem().await; Self { - session: Arc::new(session), + session } } + pub fn spawn_block_saver(&self) -> Sender { + let (block_sender, mut block_receiver) = tokio::sync::mpsc::channel::(BLOCK_WRITE_BUFFER_SIZE); + + let session = self.session.clone(); + tokio::spawn(async move { + loop { + match block_receiver.recv().await { + None => { + warn!("block_receiver closed - stopping thread"); + return; + } + + Some(block) => { + let slot = block.slot; + info!("saving block {} ..", slot); + match session.save_block(block).await { + Ok(_) => { + info!("saving block {} done", slot); + } + Err(err) => { + error!("saving block failed {}", err); + } + } + } + }; + } + }); + + return block_sender; + } + + pub fn spawn_transaction_infos_saver( &self, map_of_transaction: Arc>, @@ -1198,9 +1239,6 @@ impl Postgres { }); } - pub async fn save_block_info(&self, block: BlockInfo) -> anyhow::Result<()> { - self.session.save_block(block).await - } } #[derive(Serialize, Clone)] @@ -1220,3 +1258,20 @@ pub struct AccountsForTransaction { pub signature: String, pub accounts: Vec, } + + +pub async fn send_block_info_to_buffer(block_sender_postgres: Sender, block_info: BlockInfo) -> anyhow::Result<()> { + debug!("block buffer capacity: {}", block_sender_postgres.capacity()); + + const WARNING_THRESHOLD: Duration = Duration::from_millis(3000); + + let started_at = Instant::now(); + if let Err(SendTimeoutError::Timeout(block)) = block_sender_postgres.send_timeout(block_info, WARNING_THRESHOLD).await { + let slot = block.slot; + warn!("Block {} was not buffered for {:.3}s - continue waiting", slot, WARNING_THRESHOLD.as_secs_f32()); + block_sender_postgres.send(block).await?; + info!("Block {} was finally buffered after {:.3}s", slot, started_at.elapsed().as_secs_f32()); + } + + Ok(()) +}