add table with latest 1000 txs per acc_id (#38)

* wip

* use fancy postgres method

* cleanup

* fix logs

* format

* simpler function

* Revert "simpler function"

This reverts commit 57120b9ce8.

* fix sql function which did not deduplicate

* fix initial tx array being null
This commit is contained in:
Groovie | Mango 2024-01-09 15:12:03 +01:00 committed by GitHub
parent 77ae2a38a2
commit 5c3edc836f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 1 deletions

View File

@ -126,3 +126,29 @@ CLUSTER banking_stage_results_2.transactions using transactions_pkey;
CLUSTER banking_stage_results_2.accounts using accounts_pkey;
CREATE TABLE banking_stage_results_2.accounts_map_transaction_latest(
acc_id BIGINT PRIMARY KEY,
-- sorted: oldest to latest, max 1000
tx_ids BIGINT[]
);
CREATE OR REPLACE FUNCTION __array_reverse(anyarray) RETURNS anyarray AS $$
SELECT ARRAY(
SELECT $1[i]
FROM generate_subscripts($1,1) AS s(i)
ORDER BY i DESC
);
$$ LANGUAGE SQL STRICT IMMUTABLE;
CREATE OR REPLACE FUNCTION array_dedup_append(base bigint[], append bigint[], n_limit int)
RETURNS bigint[]
AS $$
SELECT __array_reverse(array_agg(val)) FROM (
SELECT val FROM (
SELECT DISTINCT ON (val) pos, val FROM unnest(__array_reverse(array_cat(base, append))) WITH ORDINALITY as t(val, pos)
) AS deduped
ORDER BY pos
LIMIT n_limit
) AS result
$$ LANGUAGE SQL STRICT IMMUTABLE;

View File

@ -27,6 +27,8 @@ use crate::{
transaction_info::TransactionInfo,
};
const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 1000;
pub struct TempTableTracker {
count: AtomicU64,
}
@ -298,6 +300,7 @@ impl PostgresSession {
Ok(())
}
/// add accounts for transaction to accounts_map_transaction table and update accounts_map_transaction_latest
pub async fn insert_accounts_for_transaction(
&self,
accounts_for_transaction: Vec<AccountsForTransaction>,
@ -348,6 +351,8 @@ impl PostgresSession {
}
writer.finish().await?;
// merge data from temp table into accounts_map_transaction
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer, is_atl)
@ -365,8 +370,53 @@ impl PostgresSession {
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
let rows = self.client.execute(statement.as_str(), &[]).await?;
debug!("inserted into accounts_map_transaction: {}", rows);
// merge data from temp table into accounts_map_transaction_latest
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
let temp_table_latest_agged = self.temp_table_tracker.get_new_temp_table();
let statement = format!(
r#"
CREATE TEMP TABLE {temp_table_name} AS
WITH amt_new AS (
SELECT
acc_id, array_agg(transactions.transaction_id) AS tx_agged
FROM {temp_table_newdata} AS newdata
inner join banking_stage_results_2.accounts on accounts.account_key=newdata.account_key
inner join banking_stage_results_2.transactions on transactions.signature=newdata.signature
GROUP BY acc_id
)
SELECT
acc_id,
array_dedup_append(
COALESCE((SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id ), array[]::bigint[]),
amt_new.tx_agged,
{limit}) AS tx_ids_agg
FROM amt_new
"#,
temp_table_newdata = temp_table,
temp_table_name = temp_table_latest_agged,
limit = LIMIT_LATEST_TXS_PER_ACCOUNT
);
let rows = self.client.execute(statement.as_str(), &[]).await?;
info!("inserted into {}: {}", temp_table_latest_agged, rows);
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction_latest(acc_id, tx_ids)
SELECT acc_id, tx_ids_agg FROM {temp_table_name}
ON CONFLICT (acc_id) DO UPDATE SET tx_ids = EXCLUDED.tx_ids;
"#,
temp_table_name = temp_table_latest_agged
);
let rows = self.client.execute(statement.as_str(), &[]).await?;
info!("upserted in accounts_map_transaction_latest: {}", rows);
self.drop_temp_table(temp_table_latest_agged).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}