Feature/cleanup job (#28)

cleanup job for database
This commit is contained in:
Groovie | Mango 2023-12-21 14:39:18 +01:00 committed by GitHub
parent 1076e22523
commit c74d9ab3e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 255 additions and 3 deletions

View File

@ -2,8 +2,7 @@
name = "grpc_banking_transactions_notifications"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
default-run = "grpc_banking_transactions_notifications"
[dependencies]
solana-sdk = "~1.16.17"

28
src/bin/cleanupdb.rs Normal file
View File

@ -0,0 +1,28 @@
use clap::Parser;
use grpc_banking_transactions_notifications::postgres;
use grpc_banking_transactions_notifications::postgres::PostgresSession;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, help = "num of slots to keep relative to highest slot in blocks table")]
pub num_slots_to_keep: i64,
#[arg(short, long, default_value_t = false)]
pub dry_run: bool,
}
#[tokio::main()]
async fn main() {
tracing_subscriber::fmt::init();
let Args {
num_slots_to_keep, dry_run
} = Args::parse();
let session = PostgresSession::new().await.unwrap();
session.cleanup_old_data(num_slots_to_keep, dry_run).await;
}

5
src/lib.rs Normal file
View File

@ -0,0 +1,5 @@
pub mod postgres;
pub mod block_info;
pub mod transaction_info;

View File

