From 96814a835a51d4f3aaead8fcf3ff0b37adb2ee7c Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 28 Feb 2024 09:37:00 +0100 Subject: [PATCH] TEST microbatch --- src/main.rs | 2 +- src/postgres.rs | 69 ++++++++++++++++++++++++++++++------------------- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0a84919..7627f25 100644 --- a/src/main.rs +++ b/src/main.rs @@ -365,7 +365,7 @@ async fn main() -> anyhow::Result<()> { .collect_vec(); let mut block_senders = vec![]; - for i in 1..=4 { + for i in 1..=1 { let s = postgres::Postgres::new_with_workmem(i) .await .spawn_block_saver(); diff --git a/src/postgres.rs b/src/postgres.rs index ac392eb..acbb8ba 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -9,7 +9,7 @@ use base64::Engine; use dashmap::DashMap; use futures::pin_mut; use itertools::Itertools; -use log::{debug, error, info, log, warn, Level}; +use log::{debug, error, info, log, warn, Level, trace}; use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use prometheus::{opts, register_int_gauge, IntGauge}; @@ -17,7 +17,7 @@ use serde::Serialize; use solana_sdk::transaction::TransactionError; use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::Sender; -use tokio::time::Instant; +use tokio::time::{Instant, sleep_until}; use tokio_postgres::{ binary_copy::BinaryCopyInWriter, config::SslMode, @@ -31,9 +31,10 @@ use crate::{ transaction_info::TransactionInfo, }; -const BLOCK_WRITE_BUFFER_SIZE: usize = 5; +const BLOCK_WRITE_BUFFER_SIZE: usize = 10; // requires 125.000.000 * 8 bytes * LIMIT_LATEST_TXS_PER_ACCOUNT const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 100+20; +const ACCOUNT_WRITE_BATCHSIZE: usize = 1000; lazy_static::lazy_static! { static ref ACCOUNTS_SAVING_QUEUE: IntGauge = @@ -729,21 +730,20 @@ impl PostgresSession { ], ); pin_mut!(writer); - const LIMIT: usize = 100; let mut nb_read_accounts: usize = 0; let mut nb_write_accounts: usize = 0; for account_usage in block_info.heavily_locked_accounts.iter() { - if nb_read_accounts >= LIMIT && nb_write_accounts >= LIMIT { + if nb_read_accounts >= ACCOUNT_WRITE_BATCHSIZE && nb_write_accounts >= ACCOUNT_WRITE_BATCHSIZE { break; } let is_writable = account_usage.is_write_locked; if is_writable { - if nb_write_accounts >= LIMIT { + if nb_write_accounts >= ACCOUNT_WRITE_BATCHSIZE { continue; } nb_write_accounts += 1; } else { - if nb_read_accounts >= LIMIT { + if nb_read_accounts >= ACCOUNT_WRITE_BATCHSIZE { continue; } nb_read_accounts += 1; @@ -917,31 +917,30 @@ impl PostgresSession { self.client.copy_in(statement).await } - pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> { + pub async fn save_blocks(&self, block_batch: Vec) -> anyhow::Result<()> { + info!("batch size {}", block_batch.len()); let instant = Instant::now(); // create transaction ids let int_sig = Instant::now(); - let signatures = block_info - .transactions - .iter() - .map(|transaction| transaction.signature.clone()) + let signatures = block_batch.iter() + .flat_map(|b| b.transactions.iter()) + .map(|transaction| transaction.signature.clone()) // TODO remove clone .collect(); self.create_transaction_ids(signatures).await?; + TIME_TO_SAVE_TRANSACTION.set(int_sig.elapsed().as_millis() as i64); // create account ids let ins_acc = Instant::now(); - let accounts = block_info - .heavily_locked_accounts - .iter() + let accounts = block_batch.iter() + .flat_map(|b| b.heavily_locked_accounts.iter()) .map(|acc| acc.key.clone()) .collect(); self.create_accounts_for_transaction(accounts).await?; ACCOUNT_SAVE_TIME.set(ins_acc.elapsed().as_millis() as i64); let instant_acc_tx: Instant = Instant::now(); - let txs_accounts = block_info - .transactions - .iter() + let txs_accounts = block_batch.iter() + .flat_map(|b| b.transactions.iter()) .map(|tx| AccountsForTransaction { signature: tx.signature.clone(), accounts: tx @@ -963,17 +962,23 @@ impl PostgresSession { // insert transactions let instant_save_tx = Instant::now(); - self.insert_transactions_for_block(&block_info.transactions, block_info.slot) - .await?; + for block_info in &block_batch { + self.insert_transactions_for_block(&block_info.transactions, block_info.slot) + .await?; + } TIME_TO_SAVE_TRANSACTION_DATA.set(instant_save_tx.elapsed().as_millis() as i64); // save account usage in blocks let ins = Instant::now(); - self.save_account_usage_in_block(&block_info).await?; + for block_info in &block_batch { + self.save_account_usage_in_block(&block_info).await?; + } TIME_TO_SAVE_BLOCK_ACCOUNTS.set(ins.elapsed().as_millis() as i64); let inst_block_info = Instant::now(); - self.save_block_info(&block_info).await?; + for block_info in &block_batch { + self.save_block_info(&block_info).await?; + } BLOCK_INFO_SAVE_TIME.set(inst_block_info.elapsed().as_millis() as i64); TIME_TO_SAVE_BLOCK.set(instant.elapsed().as_millis() as i64); @@ -1318,13 +1323,23 @@ impl Postgres { } Some(block) => { - let slot = block.slot; + let started_at = Instant::now(); let instant = Instant::now(); - match session.save_block(block).await { + + let mut batch = vec![block]; + 'microbatch_loop: while let Ok(bb) = block_receiver.try_recv() { + batch.push(bb); + if batch.len() > 5 { + break 'microbatch_loop; + } + } + let batch_size = batch.len(); + + match session.save_blocks(batch).await { Ok(_) => { info!( - "saving block {} took {} ms", - slot, + "saving {} blocks took {} ms", + batch_size, instant.elapsed().as_millis() ); } @@ -1332,6 +1347,8 @@ impl Postgres { error!("saving block failed {}", err); } } + + sleep_until(started_at + Duration::from_millis(3000)).await; } }; }