Adding more metrics, and having multiple postgres sessions

This commit is contained in:
godmodegalactus 2024-01-29 14:51:21 +01:00
parent 681b042ba1
commit f0e37643c5
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 43 additions and 84 deletions

View File

@ -152,7 +152,7 @@ async fn start_tracking_blocks(
rpc_client: Arc<RpcClient>,
grpc_block_addr: String,
grpc_x_token: Option<String>,
block_sender_postgres: Sender<BlockInfo>,
block_sender_postgres: Vec<Sender<BlockInfo>>,
slot: Arc<AtomicU64>,
alts_list: Vec<Pubkey>,
) {
@ -225,7 +225,6 @@ async fn start_tracking_blocks(
// let data = atl_store.serialize();
// let mut alts_file = tokio::fs::File::create("alt_binary.bin").await.unwrap();
// alts_file.write_all(&data).await.unwrap();
loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
@ -284,7 +283,6 @@ async fn start_tracking_blocks(
let Some(update) = message.update_oneof else {
continue;
};
block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match update {
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
block,
@ -293,9 +291,13 @@ async fn start_tracking_blocks(
BLOCK_TXS.set(block.transactions.len() as i64);
BANKING_STAGE_BLOCKS_COUNTER.inc();
BANKING_STAGE_BLOCKS_TASK.inc();
let block_sender_postgres = block_sender_postgres.clone();
let count = block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) as usize;
let pg_size = block_sender_postgres.len();
let block_sender = block_sender_postgres[count%pg_size].clone();
let slot = slot.clone();
let atl_store = atl_store.clone();
tokio::spawn(async move {
// to support address lookup tables delay processing a littlebit
tokio::time::sleep(Duration::from_secs(2)).await;
@ -303,7 +305,7 @@ async fn start_tracking_blocks(
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres::send_block_info_to_buffer(block_sender_postgres, block_info).await {
if let Err(e) = postgres::send_block_info_to_buffer(block_sender, block_info).await {
panic!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
@ -357,7 +359,7 @@ async fn main() -> anyhow::Result<()> {
.map(|x| Pubkey::from_str(&x).unwrap())
.collect_vec();
let block_sender = postgres1.spawn_block_saver();
let block_senders = (0..4).map(|x| postgres1.spawn_block_saver()).collect_vec();
postgres2.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
@ -384,7 +386,7 @@ async fn main() -> anyhow::Result<()> {
rpc_client,
gprc_block_addr,
args.grpc_x_token,
block_sender,
block_senders,
slot,
alts_list,
)

View File

@ -1,5 +1,7 @@
use std::{
collections::HashSet, sync::{atomic::AtomicU64, Arc}, time::Duration
collections::HashSet,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use anyhow::Context;
@ -12,7 +14,7 @@ use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use prometheus::{opts, register_int_gauge, IntGauge};
use serde::Serialize;
use solana_sdk::transaction::TransactionError;
use solana_sdk::{pubkey::Pubkey, transaction::TransactionError};
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Sender;
use tokio::time::Instant;
@ -29,7 +31,7 @@ use crate::{
transaction_info::TransactionInfo,
};
const BLOCK_WRITE_BUFFER_SIZE: usize = 256;
const BLOCK_WRITE_BUFFER_SIZE: usize = 5;
const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 100;
lazy_static::lazy_static! {
@ -54,34 +56,13 @@ lazy_static::lazy_static! {
static ref TIME_TO_SAVE_TRANSACTION_DATA: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_transaction_data_time", "Account in tx save transactions")).unwrap();
}
pub struct TempTableTracker {
count: AtomicU64,
}
impl TempTableTracker {
pub fn new() -> Self {
Self {
count: AtomicU64::new(1),
}
}
pub fn get_new_temp_table(&self) -> String {
format!(
"temp_table_{}",
self.count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
)
}
static ref ACCOUNT_SAVE_TIME: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_account_save_time", "Account save time")).unwrap();
}
#[derive(Clone)]
pub struct PostgresSession {
client: Arc<Client>,
temp_table_tracker: Arc<TempTableTracker>,
accounts_for_transaction_sender:
tokio::sync::mpsc::UnboundedSender<Vec<AccountsForTransaction>>,
}
impl PostgresSession {
@ -117,48 +98,14 @@ impl PostgresSession {
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
};
let (accounts_for_transaction_sender, accounts_for_transaction_reciever) =
tokio::sync::mpsc::unbounded_channel();
let instance = Self {
client: Arc::new(client),
temp_table_tracker: Arc::new(TempTableTracker::new()),
accounts_for_transaction_sender,
};
Self::spawn_account_transaction_saving_task(
instance.clone(),
accounts_for_transaction_reciever,
);
Ok(instance)
}
fn spawn_account_transaction_saving_task(
instance: PostgresSession,
mut accounts_for_transaction_reciever: tokio::sync::mpsc::UnboundedReceiver<
Vec<AccountsForTransaction>,
>,
) {
tokio::spawn(async move {
while let Some(accounts_for_transaction) =
accounts_for_transaction_reciever.recv().await
{
let instant: Instant = Instant::now();
ACCOUNTS_SAVING_QUEUE.dec();
if let Err(e) = instance
.insert_accounts_for_transaction(accounts_for_transaction)
.await
{
error!("Error inserting accounts for transactions : {e:?}");
}
TIME_TO_STORE_TX_ACCOUNT.set(instant.elapsed().as_millis() as i64);
log::info!(
"Took {} ms to insert accounts for transaction",
instant.elapsed().as_millis()
);
}
error!("account transaction saving task ");
panic!("account transaction saving task");
});
pub fn get_new_temp_table(&self) -> String {
Pubkey::new_unique().to_string()
}
async fn spawn_connection<T>(
@ -209,9 +156,9 @@ impl PostgresSession {
Ok(())
}
pub async fn create_transaction_ids(&self, signatures: Vec<String>) -> anyhow::Result<()> {
pub async fn create_transaction_ids(&self, signatures: HashSet<String>) -> anyhow::Result<()> {
// create temp table
let temp_table = self.temp_table_tracker.get_new_temp_table();
let temp_table = self.get_new_temp_table();
self.client
.execute(
@ -274,7 +221,7 @@ impl PostgresSession {
accounts: HashSet<String>,
) -> anyhow::Result<()> {
// create temp table
let temp_table = self.temp_table_tracker.get_new_temp_table();
let temp_table = self.get_new_temp_table();
self.client
.execute(
@ -334,7 +281,7 @@ impl PostgresSession {
&self,
txs: &[TransactionInfo],
) -> anyhow::Result<()> {
let temp_table = self.temp_table_tracker.get_new_temp_table();
let temp_table = self.get_new_temp_table();
self.client
.execute(
@ -425,7 +372,7 @@ impl PostgresSession {
&self,
accounts_for_transaction: Vec<AccountsForTransaction>,
) -> anyhow::Result<()> {
let temp_table = self.temp_table_tracker.get_new_temp_table();
let temp_table = self.get_new_temp_table();
self.client
.execute(
format!(
@ -502,7 +449,7 @@ impl PostgresSession {
// merge data from temp table into accounts_map_transaction_latest
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
let temp_table_latest_agged = self.temp_table_tracker.get_new_temp_table();
let temp_table_latest_agged = self.get_new_temp_table();
let statement = format!(
r#"
CREATE TEMP TABLE {temp_table_name} AS
@ -560,7 +507,7 @@ impl PostgresSession {
transactions: &Vec<BlockTransactionInfo>,
slot: i64,
) -> anyhow::Result<()> {
let temp_table = self.temp_table_tracker.get_new_temp_table();
let temp_table = self.get_new_temp_table();
self.client
.execute(
format!(
@ -657,7 +604,7 @@ impl PostgresSession {
}
pub async fn save_account_usage_in_block(&self, block_info: &BlockInfo) -> anyhow::Result<()> {
let temp_table = self.temp_table_tracker.get_new_temp_table();
let temp_table = self.get_new_temp_table();
self.client
.execute(
format!(
@ -704,8 +651,8 @@ impl PostgresSession {
);
pin_mut!(writer);
const LIMIT: usize = 100;
let mut nb_read_accounts : usize = 0;
let mut nb_write_accounts : usize = 0;
let mut nb_read_accounts: usize = 0;
let mut nb_write_accounts: usize = 0;
for account_usage in block_info.heavily_locked_accounts.iter() {
if nb_read_accounts >= LIMIT && nb_write_accounts >= LIMIT {
break;
@ -842,7 +789,7 @@ impl PostgresSession {
let signatures = txs
.iter()
.map(|transaction| transaction.signature.clone())
.collect_vec();
.collect();
self.create_transaction_ids(signatures).await?;
// create account ids
let accounts = txs
@ -872,7 +819,12 @@ impl PostgresSession {
.collect_vec();
// insert accounts for transaction
ACCOUNTS_SAVING_QUEUE.inc();
let _ = self.accounts_for_transaction_sender.send(txs_accounts);
let instant: Instant = Instant::now();
ACCOUNTS_SAVING_QUEUE.dec();
if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await {
error!("Error inserting accounts for transactions : {e:?}");
}
TIME_TO_STORE_TX_ACCOUNT.set(instant.elapsed().as_millis() as i64);
Ok(())
}
@ -893,16 +845,18 @@ impl PostgresSession {
.transactions
.iter()
.map(|transaction| transaction.signature.clone())
.collect_vec();
.collect();
self.create_transaction_ids(signatures).await?;
TIME_TO_SAVE_TRANSACTION.set(int_sig.elapsed().as_millis() as i64);
// create account ids
let ins_acc = Instant::now();
let accounts = block_info
.heavily_locked_accounts
.iter()
.map(|acc| acc.key.clone())
.collect();
self.create_accounts_for_transaction(accounts).await?;
ACCOUNT_SAVE_TIME.set(ins_acc.elapsed().as_millis() as i64);
let txs_accounts = block_info
.transactions
@ -922,8 +876,11 @@ impl PostgresSession {
})
.collect_vec();
ACCOUNTS_SAVING_QUEUE.inc();
let _ = self.accounts_for_transaction_sender.send(txs_accounts);
let instant_acc_tx: Instant = Instant::now();
if let Err(e) = self.insert_accounts_for_transaction(txs_accounts).await {
error!("Error inserting accounts for transactions : {e:?}");
}
TIME_TO_STORE_TX_ACCOUNT.set(instant_acc_tx.elapsed().as_millis() as i64);
// insert transactions
let instant_save_tx = Instant::now();