2021-11-03 04:17:49 -07:00
|
|
|
use anyhow::Context;
|
2021-11-02 00:55:39 -07:00
|
|
|
use log::*;
|
2021-11-13 00:32:32 -08:00
|
|
|
use native_tls::TlsConnector;
|
|
|
|
use postgres_native_tls::MakeTlsConnector;
|
2021-11-03 09:22:52 -07:00
|
|
|
use postgres_query::{query, query_dyn};
|
2021-11-03 04:17:49 -07:00
|
|
|
use std::{collections::HashMap, time::Duration};
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-08 11:15:46 -08:00
|
|
|
use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate};
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
async fn postgres_connection(
|
2021-11-08 02:26:01 -08:00
|
|
|
config: &PostgresConfig,
|
2021-11-09 05:23:42 -08:00
|
|
|
metric_retries: metrics::MetricU64,
|
|
|
|
metric_live: metrics::MetricU64,
|
2021-11-09 05:27:09 -08:00
|
|
|
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
|
2021-11-03 04:17:49 -07:00
|
|
|
let (tx, rx) = async_channel::unbounded();
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-14 04:14:19 -08:00
|
|
|
let tls = MakeTlsConnector::new(
|
|
|
|
TlsConnector::builder()
|
|
|
|
.danger_accept_invalid_certs(config.allow_invalid_certs)
|
|
|
|
.build()?,
|
|
|
|
);
|
2021-11-13 00:32:32 -08:00
|
|
|
|
2021-11-08 02:26:01 -08:00
|
|
|
let config = config.clone();
|
2021-11-13 00:32:32 -08:00
|
|
|
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
|
2021-11-08 11:15:46 -08:00
|
|
|
let mut metric_retries = metric_retries;
|
|
|
|
let mut metric_live = metric_live;
|
2021-11-02 00:55:39 -07:00
|
|
|
tokio::spawn(async move {
|
2021-11-03 04:17:49 -07:00
|
|
|
loop {
|
|
|
|
let (client, connection) = match initial.take() {
|
|
|
|
Some(v) => v,
|
|
|
|
None => {
|
|
|
|
let result =
|
2021-11-13 00:32:32 -08:00
|
|
|
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
|
2021-11-03 04:17:49 -07:00
|
|
|
match result {
|
|
|
|
Ok(v) => v,
|
|
|
|
Err(err) => {
|
|
|
|
warn!("could not connect to postgres: {:?}", err);
|
2021-11-08 02:26:01 -08:00
|
|
|
tokio::time::sleep(Duration::from_secs(
|
|
|
|
config.retry_connection_sleep_secs,
|
|
|
|
))
|
|
|
|
.await;
|
2021-11-03 04:17:49 -07:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
tx.send(Some(client)).await.expect("send success");
|
2021-11-08 11:15:46 -08:00
|
|
|
metric_live.increment();
|
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
let result = connection.await;
|
2021-11-08 11:15:46 -08:00
|
|
|
|
|
|
|
metric_retries.increment();
|
|
|
|
metric_live.decrement();
|
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
tx.send(None).await.expect("send success");
|
|
|
|
warn!("postgres connection error: {:?}", result);
|
2021-11-08 02:26:01 -08:00
|
|
|
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
|
2021-11-02 00:55:39 -07:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
Ok(rx)
|
|
|
|
}
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
async fn update_postgres_client<'a>(
|
|
|
|
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
|
|
|
|
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
|
2021-11-08 02:26:01 -08:00
|
|
|
config: &PostgresConfig,
|
2021-11-03 04:17:49 -07:00
|
|
|
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
|
|
|
|
// get the most recent client, waiting if there's a disconnect
|
|
|
|
while !rx.is_empty() || client.is_none() {
|
|
|
|
tokio::select! {
|
|
|
|
client_raw_opt = rx.recv() => {
|
2021-11-15 06:37:29 -08:00
|
|
|
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
|
2021-11-03 04:17:49 -07:00
|
|
|
},
|
2021-11-08 02:26:01 -08:00
|
|
|
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
|
2021-11-03 04:17:49 -07:00
|
|
|
error!("waited too long for new postgres client");
|
|
|
|
std::process::exit(1);
|
|
|
|
},
|
2021-11-02 00:55:39 -07:00
|
|
|
}
|
2021-11-03 04:17:49 -07:00
|
|
|
}
|
|
|
|
client.as_ref().expect("must contain value")
|
|
|
|
}
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
async fn process_account_write(
|
|
|
|
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
|
|
write: &AccountWrite,
|
2021-11-03 09:22:52 -07:00
|
|
|
account_tables: &AccountTables,
|
2021-11-09 05:27:09 -08:00
|
|
|
) -> anyhow::Result<()> {
|
2022-01-04 01:15:22 -08:00
|
|
|
futures::future::try_join_all(
|
|
|
|
account_tables
|
|
|
|
.iter()
|
|
|
|
.map(|table| table.insert_account_write(client, write)),
|
|
|
|
)
|
|
|
|
.await?;
|
2021-11-03 04:17:49 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-15 06:25:04 -08:00
|
|
|
struct Slots {
|
|
|
|
// non-rooted only
|
2021-11-03 04:17:49 -07:00
|
|
|
slots: HashMap<i64, SlotUpdate>,
|
2021-11-15 06:25:04 -08:00
|
|
|
newest_processed_slot: Option<i64>,
|
|
|
|
newest_rooted_slot: Option<i64>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
struct SlotPreprocessing {
|
|
|
|
discard_duplicate: bool,
|
|
|
|
discard_old: bool,
|
|
|
|
new_processed_head: bool,
|
|
|
|
new_rooted_head: bool,
|
|
|
|
parent_update: bool,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Slots {
|
|
|
|
fn new() -> Self {
|
|
|
|
Self {
|
|
|
|
slots: HashMap::new(),
|
|
|
|
newest_processed_slot: None,
|
|
|
|
newest_rooted_slot: None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn add(&mut self, update: &SlotUpdate) -> SlotPreprocessing {
|
|
|
|
let mut result = SlotPreprocessing::default();
|
|
|
|
|
|
|
|
if let Some(previous) = self.slots.get_mut(&update.slot) {
|
|
|
|
if previous.status == update.status && previous.parent == update.parent {
|
|
|
|
result.discard_duplicate = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
previous.status = update.status;
|
|
|
|
if update.parent.is_some() && previous.parent != update.parent {
|
|
|
|
previous.parent = update.parent;
|
|
|
|
result.parent_update = true;
|
|
|
|
}
|
|
|
|
} else if update.slot > self.newest_rooted_slot.unwrap_or(-1) {
|
|
|
|
self.slots.insert(update.slot, update.clone());
|
|
|
|
} else {
|
|
|
|
result.discard_old = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if update.status == SlotStatus::Rooted {
|
|
|
|
let old_slots: Vec<i64> = self
|
|
|
|
.slots
|
|
|
|
.keys()
|
|
|
|
.filter(|s| **s <= update.slot)
|
|
|
|
.copied()
|
|
|
|
.collect();
|
|
|
|
for old_slot in old_slots {
|
|
|
|
self.slots.remove(&old_slot);
|
|
|
|
}
|
|
|
|
if self.newest_rooted_slot.unwrap_or(-1) < update.slot {
|
|
|
|
self.newest_rooted_slot = Some(update.slot);
|
|
|
|
result.new_rooted_head = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.newest_processed_slot.unwrap_or(-1) < update.slot {
|
|
|
|
self.newest_processed_slot = Some(update.slot);
|
|
|
|
result.new_processed_head = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
result
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
struct SlotsProcessing {
|
2021-11-03 09:22:52 -07:00
|
|
|
cleanup_table_sql: Vec<String>,
|
2021-11-03 04:17:49 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl SlotsProcessing {
|
2022-01-03 01:33:26 -08:00
|
|
|
fn new(tables: &Vec<String>, delete_old_data: bool) -> Self {
|
2022-01-05 07:02:34 -08:00
|
|
|
let mut cleanup_table_sql = Vec::<String>::new();
|
2022-01-03 01:33:26 -08:00
|
|
|
|
|
|
|
if delete_old_data {
|
|
|
|
// Delete:
|
|
|
|
// 1. account writes that came before the newest rooted write
|
|
|
|
// 2. account writes that came after the newest rooted write but before
|
|
|
|
// the newest rooted slot (like processed writes that never confirmed)
|
2022-01-05 07:02:34 -08:00
|
|
|
cleanup_table_sql = tables
|
2022-01-03 01:33:26 -08:00
|
|
|
.iter()
|
|
|
|
.map(|table_name| {
|
|
|
|
format!(
|
|
|
|
"DELETE FROM {table} AS data
|
|
|
|
USING (
|
|
|
|
SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version
|
|
|
|
FROM {table}
|
|
|
|
LEFT JOIN slot USING(slot)
|
|
|
|
WHERE slot <= $newest_final_slot AND (status = 'Rooted' OR status is NULL)
|
|
|
|
ORDER BY pubkey_id, slot DESC, write_version DESC
|
|
|
|
) latest_write
|
|
|
|
WHERE data.pubkey_id = latest_write.pubkey_id
|
|
|
|
AND (
|
|
|
|
(data.slot < latest_write.slot
|
2021-11-25 01:24:06 -08:00
|
|
|
OR (data.slot = latest_write.slot
|
2022-01-03 01:33:26 -08:00
|
|
|
AND data.write_version < latest_write.write_version
|
|
|
|
)
|
|
|
|
)
|
|
|
|
OR
|
|
|
|
(
|
|
|
|
data.slot < $newest_final_slot
|
|
|
|
AND
|
|
|
|
(data.slot > latest_write.slot
|
|
|
|
OR (data.slot = latest_write.slot
|
|
|
|
AND data.write_version > latest_write.write_version
|
|
|
|
)
|
2021-11-25 01:24:06 -08:00
|
|
|
)
|
|
|
|
)
|
2022-01-03 01:33:26 -08:00
|
|
|
)",
|
|
|
|
table = table_name
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.collect();
|
2021-11-08 03:09:14 -08:00
|
|
|
|
2022-01-03 01:33:26 -08:00
|
|
|
// Delete old slots
|
|
|
|
cleanup_table_sql
|
|
|
|
.push("DELETE FROM slot WHERE slot + 100000 < $newest_final_slot".into());
|
|
|
|
}
|
2021-11-15 06:25:04 -08:00
|
|
|
|
|
|
|
Self { cleanup_table_sql }
|
2021-11-03 09:22:52 -07:00
|
|
|
}
|
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
async fn process(
|
2021-11-15 06:25:04 -08:00
|
|
|
&self,
|
2021-11-03 04:17:49 -07:00
|
|
|
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
|
|
update: &SlotUpdate,
|
2021-11-15 06:25:04 -08:00
|
|
|
meta: &SlotPreprocessing,
|
2021-11-09 05:27:09 -08:00
|
|
|
) -> anyhow::Result<()> {
|
2021-11-03 04:17:49 -07:00
|
|
|
if let Some(parent) = update.parent {
|
|
|
|
let query = query!(
|
2021-11-08 02:58:18 -08:00
|
|
|
"INSERT INTO slot
|
|
|
|
(slot, parent, status, uncle)
|
|
|
|
VALUES
|
|
|
|
($slot, $parent, $status, FALSE)
|
|
|
|
ON CONFLICT (slot) DO UPDATE SET
|
2021-11-03 04:17:49 -07:00
|
|
|
parent=$parent, status=$status",
|
|
|
|
slot = update.slot,
|
|
|
|
parent = parent,
|
|
|
|
status = update.status,
|
2021-11-02 00:55:39 -07:00
|
|
|
);
|
2021-11-03 04:17:49 -07:00
|
|
|
let _ = query.execute(client).await.context("updating slot row")?;
|
|
|
|
} else {
|
|
|
|
let query = query!(
|
2021-11-08 02:58:18 -08:00
|
|
|
"INSERT INTO slot
|
|
|
|
(slot, parent, status, uncle)
|
|
|
|
VALUES
|
|
|
|
($slot, NULL, $status, FALSE)
|
|
|
|
ON CONFLICT (slot) DO UPDATE SET
|
2021-11-03 04:17:49 -07:00
|
|
|
status=$status",
|
|
|
|
slot = update.slot,
|
|
|
|
status = update.status,
|
|
|
|
);
|
|
|
|
let _ = query.execute(client).await.context("updating slot row")?;
|
|
|
|
}
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-15 06:25:04 -08:00
|
|
|
if meta.new_rooted_head {
|
2021-11-26 02:59:23 -08:00
|
|
|
// Mark preceeding non-uncle slots as rooted
|
|
|
|
let query = query!(
|
|
|
|
"UPDATE slot SET status = 'Rooted'
|
|
|
|
WHERE slot < $newest_final_slot
|
|
|
|
AND (NOT uncle)
|
|
|
|
AND status != 'Rooted'",
|
|
|
|
newest_final_slot = update.slot
|
|
|
|
);
|
|
|
|
let _ = query
|
|
|
|
.execute(client)
|
|
|
|
.await
|
|
|
|
.context("updating preceding non-rooted slots")?;
|
|
|
|
|
2021-11-15 06:25:04 -08:00
|
|
|
// Keep only the newest rooted account write and also
|
2022-01-03 01:33:26 -08:00
|
|
|
// wipe old slots (if configured)
|
2021-11-15 06:25:04 -08:00
|
|
|
for cleanup_sql in &self.cleanup_table_sql {
|
|
|
|
let query = query_dyn!(cleanup_sql, newest_final_slot = update.slot)?;
|
2021-11-03 04:17:49 -07:00
|
|
|
let _ = query
|
|
|
|
.execute(client)
|
|
|
|
.await
|
2021-11-15 06:25:04 -08:00
|
|
|
.context("deleting old account writes")?;
|
2021-11-02 00:55:39 -07:00
|
|
|
}
|
2021-11-15 06:25:04 -08:00
|
|
|
}
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-15 06:25:04 -08:00
|
|
|
if meta.new_processed_head || meta.parent_update {
|
|
|
|
// update the uncle column for the chain of slots from the
|
|
|
|
// newest down the the first rooted slot
|
|
|
|
let query = query!(
|
|
|
|
"WITH RECURSIVE
|
|
|
|
liveslots AS (
|
|
|
|
SELECT slot.*, 0 AS depth FROM slot
|
|
|
|
WHERE slot = (SELECT max(slot) FROM slot)
|
|
|
|
UNION ALL
|
|
|
|
SELECT s.*, depth + 1 FROM slot s
|
|
|
|
INNER JOIN liveslots l ON s.slot = l.parent
|
|
|
|
WHERE l.status != 'Rooted' AND depth < 1000
|
|
|
|
),
|
|
|
|
min_slot AS (SELECT min(slot) AS min_slot FROM liveslots)
|
|
|
|
UPDATE slot SET
|
|
|
|
uncle = NOT EXISTS (SELECT 1 FROM liveslots WHERE liveslots.slot = slot.slot)
|
|
|
|
FROM min_slot
|
|
|
|
WHERE slot >= min_slot;"
|
|
|
|
);
|
|
|
|
let _ = query
|
|
|
|
.execute(client)
|
|
|
|
.await
|
|
|
|
.context("recomputing slot uncle status")?;
|
2021-11-03 04:17:49 -07:00
|
|
|
}
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-08 11:15:46 -08:00
|
|
|
trace!("slot update done {}", update.slot);
|
2021-11-03 04:17:49 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
pub async fn init(
|
2021-11-08 02:26:01 -08:00
|
|
|
config: &PostgresConfig,
|
2021-11-03 09:22:52 -07:00
|
|
|
account_tables: AccountTables,
|
2021-11-08 11:15:46 -08:00
|
|
|
metrics_sender: metrics::Metrics,
|
2021-11-09 05:27:09 -08:00
|
|
|
) -> anyhow::Result<(
|
|
|
|
async_channel::Sender<AccountWrite>,
|
|
|
|
async_channel::Sender<SlotUpdate>,
|
|
|
|
)> {
|
2021-11-03 04:17:49 -07:00
|
|
|
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
|
|
|
let (account_write_queue_sender, account_write_queue_receiver) =
|
|
|
|
async_channel::unbounded::<AccountWrite>();
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-15 06:25:04 -08:00
|
|
|
// Slot updates flowing from the outside into the single processing thread. From
|
|
|
|
// there they'll flow into the postgres sending thread.
|
2021-11-03 04:17:49 -07:00
|
|
|
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
|
2021-11-15 06:25:04 -08:00
|
|
|
let (slot_inserter_sender, slot_inserter_receiver) =
|
|
|
|
async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>();
|
2021-11-03 04:17:49 -07:00
|
|
|
|
2021-11-09 05:23:42 -08:00
|
|
|
let metric_con_retries = metrics_sender.register_u64("postgres_connection_retries".into());
|
|
|
|
let metric_con_live = metrics_sender.register_u64("postgres_connections_alive".into());
|
2021-11-08 11:15:46 -08:00
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
// postgres account write sending worker threads
|
2021-11-08 02:26:01 -08:00
|
|
|
for _ in 0..config.account_write_connection_count {
|
2021-11-08 11:15:46 -08:00
|
|
|
let postgres_account_writes =
|
2021-11-15 06:37:29 -08:00
|
|
|
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
2021-11-08 11:15:46 -08:00
|
|
|
.await?;
|
2021-11-03 04:17:49 -07:00
|
|
|
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
2021-11-03 09:22:52 -07:00
|
|
|
let account_tables_c = account_tables.clone();
|
2021-11-08 02:26:01 -08:00
|
|
|
let config = config.clone();
|
2021-11-15 06:25:04 -08:00
|
|
|
let mut metric_retries =
|
|
|
|
metrics_sender.register_u64("postgres_account_write_retries".into());
|
2021-11-03 04:17:49 -07:00
|
|
|
tokio::spawn(async move {
|
|
|
|
let mut client_opt = None;
|
|
|
|
loop {
|
2022-01-04 01:15:22 -08:00
|
|
|
// Retrieve up to batch_size account writes
|
|
|
|
let mut write_batch = Vec::new();
|
|
|
|
write_batch.push(
|
|
|
|
account_write_queue_receiver_c
|
|
|
|
.recv()
|
|
|
|
.await
|
|
|
|
.expect("sender must stay alive"),
|
|
|
|
);
|
|
|
|
while write_batch.len() < config.account_write_max_batch_size {
|
|
|
|
match account_write_queue_receiver_c.try_recv() {
|
|
|
|
Ok(write) => write_batch.push(write),
|
|
|
|
Err(async_channel::TryRecvError::Empty) => break,
|
|
|
|
Err(async_channel::TryRecvError::Closed) => {
|
|
|
|
panic!("sender must stay alive")
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2021-11-08 11:15:46 -08:00
|
|
|
trace!(
|
2022-01-04 01:15:22 -08:00
|
|
|
"account write, batch {}, channel size {}",
|
|
|
|
write_batch.len(),
|
|
|
|
account_write_queue_receiver_c.len(),
|
2021-11-03 04:17:49 -07:00
|
|
|
);
|
|
|
|
|
|
|
|
let mut error_count = 0;
|
|
|
|
loop {
|
|
|
|
let client =
|
2021-11-08 02:26:01 -08:00
|
|
|
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
|
|
|
|
.await;
|
2022-01-04 01:15:22 -08:00
|
|
|
let mut results = futures::future::join_all(
|
|
|
|
write_batch
|
|
|
|
.iter()
|
|
|
|
.map(|write| process_account_write(client, &write, &account_tables_c)),
|
|
|
|
)
|
|
|
|
.await;
|
|
|
|
let mut iter = results.iter();
|
|
|
|
write_batch.retain(|_| iter.next().unwrap().is_err());
|
|
|
|
if write_batch.len() > 0 {
|
|
|
|
metric_retries.add(write_batch.len() as u64);
|
2021-11-03 04:17:49 -07:00
|
|
|
error_count += 1;
|
2021-11-08 02:26:01 -08:00
|
|
|
if error_count - 1 < config.retry_query_max_count {
|
2022-01-04 01:15:22 -08:00
|
|
|
results.retain(|r| r.is_err());
|
|
|
|
warn!("failed to process account write, retrying: {:?}", results);
|
2021-11-08 02:26:01 -08:00
|
|
|
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
|
|
|
|
.await;
|
2021-11-03 04:17:49 -07:00
|
|
|
continue;
|
|
|
|
} else {
|
|
|
|
error!("failed to process account write, exiting");
|
|
|
|
std::process::exit(1);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
break;
|
2021-11-02 00:55:39 -07:00
|
|
|
}
|
|
|
|
}
|
2021-11-03 04:17:49 -07:00
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
// slot update handling thread
|
2021-11-15 06:49:59 -08:00
|
|
|
let mut metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into());
|
2021-11-03 04:17:49 -07:00
|
|
|
tokio::spawn(async move {
|
2021-11-15 06:25:04 -08:00
|
|
|
let mut slots = Slots::new();
|
2021-11-03 09:22:52 -07:00
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
loop {
|
|
|
|
let update = slot_queue_receiver
|
|
|
|
.recv()
|
|
|
|
.await
|
|
|
|
.expect("sender must stay alive");
|
2021-11-08 11:15:46 -08:00
|
|
|
trace!(
|
2021-11-03 04:17:49 -07:00
|
|
|
"slot update {}, channel size {}",
|
|
|
|
update.slot,
|
|
|
|
slot_queue_receiver.len()
|
|
|
|
);
|
2021-11-02 00:55:39 -07:00
|
|
|
|
2021-11-15 06:25:04 -08:00
|
|
|
// Check if we already know about the slot, or it is outdated
|
|
|
|
let slot_preprocessing = slots.add(&update);
|
|
|
|
if slot_preprocessing.discard_duplicate || slot_preprocessing.discard_old {
|
|
|
|
continue;
|
2021-11-03 04:17:49 -07:00
|
|
|
}
|
2021-11-15 06:25:04 -08:00
|
|
|
|
|
|
|
slot_inserter_sender
|
|
|
|
.send((update, slot_preprocessing))
|
|
|
|
.await
|
|
|
|
.expect("sending must succeed");
|
2021-11-15 06:49:59 -08:00
|
|
|
metric_slot_queue.set(slot_inserter_sender.len() as u64);
|
2021-11-02 00:55:39 -07:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
2021-11-15 06:25:04 -08:00
|
|
|
// postgres slot update worker threads
|
|
|
|
let table_names: Vec<String> = account_tables
|
|
|
|
.iter()
|
|
|
|
.map(|table| table.table_name().to_string())
|
|
|
|
.collect();
|
2022-01-03 01:33:26 -08:00
|
|
|
let slots_processing = SlotsProcessing::new(&table_names, config.delete_old_data);
|
2021-11-15 06:25:04 -08:00
|
|
|
for _ in 0..config.slot_update_connection_count {
|
|
|
|
let postgres_slot =
|
2021-11-15 06:37:29 -08:00
|
|
|
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
2021-11-15 06:25:04 -08:00
|
|
|
.await?;
|
|
|
|
let receiver_c = slot_inserter_receiver.clone();
|
|
|
|
let config = config.clone();
|
|
|
|
let mut metric_retries = metrics_sender.register_u64("postgres_slot_update_retries".into());
|
|
|
|
let slots_processing = slots_processing.clone();
|
|
|
|
tokio::spawn(async move {
|
|
|
|
let mut client_opt = None;
|
|
|
|
loop {
|
|
|
|
let (update, preprocessing) =
|
|
|
|
receiver_c.recv().await.expect("sender must stay alive");
|
|
|
|
trace!("slot insertion, slot {}", update.slot);
|
|
|
|
|
|
|
|
let mut error_count = 0;
|
|
|
|
loop {
|
|
|
|
let client =
|
|
|
|
update_postgres_client(&mut client_opt, &postgres_slot, &config).await;
|
|
|
|
if let Err(err) = slots_processing
|
|
|
|
.process(client, &update, &preprocessing)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
metric_retries.increment();
|
|
|
|
error_count += 1;
|
|
|
|
if error_count - 1 < config.retry_query_max_count {
|
2021-12-20 02:28:53 -08:00
|
|
|
warn!("failed to process slot update, retrying: {:?}", err);
|
2021-11-15 06:25:04 -08:00
|
|
|
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
|
|
|
|
.await;
|
|
|
|
continue;
|
|
|
|
} else {
|
2021-12-20 02:28:53 -08:00
|
|
|
error!("failed to process slot update, exiting");
|
2021-11-15 06:25:04 -08:00
|
|
|
std::process::exit(1);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
Ok((account_write_queue_sender, slot_queue_sender))
|
2021-11-02 00:55:39 -07:00
|
|
|
}
|