From d8646f65d6e2723f42844231b0e1b515759661aa Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Thu, 23 Nov 2023 11:20:30 +0100 Subject: [PATCH] Adding postgres certificates and grpc token --- migration.sql | 1 - src/block_info.rs | 6 +++++- src/cli.rs | 4 ++++ src/main.rs | 19 ++++++++++++------- src/postgres.rs | 41 ++++++++++++++++++++++++++++++++--------- 5 files changed, 53 insertions(+), 18 deletions(-) diff --git a/migration.sql b/migration.sql index f3b6d6a..15d0265 100644 --- a/migration.sql +++ b/migration.sql @@ -2,7 +2,6 @@ CREATE SCHEMA banking_stage_results; CREATE TABLE banking_stage_results.transaction_infos ( signature CHAR(88) PRIMARY KEY, - message text, errors text [], is_executed BOOL, is_confirmed BOOL, diff --git a/src/block_info.rs b/src/block_info.rs index 45ded4d..a7e8693 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -138,7 +138,11 @@ impl BlockInfo { if additional_fee > 0 { return Some(( units, - Some(((units.saturating_mul(1000)).saturating_div(additional_fee)) as u64), + Some( + ((units.saturating_mul(1000)) + .saturating_div(additional_fee)) + as u64, + ), )); } else { return Some((units, None)); diff --git a/src/cli.rs b/src/cli.rs index 01ab73f..94495ca 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -5,6 +5,10 @@ use clap::Parser; pub struct Args { #[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))] pub grpc_address: String, + + #[arg(long)] + pub grpc_x_token: Option, + /// enable metrics to prometheus at addr #[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))] pub prometheus_addr: String, diff --git a/src/main.rs b/src/main.rs index 5977ea7..ffa47b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ use clap::Parser; -use tokio::time::Instant; use std::{ collections::HashMap, - sync::{atomic::AtomicU64, Arc}, time::Duration, + sync::{atomic::AtomicU64, Arc}, + time::Duration, }; +use tokio::time::Instant; use block_info::BlockInfo; use cli::Args; @@ -15,7 +16,8 @@ use solana_sdk::signature::Signature; use transaction_info::TransactionInfo; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::prelude::{ - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdateBlock, + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, + SubscribeUpdateBlock, }; use crate::prometheus_sync::PrometheusSync; @@ -43,7 +45,7 @@ async fn main() { let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); let grpc_addr = args.grpc_address; - let mut client = GeyserGrpcClient::connect(grpc_addr, None::<&'static str>, None).unwrap(); + let mut client = GeyserGrpcClient::connect(grpc_addr, args.grpc_x_token, None).unwrap(); let map_of_infos = Arc::new(DashMap::::new()); let slot_by_errors = Arc::new(DashMap::::new()); @@ -79,7 +81,8 @@ async fn main() { postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); - let (send_block, mut recv_block) = tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>(); + let (send_block, mut recv_block) = + tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>(); let slot_by_error_task = slot_by_errors.clone(); let map_of_infos_task = map_of_infos.clone(); @@ -110,7 +113,6 @@ async fn main() { } }); - while let Some(message) = stream.next().await { let Ok(message) = message else { continue; @@ -150,7 +152,10 @@ async fn main() { log::debug!("got block {}", block.slot); BLOCK_TXS.set(block.transactions.len() as i64); slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); - send_block.send(( Instant::now() + Duration::from_secs(30), block)).expect("should works"); + + send_block + .send((Instant::now() + Duration::from_secs(30), block)) + .expect("should works"); // delay queue so that we get all the banking stage errors before processing block } _ => {} diff --git a/src/postgres.rs b/src/postgres.rs index 98f0815..f8da998 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -4,11 +4,14 @@ use std::{ }; use anyhow::Context; +use base64::Engine; use chrono::{DateTime, Utc}; use dashmap::DashMap; use itertools::Itertools; use log::debug; -use tokio_postgres::{tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket}; +use native_tls::{Certificate, Identity, TlsConnector}; +use postgres_native_tls::MakeTlsConnector; +use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket}; use crate::{block_info::BlockInfo, transaction_info::TransactionInfo}; @@ -21,7 +24,33 @@ impl PostgresSession { let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?; let pg_config = pg_config.parse::()?; - let client = Self::spawn_connection(pg_config, NoTls).await?; + let client = if let SslMode::Disable = pg_config.get_ssl_mode() { + Self::spawn_connection(pg_config, NoTls).await? + } else { + let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?; + let client_pks_b64 = + std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?; + let client_pks_password = + std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?; + + let ca_pem = base64::engine::general_purpose::STANDARD + .decode(ca_pem_b64) + .context("ca pem decode")?; + let client_pks = base64::engine::general_purpose::STANDARD + .decode(client_pks_b64) + .context("client pks decode")?; + + let connector = TlsConnector::builder() + .add_root_certificate(Certificate::from_pem(&ca_pem)?) + .identity( + Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?, + ) + .danger_accept_invalid_hostnames(true) + .danger_accept_invalid_certs(true) + .build()?; + + Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await? + }; Ok(Self { client }) } @@ -93,7 +122,6 @@ impl PostgresSession { .collect(); for tx in txs.iter() { args.push(&tx.signature); - args.push(&tx.transaction_message); args.push(&tx.errors); args.push(&tx.is_executed); args.push(&tx.is_confirmed); @@ -108,7 +136,7 @@ impl PostgresSession { let mut query = String::from( r#" INSERT INTO banking_stage_results.transaction_infos - (signature, message, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot) + (signature, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot) VALUES "#, ); @@ -200,7 +228,6 @@ impl Postgres { pub struct PostgresTransactionInfo { pub signature: String, - pub transaction_message: Option, pub errors: Vec, pub is_executed: bool, pub is_confirmed: bool, @@ -226,10 +253,6 @@ impl From<&TransactionInfo> for PostgresTransactionInfo { .collect(); Self { signature: value.signature.clone(), - transaction_message: value - .transaction_message - .as_ref() - .map(|x| base64::encode(bincode::serialize(&x).unwrap())), errors, is_executed: value.is_executed, is_confirmed: value.is_confirmed,