Add config flag for deleting old data
Keeping it is very useful for testing.
This commit is contained in:
parent
f0c26bb8fb
commit
d5395a3b75
|
@ -24,3 +24,4 @@ retry_query_sleep_secs = 5
|
||||||
retry_connection_sleep_secs = 30
|
retry_connection_sleep_secs = 30
|
||||||
fatal_connection_timeout_secs = 600
|
fatal_connection_timeout_secs = 600
|
||||||
allow_invalid_certs = false
|
allow_invalid_certs = false
|
||||||
|
delete_old_data = true
|
||||||
|
|
|
@ -24,3 +24,4 @@ retry_query_sleep_secs = 5
|
||||||
retry_connection_sleep_secs = 30
|
retry_connection_sleep_secs = 30
|
||||||
fatal_connection_timeout_secs = 600
|
fatal_connection_timeout_secs = 600
|
||||||
allow_invalid_certs = false
|
allow_invalid_certs = false
|
||||||
|
delete_old_data = true
|
||||||
|
|
|
@ -82,6 +82,8 @@ pub struct PostgresConfig {
|
||||||
pub fatal_connection_timeout_secs: u64,
|
pub fatal_connection_timeout_secs: u64,
|
||||||
/// Allow invalid TLS certificates, passed to native_tls danger_accept_invalid_certs
|
/// Allow invalid TLS certificates, passed to native_tls danger_accept_invalid_certs
|
||||||
pub allow_invalid_certs: bool,
|
pub allow_invalid_certs: bool,
|
||||||
|
/// Delete old data automatically, keeping only the current data snapshot
|
||||||
|
pub delete_old_data: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
|
|
@ -170,48 +170,53 @@ struct SlotsProcessing {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SlotsProcessing {
|
impl SlotsProcessing {
|
||||||
fn new(tables: &Vec<String>) -> Self {
|
fn new(tables: &Vec<String>, delete_old_data: bool) -> Self {
|
||||||
// Delete:
|
let cleanup_table_sql = Vec::<String>::new();
|
||||||
// 1. account writes that came before the newest rooted write
|
|
||||||
// 2. account writes that came after the newest rooted write but before
|
if delete_old_data {
|
||||||
// the newest rooted slot (like processed writes that never confirmed)
|
// Delete:
|
||||||
let mut cleanup_table_sql: Vec<String> = tables
|
// 1. account writes that came before the newest rooted write
|
||||||
.iter()
|
// 2. account writes that came after the newest rooted write but before
|
||||||
.map(|table_name| {
|
// the newest rooted slot (like processed writes that never confirmed)
|
||||||
format!(
|
let mut cleanup_table_sql: Vec<String> = tables
|
||||||
"DELETE FROM {table} AS data
|
.iter()
|
||||||
USING (
|
.map(|table_name| {
|
||||||
SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version
|
format!(
|
||||||
FROM {table}
|
"DELETE FROM {table} AS data
|
||||||
LEFT JOIN slot USING(slot)
|
USING (
|
||||||
WHERE slot <= $newest_final_slot AND (status = 'Rooted' OR status is NULL)
|
SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version
|
||||||
ORDER BY pubkey_id, slot DESC, write_version DESC
|
FROM {table}
|
||||||
) latest_write
|
LEFT JOIN slot USING(slot)
|
||||||
WHERE data.pubkey_id = latest_write.pubkey_id
|
WHERE slot <= $newest_final_slot AND (status = 'Rooted' OR status is NULL)
|
||||||
AND (
|
ORDER BY pubkey_id, slot DESC, write_version DESC
|
||||||
(data.slot < latest_write.slot
|
) latest_write
|
||||||
OR (data.slot = latest_write.slot
|
WHERE data.pubkey_id = latest_write.pubkey_id
|
||||||
AND data.write_version < latest_write.write_version
|
AND (
|
||||||
)
|
(data.slot < latest_write.slot
|
||||||
)
|
|
||||||
OR
|
|
||||||
(
|
|
||||||
data.slot < $newest_final_slot
|
|
||||||
AND
|
|
||||||
(data.slot > latest_write.slot
|
|
||||||
OR (data.slot = latest_write.slot
|
OR (data.slot = latest_write.slot
|
||||||
AND data.write_version > latest_write.write_version
|
AND data.write_version < latest_write.write_version
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
OR
|
||||||
)",
|
(
|
||||||
table = table_name
|
data.slot < $newest_final_slot
|
||||||
)
|
AND
|
||||||
})
|
(data.slot > latest_write.slot
|
||||||
.collect();
|
OR (data.slot = latest_write.slot
|
||||||
|
AND data.write_version > latest_write.write_version
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)",
|
||||||
|
table = table_name
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
// Delete old slots
|
// Delete old slots
|
||||||
cleanup_table_sql.push("DELETE FROM slot WHERE slot + 100000 < $newest_final_slot".into());
|
cleanup_table_sql
|
||||||
|
.push("DELETE FROM slot WHERE slot + 100000 < $newest_final_slot".into());
|
||||||
|
}
|
||||||
|
|
||||||
Self { cleanup_table_sql }
|
Self { cleanup_table_sql }
|
||||||
}
|
}
|
||||||
|
@ -264,7 +269,7 @@ impl SlotsProcessing {
|
||||||
.context("updating preceding non-rooted slots")?;
|
.context("updating preceding non-rooted slots")?;
|
||||||
|
|
||||||
// Keep only the newest rooted account write and also
|
// Keep only the newest rooted account write and also
|
||||||
// wipe old slots
|
// wipe old slots (if configured)
|
||||||
for cleanup_sql in &self.cleanup_table_sql {
|
for cleanup_sql in &self.cleanup_table_sql {
|
||||||
let query = query_dyn!(cleanup_sql, newest_final_slot = update.slot)?;
|
let query = query_dyn!(cleanup_sql, newest_final_slot = update.slot)?;
|
||||||
let _ = query
|
let _ = query
|
||||||
|
@ -407,7 +412,7 @@ pub async fn init(
|
||||||
.iter()
|
.iter()
|
||||||
.map(|table| table.table_name().to_string())
|
.map(|table| table.table_name().to_string())
|
||||||
.collect();
|
.collect();
|
||||||
let slots_processing = SlotsProcessing::new(&table_names);
|
let slots_processing = SlotsProcessing::new(&table_names, config.delete_old_data);
|
||||||
for _ in 0..config.slot_update_connection_count {
|
for _ in 0..config.slot_update_connection_count {
|
||||||
let postgres_slot =
|
let postgres_slot =
|
||||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
||||||
|
|
Loading…
Reference in New Issue