From 4238126f22f2388a304b00152597f4bd6d363d9d Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 28 Feb 2024 12:57:32 +0100 Subject: [PATCH] microbatch --- src/block_info.rs | 4 ++ src/main.rs | 1 - src/postgres.rs | 93 ++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 79 insertions(+), 19 deletions(-) diff --git a/src/block_info.rs b/src/block_info.rs index 1dc100b..03b9e75 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -115,6 +115,8 @@ pub struct BlockInfo { pub total_cu_requested: i64, pub heavily_locked_accounts: Vec, pub sup_info: Option, + // store supinfo redundantly as json string to make querying easier + pub sup_json: Option, pub transactions: Vec, } @@ -484,6 +486,7 @@ impl BlockInfo { Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts); let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block); + let sup_json = sup_info.as_ref().map(|x| serde_json::to_string(&x).expect("json error")); BlockInfo { block_hash, @@ -495,6 +498,7 @@ impl BlockInfo { total_cu_requested: total_cu_requested as i64, heavily_locked_accounts, sup_info, + sup_json, transactions: block_transactions, } } diff --git a/src/main.rs b/src/main.rs index 7627f25..f2c6f28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -302,7 +302,6 @@ async fn start_tracking_blocks( let atl_store = atl_store.clone(); tokio::spawn(async move { - info!("spawn task"); // to support address lookup tables delay processing a littlebit tokio::time::sleep(Duration::from_secs(2)).await; let block_info = BlockInfo::new(atl_store, &block).await; diff --git a/src/postgres.rs b/src/postgres.rs index acbb8ba..cb424d1 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -25,6 +25,7 @@ use tokio_postgres::{ types::{ToSql, Type}, Client, CopyInSink, NoTls, Socket, }; +use tokio_postgres::types::BorrowToSql; use crate::{ block_info::{BlockInfo, BlockTransactionInfo}, @@ -814,8 +815,25 @@ impl PostgresSession { Ok(()) } - pub async fn save_block_info(&self, block_info: &BlockInfo) -> anyhow::Result<()> { - let statement = r#" + pub async fn save_block_info(&self, block_batch: Vec) -> anyhow::Result<()> { + + // ($1,$2,$3),($4,$5,$6) + let values_sql = values_vecvec( + 8, block_batch.len(), + // &[ + // &block_info.slot, + // &block_info.successful_transactions, + // &block_info.processed_transactions, + // &block_info.total_cu_used, + // &block_info.total_cu_requested, + // &block_info.block_hash, + // &block_info.leader_identity.clone().unwrap_or_default(), + // &serde_json::to_string(&block_info.sup_info)?, + // ], + &[] + ); + + let statement = format!(r#" INSERT INTO banking_stage_results_2.blocks ( slot, successful_transactions, @@ -825,24 +843,31 @@ impl PostgresSession { block_hash, leader_identity, supp_infos - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ) VALUES {values_placeholders} ON CONFLICT DO NOTHING - "#; + "#, values_placeholders = values_sql); + + + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(8 * block_batch.len()); + // Vec<&(dyn ToSql + Sync)> + + for block_info in &block_batch { + args.push(&block_info.slot); + args.push(&block_info.successful_transactions); + args.push(&block_info.processed_transactions); + args.push(&block_info.total_cu_used); + args.push(&block_info.total_cu_requested); + args.push(&block_info.block_hash); + args.push(&block_info.leader_identity); + args.push(&block_info.sup_json); + } + let started_at = Instant::now(); let num_rows = self .client .execute( - statement, - &[ - &block_info.slot, - &block_info.successful_transactions, - &block_info.processed_transactions, - &block_info.total_cu_used, - &block_info.total_cu_requested, - &block_info.block_hash, - &block_info.leader_identity.clone().unwrap_or_default(), - &serde_json::to_string(&block_info.sup_info)?, - ], + &statement, + &args, ) .await?; debug!( @@ -976,9 +1001,7 @@ impl PostgresSession { TIME_TO_SAVE_BLOCK_ACCOUNTS.set(ins.elapsed().as_millis() as i64); let inst_block_info = Instant::now(); - for block_info in &block_batch { - self.save_block_info(&block_info).await?; - } + self.save_block_info(block_batch).await?; BLOCK_INFO_SAVE_TIME.set(inst_block_info.elapsed().as_millis() as i64); TIME_TO_SAVE_BLOCK.set(instant.elapsed().as_millis() as i64); @@ -1442,3 +1465,37 @@ pub async fn send_block_info_to_buffer( Ok(()) } + + +pub fn values_vecvec(args: usize, rows: usize, types: &[&str]) -> String { + let mut query = String::new(); + + multiline_query(&mut query, args, rows, types); + + query +} + +pub fn multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) { + let mut arg_index = 1usize; + for row in 0..rows { + query.push('('); + + for i in 0..args { + if row == 0 && !types.is_empty() { + query.push_str(&format!("(${arg_index})::{}", types[i])); + } else { + query.push_str(&format!("${arg_index}")); + } + arg_index += 1; + if i != (args - 1) { + query.push(','); + } + } + + query.push(')'); + + if row != (rows - 1) { + query.push(','); + } + } +}