diff --git a/src/postgres.rs b/src/postgres.rs index f615111..bd7a8f9 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -170,22 +170,6 @@ impl PostgresSession { Ok(client) } - pub async fn clear_session(&self) { - // see https://www.postgresql.org/docs/current/sql-discard.html - // CLOSE ALL -> drop potental cursors - // RESET ALL -> we do not want (would reset work_mem) - // DEALLOCATE -> would drop prepared statements which we do not use ATM - // DISCARD PLANS -> we want to keep the plans - // DISCARD SEQUENCES -> we want to keep the sequences - self.client - .batch_execute(r#" - DISCARD TEMP; - CLOSE ALL;"#) - .await - .unwrap(); - debug!("Clear postgres session"); - } - pub async fn configure_work_mem(&self) { self.client .execute("SET work_mem TO '256MB'", &[]) @@ -208,17 +192,32 @@ impl PostgresSession { } pub async fn create_transaction_ids(&self, signatures: HashSet) -> anyhow::Result<()> { - let statement = r#" - CREATE TEMP TABLE temp_new_transactions_raw( - signature char(88) - )"#; - self.client.execute(statement, &[]).await?; + // create temp table + let temp_table = self.get_new_temp_table(); - let statement = r#" - COPY temp_new_transactions_raw( + self.client + .execute( + format!( + r#" + CREATE TEMP TABLE {}( + signature char(88) + ); + "#, + temp_table + ) + .as_str(), + &[], + ) + .await?; + + let statement = format!( + r#" + COPY {}( signature ) FROM STDIN BINARY - "#.to_string(); + "#, + temp_table + ); let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]); @@ -233,18 +232,22 @@ impl PostgresSession { started_at.elapsed().as_millis() ); - let statement = r#" - INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature FROM temp_new_transactions_raw - ON CONFLICT DO NOTHING - "#; + let statement = format!( + r#" + INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature from {} + ON CONFLICT DO NOTHING + "#, + temp_table + ); let started_at = Instant::now(); - let num_rows = self.client.execute(statement, &[]).await?; + let num_rows = self.client.execute(statement.as_str(), &[]).await?; debug!( "inserted {} signatures in transactions table in {}ms", num_rows, started_at.elapsed().as_millis() ); + self.drop_temp_table(temp_table).await?; Ok(()) } @@ -252,16 +255,30 @@ impl PostgresSession { &self, accounts: HashSet, ) -> anyhow::Result<()> { - let statement = "CREATE TEMP TABLE temp_new_accounts_raw( - key TEXT - )"; - self.client.execute(statement, &[]).await?; + // create temp table + let temp_table = self.get_new_temp_table(); - let statement = r#" - COPY temp_new_accounts_raw( + self.client + .execute( + format!( + "CREATE TEMP TABLE {}( + key TEXT + );", + temp_table + ) + .as_str(), + &[], + ) + .await?; + + let statement = format!( + r#" + COPY {}( key ) FROM STDIN BINARY - "#.to_string(); + "#, + temp_table + ); let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]); @@ -276,18 +293,22 @@ impl PostgresSession { started_at.elapsed().as_millis() ); - let statement = r#" - INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key FROM temp_new_accounts_raw - ON CONFLICT DO NOTHING - "#; + let statement = format!( + r#" + INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {} + ON CONFLICT DO NOTHING + "#, + temp_table + ); let started_at = Instant::now(); - self.client.execute(statement, &[]).await?; + self.client.execute(statement.as_str(), &[]).await?; debug!( "inserted {} account keys into accounts table in {}ms", num_rows, started_at.elapsed().as_millis() ); + self.drop_temp_table(temp_table).await?; Ok(()) } @@ -388,25 +409,34 @@ impl PostgresSession { ) -> anyhow::Result<()> { let instant = Instant::now(); + let temp_table = self.get_new_temp_table(); self.client .execute( - r#"CREATE TEMP TABLE temp_new_amt_data_raw ( - account_key char(44), - signature char(88), - is_writable BOOL, - is_signer BOOL, - is_atl BOOL - )"#, + format!( + "CREATE TEMP TABLE {}( + account_key char(44), + signature char(88), + is_writable BOOL, + is_signer BOOL, + is_atl BOOL + );", + temp_table + ) + .as_str(), &[], ) .await?; - let statement = - r#"COPY temp_new_amt_data_raw( + let statement = format!( + r#" + COPY {}( account_key, signature, is_writable, is_signer, is_atl - ) FROM STDIN BINARY"#; + ) FROM STDIN BINARY + "#, + temp_table + ); let started_at = Instant::now(); - let sink: CopyInSink = self.copy_in(statement).await?; + let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new( sink, &[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL], @@ -431,33 +461,23 @@ impl PostgresSession { started_at.elapsed().as_millis() ); - let started_at = Instant::now(); - let statement = r#" - CREATE TEMP TABLE temp_new_amt_data AS - SELECT + // merge data from temp table into accounts_map_transaction + let statement = format!( + r#" + INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl) + SELECT ( select acc_id from banking_stage_results_2.accounts where account_key = new_amt_data.account_key ), ( select transaction_id from banking_stage_results_2.transactions where signature = new_amt_data.signature ), new_amt_data.is_writable, new_amt_data.is_signer, new_amt_data.is_atl - FROM temp_new_amt_data_raw AS new_amt_data - "#; - let rows = self.client.execute(statement, &[]).await?; - debug!( - "resolve {} account_keys+signatures into new_amt_data in {}ms", - rows, - started_at.elapsed().as_millis() - ); - - // merge data from temp table into accounts_map_transaction - let statement = - r#" - INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl) - SELECT * FROM temp_new_amt_data + FROM {} AS new_amt_data ON CONFLICT DO NOTHING - "#; + "#, + temp_table + ); let started_at = Instant::now(); - let rows = self.client.execute(statement, &[]).await?; + let rows = self.client.execute(statement.as_str(), &[]).await?; debug!( "inserted {} accounts into accounts_map_transaction in {}ms", rows, @@ -466,20 +486,31 @@ impl PostgresSession { TIME_TO_STORE_TX_ACCOUNT_OLD.set(instant.elapsed().as_millis() as i64); let instant = Instant::now(); + // merge data from temp table into accounts_map_transaction_latest + // note: query uses the array_dedup_append postgres function to deduplicate and limit the array size + // example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3} + let temp_table_latest_agged = self.get_new_temp_table(); let statement = format!( r#" - INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids) - SELECT + CREATE TEMP TABLE {temp_table_name} AS + WITH amt_new AS ( + SELECT + acc_id, array_agg(transactions.transaction_id) AS tx_agged + FROM {temp_table_newdata} AS newdata + inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key + inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature + GROUP BY acc_id + ) + SELECT acc_id, banking_stage_results_2.array_prepend_and_truncate( - (SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest prev_latest WHERE prev_latest.acc_id=newdata.acc_id), - array_agg(newdata.transaction_id), - {limit} - ) AS tx_ids - FROM temp_new_amt_data as newdata - GROUP BY acc_id - ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids + (SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id), + amt_new.tx_agged, + {limit}) AS tx_ids_agg + FROM amt_new "#, + temp_table_newdata = temp_table, + temp_table_name = temp_table_latest_agged, limit = LIMIT_LATEST_TXS_PER_ACCOUNT ); let started_at = Instant::now(); @@ -489,7 +520,25 @@ impl PostgresSession { num_rows, started_at.elapsed().as_millis() ); + + let statement = format!( + r#" + INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids) + SELECT acc_id, tx_ids_agg FROM {temp_table_name} + ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids + "#, + temp_table_name = temp_table_latest_agged + ); + let started_at = Instant::now(); + let num_rows = self.client.execute(statement.as_str(), &[]).await?; + debug!( + "upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms", + num_rows, + started_at.elapsed().as_millis() + ); TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64); + self.drop_temp_table(temp_table_latest_agged).await?; + self.drop_temp_table(temp_table).await?; Ok(()) } @@ -498,7 +547,11 @@ impl PostgresSession { transactions: &Vec, slot: i64, ) -> anyhow::Result<()> { - let statement = "CREATE TEMP TABLE temp_new_txblock_data( + let temp_table = self.get_new_temp_table(); + self.client + .execute( + format!( + "CREATE TEMP TABLE {}( signature char(88), processed_slot BIGINT, is_successful BOOL, @@ -506,11 +559,17 @@ impl PostgresSession { cu_consumed BIGINT, prioritization_fees BIGINT, supp_infos text - )"; - self.client.execute(statement, &[]).await?; + )", + temp_table + ) + .as_str(), + &[], + ) + .await?; - let statement = r#" - COPY temp_new_txblock_data( + let statement = format!( + r#" + COPY {}( signature, processed_slot, is_successful, @@ -519,9 +578,11 @@ impl PostgresSession { prioritization_fees, supp_infos ) FROM STDIN BINARY - "#; + "#, + temp_table + ); let started_at = Instant::now(); - let sink: CopyInSink = self.copy_in(statement).await?; + let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new( sink, &[ @@ -553,10 +614,11 @@ impl PostgresSession { started_at.elapsed().as_millis() ); - let statement = r#" + let statement = format!( + r#" INSERT INTO banking_stage_results_2.transaction_infos (transaction_id, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos) - SELECT + SELECT ( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ), t.processed_slot, t.is_successful, @@ -564,38 +626,46 @@ impl PostgresSession { t.cu_consumed, t.prioritization_fees, t.supp_infos - FROM temp_new_txblock_data AS t + FROM {} AS t ON CONFLICT DO NOTHING - "#; + "#, + temp_table + ); let started_at = Instant::now(); - let num_rows = self.client.execute(statement, &[]).await?; + let num_rows = self.client.execute(statement.as_str(), &[]).await?; debug!( "inserted {} transactions for block into transaction_infos table in {}ms", num_rows, started_at.elapsed().as_millis() ); + self.drop_temp_table(temp_table).await?; Ok(()) } pub async fn save_account_usage_in_block(&self, block_info: &BlockInfo) -> anyhow::Result<()> { + let temp_table = self.get_new_temp_table(); self.client .execute( - "CREATE TEMP TABLE temp_new_account_usage_raw( + format!( + "CREATE TEMP TABLE {}( account_key char(44), slot BIGINT, is_write_locked BOOL, total_cu_requested BIGINT, total_cu_consumed BIGINT, prioritization_fees_info text - )".to_string() + )", + temp_table + ) .as_str(), &[], ) .await?; - let statement = r#" - COPY temp_new_account_usage_raw( + let statement = format!( + r#" + COPY {}( account_key, slot, is_write_locked, @@ -603,7 +673,9 @@ impl PostgresSession { total_cu_consumed, prioritization_fees_info ) FROM STDIN BINARY - "#.to_string(); + "#, + temp_table + ); let started_at = Instant::now(); let sink: CopyInSink = self.copy_in(statement.as_str()).await?; let writer = BinaryCopyInWriter::new( @@ -654,7 +726,8 @@ impl PostgresSession { started_at.elapsed().as_millis() ); - let statement = r#" + let statement = format!( + r#" INSERT INTO banking_stage_results_2.accounts_map_blocks ( acc_id, slot, @@ -663,7 +736,7 @@ impl PostgresSession { total_cu_consumed, prioritization_fees_info ) - SELECT + SELECT ( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ), t.slot, t.is_write_locked, @@ -676,7 +749,7 @@ impl PostgresSession { is_write_locked, total_cu_requested, total_cu_consumed, - prioritization_fees_info from temp_new_account_usage_raw + prioritization_fees_info from {} ) as t (account_key, slot, @@ -686,7 +759,9 @@ impl PostgresSession { prioritization_fees_info ) ON CONFLICT DO NOTHING - "#.to_string(); + "#, + temp_table + ); let started_at = Instant::now(); let num_rows = self.client.execute(statement.as_str(), &[]).await?; debug!( @@ -695,6 +770,7 @@ impl PostgresSession { started_at.elapsed().as_millis() ); + self.drop_temp_table(temp_table).await?; Ok(()) } @@ -1203,7 +1279,6 @@ impl Postgres { Some(block) => { let slot = block.slot; let instant = Instant::now(); - session.clear_session().await; match session.save_block(block).await { Ok(_) => { info!( @@ -1242,7 +1317,6 @@ impl Postgres { } if !txs_to_store.is_empty() { - session.clear_session().await; debug!("saving transaction infos for {}", txs_to_store.len()); let batches = txs_to_store .iter()