WIP: postgres

This commit is contained in:
aniketfuryrocks 2023-03-06 19:35:51 +05:30
parent fee6daaf1a
commit 24dc5421ba
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
2 changed files with 99 additions and 67 deletions

View File

@ -106,11 +106,10 @@ impl LiteBridge {
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
let (postgres, postgres_send) = if enable_postgres {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let (postgres_connection, postgres) = Postgres::new().await?;
let postgres = Postgres::new().await?;
let postgres = postgres.start(postgres_recv);
(Some((postgres, postgres_connection)), Some(postgres_send))
(Some(postgres), Some(postgres_send))
} else {
(None, None)
};
@ -187,8 +186,7 @@ impl LiteBridge {
cleaner,
];
if let Some((postgres, connection)) = postgres {
services.push(connection);
if let Some(postgres) = postgres {
services.push(postgres);
}

View File

@ -8,11 +8,11 @@ use prometheus::{core::GenericGauge, opts, register_int_gauge};
use tokio::{
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
RwLock, RwLockReadGuard,
},
task::JoinHandle,
};
use tokio_postgres::Client;
use tokio_postgres::{Client, Statement};
use native_tls::{Certificate, Identity, TlsConnector};
@ -22,10 +22,6 @@ lazy_static::lazy_static! {
pub static ref MESSAGES_IN_POSTGRES_CHANNEL: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_messages_in_postgres", "Number of messages in postgres")).unwrap();
}
pub struct Postgres {
client: Arc<RwLock<Client>>,
}
#[derive(Debug)]
pub struct PostgresTx {
pub signature: String,
@ -68,12 +64,15 @@ pub enum PostgresMsg {
pub type PostgresMpscRecv = UnboundedReceiver<PostgresMsg>;
pub type PostgresMpscSend = UnboundedSender<PostgresMsg>;
impl Postgres {
/// # Return
/// (connection join handle, Self)
///
/// returned join handle is required to be polled
pub async fn new() -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, Self)> {
pub struct PostgresSession {
client: Client,
insert_tx_statement: Statement,
update_tx_statement: Statement,
insert_block_statement: Statement,
}
impl PostgresSession {
pub async fn new() -> anyhow::Result<Self> {
let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?;
let client_pks_b64 =
std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?;
@ -96,35 +95,51 @@ impl Postgres {
.danger_accept_invalid_certs(true)
.build()?;
info!("making tls config");
let connector = MakeTlsConnector::new(connector);
let (client, connection) = tokio_postgres::connect(&pg_config, connector.clone()).await?;
let client = Arc::new(RwLock::new(client));
let connection = {
let client = client.clone();
let insert_block_statement = client
.prepare(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
($1, $2, $3)
"#,
)
.await?;
#[allow(unreachable_code)]
tokio::spawn(async move {
let mut connection = connection;
let insert_tx_statement = client.prepare(
r#"
INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
($1, $2, $3, $4, $5, $6, $7)
"#,
).await?;
loop {
if let Err(err) = connection.await {
warn!("Connection to postgres broke {err:?}")
};
let update_tx_statement = client
.prepare(
r#"
UPDATE lite_rpc.txs
SET processed_slot = $1, cu_consumed = $2, cu_requested = $3
WHERE signature = $4
"#,
)
.await?;
let f = tokio_postgres::connect(&pg_config, connector.clone()).await?;
tokio::spawn(async move {
if let Err(err) = connection.await {
log::error!("Connection to Postgres broke {err:?}");
};
});
*client.write().await = f.0;
connection = f.1;
}
bail!("Potsgres revival loop failed")
})
};
Ok((connection, Self { client }))
Ok(Self {
client,
insert_tx_statement,
insert_block_statement,
update_tx_statement,
})
}
pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> {
@ -135,15 +150,8 @@ impl Postgres {
} = block;
self.client
.read()
.await
.execute(
r#"
INSERT INTO lite_rpc.Blocks
(slot, leader_id, parent_slot)
VALUES
($1, $2, $3)
"#,
&self.insert_block_statement,
&[&slot, &leader_id, &parent_slot],
)
.await?;
@ -162,15 +170,20 @@ impl Postgres {
quic_response,
} = tx;
self.client.read().await.execute(
r#"
INSERT INTO lite_rpc.Txs
(signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response)
VALUES
($1, $2, $3, $4, $5, $6, $7)
"#,
&[&signature, &recent_slot, &forwarded_slot, &processed_slot, &cu_consumed, &cu_requested, &quic_response],
).await?;
self.client
.execute(
&self.insert_tx_statement,
&[
&signature,
&recent_slot,
&forwarded_slot,
&processed_slot,
&cu_consumed,
&cu_requested,
&quic_response,
],
)
.await?;
Ok(())
}
@ -183,32 +196,53 @@ impl Postgres {
} = tx;
self.client
.read()
.await
.execute(
r#"
UPDATE lite_rpc.txs
SET processed_slot = $1, cu_consumed = $2, cu_requested = $3
WHERE signature = $4
"#,
&self.update_tx_statement,
&[&processed_slot, &cu_consumed, &cu_requested, &signature],
)
.await?;
Ok(())
}
}
pub struct Postgres {
session: Arc<RwLock<PostgresSession>>,
}
impl Postgres {
/// # Return
/// (connection join handle, Self)
///
/// returned join handle is required to be polled
pub async fn new() -> anyhow::Result<Self> {
let session = PostgresSession::new().await?;
let session = Arc::new(RwLock::new(session));
Ok(Self { session })
}
async fn get_session(&mut self) -> anyhow::Result<RwLockReadGuard<PostgresSession>> {
if self.session.read().await.client.is_closed() {
*self.session.write().await = PostgresSession::new().await?;
}
Ok(self.session.read().await)
}
pub fn start(self, mut recv: PostgresMpscRecv) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move {
info!("Writing to postgres");
while let Some(msg) = recv.recv().await {
MESSAGES_IN_POSTGRES_CHANNEL.dec();
MESSAGES_IN_POSTGRES_CHANNEL.inc();
let session = self.get_session().await;
let Err(err) = (
match msg {
PostgresMsg::PostgresTx(tx) => self.send_tx(tx).await,
PostgresMsg::PostgresUpdateTx(tx, sig) => self.update_tx(tx, &sig).await,
PostgresMsg::PostgresBlock(block) => self.send_block(block).await,
PostgresMsg::PostgresTx(tx) => session.send_tx(tx).await,
PostgresMsg::PostgresUpdateTx(tx, sig) => session.update_tx(tx, &sig).await,
PostgresMsg::PostgresBlock(block) => session.send_block(block).await,
PostgresMsg::PostgreAccountAddr(_) => todo!(),
} ) else {
continue;