resubscribe banking stage errors every 10 mins incase of inactivity

This commit is contained in:
godmodegalactus 2024-01-09 15:35:15 +01:00
parent 5c3edc836f
commit 527fe545c1
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 20 additions and 10 deletions

View File

@ -11,7 +11,7 @@ use std::{
},
time::Duration,
};
use tokio::io::AsyncReadExt;
use tokio::{io::AsyncReadExt, time::Instant};
use crate::prometheus_sync::PrometheusSync;
use block_info::BlockInfo;
@ -88,10 +88,16 @@ pub async fn start_tracking_banking_stage_errors(
)
.await
.unwrap();
let mut instance = Instant::now();
log::info!("started geyser banking stage subscription");
while let Ok(Some(message)) =
tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await
{
if instance.elapsed() > Duration::from_secs(600) {
// reestablish geyser connection
break;
}
let Ok(message) = message else {
continue;
};
@ -107,6 +113,8 @@ pub async fn start_tracking_banking_stage_errors(
// continue;
// }
BANKING_STAGE_ERROR_EVENT_COUNT.inc();
instance = Instant::now();
let sig = transaction.signature.to_string();
match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) {
Some(mut x) => {

View File

@ -351,7 +351,6 @@ impl PostgresSession {
}
writer.finish().await?;
// merge data from temp table into accounts_map_transaction
let statement = format!(
r#"
@ -373,7 +372,6 @@ impl PostgresSession {
let rows = self.client.execute(statement.as_str(), &[]).await?;
debug!("inserted into accounts_map_transaction: {}", rows);
// merge data from temp table into accounts_map_transaction_latest
// note: query uses the array_dedup_append postgres function to deduplicate and limit the array size
// example: array_dedup_append('{8,3,2,1}', '{5,3}', 4) -> {2,1,5,3}
@ -415,7 +413,6 @@ impl PostgresSession {
let rows = self.client.execute(statement.as_str(), &[]).await?;
info!("upserted in accounts_map_transaction_latest: {}", rows);
self.drop_temp_table(temp_table_latest_agged).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
@ -868,15 +865,20 @@ impl PostgresSession {
);
{
let txs_to_delete = self.client.query_one(
&format!(
r"
let txs_to_delete = self
.client
.query_one(
&format!(
r"
SELECT count(*) as cnt_tx FROM banking_stage_results_2.transactions txs
WHERE txs.transaction_id <= {cutoff_transaction}
",
cutoff_transaction = cutoff_transaction_incl
),
&[]).await.unwrap();
cutoff_transaction = cutoff_transaction_incl
),
&[],
)
.await
.unwrap();
let txs_to_delete: i64 = txs_to_delete.get("cnt_tx");