diff --git a/migration.sql b/migration.sql index a7e3924..b61cfdd 100644 --- a/migration.sql +++ b/migration.sql @@ -151,3 +151,20 @@ BEGIN RETURN tmplist[(len + 1 - n_limit):]; END $$ LANGUAGE plpgsql IMMUTABLE CALLED ON NULL INPUT; + + +DROP FUNCTION array_dedup_append; + +-- select banking_stage_results_2.array_prepend_and_truncate('{8,3,2,1}', '{5,3}', 3); -- 5,3,8 +CREATE OR REPLACE FUNCTION banking_stage_results_2.array_prepend_and_truncate(base bigint[], append bigint[], n_limit int) + RETURNS bigint[] +AS $$ +DECLARE + tmplist bigint[]; + len int; +BEGIN + tmplist := append || base; + len := CARDINALITY(tmplist); + RETURN tmplist[:n_limit]; +END +$$ LANGUAGE plpgsql IMMUTABLE CALLED ON NULL INPUT; diff --git a/src/postgres.rs b/src/postgres.rs index c76a093..bd7a8f9 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -32,7 +32,8 @@ use crate::{ }; const BLOCK_WRITE_BUFFER_SIZE: usize = 5; -const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 100; +// note: need to add some a bit more transactions in case of duplicates +const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 1000 + 100; lazy_static::lazy_static! { static ref ACCOUNTS_SAVING_QUEUE: IntGauge = @@ -441,7 +442,8 @@ impl PostgresSession { &[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL], ); pin_mut!(writer); - for acc_tx in accounts_for_transaction { + // note: latest transaction must land first in the array + for acc_tx in accounts_for_transaction.iter().rev() { for acc in &acc_tx.accounts { let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4); args.push(&acc.key); @@ -501,7 +503,7 @@ impl PostgresSession { ) SELECT acc_id, - array_dedup_append( + banking_stage_results_2.array_prepend_and_truncate( (SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id), amt_new.tx_agged, {limit}) AS tx_ids_agg