From 4fbef582e367f3a86e8e69f5b0ea82101eb3224e Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Sat, 28 Jan 2023 02:05:29 +0530 Subject: [PATCH] WIP: --- bench/metrics.csv | 3 - migrations/create.sql | 22 +++++++ migrations/rm.sql | 3 + src/bridge.rs | 86 ++++++++++++------------ src/lib.rs | 1 + src/postgres.rs | 112 ++++++++++++++++++++++++++++++++ src/rpc.rs | 8 +-- src/workers/block_listenser.rs | 39 +++++++++-- src/workers/metrics_capture.rs | 13 ++-- src/workers/metrics_postgres.rs | 94 --------------------------- src/workers/mod.rs | 2 - tests/workers.rs | 2 +- 12 files changed, 226 insertions(+), 159 deletions(-) delete mode 100644 bench/metrics.csv create mode 100644 migrations/create.sql create mode 100644 migrations/rm.sql create mode 100644 src/postgres.rs delete mode 100644 src/workers/metrics_postgres.rs diff --git a/bench/metrics.csv b/bench/metrics.csv deleted file mode 100644 index f346f1f9..00000000 --- a/bench/metrics.csv +++ /dev/null @@ -1,3 +0,0 @@ -total_time_elapsed_sec,txs_sent,time_to_send_txs,txs_confirmed,txs_un_confirmed,tps -8.96211512,20000,6.054988221,14321,5679,1597.9486770975554 -8.96211512,20000,6.054988221,14321,5679,1597.9486770975554 diff --git a/migrations/create.sql b/migrations/create.sql new file mode 100644 index 00000000..f355dfd0 --- /dev/null +++ b/migrations/create.sql @@ -0,0 +1,22 @@ +CREATE TABLE lite_rpc.Txs ( + id SERIAL NOT NULL PRIMARY KEY, + signature BINARY(64) NOT NULL, + recent_slot BIGINT NOT NULL, + forwarded_slot BIGINT NOT NULL, + processed_slot BIGINT, + cu_consumed BIGINT, + cu_requested BIGINT, + quic_response CHAR +); + + +CREATE TABLE lite_rpc.Blocks ( + slot BIGINT NOT NULL PRIMARY KEY, + leader_id BIGINT NOT NULL, + parent_slot BIGINT NOT NULL +); + +CREATE TABLE lite_rpc.AccountAddrs ( + id SERIAL PRIMARY KEY, + addr VARCHAR(45) NOT NULL +); diff --git a/migrations/rm.sql b/migrations/rm.sql new file mode 100644 index 00000000..a70b34a8 --- /dev/null +++ b/migrations/rm.sql @@ -0,0 +1,3 @@ +DROP TABLE lite_rpc.Txs; +DROP TABLE lite_rpc.Blocks; +DROP TABLE lite_rpc.AccountAddrs; diff --git a/src/bridge.rs b/src/bridge.rs index 456b5834..d2f3942e 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -1,9 +1,10 @@ use crate::{ configs::{IsBlockHashValidConfig, SendTransactionConfig}, encoding::BinaryEncoding, + postgres::Postgres, rpc::LiteRpcServer, tpu_manager::TpuManager, - workers::{BlockListener, Cleaner, Metrics, MetricsCapture, MetricsPostgres, TxSender}, + workers::{BlockListener, Cleaner, MetricsCapture, TxSender}, }; use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration}; @@ -29,14 +30,12 @@ use solana_transaction_status::TransactionStatus; use tokio::{net::ToSocketAddrs, task::JoinHandle}; /// A bridge between clients and tpu -#[derive(Clone)] pub struct LiteBridge { pub rpc_client: Arc, pub tpu_manager: Arc, pub tx_sender: TxSender, pub finalized_block_listenser: BlockListener, pub confirmed_block_listenser: BlockListener, - pub metrics_capture: MetricsCapture, } impl LiteBridge { @@ -66,15 +65,12 @@ impl LiteBridge { ) .await?; - let metrics_capture = MetricsCapture::new(tx_sender.clone()); - Ok(Self { rpc_client, tpu_manager, tx_sender, finalized_block_listenser, confirmed_block_listenser, - metrics_capture, }) } @@ -96,43 +92,23 @@ impl LiteBridge { clean_interval: Duration, postgres_config: &str, ) -> anyhow::Result<[JoinHandle>; 8]> { - let finalized_block_listenser = self.finalized_block_listenser.clone().listen(); - - let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen(); - let tx_sender = self .tx_sender .clone() .execute(tx_batch_size, tx_send_interval); - let ws_server_handle = ServerBuilder::default() - .ws_only() - .build(ws_addr.clone()) - .await? - .start(self.clone().into_rpc())?; + let metrics_capture = MetricsCapture::new(self.tx_sender.clone()); + let (postgres_connection, postgres) = Postgres::new(postgres_config).await?; - let http_server_handle = ServerBuilder::default() - .http_only() - .build(http_addr.clone()) - .await? - .start(self.clone().into_rpc())?; + let finalized_block_listenser = self + .finalized_block_listenser + .clone() + .listen(Some(postgres.clone())); - let ws_server = tokio::spawn(async move { - info!("Websocket Server started at {ws_addr:?}"); - ws_server_handle.stopped().await; - bail!("Websocket server stopped"); - }); - - let http_server = tokio::spawn(async move { - info!("HTTP Server started at {http_addr:?}"); - http_server_handle.stopped().await; - bail!("HTTP server stopped"); - }); - - let metrics_capture = self.metrics_capture.clone().capture(); - let metrics_postgres = MetricsPostgres::new(self.metrics_capture, postgres_config) - .await? - .sync(); + let confirmed_block_listenser = self + .confirmed_block_listenser + .clone() + .listen(Some(postgres.clone())); let cleaner = Cleaner::new( self.tx_sender.clone(), @@ -143,14 +119,44 @@ impl LiteBridge { ) .start(clean_interval); + let rpc = self.into_rpc(); + + let (ws_server, http_server) = { + let ws_server_handle = ServerBuilder::default() + .ws_only() + .build(ws_addr.clone()) + .await? + .start(rpc.clone())?; + + let http_server_handle = ServerBuilder::default() + .http_only() + .build(http_addr.clone()) + .await? + .start(rpc)?; + + let ws_server = tokio::spawn(async move { + info!("Websocket Server started at {ws_addr:?}"); + ws_server_handle.stopped().await; + bail!("Websocket server stopped"); + }); + + let http_server = tokio::spawn(async move { + info!("HTTP Server started at {http_addr:?}"); + http_server_handle.stopped().await; + bail!("HTTP server stopped"); + }); + + (ws_server, http_server) + }; + Ok([ ws_server, http_server, tx_sender, finalized_block_listenser, confirmed_block_listenser, - metrics_capture, - metrics_postgres, + postgres_connection, + metrics_capture.capture(Some(postgres)), cleaner, ]) } @@ -317,10 +323,6 @@ impl LiteRpcServer for LiteBridge { Ok(airdrop_sig) } - async fn get_metrics(&self) -> crate::rpc::Result { - return Ok(self.metrics_capture.get_metrics().await); - } - fn signature_subscribe( &self, mut sink: SubscriptionSink, diff --git a/src/lib.rs b/src/lib.rs index 4bb5e821..71cbb6ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod cli; pub mod configs; pub mod encoding; pub mod errors; +pub mod postgres; pub mod rpc; pub mod tpu_manager; pub mod workers; diff --git a/src/postgres.rs b/src/postgres.rs new file mode 100644 index 00000000..e17045bc --- /dev/null +++ b/src/postgres.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; + +use anyhow::Context; +use log::info; +use postgres_native_tls::MakeTlsConnector; +use tokio::fs; +use tokio::task::JoinHandle; +use tokio_postgres::Client; + +use native_tls::{Certificate, Identity, TlsConnector}; + +#[derive(Clone)] +pub struct Postgres { + client: Arc, +} + +pub struct PostgresTx<'a> { + pub signature: &'a [u8], + pub recent_slot: i64, + pub forwarded_slot: i64, + pub processed_slot: Option, + pub cu_consumed: Option, + pub cu_requested: Option, + pub quic_response: u32, +} + +pub struct PostgresBlock { + pub slot: i64, + pub leader_id: i64, + pub parent_slot: i64, +} + +pub struct PostgreAccountAddr { + pub id: u32, + pub addr: String, +} + +impl Postgres { + /// # Return + /// (connection join handle, Self) + /// + /// returned join handle is required to be polled + pub async fn new( + porstgres_config: &str, + ) -> anyhow::Result<(JoinHandle>, Self)> { + let connector = TlsConnector::builder() + .add_root_certificate(Certificate::from_pem(&fs::read("ca.pem").await?)?) + .identity( + Identity::from_pkcs12(&fs::read("client.pks").await?, "p").context("Identity")?, + ) + .danger_accept_invalid_hostnames(true) + .danger_accept_invalid_certs(true) + .build()?; + + info!("making tls config"); + + let connector = MakeTlsConnector::new(connector); + let (client, connection) = tokio_postgres::connect(porstgres_config, connector).await?; + let client = Arc::new(client); + + Ok(( + tokio::spawn(async move { Ok(connection.await?) }), + Self { client }, + )) + } + + pub async fn send_block(&self, block: PostgresBlock) -> anyhow::Result<()> { + let PostgresBlock { + slot, + leader_id, + parent_slot, + } = block; + + self.client + .execute( + r#" + INSERT INTO lite_rpc.Blocks + (slot, leader_id, parent_slot) + VALUES + ($1, $2, $3) + "#, + &[&slot, &leader_id, &parent_slot], + ) + .await?; + + Ok(()) + } + + pub async fn send_tx<'a>(&self, tx: PostgresTx<'a>) -> anyhow::Result<()> { + let PostgresTx { + signature, + recent_slot, + forwarded_slot, + processed_slot, + cu_consumed, + cu_requested, + quic_response, + } = tx; + + self.client.execute( + r#" + INSERT INTO lite_rpc.Txs + (signature, recent_slot, forwarded_slot, processed_slot, cu_consumed, cu_requested, quic_response) + VALUES + ($1, $2, $3, $4, $5, $6) + "#, + &[&signature, &recent_slot, &forwarded_slot, &processed_slot, &cu_consumed, &cu_requested, &quic_response], + ).await?; + + Ok(()) + } +} diff --git a/src/rpc.rs b/src/rpc.rs index 3265d4a8..36430d16 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -6,10 +6,7 @@ use solana_client::rpc_response::{Response as RpcResponse, RpcBlockhash, RpcVers use solana_sdk::commitment_config::CommitmentConfig; use solana_transaction_status::TransactionStatus; -use crate::{ - configs::{IsBlockHashValidConfig, SendTransactionConfig}, - workers::Metrics, -}; +use crate::configs::{IsBlockHashValidConfig, SendTransactionConfig}; pub type Result = std::result::Result; @@ -53,9 +50,6 @@ pub trait LiteRpc { config: Option, ) -> Result; - #[method(name = "getMetrics")] - async fn get_metrics(&self) -> Result; - #[subscription(name = "signatureSubscribe" => "signatureNotification", unsubscribe="signatureUnsubscribe", item=RpcResponse)] fn signature_subscribe(&self, signature: String, commitment_config: CommitmentConfig); } diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index fc299f3f..8471f5d9 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -14,11 +14,16 @@ use solana_client::{ use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; use solana_transaction_status::{ - TransactionConfirmationStatus, TransactionStatus, UiTransactionStatusMeta, + TransactionConfirmationStatus, TransactionStatus, UiConfirmedBlock, UiTransactionStatusMeta, +}; +use tokio::{ + sync::{mpsc::Sender, RwLock}, + task::JoinHandle, }; -use tokio::{sync::RwLock, task::JoinHandle}; -use super::TxSender; +use crate::postgres::{Postgres, PostgresBlock, PostgresTx}; + +use super::{TxProps, TxSender}; /// Background worker which listen's to new blocks /// and keeps a track of confirmed txs @@ -37,6 +42,11 @@ struct BlockInformation { pub block_height: u64, } +pub struct BlockListnerNotificatons { + pub block: Sender, + pub tx: Sender, +} + impl BlockListener { pub async fn new( pub_sub_client: Arc, @@ -89,7 +99,7 @@ impl BlockListener { self.signature_subscribers.remove(&signature); } - pub fn listen(self) -> JoinHandle> { + pub fn listen(self, postgres: Option) -> JoinHandle> { tokio::spawn(async move { info!("Subscribing to blocks"); @@ -142,6 +152,14 @@ impl BlockListener { block_height, }; + if let Some(postgres) = &postgres { + postgres.send_block(PostgresBlock { + slot: slot as i64, + leader_id: 0, //FIX: + parent_slot: 0, //FIX: + }).await?; + } + for tx in transactions { let Some(UiTransactionStatusMeta { err, status, .. }) = tx.meta else { info!("tx with no meta"); @@ -156,6 +174,18 @@ impl BlockListener { let sig = tx.get_signature().to_string(); if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) { + if let Some(postgres) = &postgres { + postgres.send_tx(PostgresTx { + signature: tx.get_signature().as_ref(), + recent_slot: slot as i64, + forwarded_slot: 0, + processed_slot: None, + cu_consumed: None, + cu_requested: None, + quic_response: 0, + }).await?; + } + tx_status.value_mut().status = Some(TransactionStatus { slot, confirmations: None, //TODO: talk about this @@ -167,7 +197,6 @@ impl BlockListener { // subscribers if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) { - // info!("notification {}", sig); // none if transaction succeeded sink.send(&RpcResponse { context: RpcResponseContext { diff --git a/src/workers/metrics_capture.rs b/src/workers/metrics_capture.rs index 342ca8c4..403da3f5 100644 --- a/src/workers/metrics_capture.rs +++ b/src/workers/metrics_capture.rs @@ -1,17 +1,16 @@ -use std::sync::Arc; - use log::{info, warn}; use solana_transaction_status::TransactionConfirmationStatus; use tokio::{sync::RwLock, task::JoinHandle}; +use crate::postgres::Postgres; + use super::TxSender; use serde::{Deserialize, Serialize}; /// Background worker which captures metrics -#[derive(Clone)] pub struct MetricsCapture { tx_sender: TxSender, - metrics: Arc>, + metrics: RwLock, } #[derive(Clone, Default, Debug, Serialize, Deserialize)] @@ -37,7 +36,7 @@ impl MetricsCapture { self.metrics.read().await.to_owned() } - pub fn capture(self) -> JoinHandle> { + pub fn capture(self, postgres: Option) -> JoinHandle> { let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1)); tokio::spawn(async move { @@ -84,6 +83,10 @@ impl MetricsCapture { None } }; + + if let Some(_postgres) = &postgres { + // postgres.send_metrics(metrics.clone()).await?; + } } }) } diff --git a/src/workers/metrics_postgres.rs b/src/workers/metrics_postgres.rs deleted file mode 100644 index 8b3cc12f..00000000 --- a/src/workers/metrics_postgres.rs +++ /dev/null @@ -1,94 +0,0 @@ -use anyhow::{bail, Context}; -use log::info; -use postgres_native_tls::MakeTlsConnector; -use tokio::fs; -use tokio::task::JoinHandle; -use tokio_postgres::Client; - -use native_tls::{Certificate, Identity, TlsConnector}; - -use super::{Metrics, MetricsCapture}; - -pub struct MetricsPostgres { - connection: JoinHandle>, - client: Client, - metrics_capture: MetricsCapture, -} - -impl MetricsPostgres { - pub async fn new( - metrics_capture: MetricsCapture, - porstgres_config: &str, - ) -> anyhow::Result { - let connector = TlsConnector::builder() - .add_root_certificate(Certificate::from_pem(&fs::read("ca.pem").await?)?) - .identity( - Identity::from_pkcs12(&fs::read("client.pks").await?, "p").context("Identity")?, - ) - .danger_accept_invalid_hostnames(true) - .danger_accept_invalid_certs(true) - .build()?; - - info!("making tls config"); - - let connector = MakeTlsConnector::new(connector); - let (client, connection) = tokio_postgres::connect(porstgres_config, connector).await?; - - Ok(Self { - connection: tokio::spawn(connection), - client, - metrics_capture, - }) - } - - pub fn sync(self) -> JoinHandle> { - let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1)); - let Self { - connection, - client, - metrics_capture, - } = self; - - let metrics_send: JoinHandle> = tokio::spawn(async move { - info!("Sending Metrics To Postgres"); - - loop { - let Metrics { - txs_sent, - txs_confirmed, - txs_finalized, - txs_ps, - txs_confirmed_ps, - txs_finalized_ps, - mem_used, - } = metrics_capture.get_metrics().await; - - client.execute( - r#"INSERT INTO LiteRpcMetrics - (txs_sent, txs_confirmed, txs_finalized, transactions_per_second, confirmations_per_second, finalized_per_second, memory_used) - VALUES - ($1, $2, $3, $4, $5, $6, $7) - "#, - &[&(txs_sent as i64), &(txs_confirmed as i64), &(txs_finalized as i64), &(txs_ps as i64), &(txs_confirmed_ps as i64), &(txs_finalized_ps as i64), &(mem_used.unwrap_or_default() as i64)], - ) - .await.unwrap(); - - one_second.tick().await; - } - }); - - #[allow(unreachable_code)] - tokio::spawn(async move { - tokio::select! { - r = metrics_send => { - bail!("Postgres metrics send thread stopped {r:?}") - } - r = connection => { - bail!("Postgres connection poll stopped {r:?}") - } - } - - Ok(()) - }) - } -} diff --git a/src/workers/mod.rs b/src/workers/mod.rs index b17f5536..6b6d9535 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -1,11 +1,9 @@ mod block_listenser; mod cleaner; mod metrics_capture; -mod metrics_postgres; mod tx_sender; pub use block_listenser::*; pub use cleaner::*; pub use metrics_capture::*; -pub use metrics_postgres::*; pub use tx_sender::*; diff --git a/tests/workers.rs b/tests/workers.rs index d95432c7..b2cfb626 100644 --- a/tests/workers.rs +++ b/tests/workers.rs @@ -44,7 +44,7 @@ async fn send_and_confirm_txs() { .unwrap(); let services = try_join_all(vec![ - block_listener.clone().listen(), + block_listener.clone().listen(None), tx_sender.clone().execute( DEFAULT_TX_BATCH_SIZE, Duration::from_millis(DEFAULT_TX_BATCH_INTERVAL_MS),