diff --git a/src/postgres.rs b/src/postgres.rs index bd7a8f9..933b538 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -170,6 +170,14 @@ impl PostgresSession { Ok(client) } + pub async fn clear_session(&self) { + self.client + .execute("DISCARD ALL", &[]) + .await + .unwrap(); + debug!("Clear postgres session"); + } + pub async fn configure_work_mem(&self) { self.client .execute("SET work_mem TO '256MB'", &[]) @@ -409,34 +417,25 @@ impl PostgresSession { ) -> anyhow::Result<()> { let instant = Instant::now(); - let temp_table = self.get_new_temp_table(); self.client .execute( - format!( - "CREATE TEMP TABLE {}( - account_key char(44), - signature char(88), - is_writable BOOL, - is_signer BOOL, - is_atl BOOL - );", - temp_table - ) - .as_str(), + r#"CREATE TEMP TABLE temp_new_amt_data_raw ( + account_key char(44), + signature char(88), + is_writable BOOL, + is_signer BOOL, + is_atl BOOL + )"#, &[], ) .await?; - let statement = format!( - r#" - COPY {}( + let statement = + r#"COPY temp_new_amt_data_raw( account_key, signature, is_writable, is_signer, is_atl - ) FROM STDIN BINARY - "#, - temp_table - ); + ) FROM STDIN BINARY"#; let started_at = Instant::now(); - let sink: CopyInSink = self.copy_in(statement.as_str()).await?; + let sink: CopyInSink = self.copy_in(statement).await?; let writer = BinaryCopyInWriter::new( sink, &[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL], @@ -461,23 +460,33 @@ impl PostgresSession { started_at.elapsed().as_millis() ); - // merge data from temp table into accounts_map_transaction - let statement = format!( - r#" - INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl) - SELECT + let started_at = Instant::now(); + let statement = r#" + CREATE TEMP TABLE temp_new_amt_data AS + SELECT ( select acc_id from banking_stage_results_2.accounts where account_key = new_amt_data.account_key ), ( select transaction_id from banking_stage_results_2.transactions where signature = new_amt_data.signature ), new_amt_data.is_writable, new_amt_data.is_signer, new_amt_data.is_atl - FROM {} AS new_amt_data - ON CONFLICT DO NOTHING - "#, - temp_table + FROM temp_new_amt_data_raw AS new_amt_data + "#; + let rows = self.client.execute(statement, &[]).await?; + debug!( + "resolve {} account_keys+signatures into new_amt_data in {}ms", + rows, + started_at.elapsed().as_millis() ); + + // merge data from temp table into accounts_map_transaction + let statement = + r#" + INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl) + SELECT * FROM temp_new_amt_data + ON CONFLICT DO NOTHING + "#; let started_at = Instant::now(); - let rows = self.client.execute(statement.as_str(), &[]).await?; + let rows = self.client.execute(statement, &[]).await?; debug!( "inserted {} accounts into accounts_map_transaction in {}ms", rows, @@ -486,31 +495,20 @@ impl PostgresSession { TIME_TO_STORE_TX_ACCOUNT_OLD.set(instant.elapsed().as_millis() as i64); let instant = Instant::now(); - // merge data from temp table into accounts_map_transaction_latest - // note: query uses the array_dedup_append postgres function to deduplicate and limit the array size - // example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3} - let temp_table_latest_agged = self.get_new_temp_table(); let statement = format!( r#" - CREATE TEMP TABLE {temp_table_name} AS - WITH amt_new AS ( - SELECT - acc_id, array_agg(transactions.transaction_id) AS tx_agged - FROM {temp_table_newdata} AS newdata - inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key - inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature - GROUP BY acc_id - ) - SELECT + INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids) + SELECT acc_id, banking_stage_results_2.array_prepend_and_truncate( - (SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id), - amt_new.tx_agged, - {limit}) AS tx_ids_agg - FROM amt_new + (SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest prev_latest WHERE prev_latest.acc_id=newdata.acc_id), + array_agg(newdata.transaction_id), + {limit} + ) AS tx_ids + FROM temp_new_amt_data as newdata + GROUP BY acc_id + ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids "#, - temp_table_newdata = temp_table, - temp_table_name = temp_table_latest_agged, limit = LIMIT_LATEST_TXS_PER_ACCOUNT ); let started_at = Instant::now(); @@ -520,25 +518,7 @@ impl PostgresSession { num_rows, started_at.elapsed().as_millis() ); - - let statement = format!( - r#" - INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids) - SELECT acc_id, tx_ids_agg FROM {temp_table_name} - ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids - "#, - temp_table_name = temp_table_latest_agged - ); - let started_at = Instant::now(); - let num_rows = self.client.execute(statement.as_str(), &[]).await?; - debug!( - "upserted {} merged transaction arrays into accounts_map_transaction_latest in {}ms", - num_rows, - started_at.elapsed().as_millis() - ); TIME_TO_STORE_TX_ACCOUNT_NEW.set(instant.elapsed().as_millis() as i64); - self.drop_temp_table(temp_table_latest_agged).await?; - self.drop_temp_table(temp_table).await?; Ok(()) } @@ -1279,6 +1259,7 @@ impl Postgres { Some(block) => { let slot = block.slot; let instant = Instant::now(); + session.clear_session().await; match session.save_block(block).await { Ok(_) => { info!( @@ -1317,6 +1298,7 @@ impl Postgres { } if !txs_to_store.is_empty() { + session.clear_session().await; debug!("saving transaction infos for {}", txs_to_store.len()); let batches = txs_to_store .iter()