postgres env

This commit is contained in:
aniketfuryrocks 2023-01-31 19:00:52 +05:30
parent 0bbb30b0db
commit a2a91dcaf7
6 changed files with 63 additions and 40 deletions

View File

@ -48,6 +48,20 @@ $ cd bench and cargo run --release
Find a new file named `metrics.csv` in the project root.
## Metrics and Postgres
LiteRpc implements a postgres service that can write to a postgres database tables as defined
in `./migrations`
### env variables
| env | purpose |
| --------- | ------ |
| `CA_PEM_B64` | Base64 encoded `ca.pem` |
| `CLIENT_PKS_B64` | Base64 encoded `client.pks` |
| `CLIENT_PKS_PASS` | Password to `client.pks` |
| `PG_CONFIG` | Postgres Connection Config |
## License & Copyright
Copyright (c) 2022 Blockworks Foundation

View File

@ -99,11 +99,11 @@ impl LiteBridge {
tx_batch_size: usize,
tx_send_interval: Duration,
clean_interval: Duration,
postgres_config: Option<String>,
enable_postgres: bool,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
let (postgres, postgres_send) = if let Some(postgres_config) = postgres_config {
let (postgres, postgres_send) = if enable_postgres {
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
let (postgres_connection, postgres) = Postgres::new(postgres_config).await?;
let (postgres_connection, postgres) = Postgres::new().await?;
let postgres = postgres.start(postgres_recv);

View File

@ -29,5 +29,5 @@ pub struct Args {
pub clean_interval_ms: u64,
/// addr to postgres
#[arg(short = 'p', long)]
pub postgres_config: Option<String>,
pub enable_postgres: bool
}

View File

@ -17,7 +17,7 @@ pub async fn main() -> anyhow::Result<()> {
tx_batch_interval_ms,
clean_interval_ms,
fanout_size,
postgres_config,
enable_postgres,
} = Args::parse();
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
@ -32,7 +32,7 @@ pub async fn main() -> anyhow::Result<()> {
tx_batch_size,
tx_batch_interval_ms,
clean_interval_ms,
postgres_config,
enable_postgres,
)
.await?;

View File

@ -207,6 +207,27 @@ impl BlockListener {
err: err.clone(),
confirmation_status: Some(comfirmation_status.clone()),
});
//
// Write to postgres
//
if let Some(postgres) = &postgres {
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
_ => None,
};
postgres
.send(PostgresMsg::PostgresUpdateTx(
PostgresUpdateTx {
processed_slot: slot as i64,
cu_consumed,
cu_requested: None, //TODO: cu requested
},
sig.clone(),
))
.unwrap();
}
};
// subscribers
@ -220,25 +241,6 @@ impl BlockListener {
value: serde_json::json!({ "err": err }),
})?;
}
let cu_consumed = match compute_units_consumed {
OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64),
_ => None,
};
// write to postgres
if let Some(postgres) = &postgres {
postgres
.send(PostgresMsg::PostgresUpdateTx(
PostgresUpdateTx {
processed_slot: slot as i64,
cu_consumed,
cu_requested: None, //TODO: cu requested
},
sig,
))
.unwrap();
}
}
}

View File

@ -5,7 +5,6 @@ use log::{info, warn};
use postgres_native_tls::MakeTlsConnector;
use tokio::{
fs,
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
@ -16,6 +15,8 @@ use tokio_postgres::Client;
use native_tls::{Certificate, Identity, TlsConnector};
use crate::encoding::BinaryEncoding;
pub struct Postgres {
client: Arc<RwLock<Client>>,
}
@ -67,20 +68,25 @@ impl Postgres {
/// (connection join handle, Self)
///
/// returned join handle is required to be polled
pub async fn new(
porstgres_config: String,
) -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, Self)> {
let ca_pem = fs::read("ca.pem").await?;
// let ca_pem = BinaryEncoding::Base64
// .decode(ca_pem_b64)
// .context("ca pem decode")?;
pub async fn new() -> anyhow::Result<(JoinHandle<anyhow::Result<()>>, Self)> {
let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?;
let client_pks_b64 =
std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?;
let client_pks_password =
std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?;
let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?;
let client_pks = fs::read("client.pks").await?;
// let client_pks = BinaryEncoding::Base64.decode(client_pks_b64).context("client pks decode")?;
let ca_pem = BinaryEncoding::Base64
.decode(ca_pem_b64)
.context("ca pem decode")?;
let client_pks = BinaryEncoding::Base64
.decode(client_pks_b64)
.context("client pks decode")?;
let connector = TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_pem)?)
.identity(Identity::from_pkcs12(&client_pks, "p").context("Identity")?)
.identity(Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?)
.danger_accept_invalid_hostnames(true)
.danger_accept_invalid_certs(true)
.build()?;
@ -88,8 +94,7 @@ impl Postgres {
info!("making tls config");
let connector = MakeTlsConnector::new(connector);
let (client, connection) =
tokio_postgres::connect(&porstgres_config, connector.clone()).await?;
let (client, connection) = tokio_postgres::connect(&pg_config, connector.clone()).await?;
let client = Arc::new(RwLock::new(client));
let connection = {
@ -104,7 +109,7 @@ impl Postgres {
warn!("Connection to postgres broke {err:?}")
};
let f = tokio_postgres::connect(&porstgres_config, connector.clone()).await?;
let f = tokio_postgres::connect(&pg_config, connector.clone()).await?;
*client.write().await = f.0;
connection = f.1;
@ -166,6 +171,8 @@ impl Postgres {
}
pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> {
warn!("updating {signature} with {tx:?}");
let PostgresUpdateTx {
processed_slot,
cu_consumed,
@ -177,7 +184,7 @@ impl Postgres {
.await
.execute(
r#"
UPDATE lite_rpc.Txs
UPDATE lite_rpc.txs
SET processed_slot = $1, cu_consumed = $2, cu_requested = $3
WHERE signature = $4
"#,