From da7011100be75392c0fcbd235b612f4d466d6e71 Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 19:00:52 +0530 Subject: [PATCH] postgres env --- README.md | 14 ++++++++++++ src/bridge.rs | 6 ++--- src/cli.rs | 2 +- src/main.rs | 4 ++-- src/workers/block_listenser.rs | 40 ++++++++++++++++++---------------- src/workers/postgres.rs | 37 ++++++++++++++++++------------- 6 files changed, 63 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 10f77141..2cdec18e 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,20 @@ $ cd bench and cargo run --release Find a new file named `metrics.csv` in the project root. +## Metrics and Postgres + +LiteRpc implements a postgres service that can write to a postgres database tables as defined +in `./migrations` + +### env variables + +| env | purpose | +| --------- | ------ | +| `CA_PEM_B64` | Base64 encoded `ca.pem` | +| `CLIENT_PKS_B64` | Base64 encoded `client.pks` | +| `CLIENT_PKS_PASS` | Password to `client.pks` | +| `PG_CONFIG` | Postgres Connection Config | + ## License & Copyright Copyright (c) 2022 Blockworks Foundation diff --git a/src/bridge.rs b/src/bridge.rs index 79895b01..05faf062 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -99,11 +99,11 @@ impl LiteBridge { tx_batch_size: usize, tx_send_interval: Duration, clean_interval: Duration, - postgres_config: Option, + enable_postgres: bool, ) -> anyhow::Result>>> { - let (postgres, postgres_send) = if let Some(postgres_config) = postgres_config { + let (postgres, postgres_send) = if enable_postgres { let (postgres_send, postgres_recv) = mpsc::unbounded_channel(); - let (postgres_connection, postgres) = Postgres::new(postgres_config).await?; + let (postgres_connection, postgres) = Postgres::new().await?; let postgres = postgres.start(postgres_recv); diff --git a/src/cli.rs b/src/cli.rs index 18542ca2..ac8bcd42 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -29,5 +29,5 @@ pub struct Args { pub clean_interval_ms: u64, /// addr to postgres #[arg(short = 'p', long)] - pub postgres_config: Option, + pub enable_postgres: bool } diff --git a/src/main.rs b/src/main.rs index 709f0dd1..de2af6f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ pub async fn main() -> anyhow::Result<()> { tx_batch_interval_ms, clean_interval_ms, fanout_size, - postgres_config, + enable_postgres, } = Args::parse(); let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms); @@ -32,7 +32,7 @@ pub async fn main() -> anyhow::Result<()> { tx_batch_size, tx_batch_interval_ms, clean_interval_ms, - postgres_config, + enable_postgres, ) .await?; diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index e98fe94a..cb70d4e6 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -206,6 +206,27 @@ impl BlockListener { err: err.clone(), confirmation_status: Some(comfirmation_status.clone()), }); + + // + // Write to postgres + // + if let Some(postgres) = &postgres { + let cu_consumed = match compute_units_consumed { + OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64), + _ => None, + }; + + postgres + .send(PostgresMsg::PostgresUpdateTx( + PostgresUpdateTx { + processed_slot: slot as i64, + cu_consumed, + cu_requested: None, //TODO: cu requested + }, + sig.clone(), + )) + .unwrap(); + } }; // subscribers @@ -219,25 +240,6 @@ impl BlockListener { value: serde_json::json!({ "err": err }), })?; } - - let cu_consumed = match compute_units_consumed { - OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64), - _ => None, - }; - - // write to postgres - if let Some(postgres) = &postgres { - postgres - .send(PostgresMsg::PostgresUpdateTx( - PostgresUpdateTx { - processed_slot: slot as i64, - cu_consumed, - cu_requested: None, //TODO: cu requested - }, - sig, - )) - .unwrap(); - } } } diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 6fcdc964..1810aed8 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -5,7 +5,6 @@ use log::{info, warn}; use postgres_native_tls::MakeTlsConnector; use tokio::{ - fs, sync::{ mpsc::{UnboundedReceiver, UnboundedSender}, RwLock, @@ -16,6 +15,8 @@ use tokio_postgres::Client; use native_tls::{Certificate, Identity, TlsConnector}; +use crate::encoding::BinaryEncoding; + pub struct Postgres { client: Arc>, } @@ -67,20 +68,25 @@ impl Postgres { /// (connection join handle, Self) /// /// returned join handle is required to be polled - pub async fn new( - porstgres_config: String, - ) -> anyhow::Result<(JoinHandle>, Self)> { - let ca_pem = fs::read("ca.pem").await?; - // let ca_pem = BinaryEncoding::Base64 - // .decode(ca_pem_b64) - // .context("ca pem decode")?; + pub async fn new() -> anyhow::Result<(JoinHandle>, Self)> { + let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?; + let client_pks_b64 = + std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?; + let client_pks_password = + std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?; + let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?; - let client_pks = fs::read("client.pks").await?; - // let client_pks = BinaryEncoding::Base64.decode(client_pks_b64).context("client pks decode")?; + let ca_pem = BinaryEncoding::Base64 + .decode(ca_pem_b64) + .context("ca pem decode")?; + + let client_pks = BinaryEncoding::Base64 + .decode(client_pks_b64) + .context("client pks decode")?; let connector = TlsConnector::builder() .add_root_certificate(Certificate::from_pem(&ca_pem)?) - .identity(Identity::from_pkcs12(&client_pks, "p").context("Identity")?) + .identity(Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?) .danger_accept_invalid_hostnames(true) .danger_accept_invalid_certs(true) .build()?; @@ -88,8 +94,7 @@ impl Postgres { info!("making tls config"); let connector = MakeTlsConnector::new(connector); - let (client, connection) = - tokio_postgres::connect(&porstgres_config, connector.clone()).await?; + let (client, connection) = tokio_postgres::connect(&pg_config, connector.clone()).await?; let client = Arc::new(RwLock::new(client)); let connection = { @@ -104,7 +109,7 @@ impl Postgres { warn!("Connection to postgres broke {err:?}") }; - let f = tokio_postgres::connect(&porstgres_config, connector.clone()).await?; + let f = tokio_postgres::connect(&pg_config, connector.clone()).await?; *client.write().await = f.0; connection = f.1; @@ -166,6 +171,8 @@ impl Postgres { } pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> { + warn!("updating {signature} with {tx:?}"); + let PostgresUpdateTx { processed_slot, cu_consumed, @@ -177,7 +184,7 @@ impl Postgres { .await .execute( r#" - UPDATE lite_rpc.Txs + UPDATE lite_rpc.txs SET processed_slot = $1, cu_consumed = $2, cu_requested = $3 WHERE signature = $4 "#,