Merge branch 'lite_rpc_postgres' into postgres

This commit is contained in:
aniketfuryrocks 2023-01-22 15:00:36 +05:30
commit 36f2a7e9fa
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
7 changed files with 292 additions and 3 deletions

190
Cargo.lock generated
View File

@ -1179,6 +1179,12 @@ version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "1.8.0"
@ -1210,6 +1216,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@ -2074,6 +2095,8 @@ dependencies = [
"futures",
"jsonrpsee",
"log",
"openssl",
"postgres-openssl",
"procinfo",
"serde",
"serde_json",
@ -2083,6 +2106,7 @@ dependencies = [
"solana-version",
"thiserror",
"tokio",
"tokio-postgres",
"tracing-subscriber",
]
@ -2105,6 +2129,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
"digest 0.10.6",
]
[[package]]
name = "memchr"
version = "2.5.0"
@ -2378,12 +2411,51 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b102428fd03bc5edf97f62620f7298614c45cedf287c271e7ed450bbaf83f2e1"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c"
dependencies = [
"proc-macro2 1.0.49",
"quote 1.0.23",
"syn 1.0.107",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23bbbf7854cd45b83958ebe919f0e8e516793727652e27fda10a8384cfc790b7"
dependencies = [
"autocfg",
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "os_str_bytes"
version = "6.4.1"
@ -2467,6 +2539,24 @@ dependencies = [
"num",
]
[[package]]
name = "phf"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.0.12"
@ -2528,6 +2618,48 @@ dependencies = [
"universal-hash",
]
[[package]]
name = "postgres-openssl"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1de0ea6504e07ca78355a6fb88ad0f36cafe9e696cbc6717f16a207f3a60be72"
dependencies = [
"futures",
"openssl",
"tokio",
"tokio-openssl",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "878c6cbf956e03af9aa8204b407b9cbf47c072164800aa918c516cd4b056c50c"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac 0.12.1",
"md-5",
"memchr",
"rand 0.8.5",
"sha2 0.10.6",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -3268,6 +3400,12 @@ version = "1.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"
[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sized-chunks"
version = "0.6.5"
@ -3990,6 +4128,16 @@ dependencies = [
"thiserror",
]
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "strsim"
version = "0.8.0"
@ -4212,6 +4360,42 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "tokio-openssl"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08f9ffb7809f1b20c1b398d92acf4cc719874b3b2b2d9ea2f09b4a80350878a"
dependencies = [
"futures-util",
"openssl",
"openssl-sys",
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29a12c1b3e0704ae7dfc25562629798b29c72e6b1d0a681b6f29ab4ae5e7f7bf"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"socket2",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
@ -4478,6 +4662,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vec_map"
version = "0.8.2"

View File

@ -34,3 +34,6 @@ const_env = "0.1.2"
jsonrpsee = { version = "0.16.2", features = ["macros", "full"] }
tracing-subscriber = "0.3.16"
procinfo = "0.4.2"
tokio-postgres = "0.7.7"
openssl = "0.10.45"
postgres-openssl = "0.5.0"

View File

@ -3,7 +3,7 @@ use crate::{
encoding::BinaryEncoding,
rpc::LiteRpcServer,
tpu_manager::TpuManager,
workers::{BlockListener, Cleaner, Metrics, MetricsCapture, TxSender},
workers::{BlockListener, Cleaner, Metrics, MetricsCapture, MetricsPostgres, TxSender},
};
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
@ -94,7 +94,8 @@ impl LiteBridge {
tx_batch_size: usize,
tx_send_interval: Duration,
clean_interval: Duration,
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 7]> {
postgres_config: &str,
) -> anyhow::Result<[JoinHandle<anyhow::Result<()>>; 8]> {
let finalized_block_listenser = self.finalized_block_listenser.clone().listen();
let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen();
@ -128,7 +129,11 @@ impl LiteBridge {
bail!("HTTP server stopped");
});
let metrics_capture = self.metrics_capture.capture();
let metrics_capture = self.metrics_capture.clone().capture();
let metrics_postgres = MetricsPostgres::new(self.metrics_capture, postgres_config)
.await?
.sync();
let cleaner = Cleaner::new(
self.tx_sender.clone(),
[
@ -145,6 +150,7 @@ impl LiteBridge {
finalized_block_listenser,
confirmed_block_listenser,
metrics_capture,
metrics_postgres,
cleaner,
])
}

View File

@ -27,4 +27,7 @@ pub struct Args {
/// interval between clean
#[arg(short = 'c', long, default_value_t = DEFAULT_CLEAN_INTERVAL_MS)]
pub clean_interval_ms: u64,
/// addr to postgres
#[arg(short = 'p', long, default_value_t = String::from("host=localhost user=postgres"))]
pub postgres_config: String,
}

View File

@ -17,6 +17,7 @@ pub async fn main() -> anyhow::Result<()> {
tx_batch_interval_ms,
clean_interval_ms,
fanout_size,
postgres_config,
} = Args::parse();
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
@ -31,6 +32,7 @@ pub async fn main() -> anyhow::Result<()> {
tx_batch_size,
tx_batch_interval_ms,
clean_interval_ms,
&postgres_config,
)
.await?;

View File

@ -0,0 +1,83 @@
use anyhow::bail;
use postgres_openssl::MakeTlsConnector;
use tokio::task::JoinHandle;
use tokio_postgres::Client;
use openssl::ssl::{SslConnector, SslMethod};
use super::{Metrics, MetricsCapture};
pub struct MetricsPostgres {
connection: JoinHandle<Result<(), tokio_postgres::Error>>,
client: Client,
metrics_capture: MetricsCapture,
}
impl MetricsPostgres {
pub async fn new(
metrics_capture: MetricsCapture,
porstgres_config: &str,
) -> anyhow::Result<Self> {
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_ca_file("ca.pem")?;
// builder.set_private_key("client-key.pem")?;
let connector = MakeTlsConnector::new(builder.build());
let (client, connection) = tokio_postgres::connect(porstgres_config, connector).await?;
Ok(Self {
connection: tokio::spawn(connection),
client,
metrics_capture,
})
}
pub fn sync(self) -> JoinHandle<anyhow::Result<()>> {
let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1));
let Self {
connection,
client,
metrics_capture,
} = self;
let metrics_send: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
loop {
let Metrics {
txs_sent,
txs_confirmed,
txs_finalized,
txs_ps,
txs_confirmed_ps,
txs_finalized_ps,
mem_used,
} = metrics_capture.get_metrics().await;
client.execute(
r#"INSERT INTO Metrics
(txs_sent, txs_confirmed, txs_finalized, transactions_per_second,  confirmations_per_second,  finalized_per_second,  memory_used)
VALUES
($1, $2, $3, $4, $5, $6)
"#,
&[&(txs_sent as u32), &(txs_confirmed as u32), &(txs_finalized as u32), &(txs_ps as u32), &(txs_confirmed_ps as u32), &(txs_finalized_ps as u32), &(mem_used.unwrap_or_default() as u32)],
)
.await?;
one_second.tick().await;
}
});
#[allow(unreachable_code)]
tokio::spawn(async move {
tokio::select! {
r = metrics_send => {
bail!("Postgres metrics send thread stopped {r:?}")
}
r = connection => {
bail!("Postgres connection poll stopped {r:?}")
}
}
Ok(())
})
}
}

View File

@ -1,9 +1,11 @@
mod block_listenser;
mod cleaner;
mod metrics_capture;
mod metrics_postgres;
mod tx_sender;
pub use block_listenser::*;
pub use cleaner::*;
pub use metrics_capture::*;
pub use metrics_postgres::*;
pub use tx_sender::*;