microbatch

This commit is contained in:
GroovieGermanikus 2024-02-28 12:57:32 +01:00
parent 96814a835a
commit 4238126f22
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 79 additions and 19 deletions

View File

@ -115,6 +115,8 @@ pub struct BlockInfo {
pub total_cu_requested: i64,
pub heavily_locked_accounts: Vec<AccountUsage>,
pub sup_info: Option<PrioritizationFeesInfo>,
// store supinfo redundantly as json string to make querying easier
pub sup_json: Option<String>,
pub transactions: Vec<BlockTransactionInfo>,
}
@ -484,6 +486,7 @@ impl BlockInfo {
Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts);
let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block);
let sup_json = sup_info.as_ref().map(|x| serde_json::to_string(&x).expect("json error"));
BlockInfo {
block_hash,
@ -495,6 +498,7 @@ impl BlockInfo {
total_cu_requested: total_cu_requested as i64,
heavily_locked_accounts,
sup_info,
sup_json,
transactions: block_transactions,
}
}

View File

@ -302,7 +302,6 @@ async fn start_tracking_blocks(
let atl_store = atl_store.clone();
tokio::spawn(async move {
info!("spawn task");
// to support address lookup tables delay processing a littlebit
tokio::time::sleep(Duration::from_secs(2)).await;
let block_info = BlockInfo::new(atl_store, &block).await;

View File

@ -25,6 +25,7 @@ use tokio_postgres::{
types::{ToSql, Type},
Client, CopyInSink, NoTls, Socket,
};
use tokio_postgres::types::BorrowToSql;
use crate::{
block_info::{BlockInfo, BlockTransactionInfo},
@ -814,8 +815,25 @@ impl PostgresSession {
Ok(())
}
pub async fn save_block_info(&self, block_info: &BlockInfo) -> anyhow::Result<()> {
let statement = r#"
pub async fn save_block_info(&self, block_batch: Vec<BlockInfo>) -> anyhow::Result<()> {
// ($1,$2,$3),($4,$5,$6)
let values_sql = values_vecvec(
8, block_batch.len(),
// &[
// &block_info.slot,
// &block_info.successful_transactions,
// &block_info.processed_transactions,
// &block_info.total_cu_used,
// &block_info.total_cu_requested,
// &block_info.block_hash,
// &block_info.leader_identity.clone().unwrap_or_default(),
// &serde_json::to_string(&block_info.sup_info)?,
// ],
&[]
);
let statement = format!(r#"
INSERT INTO banking_stage_results_2.blocks (
slot,
successful_transactions,
@ -825,24 +843,31 @@ impl PostgresSession {
block_hash,
leader_identity,
supp_infos
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
) VALUES {values_placeholders}
ON CONFLICT DO NOTHING
"#;
"#, values_placeholders = values_sql);
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(8 * block_batch.len());
// Vec<&(dyn ToSql + Sync)>
for block_info in &block_batch {
args.push(&block_info.slot);
args.push(&block_info.successful_transactions);
args.push(&block_info.processed_transactions);
args.push(&block_info.total_cu_used);
args.push(&block_info.total_cu_requested);
args.push(&block_info.block_hash);
args.push(&block_info.leader_identity);
args.push(&block_info.sup_json);
}
let started_at = Instant::now();
let num_rows = self
.client
.execute(
statement,
&[
&block_info.slot,
&block_info.successful_transactions,
&block_info.processed_transactions,
&block_info.total_cu_used,
&block_info.total_cu_requested,
&block_info.block_hash,
&block_info.leader_identity.clone().unwrap_or_default(),
&serde_json::to_string(&block_info.sup_info)?,
],
&statement,
&args,
)
.await?;
debug!(
@ -976,9 +1001,7 @@ impl PostgresSession {
TIME_TO_SAVE_BLOCK_ACCOUNTS.set(ins.elapsed().as_millis() as i64);
let inst_block_info = Instant::now();
for block_info in &block_batch {
self.save_block_info(&block_info).await?;
}
self.save_block_info(block_batch).await?;
BLOCK_INFO_SAVE_TIME.set(inst_block_info.elapsed().as_millis() as i64);
TIME_TO_SAVE_BLOCK.set(instant.elapsed().as_millis() as i64);
@ -1442,3 +1465,37 @@ pub async fn send_block_info_to_buffer(
Ok(())
}
pub fn values_vecvec(args: usize, rows: usize, types: &[&str]) -> String {
let mut query = String::new();
multiline_query(&mut query, args, rows, types);
query
}
pub fn multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) {
let mut arg_index = 1usize;
for row in 0..rows {
query.push('(');
for i in 0..args {
if row == 0 && !types.is_empty() {
query.push_str(&format!("(${arg_index})::{}", types[i]));
} else {
query.push_str(&format!("${arg_index}"));
}
arg_index += 1;
if i != (args - 1) {
query.push(',');
}
}
query.push(')');
if row != (rows - 1) {
query.push(',');
}
}
}