TEST microbatch

This commit is contained in:
GroovieGermanikus 2024-02-28 09:37:00 +01:00
parent 1aea32786c
commit 96814a835a
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 44 additions and 27 deletions

View File

@ -365,7 +365,7 @@ async fn main() -> anyhow::Result<()> {
.collect_vec();
let mut block_senders = vec![];
for i in 1..=4 {
for i in 1..=1 {
let s = postgres::Postgres::new_with_workmem(i)
.await
.spawn_block_saver();

View File

@ -9,7 +9,7 @@ use base64::Engine;
use dashmap::DashMap;
use futures::pin_mut;
use itertools::Itertools;
use log::{debug, error, info, log, warn, Level};
use log::{debug, error, info, log, warn, Level, trace};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use prometheus::{opts, register_int_gauge, IntGauge};
@ -17,7 +17,7 @@ use serde::Serialize;
use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Sender;
use tokio::time::Instant;
use tokio::time::{Instant, sleep_until};
use tokio_postgres::{
binary_copy::BinaryCopyInWriter,
config::SslMode,
@ -31,9 +31,10 @@ use crate::{
transaction_info::TransactionInfo,
};
const BLOCK_WRITE_BUFFER_SIZE: usize = 5;
const BLOCK_WRITE_BUFFER_SIZE: usize = 10;
// requires 125.000.000 * 8 bytes * LIMIT_LATEST_TXS_PER_ACCOUNT
const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 100+20;
const ACCOUNT_WRITE_BATCHSIZE: usize = 1000;
lazy_static::lazy_static! {
static ref ACCOUNTS_SAVING_QUEUE: IntGauge =
@ -729,21 +730,20 @@ impl PostgresSession {
],
);
pin_mut!(writer);
const LIMIT: usize = 100;
let mut nb_read_accounts: usize = 0;
let mut nb_write_accounts: usize = 0;
for account_usage in block_info.heavily_locked_accounts.iter() {
if nb_read_accounts >= LIMIT && nb_write_accounts >= LIMIT {
if nb_read_accounts >= ACCOUNT_WRITE_BATCHSIZE && nb_write_accounts >= ACCOUNT_WRITE_BATCHSIZE {
break;
}
let is_writable = account_usage.is_write_locked;
if is_writable {
if nb_write_accounts >= LIMIT {
if nb_write_accounts >= ACCOUNT_WRITE_BATCHSIZE {
continue;
}
nb_write_accounts += 1;
} else {
if nb_read_accounts >= LIMIT {
if nb_read_accounts >= ACCOUNT_WRITE_BATCHSIZE {
continue;
}
nb_read_accounts += 1;
@ -917,31 +917,30 @@ impl PostgresSession {
self.client.copy_in(statement).await
}
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
pub async fn save_blocks(&self, block_batch: Vec<BlockInfo>) -> anyhow::Result<()> {
info!("batch size {}", block_batch.len());
let instant = Instant::now();
// create transaction ids
let int_sig = Instant::now();
let signatures = block_info
.transactions
.iter()
.map(|transaction| transaction.signature.clone())
let signatures = block_batch.iter()
.flat_map(|b| b.transactions.iter())
.map(|transaction| transaction.signature.clone()) // TODO remove clone
.collect();
self.create_transaction_ids(signatures).await?;
TIME_TO_SAVE_TRANSACTION.set(int_sig.elapsed().as_millis() as i64);
// create account ids
let ins_acc = Instant::now();
let accounts = block_info
.heavily_locked_accounts
.iter()
let accounts = block_batch.iter()
.flat_map(|b| b.heavily_locked_accounts.iter())
.map(|acc| acc.key.clone())
.collect();
self.create_accounts_for_transaction(accounts).await?;
ACCOUNT_SAVE_TIME.set(ins_acc.elapsed().as_millis() as i64);
let instant_acc_tx: Instant = Instant::now();
let txs_accounts = block_info
.transactions
.iter()
let txs_accounts = block_batch.iter()
.flat_map(|b| b.transactions.iter())
.map(|tx| AccountsForTransaction {
signature: tx.signature.clone(),
accounts: tx
@ -963,17 +962,23 @@ impl PostgresSession {
// insert transactions
let instant_save_tx = Instant::now();
self.insert_transactions_for_block(&block_info.transactions, block_info.slot)
.await?;
for block_info in &block_batch {
self.insert_transactions_for_block(&block_info.transactions, block_info.slot)
.await?;
}
TIME_TO_SAVE_TRANSACTION_DATA.set(instant_save_tx.elapsed().as_millis() as i64);
// save account usage in blocks
let ins = Instant::now();
self.save_account_usage_in_block(&block_info).await?;
for block_info in &block_batch {
self.save_account_usage_in_block(&block_info).await?;
}
TIME_TO_SAVE_BLOCK_ACCOUNTS.set(ins.elapsed().as_millis() as i64);
let inst_block_info = Instant::now();
self.save_block_info(&block_info).await?;
for block_info in &block_batch {
self.save_block_info(&block_info).await?;
}
BLOCK_INFO_SAVE_TIME.set(inst_block_info.elapsed().as_millis() as i64);
TIME_TO_SAVE_BLOCK.set(instant.elapsed().as_millis() as i64);
@ -1318,13 +1323,23 @@ impl Postgres {
}
Some(block) => {
let slot = block.slot;
let started_at = Instant::now();
let instant = Instant::now();
match session.save_block(block).await {
let mut batch = vec![block];
'microbatch_loop: while let Ok(bb) = block_receiver.try_recv() {
batch.push(bb);
if batch.len() > 5 {
break 'microbatch_loop;
}
}
let batch_size = batch.len();
match session.save_blocks(batch).await {
Ok(_) => {
info!(
"saving block {} took {} ms",
slot,
"saving {} blocks took {} ms",
batch_size,
instant.elapsed().as_millis()
);
}
@ -1332,6 +1347,8 @@ impl Postgres {
error!("saving block failed {}", err);
}
}
sleep_until(started_at + Duration::from_millis(3000)).await;
}
};
}