clear pg session, redesign insert_accounts_for_transaction

This commit is contained in:
GroovieGermanikus 2024-01-31 12:03:10 +01:00
parent 7a577a43fa
commit bd43f3717e
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 50 additions and 68 deletions

View File

@ -170,6 +170,14 @@ impl PostgresSession {
Ok(client)
}
pub async fn clear_session(&self) {
self.client
.execute("DISCARD ALL", &[])
.await
.unwrap();
debug!("Clear postgres session");
}
pub async fn configure_work_mem(&self) {
self.client
.execute("SET work_mem TO '256MB'", &[])
@ -409,34 +417,25 @@ impl PostgresSession {
) -> anyhow::Result<()> {
let instant = Instant::now();
let temp_table = self.get_new_temp_table();
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
account_key char(44),
signature char(88),
is_writable BOOL,
is_signer BOOL,
is_atl BOOL
);",
temp_table
)
.as_str(),
r#"CREATE TEMP TABLE temp_new_amt_data_raw (
account_key char(44),
signature char(88),
is_writable BOOL,
is_signer BOOL,
is_atl BOOL
)"#,
&[],
)
.await?;
let statement = format!(
r#"
COPY {}(
let statement =
r#"COPY temp_new_amt_data_raw(
account_key, signature, is_writable, is_signer, is_atl
) FROM STDIN BINARY
"#,
temp_table
);
) FROM STDIN BINARY"#;
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL],
@ -461,23 +460,33 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
// merge data from temp table into accounts_map_transaction
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl)
SELECT
let started_at = Instant::now();
let statement = r#"
CREATE TEMP TABLE temp_new_amt_data AS
SELECT
( select acc_id from banking_stage_results_2.accounts where account_key = new_amt_data.account_key ),
( select transaction_id from banking_stage_results_2.transactions where signature = new_amt_data.signature ),
new_amt_data.is_writable,
new_amt_data.is_signer,
new_amt_data.is_atl
FROM {} AS new_amt_data
ON CONFLICT DO NOTHING
"#,
temp_table
FROM temp_new_amt_data_raw AS new_amt_data
"#;
let rows = self.client.execute(statement, &[]).await?;
debug!(
"resolve {} account_keys+signatures into new_amt_data in {}ms",
rows,
started_at.elapsed().as_millis()
);
// merge data from temp table into accounts_map_transaction
let statement =
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl)
SELECT * FROM temp_new_amt_data
ON CONFLICT DO NOTHING
"#;
let started_at = Instant::now();
let rows = self.client.execute(statement.as_str(), &[]).await?;
let rows = self.client.execute(statement, &[]).await?;
debug!(
"inserted {} accounts into accounts_map_transaction in {}ms",
rows,
@ -486,31 +495,20 @@ impl PostgresSession {
TIME_TO_STORE_TX_ACCOUNT_OLD.set(instant.elapsed().as_millis() as i64);
let instant = Instant::now();
// merge data from temp table into accounts_map_transaction_latest
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
let temp_table_latest_agged = self.get_new_temp_table();
let statement = format!(
r#"
CREATE TEMP TABLE {temp_table_name} AS
WITH amt_new AS (
SELECT
acc_id, array_agg(transactions.transaction_id) AS tx_agged
FROM {temp_table_newdata} AS newdata
inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key
inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature
GROUP BY acc_id
)
SELECT
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
SELECT
acc_id,
banking_stage_results_2.array_prepend_and_truncate(
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id),
amt_new.tx_agged,
{limit}) AS tx_ids_agg
FROM amt_new
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest prev_latest WHERE prev_latest.acc_id=newdata.acc_id),
array_agg(newdata.transaction_id),
{limit}
) AS tx_ids
FROM temp_new_amt_data as newdata
GROUP BY acc_id
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
"#,
temp_table_newdata = temp_table,
temp_table_name = temp_table_latest_agged,
limit = LIMIT_LATEST_TXS_PER_ACCOUNT
);
let started_at = Instant::now();
@ -520,25 +518,7 @@ impl PostgresSession {
num_rows,
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
SELECT acc_id, tx_ids_agg FROM {temp_table_name}
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
"#,
temp_table_name = temp_table_latest_agged
);
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64);
self.drop_temp_table(temp_table_latest_agged).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -1279,6 +1259,7 @@ impl Postgres {
Some(block) => {
let slot = block.slot;
let instant = Instant::now();
session.clear_session().await;
match session.save_block(block).await {
Ok(_) => {
info!(
@ -1317,6 +1298,7 @@ impl Postgres {
}
if !txs_to_store.is_empty() {
session.clear_session().await;
debug!("saving transaction infos for {}", txs_to_store.len());
let batches = txs_to_store
.iter()