@ -2,13 +2,15 @@ use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use std::cmp::min;
use std::time::Instant;
use anyhow::Context;
use base64::Engine;
use dashmap::DashMap;
use futures::pin_mut;
use itertools::Itertools;
use log::{debug, error, info};
use log::{debug, error, info, Level, log, warn};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use serde::Serialize;
@ -697,6 +699,224 @@ impl PostgresSession {
}
}
impl PostgresSession {
// "errors" -> lookup data
// "blocks" -> keep, it its small
// "accounts" -> keep, table is used for pkey lookup
// "transactions" -> keep, table is used for pkey lookup
// "accounts_map_blocks" -> delete rows slot before X
// "accounts_map_transaction" -> delete rows with transaction_id before X
// "transaction_infos" -> delete rows processed_slot before X
// "transaction_slot" -> delete transaction with slot before X
pub async fn cleanup_old_data(&self, slots_to_keep: i64, dry_run: bool) {
// keep 1mio slots (apprx 4 days)
info!("{}Running cleanup job with slots_to_keep={}",
if dry_run { "DRY-RUN: " } else { "" },
slots_to_keep);
self.client.execute("SET work_mem TO '256MB'", &[]).await.unwrap();
let work_mem: String = self.client.query_one("show work_mem", &[]).await.unwrap().get("work_mem");
info!("Configured work_mem={}", work_mem);
{
info!("Rows before cleanup:");
self.log_rowcount(Level::Info, "blocks").await;
self.log_rowcount(Level::Info, "accounts").await;
self.log_rowcount(Level::Info, "transactions").await;
self.log_rowcount(Level::Info, "accounts_map_blocks").await;
self.log_rowcount(Level::Info, "accounts_map_transaction").await;
self.log_rowcount(Level::Info, "transaction_infos").await;
self.log_rowcount(Level::Info, "transaction_slot").await;
}
// max slot from blocks table
let latest_slot = self.client.query_one(
"SELECT max(slot) as latest_slot FROM banking_stage_results_2.blocks", &[])
.await.unwrap();
// assume not null
let latest_slot: i64 = latest_slot.get("latest_slot");
// do not delete cutoff_slot
let cutoff_slot_excl = latest_slot - slots_to_keep;
info!("latest_slot={} from blocks table; keeping {} slots - i.e. slot {}", latest_slot, slots_to_keep, cutoff_slot_excl);
let cutoff_transaction_incl: i64 = {
let cutoff_transaction_from_txi_incl = self.client.query_one(
&format!(
r"
SELECT max(transaction_id) as transaction_id FROM banking_stage_results_2.transaction_infos
WHERE processed_slot < {cutoff_slot}
",
cutoff_slot = cutoff_slot_excl
),
&[]).await.unwrap();
let cutoff_transaction_from_txi_incl: Option<i64> = cutoff_transaction_from_txi_incl.get("transaction_id");
let cutoff_transaction_from_txslot_incl = self.client.query_one(
&format!(
r"
SELECT max(transaction_id) as transaction_id FROM banking_stage_results_2.transaction_slot
WHERE slot < {cutoff_slot}
",
cutoff_slot = cutoff_slot_excl
),
&[]).await.unwrap();
let cutoff_transaction_from_txslot_incl: Option<i64> = cutoff_transaction_from_txslot_incl.get("transaction_id");
debug!("cutoff_transaction_from_txi_incl: {:?}", cutoff_transaction_from_txi_incl);
debug!("cutoff_transaction_from_txslot_incl: {:?}", cutoff_transaction_from_txslot_incl);
let min_transaction_id: Option<i64> =
vec![cutoff_transaction_from_txi_incl, cutoff_transaction_from_txslot_incl]
.into_iter()
.filter_map(|x| x)
.min();
if min_transaction_id.is_none() {
info!("nothing to delete - abort");
return;
}
min_transaction_id.unwrap()
};
info!("delete slots but keep slots including and after {}", cutoff_slot_excl);
info!("should delete transactions with id =< {}", cutoff_transaction_incl);
// delete accounts_map_transaction
{
let tx_to_delete = self.client.query_one(
&format!(
r"
SELECT count(*) as cnt_tx FROM banking_stage_results_2.accounts_map_transaction amt
WHERE amt.transaction_id <= {cutoff_transaction}
",
cutoff_transaction = cutoff_transaction_incl
),
&[]).await.unwrap();
let tx_to_delete: i64 = tx_to_delete.get("cnt_tx");
info!("would delete transactions: {}", tx_to_delete);
}
{
let amb_to_delete = self.client.query_one(
&format!(
r"
SELECT count(*) as cnt_ambs FROM banking_stage_results_2.accounts_map_blocks amb
WHERE amb.slot < {cutoff_slot}
",
cutoff_slot = cutoff_slot_excl
),
&[]).await.unwrap();
let amb_to_delete: i64 = amb_to_delete.get("cnt_ambs");
info!("would delete from accounts_map_blocks: {}", amb_to_delete);
}
{
let txi_to_delete = self.client.query_one(
&format!(
r"
SELECT count(*) as cnt_txis FROM banking_stage_results_2.transaction_infos txi
WHERE txi.processed_slot < {cutoff_slot}
",
cutoff_slot = cutoff_slot_excl
),
&[]).await.unwrap();
let txi_to_delete: i64 = txi_to_delete.get("cnt_txis");
info!("would delete from transaction_infos: {}", txi_to_delete);
}
{
let txslot_to_delete = self.client.query_one(
&format!(
r"
SELECT count(*) as cnt_txslots FROM banking_stage_results_2.transaction_slot tx_slot
WHERE tx_slot.slot < {cutoff_slot}
",
cutoff_slot = cutoff_slot_excl
),
&[]).await.unwrap();
let txslot_to_delete: i64 = txslot_to_delete.get("cnt_txslots");
info!("would delete from transaction_slot: {}", txslot_to_delete);
}
if dry_run {
warn!("dry-run: stop now without changing anything");
return;
}
{
let started = Instant::now();
let deleted_rows = self.client.execute(
&format!(
r"
DELETE FROM banking_stage_results_2.transactions WHERE transaction_id <= {transaction_id}
", transaction_id = cutoff_transaction_incl
), &[]).await.unwrap();
info!("Deleted {} rows from transactions in {:.2}ms", deleted_rows, started.elapsed().as_secs_f32());
}
{
let started = Instant::now();
let deleted_rows = self.client.execute(
&format!(
r"
DELETE FROM banking_stage_results_2.accounts_map_transaction WHERE transaction_id <= {transaction_id}
", transaction_id = cutoff_transaction_incl
), &[]).await.unwrap();
info!("Deleted {} rows from accounts_map_transaction in {:.2}ms", deleted_rows, started.elapsed().as_secs_f32());
}
{
let started = Instant::now();
let deleted_rows = self.client.execute(
&format!(
r"
DELETE FROM banking_stage_results_2.transaction_infos WHERE processed_slot < {cutoff_slot}
", cutoff_slot = cutoff_slot_excl
), &[]).await.unwrap();
info!("Deleted {} rows from transaction_infos in {:.2}ms", deleted_rows, started.elapsed().as_secs_f32());
}
{
let started = Instant::now();
let deleted_rows = self.client.execute(
&format!(
r"
DELETE FROM banking_stage_results_2.transaction_slot WHERE slot < {cutoff_slot}
", cutoff_slot = cutoff_slot_excl
), &[]).await.unwrap();
info!("Deleted {} rows from transaction_slot in {:.2}ms", deleted_rows, started.elapsed().as_secs_f32());
}
{
info!("Rows after cleanup:");
self.log_rowcount(Level::Info, "blocks").await;
self.log_rowcount(Level::Info, "accounts").await;
self.log_rowcount(Level::Info, "transactions").await;
self.log_rowcount(Level::Info, "accounts_map_blocks").await;
self.log_rowcount(Level::Info, "accounts_map_transaction").await;
self.log_rowcount(Level::Info, "transaction_infos").await;
self.log_rowcount(Level::Info, "transaction_slot").await;
}
}
async fn log_rowcount(&self, level: Level, table: &str) {
let count: i64 = self.client.query_one(
&format!("SELECT count(*) as cnt FROM banking_stage_results_2.{tablename}", tablename = table), &[])
.await.unwrap()
.get("cnt");
log!(level, "- rows count in table <{}>: {}", table, count);
}
}
#[derive(Clone)]
pub struct Postgres {
session: Arc<PostgresSession>,