redesign insert_transactions_for_block

This commit is contained in:
GroovieGermanikus 2024-01-31 12:06:40 +01:00
parent bd43f3717e
commit 52b8ea75c2
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 12 additions and 28 deletions

View File

@ -527,11 +527,7 @@ impl PostgresSession {
transactions: &Vec<BlockTransactionInfo>,
slot: i64,
) -> anyhow::Result<()> {
let temp_table = self.get_new_temp_table();
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
let statement = "CREATE TEMP TABLE temp_new_txblock_data(
signature char(88),
processed_slot BIGINT,
is_successful BOOL,
@ -539,17 +535,11 @@ impl PostgresSession {
cu_consumed BIGINT,
prioritization_fees BIGINT,
supp_infos text
)",
temp_table
)
.as_str(),
&[],
)
.await?;
)";
self.client.execute(statement, &[]).await?;
let statement = format!(
r#"
COPY {}(
let statement = r#"
COPY temp_new_txblock_data(
signature,
processed_slot,
is_successful,
@ -558,11 +548,9 @@ impl PostgresSession {
prioritization_fees,
supp_infos
) FROM STDIN BINARY
"#,
temp_table
);
"#;
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[
@ -594,11 +582,10 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
let statement = r#"
INSERT INTO banking_stage_results_2.transaction_infos
(transaction_id, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos)
SELECT
SELECT
( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ),
t.processed_slot,
t.is_successful,
@ -606,20 +593,17 @@ impl PostgresSession {
t.cu_consumed,
t.prioritization_fees,
t.supp_infos
FROM {} AS t
FROM temp_new_txblock_data AS t
ON CONFLICT DO NOTHING
"#,
temp_table
);
"#;
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
let num_rows = self.client.execute(statement, &[]).await?;
debug!(
"inserted {} transactions for block into transaction_infos table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}