From f5f9c99375599c3e10f2f58c936b8825301dbf99 Mon Sep 17 00:00:00 2001 From: Groovie | Mango <95291500+grooviegermanikus@users.noreply.github.com> Date: Wed, 10 Jan 2024 16:39:05 +0100 Subject: [PATCH] multiple write dbsessions (#40) * postgres connection handling: - use dedicated connections - set work_mem * serialize writes * brush send_block_info_to_buffer logging * remove sleep * brush log * cleanu --- src/main.rs | 18 ++++++---- src/postgres.rs | 89 +++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 84 insertions(+), 23 deletions(-) diff --git a/src/main.rs b/src/main.rs index 47aad5f..bdc86e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,7 @@ use dashmap::DashMap; use futures::StreamExt; use log::{debug, error, info}; use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge}; +use tokio::sync::mpsc::Sender; use transaction_info::TransactionInfo; mod alt_store; @@ -146,7 +147,7 @@ async fn start_tracking_blocks( rpc_client: Arc, grpc_block_addr: String, grpc_x_token: Option, - postgres: postgres::Postgres, + block_sender_postgres: Sender, slot: Arc, alts_list: Vec, ) { @@ -248,7 +249,7 @@ async fn start_tracking_blocks( BLOCK_TXS.set(block.transactions.len() as i64); BANKING_STAGE_BLOCKS_COUNTER.inc(); BANKING_STAGE_BLOCKS_TASK.inc(); - let postgres = postgres.clone(); + let block_sender_postgres = block_sender_postgres.clone(); let slot = slot.clone(); let atl_store = atl_store.clone(); tokio::spawn(async move { @@ -258,7 +259,7 @@ async fn start_tracking_blocks( TXERROR_COUNT.add( block_info.processed_transactions - block_info.successful_transactions, ); - if let Err(e) = postgres.save_block_info(block_info).await { + if let Err(e) = postgres::send_block_info_to_buffer(block_sender_postgres, block_info).await { panic!("Error saving block {}", e); } slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); @@ -295,7 +296,8 @@ async fn main() -> anyhow::Result<()> { let grpc_block_addr = args.grpc_address_to_fetch_blocks; let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new()); - let postgres = postgres::Postgres::new().await; + let postgres1 = postgres::Postgres::new_with_workmem().await; + let postgres2 = postgres::Postgres::new_with_workmem().await; let slot = Arc::new(AtomicU64::new(0)); let no_block_subscription = grpc_block_addr.is_none(); let alts = args.alts; @@ -311,7 +313,11 @@ async fn main() -> anyhow::Result<()> { .map(|x| Pubkey::from_str(&x).unwrap()) .collect_vec(); - postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); + + let block_sender = postgres1.spawn_block_saver(); + + postgres2.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); + let jhs = args .banking_grpc_addresses .iter() @@ -335,7 +341,7 @@ async fn main() -> anyhow::Result<()> { rpc_client, gprc_block_addr, args.grpc_x_token, - postgres, + block_sender, slot, alts_list, ) diff --git a/src/postgres.rs b/src/postgres.rs index 15208c2..4b81a3a 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -13,6 +13,8 @@ use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use serde::Serialize; use solana_sdk::transaction::TransactionError; +use tokio::sync::mpsc::error::SendTimeoutError; +use tokio::sync::mpsc::Sender; use tokio::time::Instant; use tokio_postgres::{ binary_copy::BinaryCopyInWriter, @@ -27,6 +29,7 @@ use crate::{ transaction_info::TransactionInfo, }; +const BLOCK_WRITE_BUFFER_SIZE: usize = 5; const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 1000; pub struct TempTableTracker { @@ -120,6 +123,20 @@ impl PostgresSession { Ok(client) } + pub async fn configure_work_mem(&self) { + self.client + .execute("SET work_mem TO '256MB'", &[]) + .await + .unwrap(); + let work_mem: String = self + .client + .query_one("show work_mem", &[]) + .await + .unwrap() + .get("work_mem"); + info!("Configured work_mem={}", work_mem); + } + pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> { self.client .execute(format!("drop table if exists {};", table).as_str(), &[]) @@ -829,7 +846,7 @@ impl PostgresSession { // save account usage in blocks self.save_account_usage_in_block(&block_info).await?; self.save_block_info(&block_info).await?; - info!("block saved"); + Ok(()) } } @@ -851,17 +868,7 @@ impl PostgresSession { slots_to_keep ); - self.client - .execute("SET work_mem TO '256MB'", &[]) - .await - .unwrap(); - let work_mem: String = self - .client - .query_one("show work_mem", &[]) - .await - .unwrap() - .get("work_mem"); - info!("Configured work_mem={}", work_mem); + self.configure_work_mem().await; { info!("Rows before cleanup:"); @@ -1159,13 +1166,47 @@ pub struct Postgres { } impl Postgres { - pub async fn new() -> Self { + pub async fn new_with_workmem() -> Self { let session = PostgresSession::new().await.unwrap(); + let session = Arc::new(session); + session.configure_work_mem().await; Self { - session: Arc::new(session), + session } } + pub fn spawn_block_saver(&self) -> Sender { + let (block_sender, mut block_receiver) = tokio::sync::mpsc::channel::(BLOCK_WRITE_BUFFER_SIZE); + + let session = self.session.clone(); + tokio::spawn(async move { + loop { + match block_receiver.recv().await { + None => { + warn!("block_receiver closed - stopping thread"); + return; + } + + Some(block) => { + let slot = block.slot; + info!("saving block {} ..", slot); + match session.save_block(block).await { + Ok(_) => { + info!("saving block {} done", slot); + } + Err(err) => { + error!("saving block failed {}", err); + } + } + } + }; + } + }); + + return block_sender; + } + + pub fn spawn_transaction_infos_saver( &self, map_of_transaction: Arc>, @@ -1198,9 +1239,6 @@ impl Postgres { }); } - pub async fn save_block_info(&self, block: BlockInfo) -> anyhow::Result<()> { - self.session.save_block(block).await - } } #[derive(Serialize, Clone)] @@ -1220,3 +1258,20 @@ pub struct AccountsForTransaction { pub signature: String, pub accounts: Vec, } + + +pub async fn send_block_info_to_buffer(block_sender_postgres: Sender, block_info: BlockInfo) -> anyhow::Result<()> { + debug!("block buffer capacity: {}", block_sender_postgres.capacity()); + + const WARNING_THRESHOLD: Duration = Duration::from_millis(3000); + + let started_at = Instant::now(); + if let Err(SendTimeoutError::Timeout(block)) = block_sender_postgres.send_timeout(block_info, WARNING_THRESHOLD).await { + let slot = block.slot; + warn!("Block {} was not buffered for {:.3}s - continue waiting", slot, WARNING_THRESHOLD.as_secs_f32()); + block_sender_postgres.send(block).await?; + info!("Block {} was finally buffered after {:.3}s", slot, started_at.elapsed().as_secs_f32()); + } + + Ok(()) +}