postgres connection revival
This commit is contained in:
parent
fa655bef33
commit
115ab1c8ad
|
@ -99,7 +99,7 @@ impl LiteBridge {
|
||||||
tx_batch_size: usize,
|
tx_batch_size: usize,
|
||||||
tx_send_interval: Duration,
|
tx_send_interval: Duration,
|
||||||
clean_interval: Duration,
|
clean_interval: Duration,
|
||||||
postgres_config: &str,
|
postgres_config: String,
|
||||||
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 9]> {
|
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 9]> {
|
||||||
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
|
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
|
||||||
let (postgres_connection, postgres) = Postgres::new(postgres_config).await?;
|
let (postgres_connection, postgres) = Postgres::new(postgres_config).await?;
|
||||||
|
|
|
@ -32,7 +32,7 @@ pub async fn main() -> anyhow::Result<()> {
|
||||||
tx_batch_size,
|
tx_batch_size,
|
||||||
tx_batch_interval_ms,
|
tx_batch_interval_ms,
|
||||||
clean_interval_ms,
|
clean_interval_ms,
|
||||||
&postgres_config,
|
postgres_config,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -6,16 +6,18 @@ use postgres_native_tls::MakeTlsConnector;
|
||||||
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs,
|
fs,
|
||||||
sync::mpsc::{UnboundedReceiver, UnboundedSender},
|
sync::{
|
||||||
|
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||||
|
RwLock,
|
||||||
|
},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
};
|
};
|
||||||
use tokio_postgres::Client;
|
use tokio_postgres::Client;
|
||||||
|
|
||||||
use native_tls::{Certificate, Identity, TlsConnector};
|
use native_tls::{Certificate, Identity, TlsConnector};
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Postgres {
|
pub struct Postgres {
|
||||||
client: Arc<Client>,
|
client: Arc<RwLock<Client>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -58,7 +60,7 @@ impl Postgres {
|
||||||
///
|
///
|
||||||
/// returned join handle is required to be polled
|
/// returned join handle is required to be polled
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
porstgres_config: &str,
|
porstgres_config: String,
|
||||||
) -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, Self)> {
|
) -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, Self)> {
|
||||||
let connector = TlsConnector::builder()
|
let connector = TlsConnector::builder()
|
||||||
.add_root_certificate(Certificate::from_pem(&fs::read("ca.pem").await?)?)
|
.add_root_certificate(Certificate::from_pem(&fs::read("ca.pem").await?)?)
|
||||||
|
@ -72,13 +74,33 @@ impl Postgres {
|
||||||
info!("making tls config");
|
info!("making tls config");
|
||||||
|
|
||||||
let connector = MakeTlsConnector::new(connector);
|
let connector = MakeTlsConnector::new(connector);
|
||||||
let (client, connection) = tokio_postgres::connect(porstgres_config, connector).await?;
|
let (client, connection) =
|
||||||
let client = Arc::new(client);
|
tokio_postgres::connect(&porstgres_config, connector.clone()).await?;
|
||||||
|
let client = Arc::new(RwLock::new(client));
|
||||||
|
|
||||||
Ok((
|
let connection = {
|
||||||
tokio::spawn(async move { Ok(connection.await?) }),
|
let client = client.clone();
|
||||||
Self { client },
|
|
||||||
))
|
#[allow(unreachable_code)]
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut connection = connection;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Err(err) = connection.await {
|
||||||
|
warn!("Connection to postgres broke {err:?}")
|
||||||
|
};
|
||||||
|
|
||||||
|
let f = tokio_postgres::connect(&porstgres_config, connector.clone()).await?;
|
||||||
|
|
||||||
|
*client.write().await = f.0;
|
||||||
|
connection = f.1;
|
||||||
|
}
|
||||||
|
|
||||||
|
bail!("Potsgres revival loop failed")
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((connection, Self { client }))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> {
|
pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> {
|
||||||
|
@ -89,6 +111,8 @@ impl Postgres {
|
||||||
} = block;
|
} = block;
|
||||||
|
|
||||||
self.client
|
self.client
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
.execute(
|
.execute(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO lite_rpc.Blocks
|
INSERT INTO lite_rpc.Blocks
|
||||||
|
@ -114,9 +138,7 @@ impl Postgres {
|
||||||
quic_response,
|
quic_response,
|
||||||
} = tx;
|
} = tx;
|
||||||
|
|
||||||
warn!("{}", signature.len());
|
self.client.read().await.execute(
|
||||||
|
|
||||||
self.client.execute(
|
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO lite_rpc.Txs
|
INSERT INTO lite_rpc.Txs
|
||||||
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
|
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
|
||||||
|
|
|
@ -127,7 +127,6 @@ impl TxSender {
|
||||||
while (prev_inst.elapsed() < tx_send_interval) || txs.len() == tx_batch_size {
|
while (prev_inst.elapsed() < tx_send_interval) || txs.len() == tx_batch_size {
|
||||||
match recv.try_recv() {
|
match recv.try_recv() {
|
||||||
Ok((sig, tx, slot)) => {
|
Ok((sig, tx, slot)) => {
|
||||||
log::warn!("recv");
|
|
||||||
sigs_and_slots.push((sig, slot));
|
sigs_and_slots.push((sig, slot));
|
||||||
txs.push(tx);
|
txs.push(tx);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue