Creating seperate task to save transaction accounts

This commit is contained in:
godmodegalactus 2024-01-22 17:48:34 +01:00
parent abbf805726
commit 64d607155c
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 82 additions and 27 deletions

View File

@ -313,7 +313,6 @@ async fn main() -> anyhow::Result<()> {
.map(|x| Pubkey::from_str(&x).unwrap())
.collect_vec();
let block_sender = postgres1.spawn_block_saver();
postgres2.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());

View File

@ -52,9 +52,12 @@ impl TempTableTracker {
}
}
#[derive(Clone)]
pub struct PostgresSession {
client: Client,
temp_table_tracker: TempTableTracker,
client: Arc<Client>,
temp_table_tracker: Arc<TempTableTracker>,
accounts_for_transaction_sender:
tokio::sync::mpsc::UnboundedSender<Vec<AccountsForTransaction>>,
}
impl PostgresSession {
@ -90,10 +93,46 @@ impl PostgresSession {
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
};
Ok(Self {
client,
temp_table_tracker: TempTableTracker::new(),
})
let (accounts_for_transaction_sender, accounts_for_transaction_reciever) =
tokio::sync::mpsc::unbounded_channel();
let instance = Self {
client: Arc::new(client),
temp_table_tracker: Arc::new(TempTableTracker::new()),
accounts_for_transaction_sender,
};
Self::spawn_account_transaction_saving_task(
instance.clone(),
accounts_for_transaction_reciever,
);
Ok(instance)
}
fn spawn_account_transaction_saving_task(
instance: PostgresSession,
mut accounts_for_transaction_reciever: tokio::sync::mpsc::UnboundedReceiver<
Vec<AccountsForTransaction>,
>,
) {
tokio::spawn(async move {
while let Some(accounts_for_transaction) =
accounts_for_transaction_reciever.recv().await
{
let instant = Instant::now();
if let Err(e) = instance
.insert_accounts_for_transaction(accounts_for_transaction)
.await
{
error!("Error inserting accounts for transactions : {e:?}");
}
log::info!(
"Took {} ms to insert accounts for transaction",
instant.elapsed().as_millis()
);
}
error!("account transaction saving task ");
panic!("account transaction saving task");
});
}
async fn spawn_connection<T>(
@ -200,7 +239,6 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -352,7 +390,6 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -722,7 +759,8 @@ impl PostgresSession {
ON CONFLICT DO NOTHING
"#;
let started_at = Instant::now();
let num_rows = self.client
let num_rows = self
.client
.execute(
statement,
&[
@ -790,7 +828,7 @@ impl PostgresSession {
})
.collect_vec();
// insert accounts for transaction
self.insert_accounts_for_transaction(txs_accounts).await?;
let _ = self.accounts_for_transaction_sender.send(txs_accounts);
Ok(())
}
@ -836,7 +874,8 @@ impl PostgresSession {
.collect(),
})
.collect_vec();
self.insert_accounts_for_transaction(txs_accounts).await?;
let _ = self.accounts_for_transaction_sender.send(txs_accounts);
// save transactions in block
self.insert_transactions_for_block(&block_info.transactions, block_info.slot)
@ -1169,13 +1208,12 @@ impl Postgres {
let session = PostgresSession::new().await.unwrap();
let session = Arc::new(session);
session.configure_work_mem().await;
Self {
session
}
Self { session }
}
pub fn spawn_block_saver(&self) -> Sender<BlockInfo> {
let (block_sender, mut block_receiver) = tokio::sync::mpsc::channel::<BlockInfo>(BLOCK_WRITE_BUFFER_SIZE);
let (block_sender, mut block_receiver) =
tokio::sync::mpsc::channel::<BlockInfo>(BLOCK_WRITE_BUFFER_SIZE);
let session = self.session.clone();
tokio::spawn(async move {
@ -1188,10 +1226,14 @@ impl Postgres {
Some(block) => {
let slot = block.slot;
info!("saving block {} ..", slot);
let instant = Instant::now();
match session.save_block(block).await {
Ok(_) => {
info!("saving block {} done", slot);
info!(
"saving block {} took {} ms",
slot,
instant.elapsed().as_millis()
);
}
Err(err) => {
error!("saving block failed {}", err);
@ -1202,10 +1244,9 @@ impl Postgres {
}
});
return block_sender;
block_sender
}
pub fn spawn_transaction_infos_saver(
&self,
map_of_transaction: Arc<DashMap<(String, u64), TransactionInfo>>,
@ -1237,7 +1278,6 @@ impl Postgres {
}
});
}
}
#[derive(Serialize, Clone)]
@ -1258,18 +1298,34 @@ pub struct AccountsForTransaction {
pub accounts: Vec<AccountUsed>,
}
pub async fn send_block_info_to_buffer(block_sender_postgres: Sender<BlockInfo>, block_info: BlockInfo) -> anyhow::Result<()> {
debug!("block buffer capacity: {}", block_sender_postgres.capacity());
pub async fn send_block_info_to_buffer(
block_sender_postgres: Sender<BlockInfo>,
block_info: BlockInfo,
) -> anyhow::Result<()> {
debug!(
"block buffer capacity: {}",
block_sender_postgres.capacity()
);
const WARNING_THRESHOLD: Duration = Duration::from_millis(3000);
let started_at = Instant::now();
if let Err(SendTimeoutError::Timeout(block)) = block_sender_postgres.send_timeout(block_info, WARNING_THRESHOLD).await {
if let Err(SendTimeoutError::Timeout(block)) = block_sender_postgres
.send_timeout(block_info, WARNING_THRESHOLD)
.await
{
let slot = block.slot;
warn!("Block {} was not buffered for {:.3}s - continue waiting", slot, WARNING_THRESHOLD.as_secs_f32());
warn!(
"Block {} was not buffered for {:.3}s - continue waiting",
slot,
WARNING_THRESHOLD.as_secs_f32()
);
block_sender_postgres.send(block).await?;
info!("Block {} was finally buffered after {:.3}s", slot, started_at.elapsed().as_secs_f32());
info!(
"Block {} was finally buffered after {:.3}s",
slot,
started_at.elapsed().as_secs_f32()
);
}
Ok(())