Fix minor bug

This commit is contained in:
godmodegalactus 2024-01-29 15:09:03 +01:00
parent f0e37643c5
commit 343bb70428
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 41 additions and 12 deletions

View File

@ -342,8 +342,7 @@ async fn main() -> anyhow::Result<()> {
let grpc_block_addr = args.grpc_address_to_fetch_blocks;
let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new());
let postgres1 = postgres::Postgres::new_with_workmem().await;
let postgres2 = postgres::Postgres::new_with_workmem().await;
let postgres1 = postgres::Postgres::new_with_workmem(0).await;
let slot = Arc::new(AtomicU64::new(0));
let no_block_subscription = grpc_block_addr.is_none();
let alts = args.alts;
@ -359,9 +358,14 @@ async fn main() -> anyhow::Result<()> {
.map(|x| Pubkey::from_str(&x).unwrap())
.collect_vec();
let block_senders = (0..4).map(|x| postgres1.spawn_block_saver()).collect_vec();
let mut block_senders = vec![];
for i in 1..=4
{
let s = postgres::Postgres::new_with_workmem(i).await.spawn_block_saver();
block_senders.push(s);
}
postgres2.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
postgres1.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let jhs = args
.banking_grpc_addresses

View File

@ -14,7 +14,7 @@ use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use prometheus::{opts, register_int_gauge, IntGauge};
use serde::Serialize;
use solana_sdk::{pubkey::Pubkey, transaction::TransactionError};
use solana_sdk::transaction::TransactionError;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Sender;
use tokio::time::Instant;
@ -60,13 +60,38 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("banking_stage_sidecar_account_save_time", "Account save time")).unwrap();
}
#[derive(Clone)]
pub struct TempTableTracker {
nb : usize,
count: Arc<AtomicU64>,
}
impl TempTableTracker {
pub fn new(nb:usize) -> Self {
Self {
nb,
count: Arc::new(AtomicU64::new(1)),
}
}
pub fn get_new_temp_table(&self) -> String {
format!(
"temp_table_{}_{}",
self.nb,
self.count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
)
}
}
#[derive(Clone)]
pub struct PostgresSession {
client: Arc<Client>,
temp_table_tracker : TempTableTracker,
}
impl PostgresSession {
pub async fn new() -> anyhow::Result<Self> {
pub async fn new(nb : usize) -> anyhow::Result<Self> {
let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?;
let pg_config = pg_config.parse::<tokio_postgres::Config>()?;
@ -98,14 +123,14 @@ impl PostgresSession {
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
};
let instance = Self {
Ok(Self {
client: Arc::new(client),
};
Ok(instance)
temp_table_tracker: TempTableTracker::new(nb)
})
}
pub fn get_new_temp_table(&self) -> String {
Pubkey::new_unique().to_string()
self.temp_table_tracker.get_new_temp_table()
}
async fn spawn_connection<T>(
@ -1213,8 +1238,8 @@ pub struct Postgres {
}
impl Postgres {
pub async fn new_with_workmem() -> Self {
let session = PostgresSession::new().await.unwrap();
pub async fn new_with_workmem(nb: usize) -> Self {
let session = PostgresSession::new(nb).await.unwrap();
let session = Arc::new(session);
session.configure_work_mem().await;
Self { session }