diff --git a/src/bridge.rs b/src/bridge.rs index 97c0d776..33cd0e81 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -99,7 +99,7 @@ impl LiteBridge { tx_batch_size: usize, tx_send_interval: Duration, clean_interval: Duration, - postgres_config: &str, + postgres_config: String, ) -> anyhow::Result<[JoinHandle>; 9]> { let (postgres_send, postgres_recv) = mpsc::unbounded_channel(); let (postgres_connection, postgres) = Postgres::new(postgres_config).await?; diff --git a/src/main.rs b/src/main.rs index 00490f7e..709f0dd1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,7 +32,7 @@ pub async fn main() -> anyhow::Result<()> { tx_batch_size, tx_batch_interval_ms, clean_interval_ms, - &postgres_config, + postgres_config, ) .await?; diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 8db95faa..abb459ae 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -6,16 +6,18 @@ use postgres_native_tls::MakeTlsConnector; use tokio::{ fs, - sync::mpsc::{UnboundedReceiver, UnboundedSender}, + sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + RwLock, + }, task::JoinHandle, }; use tokio_postgres::Client; use native_tls::{Certificate, Identity, TlsConnector}; -#[derive(Clone)] pub struct Postgres { - client: Arc, + client: Arc>, } #[derive(Debug)] @@ -58,7 +60,7 @@ impl Postgres { /// /// returned join handle is required to be polled pub async fn new( - porstgres_config: &str, + porstgres_config: String, ) -> anyhow::Result<(JoinHandle>, Self)> { let connector = TlsConnector::builder() .add_root_certificate(Certificate::from_pem(&fs::read("ca.pem").await?)?) @@ -72,13 +74,33 @@ impl Postgres { info!("making tls config"); let connector = MakeTlsConnector::new(connector); - let (client, connection) = tokio_postgres::connect(porstgres_config, connector).await?; - let client = Arc::new(client); + let (client, connection) = + tokio_postgres::connect(&porstgres_config, connector.clone()).await?; + let client = Arc::new(RwLock::new(client)); - Ok(( - tokio::spawn(async move { Ok(connection.await?) }), - Self { client }, - )) + let connection = { + let client = client.clone(); + + #[allow(unreachable_code)] + tokio::spawn(async move { + let mut connection = connection; + + loop { + if let Err(err) = connection.await { + warn!("Connection to postgres broke {err:?}") + }; + + let f = tokio_postgres::connect(&porstgres_config, connector.clone()).await?; + + *client.write().await = f.0; + connection = f.1; + } + + bail!("Potsgres revival loop failed") + }) + }; + + Ok((connection, Self { client })) } pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> { @@ -89,6 +111,8 @@ impl Postgres { } = block; self.client + .read() + .await .execute( r#" INSERT INTO lite_rpc.Blocks @@ -114,9 +138,7 @@ impl Postgres { quic_response, } = tx; - warn!("{}", signature.len()); - - self.client.execute( + self.client.read().await.execute( r#" INSERT INTO lite_rpc.Txs (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index 05950b79..4c3ad69c 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -127,7 +127,6 @@ impl TxSender { while (prev_inst.elapsed() < tx_send_interval) || txs.len() == tx_batch_size { match recv.try_recv() { Ok((sig, tx, slot)) => { - log::warn!("recv"); sigs_and_slots.push((sig, slot)); txs.push(tx); }