build amt_latest tx array with latest at leftmost position (#51)
This commit is contained in:
parent
65b27ce930
commit
a981ff4b2b
|
@ -151,3 +151,20 @@ BEGIN
|
||||||
RETURN tmplist[(len + 1 - n_limit):];
|
RETURN tmplist[(len + 1 - n_limit):];
|
||||||
END
|
END
|
||||||
$$ LANGUAGE plpgsql IMMUTABLE CALLED ON NULL INPUT;
|
$$ LANGUAGE plpgsql IMMUTABLE CALLED ON NULL INPUT;
|
||||||
|
|
||||||
|
|
||||||
|
DROP FUNCTION array_dedup_append;
|
||||||
|
|
||||||
|
-- select banking_stage_results_2.array_prepend_and_truncate('{8,3,2,1}', '{5,3}', 3); -- 5,3,8
|
||||||
|
CREATE OR REPLACE FUNCTION banking_stage_results_2.array_prepend_and_truncate(base bigint[], append bigint[], n_limit int)
|
||||||
|
RETURNS bigint[]
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
tmplist bigint[];
|
||||||
|
len int;
|
||||||
|
BEGIN
|
||||||
|
tmplist := append || base;
|
||||||
|
len := CARDINALITY(tmplist);
|
||||||
|
RETURN tmplist[:n_limit];
|
||||||
|
END
|
||||||
|
$$ LANGUAGE plpgsql IMMUTABLE CALLED ON NULL INPUT;
|
||||||
|
|
|
@ -32,7 +32,8 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
const BLOCK_WRITE_BUFFER_SIZE: usize = 5;
|
const BLOCK_WRITE_BUFFER_SIZE: usize = 5;
|
||||||
const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 100;
|
// note: need to add some a bit more transactions in case of duplicates
|
||||||
|
const LIMIT_LATEST_TXS_PER_ACCOUNT: i64 = 1000 + 100;
|
||||||
|
|
||||||
lazy_static::lazy_static! {
|
lazy_static::lazy_static! {
|
||||||
static ref ACCOUNTS_SAVING_QUEUE: IntGauge =
|
static ref ACCOUNTS_SAVING_QUEUE: IntGauge =
|
||||||
|
@ -441,7 +442,8 @@ impl PostgresSession {
|
||||||
&[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL],
|
&[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL, Type::BOOL],
|
||||||
);
|
);
|
||||||
pin_mut!(writer);
|
pin_mut!(writer);
|
||||||
for acc_tx in accounts_for_transaction {
|
// note: latest transaction must land first in the array
|
||||||
|
for acc_tx in accounts_for_transaction.iter().rev() {
|
||||||
for acc in &acc_tx.accounts {
|
for acc in &acc_tx.accounts {
|
||||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4);
|
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4);
|
||||||
args.push(&acc.key);
|
args.push(&acc.key);
|
||||||
|
@ -501,7 +503,7 @@ impl PostgresSession {
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
acc_id,
|
acc_id,
|
||||||
array_dedup_append(
|
banking_stage_results_2.array_prepend_and_truncate(
|
||||||
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id),
|
(SELECT tx_ids FROM banking_stage_results_2.accounts_map_transaction_latest WHERE acc_id=amt_new.acc_id),
|
||||||
amt_new.tx_agged,
|
amt_new.tx_agged,
|
||||||
{limit}) AS tx_ids_agg
|
{limit}) AS tx_ids_agg
|
||||||
|
|
Loading…
Reference in New Issue