diff --git a/src/main.rs b/src/main.rs index 1e6d11f..72b900e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -152,7 +152,7 @@ async fn start_tracking_blocks( rpc_client: Arc, grpc_block_addr: String, grpc_x_token: Option, - block_sender_postgres: Sender, + block_sender_postgres: Vec>, slot: Arc, alts_list: Vec, ) { @@ -225,7 +225,6 @@ async fn start_tracking_blocks( // let data = atl_store.serialize(); // let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap(); // alts_file.write_all(&data).await.unwrap(); - loop { let mut blocks_subs = HashMap::new(); blocks_subs.insert( @@ -284,7 +283,6 @@ async fn start_tracking_blocks( let Some(update) = message.update_oneof else { continue; }; - block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); match update { yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block( block, @@ -293,9 +291,13 @@ async fn start_tracking_blocks( BLOCK_TXS.set(block.transactions.len() as i64); BANKING_STAGE_BLOCKS_COUNTER.inc(); BANKING_STAGE_BLOCKS_TASK.inc(); - let block_sender_postgres = block_sender_postgres.clone(); + + let count = block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) as usize; + let pg_size = block_sender_postgres.len(); + let block_sender = block_sender_postgres[count%pg_size].clone(); let slot = slot.clone(); let atl_store = atl_store.clone(); + tokio::spawn(async move { // to support address lookup tables delay processing a littlebit tokio::time::sleep(Duration::from_secs(2)).await; @@ -303,7 +305,7 @@ async fn start_tracking_blocks( TXERROR_COUNT.add( block_info.processed_transactions - block_info.successful_transactions, ); - if let Err(e) = postgres::send_block_info_to_buffer(block_sender_postgres, block_info).await { + if let Err(e) = postgres::send_block_info_to_buffer(block_sender, block_info).await { panic!("Error saving block {}", e); } slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); @@ -357,7 +359,7 @@ async fn main() -> anyhow::Result<()> { .map(|x| Pubkey::from_str(&x).unwrap()) .collect_vec(); - let block_sender = postgres1.spawn_block_saver(); + let block_senders = (0..4).map(|x| postgres1.spawn_block_saver()).collect_vec(); postgres2.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); @@ -384,7 +386,7 @@ async fn main() -> anyhow::Result<()> { rpc_client, gprc_block_addr, args.grpc_x_token, - block_sender, + block_senders, slot, alts_list, ) diff --git a/src/postgres.rs b/src/postgres.rs index ddf7455..b26e4b4 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,5 +1,7 @@ use std::{ - collections::HashSet, sync::{atomic::AtomicU64, Arc}, time::Duration + collections::HashSet, + sync::{atomic::AtomicU64, Arc}, + time::Duration, }; use anyhow::Context; @@ -12,7 +14,7 @@ use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use prometheus::{opts, register_int_gauge, IntGauge}; use serde::Serialize; -use solana_sdk::transaction::TransactionError; +use solana_sdk::{pubkey::Pubkey, transaction::TransactionError}; use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::Sender; use tokio::time::Instant; @@ -29,7 +31,7 @@ use crate::{ transaction_info::TransactionInfo, }; -const BLOCK_WRITE_BUFFER_SIZE: usize = 256; +const BLOCK_WRITE_BUFFER_SIZE: usize = 5; const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 100; lazy_static::lazy_static! { @@ -54,34 +56,13 @@ lazy_static::lazy_static! { static ref TIME_TO_SAVE_TRANSACTION_DATA: IntGauge = register_int_gauge!(opts!("banking_stage_sidecar_transaction_data_time", "Account in tx save transactions")).unwrap(); -} - -pub struct TempTableTracker { - count: AtomicU64, -} - -impl TempTableTracker { - pub fn new() -> Self { - Self { - count: AtomicU64::new(1), - } - } - - pub fn get_new_temp_table(&self) -> String { - format!( - "temp_table_{}", - self.count - .fetch_add(1, std::sync::atomic::Ordering::Relaxed) - ) - } + static ref ACCOUNT_SAVE_TIME: IntGauge = + register_int_gauge!(opts!("banking_stage_sidecar_account_save_time", "Account save time")).unwrap(); } #[derive(Clone)] pub struct PostgresSession { client: Arc, - temp_table_tracker: Arc, - accounts_for_transaction_sender: - tokio::sync::mpsc::UnboundedSender>, } impl PostgresSession { @@ -117,48 +98,14 @@ impl PostgresSession { Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await? }; - let (accounts_for_transaction_sender, accounts_for_transaction_reciever) = - tokio::sync::mpsc::unbounded_channel(); - let instance = Self { client: Arc::new(client), - temp_table_tracker: Arc::new(TempTableTracker::new()), - accounts_for_transaction_sender, }; - Self::spawn_account_transaction_saving_task( - instance.clone(), - accounts_for_transaction_reciever, - ); Ok(instance) } - fn spawn_account_transaction_saving_task( - instance: PostgresSession, - mut accounts_for_transaction_reciever: tokio::sync::mpsc::UnboundedReceiver< - Vec, - >, - ) { - tokio::spawn(async move { - while let Some(accounts_for_transaction) = - accounts_for_transaction_reciever.recv().await - { - let instant: Instant = Instant::now(); - ACCOUNTS_SAVING_QUEUE.dec(); - if let Err(e) = instance - .insert_accounts_for_transaction(accounts_for_transaction) - .await - { - error!("Error inserting accounts for transactions : {e:?}"); - } - TIME_TO_STORE_TX_ACCOUNT.set(instant.elapsed().as_millis() as i64); - log::info!( - "Took {} ms to insert accounts for transaction", - instant.elapsed().as_millis() - ); - } - error!("account transaction saving task "); - panic!("account transaction saving task"); - }); + pub fn get_new_temp_table(&self) -> String { + Pubkey::new_unique().to_string() } async fn spawn_connection( @@ -209,9 +156,9 @@ impl PostgresSession { Ok(()) } - pub async fn create_transaction_ids(&self, signatures: Vec) -> anyhow::Result<()> { + pub async fn create_transaction_ids(&self, signatures: HashSet) -> anyhow::Result<()> { // create temp table - let temp_table = self.temp_table_tracker.get_new_temp_table(); + let temp_table = self.get_new_temp_table(); self.client .execute( @@ -274,7 +221,7 @@ impl PostgresSession { accounts: HashSet, ) -> anyhow::Result<()> { // create temp table - let temp_table = self.temp_table_tracker.get_new_temp_table(); + let temp_table = self.get_new_temp_table(); self.client .execute( @@ -334,7 +281,7 @@ impl PostgresSession { &self, txs: &[TransactionInfo], ) -> anyhow::Result<()> { - let temp_table = self.temp_table_tracker.get_new_temp_table(); + let temp_table = self.get_new_temp_table(); self.client .execute( @@ -425,7 +372,7 @@ impl PostgresSession { &self, accounts_for_transaction: Vec, ) -> anyhow::Result<()> { - let temp_table = self.temp_table_tracker.get_new_temp_table(); + let temp_table = self.get_new_temp_table(); self.client .execute( format!( @@ -502,7 +449,7 @@ impl PostgresSession { // merge data from temp table into accounts_map_transaction_latest // note: query uses the array_dedup_append postgres function to deduplicate and limit the array size // example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3} - let temp_table_latest_agged = self.temp_table_tracker.get_new_temp_table(); + let temp_table_latest_agged = self.get_new_temp_table(); let statement = format!( r#" CREATE TEMP TABLE {temp_table_name} AS @@ -560,7 +507,7 @@ impl PostgresSession { transactions: &Vec, slot: i64, ) -> anyhow::Result<()> { - let temp_table = self.temp_table_tracker.get_new_temp_table(); + let temp_table = self.get_new_temp_table(); self.client .execute( format!( @@ -657,7 +604,7 @@ impl PostgresSession { } pub async fn save_account_usage_in_block(&self, block_info: &BlockInfo) -> anyhow::Result<()> { - let temp_table = self.temp_table_tracker.get_new_temp_table(); + let temp_table = self.get_new_temp_table(); self.client .execute( format!( @@ -704,8 +651,8 @@ impl PostgresSession { ); pin_mut!(writer); const LIMIT: usize = 100; - let mut nb_read_accounts : usize = 0; - let mut nb_write_accounts : usize = 0; + let mut nb_read_accounts: usize = 0; + let mut nb_write_accounts: usize = 0; for account_usage in block_info.heavily_locked_accounts.iter() { if nb_read_accounts >= LIMIT && nb_write_accounts >= LIMIT { break; @@ -842,7 +789,7 @@ impl PostgresSession { let signatures = txs .iter() .map(|transaction| transaction.signature.clone()) - .collect_vec(); + .collect(); self.create_transaction_ids(signatures).await?; // create account ids let accounts = txs @@ -872,7 +819,12 @@ impl PostgresSession { .collect_vec(); // insert accounts for transaction ACCOUNTS_SAVING_QUEUE.inc(); - let _ = self.accounts_for_transaction_sender.send(txs_accounts); + let instant: Instant = Instant::now(); + ACCOUNTS_SAVING_QUEUE.dec(); + if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await { + error!("Error inserting accounts for transactions : {e:?}"); + } + TIME_TO_STORE_TX_ACCOUNT.set(instant.elapsed().as_millis() as i64); Ok(()) } @@ -893,16 +845,18 @@ impl PostgresSession { .transactions .iter() .map(|transaction| transaction.signature.clone()) - .collect_vec(); + .collect(); self.create_transaction_ids(signatures).await?; TIME_TO_SAVE_TRANSACTION.set(int_sig.elapsed().as_millis() as i64); // create account ids + let ins_acc = Instant::now(); let accounts = block_info .heavily_locked_accounts .iter() .map(|acc| acc.key.clone()) .collect(); self.create_accounts_for_transaction(accounts).await?; + ACCOUNT_SAVE_TIME.set(ins_acc.elapsed().as_millis() as i64); let txs_accounts = block_info .transactions @@ -922,8 +876,11 @@ impl PostgresSession { }) .collect_vec(); - ACCOUNTS_SAVING_QUEUE.inc(); - let _ = self.accounts_for_transaction_sender.send(txs_accounts); + let instant_acc_tx: Instant = Instant::now(); + if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await { + error!("Error inserting accounts for transactions : {e:?}"); + } + TIME_TO_STORE_TX_ACCOUNT.set(instant_acc_tx.elapsed().as_millis() as i64); // insert transactions let instant_save_tx = Instant::now();