Revert "clear postgres session and use hardcoded temptable names (#52)"

This reverts commit 7afa8fc45f.
This commit is contained in:
GroovieGermanikus 2024-02-01 09:43:34 +01:00
parent 7afa8fc45f
commit 53128eeca4
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 178 additions and 104 deletions

View File

@ -170,22 +170,6 @@ impl PostgresSession {
Ok(client)
}
pub async fn clear_session(&self) {
// see https://www.postgresql.org/docs/current/sql-discard.html
// CLOSE ALL -> drop potental cursors
// RESET ALL -> we do not want (would reset work_mem)
// DEALLOCATE -> would drop prepared statements which we do not use ATM
// DISCARD PLANS -> we want to keep the plans
// DISCARD SEQUENCES -> we want to keep the sequences
self.client
.batch_execute(r#"
DISCARD TEMP;
CLOSE ALL;"#)
.await
.unwrap();
debug!("Clear postgres session");
}
pub async fn configure_work_mem(&self) {
self.client
.execute("SET work_mem TO '256MB'", &[])
@ -208,17 +192,32 @@ impl PostgresSession {
}
pub async fn create_transaction_ids(&self, signatures: HashSet<String>) -> anyhow::Result<()> {
let statement = r#"
CREATE TEMP TABLE temp_new_transactions_raw(
signature char(88)
)"#;
self.client.execute(statement, &[]).await?;
// create temp table
let temp_table = self.get_new_temp_table();
let statement = r#"
COPY temp_new_transactions_raw(
self.client
.execute(
format!(
r#"
CREATE TEMP TABLE {}(
signature char(88)
);
"#,
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = format!(
r#"
COPY {}(
signature
) FROM STDIN BINARY
"#.to_string();
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
@ -233,18 +232,22 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
let statement = r#"
INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature FROM temp_new_transactions_raw
ON CONFLICT DO NOTHING
"#;
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature from {}
ON CONFLICT DO NOTHING
"#,
temp_table
);
let started_at = Instant::now();
let num_rows = self.client.execute(statement, &[]).await?;
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} signatures in transactions table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -252,16 +255,30 @@ impl PostgresSession {
&self,
accounts: HashSet<String>,
) -> anyhow::Result<()> {
let statement = "CREATE TEMP TABLE temp_new_accounts_raw(
key TEXT
)";
self.client.execute(statement, &[]).await?;
// create temp table
let temp_table = self.get_new_temp_table();
let statement = r#"
COPY temp_new_accounts_raw(
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
key TEXT
);",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = format!(
r#"
COPY {}(
key
) FROM STDIN BINARY
"#.to_string();
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
@ -276,18 +293,22 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
let statement = r#"
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key FROM temp_new_accounts_raw
ON CONFLICT DO NOTHING
"#;
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {}
ON CONFLICT DO NOTHING
"#,
temp_table
);
let started_at = Instant::now();
self.client.execute(statement, &[]).await?;
self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} account keys into accounts table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -388,25 +409,34 @@ impl PostgresSession {
) -> anyhow::Result<()> {
let instant = Instant::now();
let temp_table = self.get_new_temp_table();
self.client
.execute(
r#"CREATE TEMP TABLE temp_new_amt_data_raw (
account_key char(44),
signature char(88),
is_writable BOOL,
is_signer BOOL,
is_atl BOOL
)"#,
format!(
"CREATE TEMP TABLE {}(
account_key char(44),
signature char(88),
is_writable BOOL,
is_signer BOOL,
is_atl BOOL
);",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement =
r#"COPY temp_new_amt_data_raw(
let statement = format!(
r#"
COPY {}(
account_key, signature, is_writable, is_signer, is_atl
) FROM STDIN BINARY"#;
) FROM STDIN BINARY
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await?;
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL],
@ -431,33 +461,23 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
let started_at = Instant::now();
let statement = r#"
CREATE TEMP TABLE temp_new_amt_data AS
SELECT
// merge data from temp table into accounts_map_transaction
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl)
SELECT
( select acc_id from banking_stage_results_2.accounts where account_key = new_amt_data.account_key ),
( select transaction_id from banking_stage_results_2.transactions where signature = new_amt_data.signature ),
new_amt_data.is_writable,
new_amt_data.is_signer,
new_amt_data.is_atl
FROM temp_new_amt_data_raw AS new_amt_data
"#;
let rows = self.client.execute(statement, &[]).await?;
debug!(
"resolve {} account_keys+signatures into new_amt_data in {}ms",
rows,
started_at.elapsed().as_millis()
);
// merge data from temp table into accounts_map_transaction
let statement =
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl)
SELECT * FROM temp_new_amt_data
FROM {} AS new_amt_data
ON CONFLICT DO NOTHING
"#;
"#,
temp_table
);
let started_at = Instant::now();
let rows = self.client.execute(statement, &[]).await?;
let rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} accounts into accounts_map_transaction in {}ms",
rows,
@ -466,20 +486,31 @@ impl PostgresSession {
TIME_TO_STORE_TX_ACCOUNT_OLD.set(instant.elapsed().as_millis() as i64);
let instant = Instant::now();
// merge data from temp table into accounts_map_transaction_latest
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
let temp_table_latest_agged = self.get_new_temp_table();
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
SELECT
CREATE TEMP TABLE {temp_table_name} AS
WITH amt_new AS (
SELECT
acc_id, array_agg(transactions.transaction_id) AS tx_agged
FROM {temp_table_newdata} AS newdata
inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key
inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature
GROUP BY acc_id
)
SELECT
acc_id,
banking_stage_results_2.array_prepend_and_truncate(
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest prev_latest WHERE prev_latest.acc_id=newdata.acc_id),
array_agg(newdata.transaction_id),
{limit}
) AS tx_ids
FROM temp_new_amt_data as newdata
GROUP BY acc_id
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id),
amt_new.tx_agged,
{limit}) AS tx_ids_agg
FROM amt_new
"#,
temp_table_newdata = temp_table,
temp_table_name = temp_table_latest_agged,
limit = LIMIT_LATEST_TXS_PER_ACCOUNT
);
let started_at = Instant::now();
@ -489,7 +520,25 @@ impl PostgresSession {
num_rows,
started_at.elapsed().as_millis()
);
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
SELECT acc_id, tx_ids_agg FROM {temp_table_name}
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
"#,
temp_table_name = temp_table_latest_agged
);
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64);
self.drop_temp_table(temp_table_latest_agged).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -498,7 +547,11 @@ impl PostgresSession {
transactions: &Vec<BlockTransactionInfo>,
slot: i64,
) -> anyhow::Result<()> {
let statement = "CREATE TEMP TABLE temp_new_txblock_data(
let temp_table = self.get_new_temp_table();
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
signature char(88),
processed_slot BIGINT,
is_successful BOOL,
@ -506,11 +559,17 @@ impl PostgresSession {
cu_consumed BIGINT,
prioritization_fees BIGINT,
supp_infos text
)";
self.client.execute(statement, &[]).await?;
)",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = r#"
COPY temp_new_txblock_data(
let statement = format!(
r#"
COPY {}(
signature,
processed_slot,
is_successful,
@ -519,9 +578,11 @@ impl PostgresSession {
prioritization_fees,
supp_infos
) FROM STDIN BINARY
"#;
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await?;
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[
@ -553,10 +614,11 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
let statement = r#"
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.transaction_infos
(transaction_id, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos)
SELECT
SELECT
( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ),
t.processed_slot,
t.is_successful,
@ -564,38 +626,46 @@ impl PostgresSession {
t.cu_consumed,
t.prioritization_fees,
t.supp_infos
FROM temp_new_txblock_data AS t
FROM {} AS t
ON CONFLICT DO NOTHING
"#;
"#,
temp_table
);
let started_at = Instant::now();
let num_rows = self.client.execute(statement, &[]).await?;
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
"inserted {} transactions for block into transaction_infos table in {}ms",
num_rows,
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}
pub async fn save_account_usage_in_block(&self, block_info: &BlockInfo) -> anyhow::Result<()> {
let temp_table = self.get_new_temp_table();
self.client
.execute(
"CREATE TEMP TABLE temp_new_account_usage_raw(
format!(
"CREATE TEMP TABLE {}(
account_key char(44),
slot BIGINT,
is_write_locked BOOL,
total_cu_requested BIGINT,
total_cu_consumed BIGINT,
prioritization_fees_info text
)".to_string()
)",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = r#"
COPY temp_new_account_usage_raw(
let statement = format!(
r#"
COPY {}(
account_key,
slot,
is_write_locked,
@ -603,7 +673,9 @@ impl PostgresSession {
total_cu_consumed,
prioritization_fees_info
) FROM STDIN BINARY
"#.to_string();
"#,
temp_table
);
let started_at = Instant::now();
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
@ -654,7 +726,8 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
let statement = r#"
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_blocks
( acc_id,
slot,
@ -663,7 +736,7 @@ impl PostgresSession {
total_cu_consumed,
prioritization_fees_info
)
SELECT
SELECT
( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ),
t.slot,
t.is_write_locked,
@ -676,7 +749,7 @@ impl PostgresSession {
is_write_locked,
total_cu_requested,
total_cu_consumed,
prioritization_fees_info from temp_new_account_usage_raw
prioritization_fees_info from {}
)
as t (account_key,
slot,
@ -686,7 +759,9 @@ impl PostgresSession {
prioritization_fees_info
)
ON CONFLICT DO NOTHING
"#.to_string();
"#,
temp_table
);
let started_at = Instant::now();
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
debug!(
@ -695,6 +770,7 @@ impl PostgresSession {
started_at.elapsed().as_millis()
);
self.drop_temp_table(temp_table).await?;
Ok(())
}
@ -1203,7 +1279,6 @@ impl Postgres {
Some(block) => {
let slot = block.slot;
let instant = Instant::now();
session.clear_session().await;
match session.save_block(block).await {
Ok(_) => {
info!(
@ -1242,7 +1317,6 @@ impl Postgres {
}
if !txs_to_store.is_empty() {
session.clear_session().await;
debug!("saving transaction infos for {}", txs_to_store.len());
let batches = txs_to_store
.iter()