multiple write dbsessions (#40)

* postgres connection handling:

- use dedicated connections
- set work_mem

* serialize writes

* brush send_block_info_to_buffer logging

* remove sleep

* brush log

* cleanu
This commit is contained in:
Groovie | Mango 2024-01-10 16:39:05 +01:00 committed by GitHub
parent f0727de608
commit f5f9c99375
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 84 additions and 23 deletions

View File

@ -20,6 +20,7 @@ use dashmap::DashMap;
use futures::StreamExt;
use log::{debug, error, info};
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
use tokio::sync::mpsc::Sender;
use transaction_info::TransactionInfo;
mod alt_store;
@ -146,7 +147,7 @@ async fn start_tracking_blocks(
rpc_client: Arc<RpcClient>,
grpc_block_addr: String,
grpc_x_token: Option<String>,
postgres: postgres::Postgres,
block_sender_postgres: Sender<BlockInfo>,
slot: Arc<AtomicU64>,
alts_list: Vec<Pubkey>,
) {
@ -248,7 +249,7 @@ async fn start_tracking_blocks(
BLOCK_TXS.set(block.transactions.len() as i64);
BANKING_STAGE_BLOCKS_COUNTER.inc();
BANKING_STAGE_BLOCKS_TASK.inc();
let postgres = postgres.clone();
let block_sender_postgres = block_sender_postgres.clone();
let slot = slot.clone();
let atl_store = atl_store.clone();
tokio::spawn(async move {
@ -258,7 +259,7 @@ async fn start_tracking_blocks(
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres.save_block_info(block_info).await {
if let Err(e) = postgres::send_block_info_to_buffer(block_sender_postgres, block_info).await {
panic!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
@ -295,7 +296,8 @@ async fn main() -> anyhow::Result<()> {
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new());
let postgres = postgres::Postgres::new().await;
let postgres1 = postgres::Postgres::new_with_workmem().await;
let postgres2 = postgres::Postgres::new_with_workmem().await;
let slot = Arc::new(AtomicU64::new(0));
let no_block_subscription = grpc_block_addr.is_none();
let alts = args.alts;
@ -311,7 +313,11 @@ async fn main() -> anyhow::Result<()> {
.map(|x| Pubkey::from_str(&x).unwrap())
.collect_vec();
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let block_sender = postgres1.spawn_block_saver();
postgres2.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let jhs = args
.banking_grpc_addresses
.iter()
@ -335,7 +341,7 @@ async fn main() -> anyhow::Result<()> {
rpc_client,
gprc_block_addr,
args.grpc_x_token,
postgres,
block_sender,
slot,
alts_list,
)

View File

@ -13,6 +13,8 @@ use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use serde::Serialize;
use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Sender;
use tokio::time::Instant;
use tokio_postgres::{
binary_copy::BinaryCopyInWriter,
@ -27,6 +29,7 @@ use crate::{
transaction_info::TransactionInfo,
};
const BLOCK_WRITE_BUFFER_SIZE: usize = 5;
const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 1000;
pub struct TempTableTracker {
@ -120,6 +123,20 @@ impl PostgresSession {
Ok(client)
}
pub async fn configure_work_mem(&self) {
self.client
.execute("SET work_mem TO '256MB'", &[])
.await
.unwrap();
let work_mem: String = self
.client
.query_one("show work_mem", &[])
.await
.unwrap()
.get("work_mem");
info!("Configured work_mem={}", work_mem);
}
pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> {
self.client
.execute(format!("drop table if exists {};", table).as_str(), &[])
@ -829,7 +846,7 @@ impl PostgresSession {
// save account usage in blocks
self.save_account_usage_in_block(&block_info).await?;
self.save_block_info(&block_info).await?;
info!("block saved");
Ok(())
}
}
@ -851,17 +868,7 @@ impl PostgresSession {
slots_to_keep
);
self.client
.execute("SET work_mem TO '256MB'", &[])
.await
.unwrap();
let work_mem: String = self
.client
.query_one("show work_mem", &[])
.await
.unwrap()
.get("work_mem");
info!("Configured work_mem={}", work_mem);
self.configure_work_mem().await;
{
info!("Rows before cleanup:");
@ -1159,13 +1166,47 @@ pub struct Postgres {
}
impl Postgres {
pub async fn new() -> Self {
pub async fn new_with_workmem() -> Self {
let session = PostgresSession::new().await.unwrap();
let session = Arc::new(session);
session.configure_work_mem().await;
Self {
session: Arc::new(session),
session
}
}
pub fn spawn_block_saver(&self) -> Sender<BlockInfo> {
let (block_sender, mut block_receiver) = tokio::sync::mpsc::channel::<BlockInfo>(BLOCK_WRITE_BUFFER_SIZE);
let session = self.session.clone();
tokio::spawn(async move {
loop {
match block_receiver.recv().await {
None => {
warn!("block_receiver closed - stopping thread");
return;
}
Some(block) => {
let slot = block.slot;
info!("saving block {} ..", slot);
match session.save_block(block).await {
Ok(_) => {
info!("saving block {} done", slot);
}
Err(err) => {
error!("saving block failed {}", err);
}
}
}
};
}
});
return block_sender;
}
pub fn spawn_transaction_infos_saver(
&self,
map_of_transaction: Arc<DashMap<(String, u64), TransactionInfo>>,
@ -1198,9 +1239,6 @@ impl Postgres {
});
}
pub async fn save_block_info(&self, block: BlockInfo) -> anyhow::Result<()> {
self.session.save_block(block).await
}
}
#[derive(Serialize, Clone)]
@ -1220,3 +1258,20 @@ pub struct AccountsForTransaction {
pub signature: String,
pub accounts: Vec<AccountUsed>,
}
pub async fn send_block_info_to_buffer(block_sender_postgres: Sender<BlockInfo>, block_info: BlockInfo) -> anyhow::Result<()> {
debug!("block buffer capacity: {}", block_sender_postgres.capacity());
const WARNING_THRESHOLD: Duration = Duration::from_millis(3000);
let started_at = Instant::now();
if let Err(SendTimeoutError::Timeout(block)) = block_sender_postgres.send_timeout(block_info, WARNING_THRESHOLD).await {
let slot = block.slot;
warn!("Block {} was not buffered for {:.3}s - continue waiting", slot, WARNING_THRESHOLD.as_secs_f32());
block_sender_postgres.send(block).await?;
info!("Block {} was finally buffered after {:.3}s", slot, started_at.elapsed().as_secs_f32());
}
Ok(())
}