clippy+fmt
This commit is contained in:
parent
c74d9ab3e5
commit
875cf7c722
|
@ -1,12 +1,14 @@
|
|||
use clap::Parser;
|
||||
use grpc_banking_transactions_notifications::postgres;
|
||||
use grpc_banking_transactions_notifications::postgres::PostgresSession;
|
||||
|
||||
#[derive(Parser, Debug, Clone)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
pub struct Args {
|
||||
|
||||
#[arg(short, long, help = "num of slots to keep relative to highest slot in blocks table")]
|
||||
#[arg(
|
||||
short,
|
||||
long,
|
||||
help = "num of slots to keep relative to highest slot in blocks table"
|
||||
)]
|
||||
pub num_slots_to_keep: i64,
|
||||
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
|
@ -18,11 +20,11 @@ async fn main() {
|
|||
tracing_subscriber::fmt::init();
|
||||
|
||||
let Args {
|
||||
num_slots_to_keep, dry_run
|
||||
num_slots_to_keep,
|
||||
dry_run,
|
||||
} = Args::parse();
|
||||
|
||||
let session = PostgresSession::new().await.unwrap();
|
||||
|
||||
session.cleanup_old_data(num_slots_to_keep, dry_run).await;
|
||||
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use crate::alt_store::ALTStore;
|
||||
use itertools::Itertools;
|
||||
use serde::Serialize;
|
||||
use solana_sdk::{
|
||||
|
@ -14,8 +15,6 @@ use solana_sdk::{
|
|||
};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use crate::alt_store::ALTStore;
|
||||
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
pub struct PrioFeeData {
|
||||
pub max: Option<u64>,
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
pub mod postgres;
|
||||
mod alt_store;
|
||||
pub mod block_info;
|
||||
pub mod postgres;
|
||||
pub mod transaction_info;
|
||||
|
||||
|
||||
|
|
147
src/postgres.rs
147
src/postgres.rs
|
@ -1,16 +1,15 @@
|
|||
use std::time::Instant;
|
||||
use std::{
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use std::cmp::min;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Context;
|
||||
use base64::Engine;
|
||||
use dashmap::DashMap;
|
||||
use futures::pin_mut;
|
||||
use itertools::Itertools;
|
||||
use log::{debug, error, info, Level, log, warn};
|
||||
use log::{debug, error, info, log, warn, Level};
|
||||
use native_tls::{Certificate, Identity, TlsConnector};
|
||||
use postgres_native_tls::MakeTlsConnector;
|
||||
use serde::Serialize;
|
||||
|
@ -615,13 +614,12 @@ impl PostgresSession {
|
|||
// create account ids
|
||||
let accounts = txs
|
||||
.iter()
|
||||
.map(|transaction| transaction.account_used.clone())
|
||||
.flatten()
|
||||
.flat_map(|transaction| transaction.account_used.clone())
|
||||
.map(|(acc, _)| acc)
|
||||
.collect_vec();
|
||||
self.create_accounts_for_transaction(accounts).await?;
|
||||
// add transaction in tx slot table
|
||||
self.insert_transaction_in_txslot_table(&txs.as_slice())
|
||||
self.insert_transaction_in_txslot_table(txs.as_slice())
|
||||
.await?;
|
||||
let txs_accounts = txs
|
||||
.iter()
|
||||
|
@ -700,7 +698,6 @@ impl PostgresSession {
|
|||
}
|
||||
|
||||
impl PostgresSession {
|
||||
|
||||
// "errors" -> lookup data
|
||||
// "blocks" -> keep, it its small
|
||||
// "accounts" -> keep, table is used for pkey lookup
|
||||
|
@ -711,12 +708,22 @@ impl PostgresSession {
|
|||
// "transaction_slot" -> delete transaction with slot before X
|
||||
pub async fn cleanup_old_data(&self, slots_to_keep: i64, dry_run: bool) {
|
||||
// keep 1mio slots (apprx 4 days)
|
||||
info!("{}Running cleanup job with slots_to_keep={}",
|
||||
info!(
|
||||
"{}Running cleanup job with slots_to_keep={}",
|
||||
if dry_run { "DRY-RUN: " } else { "" },
|
||||
slots_to_keep);
|
||||
slots_to_keep
|
||||
);
|
||||
|
||||
self.client.execute("SET work_mem TO '256MB'", &[]).await.unwrap();
|
||||
let work_mem: String = self.client.query_one("show work_mem", &[]).await.unwrap().get("work_mem");
|
||||
self.client
|
||||
.execute("SET work_mem TO '256MB'", &[])
|
||||
.await
|
||||
.unwrap();
|
||||
let work_mem: String = self
|
||||
.client
|
||||
.query_one("show work_mem", &[])
|
||||
.await
|
||||
.unwrap()
|
||||
.get("work_mem");
|
||||
info!("Configured work_mem={}", work_mem);
|
||||
|
||||
{
|
||||
|
@ -725,21 +732,30 @@ impl PostgresSession {
|
|||
self.log_rowcount(Level::Info, "accounts").await;
|
||||
self.log_rowcount(Level::Info, "transactions").await;
|
||||
self.log_rowcount(Level::Info, "accounts_map_blocks").await;
|
||||
self.log_rowcount(Level::Info, "accounts_map_transaction").await;
|
||||
self.log_rowcount(Level::Info, "accounts_map_transaction")
|
||||
.await;
|
||||
self.log_rowcount(Level::Info, "transaction_infos").await;
|
||||
self.log_rowcount(Level::Info, "transaction_slot").await;
|
||||
}
|
||||
|
||||
// max slot from blocks table
|
||||
let latest_slot = self.client.query_one(
|
||||
"SELECT max(slot) as latest_slot FROM banking_stage_results_2.blocks", &[])
|
||||
.await.unwrap();
|
||||
let latest_slot = self
|
||||
.client
|
||||
.query_one(
|
||||
"SELECT max(slot) as latest_slot FROM banking_stage_results_2.blocks",
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
// assume not null
|
||||
let latest_slot: i64 = latest_slot.get("latest_slot");
|
||||
|
||||
// do not delete cutoff_slot
|
||||
let cutoff_slot_excl = latest_slot - slots_to_keep;
|
||||
info!("latest_slot={} from blocks table; keeping {} slots - i.e. slot {}", latest_slot, slots_to_keep, cutoff_slot_excl);
|
||||
info!(
|
||||
"latest_slot={} from blocks table; keeping {} slots - i.e. slot {}",
|
||||
latest_slot, slots_to_keep, cutoff_slot_excl
|
||||
);
|
||||
|
||||
let cutoff_transaction_incl: i64 = {
|
||||
let cutoff_transaction_from_txi_incl = self.client.query_one(
|
||||
|
@ -751,7 +767,8 @@ impl PostgresSession {
|
|||
cutoff_slot = cutoff_slot_excl
|
||||
),
|
||||
&[]).await.unwrap();
|
||||
let cutoff_transaction_from_txi_incl: Option<i64> = cutoff_transaction_from_txi_incl.get("transaction_id");
|
||||
let cutoff_transaction_from_txi_incl: Option<i64> =
|
||||
cutoff_transaction_from_txi_incl.get("transaction_id");
|
||||
|
||||
let cutoff_transaction_from_txslot_incl = self.client.query_one(
|
||||
&format!(
|
||||
|
@ -762,16 +779,25 @@ impl PostgresSession {
|
|||
cutoff_slot = cutoff_slot_excl
|
||||
),
|
||||
&[]).await.unwrap();
|
||||
let cutoff_transaction_from_txslot_incl: Option<i64> = cutoff_transaction_from_txslot_incl.get("transaction_id");
|
||||
let cutoff_transaction_from_txslot_incl: Option<i64> =
|
||||
cutoff_transaction_from_txslot_incl.get("transaction_id");
|
||||
|
||||
debug!("cutoff_transaction_from_txi_incl: {:?}", cutoff_transaction_from_txi_incl);
|
||||
debug!("cutoff_transaction_from_txslot_incl: {:?}", cutoff_transaction_from_txslot_incl);
|
||||
debug!(
|
||||
"cutoff_transaction_from_txi_incl: {:?}",
|
||||
cutoff_transaction_from_txi_incl
|
||||
);
|
||||
debug!(
|
||||
"cutoff_transaction_from_txslot_incl: {:?}",
|
||||
cutoff_transaction_from_txslot_incl
|
||||
);
|
||||
|
||||
let min_transaction_id: Option<i64> =
|
||||
vec![cutoff_transaction_from_txi_incl, cutoff_transaction_from_txslot_incl]
|
||||
.into_iter()
|
||||
.filter_map(|x| x)
|
||||
.min();
|
||||
let min_transaction_id: Option<i64> = vec![
|
||||
cutoff_transaction_from_txi_incl,
|
||||
cutoff_transaction_from_txslot_incl,
|
||||
]
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.min();
|
||||
|
||||
if min_transaction_id.is_none() {
|
||||
info!("nothing to delete - abort");
|
||||
|
@ -781,8 +807,14 @@ impl PostgresSession {
|
|||
min_transaction_id.unwrap()
|
||||
};
|
||||
|
||||
info!("delete slots but keep slots including and after {}", cutoff_slot_excl);
|
||||
info!("should delete transactions with id =< {}", cutoff_transaction_incl);
|
||||
info!(
|
||||
"delete slots but keep slots including and after {}",
|
||||
cutoff_slot_excl
|
||||
);
|
||||
info!(
|
||||
"should delete transactions with id =< {}",
|
||||
cutoff_transaction_incl
|
||||
);
|
||||
|
||||
// delete accounts_map_transaction
|
||||
{
|
||||
|
@ -862,7 +894,11 @@ impl PostgresSession {
|
|||
DELETE FROM banking_stage_results_2.transactions WHERE transaction_id <= {transaction_id}
|
||||
", transaction_id = cutoff_transaction_incl
|
||||
), &[]).await.unwrap();
|
||||
info!("Deleted {} rows from transactions in {:.2}ms", deleted_rows, started.elapsed().as_secs_f32());
|
||||
info!(
|
||||
"Deleted {} rows from transactions in {:.2}ms",
|
||||
deleted_rows,
|
||||
started.elapsed().as_secs_f32()
|
||||
);
|
||||
}
|
||||
{
|
||||
let started = Instant::now();
|
||||
|
@ -872,7 +908,11 @@ impl PostgresSession {
|
|||
DELETE FROM banking_stage_results_2.accounts_map_transaction WHERE transaction_id <= {transaction_id}
|
||||
", transaction_id = cutoff_transaction_incl
|
||||
), &[]).await.unwrap();
|
||||
info!("Deleted {} rows from accounts_map_transaction in {:.2}ms", deleted_rows, started.elapsed().as_secs_f32());
|
||||
info!(
|
||||
"Deleted {} rows from accounts_map_transaction in {:.2}ms",
|
||||
deleted_rows,
|
||||
started.elapsed().as_secs_f32()
|
||||
);
|
||||
}
|
||||
{
|
||||
let started = Instant::now();
|
||||
|
@ -882,17 +922,32 @@ impl PostgresSession {
|
|||
DELETE FROM banking_stage_results_2.transaction_infos WHERE processed_slot < {cutoff_slot}
|
||||
", cutoff_slot = cutoff_slot_excl
|
||||
), &[]).await.unwrap();
|
||||
info!("Deleted {} rows from transaction_infos in {:.2}ms", deleted_rows, started.elapsed().as_secs_f32());
|
||||
info!(
|
||||
"Deleted {} rows from transaction_infos in {:.2}ms",
|
||||
deleted_rows,
|
||||
started.elapsed().as_secs_f32()
|
||||
);
|
||||
}
|
||||
{
|
||||
let started = Instant::now();
|
||||
let deleted_rows = self.client.execute(
|
||||
&format!(
|
||||
r"
|
||||
let deleted_rows = self
|
||||
.client
|
||||
.execute(
|
||||
&format!(
|
||||
r"
|
||||
DELETE FROM banking_stage_results_2.transaction_slot WHERE slot < {cutoff_slot}
|
||||
", cutoff_slot = cutoff_slot_excl
|
||||
), &[]).await.unwrap();
|
||||
info!("Deleted {} rows from transaction_slot in {:.2}ms", deleted_rows, started.elapsed().as_secs_f32());
|
||||
",
|
||||
cutoff_slot = cutoff_slot_excl
|
||||
),
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
info!(
|
||||
"Deleted {} rows from transaction_slot in {:.2}ms",
|
||||
deleted_rows,
|
||||
started.elapsed().as_secs_f32()
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -901,22 +956,30 @@ impl PostgresSession {
|
|||
self.log_rowcount(Level::Info, "accounts").await;
|
||||
self.log_rowcount(Level::Info, "transactions").await;
|
||||
self.log_rowcount(Level::Info, "accounts_map_blocks").await;
|
||||
self.log_rowcount(Level::Info, "accounts_map_transaction").await;
|
||||
self.log_rowcount(Level::Info, "accounts_map_transaction")
|
||||
.await;
|
||||
self.log_rowcount(Level::Info, "transaction_infos").await;
|
||||
self.log_rowcount(Level::Info, "transaction_slot").await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn log_rowcount(&self, level: Level, table: &str) {
|
||||
let count: i64 = self.client.query_one(
|
||||
&format!("SELECT count(*) as cnt FROM banking_stage_results_2.{tablename}", tablename = table), &[])
|
||||
.await.unwrap()
|
||||
async fn log_rowcount(&self, level: Level, table: &str) {
|
||||
let count: i64 = self
|
||||
.client
|
||||
.query_one(
|
||||
&format!(
|
||||
"SELECT count(*) as cnt FROM banking_stage_results_2.{tablename}",
|
||||
tablename = table
|
||||
),
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.get("cnt");
|
||||
log!(level, "- rows count in table <{}>: {}", table, count);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Postgres {
|
||||
session: Arc<PostgresSession>,
|
||||
|
|
Loading…
Reference in New Issue