log metrics for postgres write statements (#39)

* add debug logs for query execution

* use tokio instant

* words
This commit is contained in:
Groovie | Mango 2024-01-10 14:13:19 +01:00 committed by GitHub
parent 527fe545c1
commit f0727de608
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 108 additions and 19 deletions

View File

@ -1,4 +1,3 @@
use std::time::Instant;
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
@ -14,6 +13,7 @@ use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use serde::Serialize;
use solana_sdk::transaction::TransactionError;
use tokio::time::Instant;
use tokio_postgres::{
binary_copy::BinaryCopyInWriter,
config::SslMode,
@ -154,13 +154,19 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
pin_mut!(writer);
for signature in signatures {
writer.as_mut().write(&[&signature]).await?;
}
writer.finish().await?;
let num_rows = writer.finish().await?;
debug!(
"inserted {} signatures into temp table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
@ -169,7 +175,14 @@ impl PostgresSession {
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} signatures in transactions table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
@ -203,13 +216,19 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
pin_mut!(writer);
for account in accounts {
writer.as_mut().write(&[&account]).await?;
}
writer.finish().await?;
let num_rows = writer.finish().await?;
debug!(
"inserted {} account keys into temp table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
@ -218,7 +237,14 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} account keys into accounts table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -254,6 +280,7 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
@ -282,7 +309,12 @@ impl PostgresSession {
writer.as_mut().write(&args).await?;
}
}
writer.finish().await?;
let num_rows = writer.finish().await?;
debug!(
"inserted {} txs for tx_slot into temp table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
@ -295,7 +327,15 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} txs into transaction_slot table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -331,7 +371,7 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
@ -349,7 +389,12 @@ impl PostgresSession {
writer.as_mut().write(&args).await?;
}
}
writer.finish().await?;
let num_rows = writer.finish().await?;
debug!(
"inserted {} accounts for transaction into temp table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
// merge data from temp table into accounts_map_transaction
let statement = format!(
@ -369,8 +414,13 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
let rows = self.client.execute(statement.as_str(), &[]).await?;
debug!("inserted into accounts_map_transaction: {}", rows);
debug!(
"inserted {} accounts into accounts_map_transaction in {}ms",
rows,
started_at.elapsed().as_millis()
);
// merge data from temp table into accounts_map_transaction_latest
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
@ -399,8 +449,13 @@ impl PostgresSession {
temp_table_name = temp_table_latest_agged,
limit = LIMIT_LATEST_TXS_PER_ACCOUNT
);
let rows = self.client.execute(statement.as_str(), &[]).await?;
info!("inserted into {}: {}", temp_table_latest_agged, rows);
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"merged new transactions into accounts_map_transaction_latest for {} accounts in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
@ -410,8 +465,13 @@ impl PostgresSession {
"#,
temp_table_name = temp_table_latest_agged
);
let rows = self.client.execute(statement.as_str(), &[]).await?;
info!("upserted in accounts_map_transaction_latest: {}", rows);
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table_latest_agged).await?;
self.drop_temp_table(temp_table).await?;
@ -457,7 +517,7 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
@ -483,7 +543,12 @@ impl PostgresSession {
args.push(&transaction.supp_infos);
writer.as_mut().write(&args).await?;
}
writer.finish().await?;
let num_rows = writer.finish().await?;
debug!(
"inserted {} transactions for block into temp table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
@ -505,7 +570,13 @@ impl PostgresSession {
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} transactions for block into transaction_infos table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
@ -544,7 +615,7 @@ impl PostgresSession {
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
@ -570,7 +641,12 @@ impl PostgresSession {
args.push(&pf_json);
writer.as_mut().write(&args).await?;
}
writer.finish().await?;
let num_rows = writer.finish().await?;
debug!(
"inserted {} heavily_locked_accounts into temp table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
@ -608,7 +684,13 @@ impl PostgresSession {
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} heavily_locked_accounts into accounts_map_blocks table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
@ -627,7 +709,8 @@ impl PostgresSession {
supp_infos
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8);
"#;
self.client
let started_at = Instant::now();
let num_rows = self.client
.execute(
statement,
&[
@ -642,6 +725,12 @@ impl PostgresSession {
],
)
.await?;
debug!(
"inserted {} block info into blocks table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
Ok(())
}