From da7011100be75392c0fcbd235b612f4d466d6e71 Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 19:00:52 +0530 Subject: [PATCH 1/9] postgres env --- README.md | 14 ++++++++++++ src/bridge.rs | 6 ++--- src/cli.rs | 2 +- src/main.rs | 4 ++-- src/workers/block_listenser.rs | 40 ++++++++++++++++++---------------- src/workers/postgres.rs | 37 ++++++++++++++++++------------- 6 files changed, 63 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 10f77141..2cdec18e 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,20 @@ $ cd bench and cargo run --release Find a new file named `metrics.csv` in the project root. +## Metrics and Postgres + +LiteRpc implements a postgres service that can write to a postgres database tables as defined +in `./migrations` + +### env variables + +| env | purpose | +| --------- | ------ | +| `CA_PEM_B64` | Base64 encoded `ca.pem` | +| `CLIENT_PKS_B64` | Base64 encoded `client.pks` | +| `CLIENT_PKS_PASS` | Password to `client.pks` | +| `PG_CONFIG` | Postgres Connection Config | + ## License & Copyright Copyright (c) 2022 Blockworks Foundation diff --git a/src/bridge.rs b/src/bridge.rs index 79895b01..05faf062 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -99,11 +99,11 @@ impl LiteBridge { tx_batch_size: usize, tx_send_interval: Duration, clean_interval: Duration, - postgres_config: Option, + enable_postgres: bool, ) -> anyhow::Result>>> { - let (postgres, postgres_send) = if let Some(postgres_config) = postgres_config { + let (postgres, postgres_send) = if enable_postgres { let (postgres_send, postgres_recv) = mpsc::unbounded_channel(); - let (postgres_connection, postgres) = Postgres::new(postgres_config).await?; + let (postgres_connection, postgres) = Postgres::new().await?; let postgres = postgres.start(postgres_recv); diff --git a/src/cli.rs b/src/cli.rs index 18542ca2..ac8bcd42 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -29,5 +29,5 @@ pub struct Args { pub clean_interval_ms: u64, /// addr to postgres #[arg(short = 'p', long)] - pub postgres_config: Option, + pub enable_postgres: bool } diff --git a/src/main.rs b/src/main.rs index 709f0dd1..de2af6f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ pub async fn main() -> anyhow::Result<()> { tx_batch_interval_ms, clean_interval_ms, fanout_size, - postgres_config, + enable_postgres, } = Args::parse(); let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms); @@ -32,7 +32,7 @@ pub async fn main() -> anyhow::Result<()> { tx_batch_size, tx_batch_interval_ms, clean_interval_ms, - postgres_config, + enable_postgres, ) .await?; diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index e98fe94a..cb70d4e6 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -206,6 +206,27 @@ impl BlockListener { err: err.clone(), confirmation_status: Some(comfirmation_status.clone()), }); + + // + // Write to postgres + // + if let Some(postgres) = &postgres { + let cu_consumed = match compute_units_consumed { + OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64), + _ => None, + }; + + postgres + .send(PostgresMsg::PostgresUpdateTx( + PostgresUpdateTx { + processed_slot: slot as i64, + cu_consumed, + cu_requested: None, //TODO: cu requested + }, + sig.clone(), + )) + .unwrap(); + } }; // subscribers @@ -219,25 +240,6 @@ impl BlockListener { value: serde_json::json!({ "err": err }), })?; } - - let cu_consumed = match compute_units_consumed { - OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64), - _ => None, - }; - - // write to postgres - if let Some(postgres) = &postgres { - postgres - .send(PostgresMsg::PostgresUpdateTx( - PostgresUpdateTx { - processed_slot: slot as i64, - cu_consumed, - cu_requested: None, //TODO: cu requested - }, - sig, - )) - .unwrap(); - } } } diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 6fcdc964..1810aed8 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -5,7 +5,6 @@ use log::{info, warn}; use postgres_native_tls::MakeTlsConnector; use tokio::{ - fs, sync::{ mpsc::{UnboundedReceiver, UnboundedSender}, RwLock, @@ -16,6 +15,8 @@ use tokio_postgres::Client; use native_tls::{Certificate, Identity, TlsConnector}; +use crate::encoding::BinaryEncoding; + pub struct Postgres { client: Arc>, } @@ -67,20 +68,25 @@ impl Postgres { /// (connection join handle, Self) /// /// returned join handle is required to be polled - pub async fn new( - porstgres_config: String, - ) -> anyhow::Result<(JoinHandle>, Self)> { - let ca_pem = fs::read("ca.pem").await?; - // let ca_pem = BinaryEncoding::Base64 - // .decode(ca_pem_b64) - // .context("ca pem decode")?; + pub async fn new() -> anyhow::Result<(JoinHandle>, Self)> { + let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?; + let client_pks_b64 = + std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?; + let client_pks_password = + std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?; + let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?; - let client_pks = fs::read("client.pks").await?; - // let client_pks = BinaryEncoding::Base64.decode(client_pks_b64).context("client pks decode")?; + let ca_pem = BinaryEncoding::Base64 + .decode(ca_pem_b64) + .context("ca pem decode")?; + + let client_pks = BinaryEncoding::Base64 + .decode(client_pks_b64) + .context("client pks decode")?; let connector = TlsConnector::builder() .add_root_certificate(Certificate::from_pem(&ca_pem)?) - .identity(Identity::from_pkcs12(&client_pks, "p").context("Identity")?) + .identity(Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?) .danger_accept_invalid_hostnames(true) .danger_accept_invalid_certs(true) .build()?; @@ -88,8 +94,7 @@ impl Postgres { info!("making tls config"); let connector = MakeTlsConnector::new(connector); - let (client, connection) = - tokio_postgres::connect(&porstgres_config, connector.clone()).await?; + let (client, connection) = tokio_postgres::connect(&pg_config, connector.clone()).await?; let client = Arc::new(RwLock::new(client)); let connection = { @@ -104,7 +109,7 @@ impl Postgres { warn!("Connection to postgres broke {err:?}") }; - let f = tokio_postgres::connect(&porstgres_config, connector.clone()).await?; + let f = tokio_postgres::connect(&pg_config, connector.clone()).await?; *client.write().await = f.0; connection = f.1; @@ -166,6 +171,8 @@ impl Postgres { } pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> { + warn!("updating {signature} with {tx:?}"); + let PostgresUpdateTx { processed_slot, cu_consumed, @@ -177,7 +184,7 @@ impl Postgres { .await .execute( r#" - UPDATE lite_rpc.Txs + UPDATE lite_rpc.txs SET processed_slot = $1, cu_consumed = $2, cu_requested = $3 WHERE signature = $4 "#, From 0fb7fa04b6cc103d5282d80fd1706b136b603c69 Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 19:01:48 +0530 Subject: [PATCH 2/9] rm log --- src/workers/postgres.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 1810aed8..2e0721bb 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -171,8 +171,6 @@ impl Postgres { } pub async fn update_tx(&self, tx: PostgresUpdateTx, signature: &str) -> anyhow::Result<()> { - warn!("updating {signature} with {tx:?}"); - let PostgresUpdateTx { processed_slot, cu_consumed, From c4ed60392f6277cf94bcd05ffc2f69d25ad7d820 Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 19:02:23 +0530 Subject: [PATCH 3/9] ignore *post pks and pem --- .gitignore | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index ef061073..8eaa38f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ target node_modules bench/metrics.csv -*.pem -*.pks +*.pem* +*.pks* .env From 2c392f52623b6abf1cb568b35fe9f3c49ca846d9 Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 19:20:17 +0530 Subject: [PATCH 4/9] use rust 1.67 and enable postgres --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3c423e1b..a4a0bd18 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # syntax = docker/dockerfile:1.2 -FROM rust:1.65.0 as base +FROM rust:1.67.0 as base RUN cargo install cargo-chef RUN rustup component add rustfmt RUN apt-get update && apt-get install -y clang cmake ssh @@ -20,4 +20,4 @@ FROM debian:bullseye-slim as run RUN apt-get update && apt-get -y install ca-certificates libc6 COPY --from=build /app/target/release/lite-rpc /usr/local/bin/ -CMD lite-rpc --rpc-addr "$RPC_URL" --ws-addr "$WS_URL" \ No newline at end of file +CMD lite-rpc --rpc-addr "$RPC_URL" --ws-addr "$WS_URL" -p From a64f70ed8ba6aa45626876047c8714cfdffde95a Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 22:32:50 +0530 Subject: [PATCH 5/9] bail out when services quit --- src/main.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index de2af6f5..c69d6674 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use anyhow::Context; +use anyhow::bail; use clap::Parser; use lite_rpc::{bridge::LiteBridge, cli::Args}; @@ -41,11 +41,11 @@ pub async fn main() -> anyhow::Result<()> { let ctrl_c_signal = tokio::signal::ctrl_c(); tokio::select! { - services = services => { - services.context("Some services exited unexpectedly")?; + _ = services => { + bail!("Serives quit unexpectedly"); + } + _ = ctrl_c_signal => { + Ok(()) } - _ = ctrl_c_signal => {} } - - Ok(()) } From e885751e278d0514237f128fd9e8540b35c4309d Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 22:38:10 +0530 Subject: [PATCH 6/9] log ctrl c --- src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main.rs b/src/main.rs index c69d6674..3f115e18 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use std::time::Duration; use anyhow::bail; use clap::Parser; use lite_rpc::{bridge::LiteBridge, cli::Args}; +use log::info; #[tokio::main] pub async fn main() -> anyhow::Result<()> { @@ -45,6 +46,7 @@ pub async fn main() -> anyhow::Result<()> { bail!("Serives quit unexpectedly"); } _ = ctrl_c_signal => { + info!("Received ctrl+c signal"); Ok(()) } } From 297091c0428913bd01d07a9f7d3883b0eeded8dc Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 22:38:28 +0530 Subject: [PATCH 7/9] fixed spell mistake --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 3f115e18..680e50f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,7 +43,7 @@ pub async fn main() -> anyhow::Result<()> { tokio::select! { _ = services => { - bail!("Serives quit unexpectedly"); + bail!("Services quit unexpectedly"); } _ = ctrl_c_signal => { info!("Received ctrl+c signal"); From 303543c3fbe47185a0268ec5c4f981f2f7c556fe Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Tue, 31 Jan 2023 23:54:50 +0530 Subject: [PATCH 8/9] tx_sender batching fix --- src/workers/tx_sender.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index cc5dd372..3ddd851d 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -118,12 +118,10 @@ impl TxSender { ); loop { - let prev_inst = tokio::time::Instant::now(); - let mut sigs_and_slots = Vec::with_capacity(tx_batch_size); let mut txs = Vec::with_capacity(tx_batch_size); - while (prev_inst.elapsed() < tx_send_interval) || txs.len() == tx_batch_size { + while txs.len() <= tx_batch_size { match recv.try_recv() { Ok((sig, tx, slot)) => { sigs_and_slots.push((sig, slot)); @@ -132,12 +130,16 @@ impl TxSender { Err(TryRecvError::Disconnected) => { bail!("Channel Disconnected"); } - _ => {} + _ => { + break; + } } } self.forward_txs(sigs_and_slots, txs, postgres_send.clone()) .await; + + tokio::time::sleep(tx_send_interval).await; } }) } From 107e06a999d4bc3f09675a2abcc41c8fba40bb0f Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Wed, 1 Feb 2023 00:14:24 +0530 Subject: [PATCH 9/9] rm procinfo --- Cargo.lock | 75 ++++++++-------------------------- Cargo.toml | 1 - src/bridge.rs | 3 +- src/workers/metrics_capture.rs | 19 ++------- 4 files changed, 21 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 503df6d0..1f5d21a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,7 +134,7 @@ dependencies = [ "asn1-rs-derive", "asn1-rs-impl", "displaydoc", - "nom 7.1.3", + "nom", "num-traits", "rusticata-macros", "thiserror", @@ -926,7 +926,7 @@ checksum = "42d4bc9b0db0a0df9ae64634ac5bdefb7afcb534e182275ca0beadbe486701c1" dependencies = [ "asn1-rs", "displaydoc", - "nom 7.1.3", + "nom", "num-bigint 0.4.3", "num-traits", "rusticata-macros", @@ -2110,7 +2110,6 @@ dependencies = [ "log", "native-tls", "postgres-native-tls", - "procinfo", "serde", "serde_json", "serde_prometheus", @@ -2260,12 +2259,6 @@ dependencies = [ "memoffset 0.6.5", ] -[[package]] -name = "nom" -version = "2.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf51a729ecf40266a2368ad335a5fdde43471f545a967109cd62146ecf8b66ff" - [[package]] name = "nom" version = "7.1.3" @@ -2768,18 +2761,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "procinfo" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ab1427f3d2635891f842892dda177883dca0639e05fe66796a62c9d2f23b49c" -dependencies = [ - "byteorder", - "libc", - "nom 2.2.1", - "rustc_version 0.2.3", -] - [[package]] name = "qstring" version = "0.7.2" @@ -3100,22 +3081,13 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" -[[package]] -name = "rustc_version" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" -dependencies = [ - "semver 0.9.0", -] - [[package]] name = "rustc_version" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver 1.0.16", + "semver", ] [[package]] @@ -3124,7 +3096,7 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632" dependencies = [ - "nom 7.1.3", + "nom", ] [[package]] @@ -3249,27 +3221,12 @@ dependencies = [ "libc", ] -[[package]] -name = "semver" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" -dependencies = [ - "semver-parser", -] - [[package]] name = "semver" version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a" -[[package]] -name = "semver-parser" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" - [[package]] name = "send_wrapper" version = "0.4.0" @@ -3569,7 +3526,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "rustc_version 0.4.0", + "rustc_version", "serde", "solana-frozen-abi", "solana-frozen-abi-macro", @@ -3643,7 +3600,7 @@ dependencies = [ "rayon", "reqwest", "rustls", - "semver 1.0.16", + "semver", "serde", "serde_derive", "serde_json", @@ -3728,7 +3685,7 @@ dependencies = [ "memmap2", "once_cell", "rand_core 0.6.4", - "rustc_version 0.4.0", + "rustc_version", "serde", "serde_bytes", "serde_derive", @@ -3747,7 +3704,7 @@ checksum = "be23cc7a382f54dfe1348edb94610e5cc146b8eb21563cdd04062a403c75ba62" dependencies = [ "proc-macro2 1.0.50", "quote 1.0.23", - "rustc_version 0.4.0", + "rustc_version", "syn 1.0.107", ] @@ -3867,7 +3824,7 @@ dependencies = [ "parking_lot", "rand 0.7.3", "rand_chacha 0.2.2", - "rustc_version 0.4.0", + "rustc_version", "rustversion", "serde", "serde_bytes", @@ -3901,7 +3858,7 @@ dependencies = [ "num-derive", "num-traits", "rand 0.7.3", - "rustc_version 0.4.0", + "rustc_version", "serde", "solana-frozen-abi", "solana-frozen-abi-macro", @@ -3934,7 +3891,7 @@ dependencies = [ "num-traits", "parking_lot", "qstring", - "semver 1.0.16", + "semver", "solana-sdk", "thiserror", "uriparse", @@ -3973,7 +3930,7 @@ dependencies = [ "qstring", "rand 0.7.3", "rand_chacha 0.2.2", - "rustc_version 0.4.0", + "rustc_version", "rustversion", "serde", "serde_bytes", @@ -4069,8 +4026,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a46c9ecb15ccd5388511cec0c5bfb989589425f8286ce432ff64b55dc7bf61e" dependencies = [ "log", - "rustc_version 0.4.0", - "semver 1.0.16", + "rustc_version", + "semver", "serde", "serde_derive", "solana-frozen-abi", @@ -4088,7 +4045,7 @@ dependencies = [ "log", "num-derive", "num-traits", - "rustc_version 0.4.0", + "rustc_version", "serde", "serde_derive", "solana-frozen-abi", @@ -5008,7 +4965,7 @@ dependencies = [ "data-encoding", "der-parser", "lazy_static", - "nom 7.1.3", + "nom", "oid-registry", "rusticata-macros", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 832004c0..2a2b5696 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ dashmap = "5.4.0" const_env = "0.1.2" jsonrpsee = { version = "0.16.2", features = ["macros", "full"] } tracing-subscriber = "0.3.16" -procinfo = "0.4.2" tokio-postgres = "0.7.7" native-tls = "0.2.11" postgres-native-tls = "0.5.0" diff --git a/src/bridge.rs b/src/bridge.rs index 05faf062..c0531929 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -124,6 +124,7 @@ impl LiteBridge { let metrics_capture = MetricsCapture::new(self.tx_sender.clone()); let prometheus_sync = PrometheusSync::new(metrics_capture.clone()).sync(); + let metrics_capture = metrics_capture.capture(); let finalized_block_listener = self .finalized_block_listener @@ -176,7 +177,7 @@ impl LiteBridge { tx_sender, finalized_block_listener, confirmed_block_listener, - metrics_capture.capture(postgres_send), + metrics_capture, prometheus_sync, cleaner, ]; diff --git a/src/workers/metrics_capture.rs b/src/workers/metrics_capture.rs index fdb2b965..0c66059a 100644 --- a/src/workers/metrics_capture.rs +++ b/src/workers/metrics_capture.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use log::{info, warn}; +use log::info; use solana_transaction_status::TransactionConfirmationStatus; use tokio::{sync::RwLock, task::JoinHandle}; -use super::{PostgresMpscSend, TxSender}; +use super::TxSender; use serde::{Deserialize, Serialize}; /// Background worker which captures metrics @@ -22,7 +22,6 @@ pub struct Metrics { pub txs_ps: usize, pub txs_confirmed_ps: usize, pub txs_finalized_ps: usize, - pub mem_used: Option, } impl MetricsCapture { @@ -37,7 +36,7 @@ impl MetricsCapture { self.metrics.read().await.to_owned() } - pub fn capture(self, postgres: Option) -> JoinHandle> { + pub fn capture(self) -> JoinHandle> { let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1)); tokio::spawn(async move { @@ -76,18 +75,6 @@ impl MetricsCapture { metrics.txs_sent = txs_sent; metrics.txs_confirmed = txs_confirmed; metrics.txs_finalized = txs_finalized; - - metrics.mem_used = match procinfo::pid::statm_self() { - Ok(statm) => Some(statm.size), - Err(err) => { - warn!("Error capturing memory consumption {err}"); - None - } - }; - - if let Some(_postgres) = &postgres { - // postgres.send_metrics(metrics.clone()).await?; - } } }) }