clear postgres session and use hardcoded temptable names (#52)
* clear pg session, redesign insert_accounts_for_transaction * redesign insert_transactions_for_block * redesign save_account_usage_in_block * redesign create_transaction_ids * redesign create_accounts_for_transaction * narrow resources cleared in clear_session
This commit is contained in:
parent
7a577a43fa
commit
7afa8fc45f
278
src/postgres.rs
278
src/postgres.rs
|
@ -170,6 +170,22 @@ impl PostgresSession {
|
|||
Ok(client)
|
||||
}
|
||||
|
||||
pub async fn clear_session(&self) {
|
||||
// see https://www.postgresql.org/docs/current/sql-discard.html
|
||||
// CLOSE ALL -> drop potental cursors
|
||||
// RESET ALL -> we do not want (would reset work_mem)
|
||||
// DEALLOCATE -> would drop prepared statements which we do not use ATM
|
||||
// DISCARD PLANS -> we want to keep the plans
|
||||
// DISCARD SEQUENCES -> we want to keep the sequences
|
||||
self.client
|
||||
.batch_execute(r#"
|
||||
DISCARD TEMP;
|
||||
CLOSE ALL;"#)
|
||||
.await
|
||||
.unwrap();
|
||||
debug!("Clear postgres session");
|
||||
}
|
||||
|
||||
pub async fn configure_work_mem(&self) {
|
||||
self.client
|
||||
.execute("SET work_mem TO '256MB'", &[])
|
||||
|
@ -192,32 +208,17 @@ impl PostgresSession {
|
|||
}
|
||||
|
||||
pub async fn create_transaction_ids(&self, signatures: HashSet<String>) -> anyhow::Result<()> {
|
||||
// create temp table
|
||||
let temp_table = self.get_new_temp_table();
|
||||
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
r#"
|
||||
CREATE TEMP TABLE {}(
|
||||
let statement = r#"
|
||||
CREATE TEMP TABLE temp_new_transactions_raw(
|
||||
signature char(88)
|
||||
);
|
||||
"#,
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
)"#;
|
||||
self.client.execute(statement, &[]).await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
let statement = r#"
|
||||
COPY temp_new_transactions_raw(
|
||||
signature
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
"#.to_string();
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
|
||||
|
@ -232,22 +233,18 @@ impl PostgresSession {
|
|||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature from {}
|
||||
ON CONFLICT DO NOTHING
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
let statement = r#"
|
||||
INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature FROM temp_new_transactions_raw
|
||||
ON CONFLICT DO NOTHING
|
||||
"#;
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
let num_rows = self.client.execute(statement, &[]).await?;
|
||||
debug!(
|
||||
"inserted {} signatures in transactions table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -255,30 +252,16 @@ impl PostgresSession {
|
|||
&self,
|
||||
accounts: HashSet<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
// create temp table
|
||||
let temp_table = self.get_new_temp_table();
|
||||
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
let statement = "CREATE TEMP TABLE temp_new_accounts_raw(
|
||||
key TEXT
|
||||
);",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
)";
|
||||
self.client.execute(statement, &[]).await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
let statement = r#"
|
||||
COPY temp_new_accounts_raw(
|
||||
key
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
"#.to_string();
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
|
||||
|
@ -293,22 +276,18 @@ impl PostgresSession {
|
|||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {}
|
||||
ON CONFLICT DO NOTHING
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
let statement = r#"
|
||||
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key FROM temp_new_accounts_raw
|
||||
ON CONFLICT DO NOTHING
|
||||
"#;
|
||||
let started_at = Instant::now();
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
self.client.execute(statement, &[]).await?;
|
||||
debug!(
|
||||
"inserted {} account keys into accounts table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -409,34 +388,25 @@ impl PostgresSession {
|
|||
) -> anyhow::Result<()> {
|
||||
|
||||
let instant = Instant::now();
|
||||
let temp_table = self.get_new_temp_table();
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
account_key char(44),
|
||||
signature char(88),
|
||||
is_writable BOOL,
|
||||
is_signer BOOL,
|
||||
is_atl BOOL
|
||||
);",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
r#"CREATE TEMP TABLE temp_new_amt_data_raw (
|
||||
account_key char(44),
|
||||
signature char(88),
|
||||
is_writable BOOL,
|
||||
is_signer BOOL,
|
||||
is_atl BOOL
|
||||
)"#,
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
let statement =
|
||||
r#"COPY temp_new_amt_data_raw(
|
||||
account_key, signature, is_writable, is_signer, is_atl
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
) FROM STDIN BINARY"#;
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
&[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL],
|
||||
|
@ -461,23 +431,33 @@ impl PostgresSession {
|
|||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
// merge data from temp table into accounts_map_transaction
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl)
|
||||
SELECT
|
||||
let started_at = Instant::now();
|
||||
let statement = r#"
|
||||
CREATE TEMP TABLE temp_new_amt_data AS
|
||||
SELECT
|
||||
( select acc_id from banking_stage_results_2.accounts where account_key = new_amt_data.account_key ),
|
||||
( select transaction_id from banking_stage_results_2.transactions where signature = new_amt_data.signature ),
|
||||
new_amt_data.is_writable,
|
||||
new_amt_data.is_signer,
|
||||
new_amt_data.is_atl
|
||||
FROM {} AS new_amt_data
|
||||
ON CONFLICT DO NOTHING
|
||||
"#,
|
||||
temp_table
|
||||
FROM temp_new_amt_data_raw AS new_amt_data
|
||||
"#;
|
||||
let rows = self.client.execute(statement, &[]).await?;
|
||||
debug!(
|
||||
"resolve {} account_keys+signatures into new_amt_data in {}ms",
|
||||
rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
// merge data from temp table into accounts_map_transaction
|
||||
let statement =
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl)
|
||||
SELECT * FROM temp_new_amt_data
|
||||
ON CONFLICT DO NOTHING
|
||||
"#;
|
||||
let started_at = Instant::now();
|
||||
let rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
let rows = self.client.execute(statement, &[]).await?;
|
||||
debug!(
|
||||
"inserted {} accounts into accounts_map_transaction in {}ms",
|
||||
rows,
|
||||
|
@ -486,31 +466,20 @@ impl PostgresSession {
|
|||
TIME_TO_STORE_TX_ACCOUNT_OLD.set(instant.elapsed().as_millis() as i64);
|
||||
|
||||
let instant = Instant::now();
|
||||
// merge data from temp table into accounts_map_transaction_latest
|
||||
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
|
||||
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
|
||||
let temp_table_latest_agged = self.get_new_temp_table();
|
||||
let statement = format!(
|
||||
r#"
|
||||
CREATE TEMP TABLE {temp_table_name} AS
|
||||
WITH amt_new AS (
|
||||
SELECT
|
||||
acc_id, array_agg(transactions.transaction_id) AS tx_agged
|
||||
FROM {temp_table_newdata} AS newdata
|
||||
inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key
|
||||
inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature
|
||||
GROUP BY acc_id
|
||||
)
|
||||
SELECT
|
||||
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
|
||||
SELECT
|
||||
acc_id,
|
||||
banking_stage_results_2.array_prepend_and_truncate(
|
||||
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id),
|
||||
amt_new.tx_agged,
|
||||
{limit}) AS tx_ids_agg
|
||||
FROM amt_new
|
||||
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest prev_latest WHERE prev_latest.acc_id=newdata.acc_id),
|
||||
array_agg(newdata.transaction_id),
|
||||
{limit}
|
||||
) AS tx_ids
|
||||
FROM temp_new_amt_data as newdata
|
||||
GROUP BY acc_id
|
||||
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
|
||||
"#,
|
||||
temp_table_newdata = temp_table,
|
||||
temp_table_name = temp_table_latest_agged,
|
||||
limit = LIMIT_LATEST_TXS_PER_ACCOUNT
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
|
@ -520,25 +489,7 @@ impl PostgresSession {
|
|||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
|
||||
SELECT acc_id, tx_ids_agg FROM {temp_table_name}
|
||||
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids
|
||||
"#,
|
||||
temp_table_name = temp_table_latest_agged
|
||||
);
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
"upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64);
|
||||
self.drop_temp_table(temp_table_latest_agged).await?;
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -547,11 +498,7 @@ impl PostgresSession {
|
|||
transactions: &Vec<BlockTransactionInfo>,
|
||||
slot: i64,
|
||||
) -> anyhow::Result<()> {
|
||||
let temp_table = self.get_new_temp_table();
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
let statement = "CREATE TEMP TABLE temp_new_txblock_data(
|
||||
signature char(88),
|
||||
processed_slot BIGINT,
|
||||
is_successful BOOL,
|
||||
|
@ -559,17 +506,11 @@ impl PostgresSession {
|
|||
cu_consumed BIGINT,
|
||||
prioritization_fees BIGINT,
|
||||
supp_infos text
|
||||
)",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
)";
|
||||
self.client.execute(statement, &[]).await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
let statement = r#"
|
||||
COPY temp_new_txblock_data(
|
||||
signature,
|
||||
processed_slot,
|
||||
is_successful,
|
||||
|
@ -578,11 +519,9 @@ impl PostgresSession {
|
|||
prioritization_fees,
|
||||
supp_infos
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
"#;
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
&[
|
||||
|
@ -614,11 +553,10 @@ impl PostgresSession {
|
|||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
let statement = r#"
|
||||
INSERT INTO banking_stage_results_2.transaction_infos
|
||||
(transaction_id, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos)
|
||||
SELECT
|
||||
SELECT
|
||||
( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ),
|
||||
t.processed_slot,
|
||||
t.is_successful,
|
||||
|
@ -626,46 +564,38 @@ impl PostgresSession {
|
|||
t.cu_consumed,
|
||||
t.prioritization_fees,
|
||||
t.supp_infos
|
||||
FROM {} AS t
|
||||
FROM temp_new_txblock_data AS t
|
||||
ON CONFLICT DO NOTHING
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
"#;
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
let num_rows = self.client.execute(statement, &[]).await?;
|
||||
debug!(
|
||||
"inserted {} transactions for block into transaction_infos table in {}ms",
|
||||
num_rows,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn save_account_usage_in_block(&self, block_info: &BlockInfo) -> anyhow::Result<()> {
|
||||
let temp_table = self.get_new_temp_table();
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
"CREATE TEMP TABLE temp_new_account_usage_raw(
|
||||
account_key char(44),
|
||||
slot BIGINT,
|
||||
is_write_locked BOOL,
|
||||
total_cu_requested BIGINT,
|
||||
total_cu_consumed BIGINT,
|
||||
prioritization_fees_info text
|
||||
)",
|
||||
temp_table
|
||||
)
|
||||
)".to_string()
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
let statement = r#"
|
||||
COPY temp_new_account_usage_raw(
|
||||
account_key,
|
||||
slot,
|
||||
is_write_locked,
|
||||
|
@ -673,9 +603,7 @@ impl PostgresSession {
|
|||
total_cu_consumed,
|
||||
prioritization_fees_info
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
"#.to_string();
|
||||
let started_at = Instant::now();
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
|
@ -726,8 +654,7 @@ impl PostgresSession {
|
|||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
let statement = r#"
|
||||
INSERT INTO banking_stage_results_2.accounts_map_blocks
|
||||
( acc_id,
|
||||
slot,
|
||||
|
@ -736,7 +663,7 @@ impl PostgresSession {
|
|||
total_cu_consumed,
|
||||
prioritization_fees_info
|
||||
)
|
||||
SELECT
|
||||
SELECT
|
||||
( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ),
|
||||
t.slot,
|
||||
t.is_write_locked,
|
||||
|
@ -749,7 +676,7 @@ impl PostgresSession {
|
|||
is_write_locked,
|
||||
total_cu_requested,
|
||||
total_cu_consumed,
|
||||
prioritization_fees_info from {}
|
||||
prioritization_fees_info from temp_new_account_usage_raw
|
||||
)
|
||||
as t (account_key,
|
||||
slot,
|
||||
|
@ -759,9 +686,7 @@ impl PostgresSession {
|
|||
prioritization_fees_info
|
||||
)
|
||||
ON CONFLICT DO NOTHING
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
"#.to_string();
|
||||
let started_at = Instant::now();
|
||||
let num_rows = self.client.execute(statement.as_str(), &[]).await?;
|
||||
debug!(
|
||||
|
@ -770,7 +695,6 @@ impl PostgresSession {
|
|||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1279,6 +1203,7 @@ impl Postgres {
|
|||
Some(block) => {
|
||||
let slot = block.slot;
|
||||
let instant = Instant::now();
|
||||
session.clear_session().await;
|
||||
match session.save_block(block).await {
|
||||
Ok(_) => {
|
||||
info!(
|
||||
|
@ -1317,6 +1242,7 @@ impl Postgres {
|
|||
}
|
||||
|
||||
if !txs_to_store.is_empty() {
|
||||
session.clear_session().await;
|
||||
debug!("saving transaction infos for {}", txs_to_store.len());
|
||||
let batches = txs_to_store
|
||||
.iter()
|
||||
|
|
Loading…
Reference in New Issue