Merge branch 'main' of github.com:blockworks-foundation/BankingStageErrorsTrackingSidecar

This commit is contained in:
GroovieGermanikus 2023-11-23 14:28:51 +01:00
commit 4f21a76966
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
5 changed files with 53 additions and 18 deletions

View File

@ -2,7 +2,6 @@ CREATE SCHEMA banking_stage_results;
CREATE TABLE banking_stage_results.transaction_infos (
signature CHAR(88) PRIMARY KEY,
message text,
errors text [],
is_executed BOOL,
is_confirmed BOOL,

View File

@ -138,7 +138,11 @@ impl BlockInfo {
if additional_fee > 0 {
return Some((
units,
Some(((units.saturating_mul(1000)).saturating_div(additional_fee)) as u64),
Some(
((units.saturating_mul(1000))
.saturating_div(additional_fee))
as u64,
),
));
} else {
return Some((units, None));

View File

@ -5,6 +5,10 @@ use clap::Parser;
pub struct Args {
#[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))]
pub grpc_address: String,
#[arg(long)]
pub grpc_x_token: Option<String>,
/// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))]
pub prometheus_addr: String,

View File

@ -1,9 +1,10 @@
use clap::Parser;
use tokio::time::Instant;
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc}, time::Duration,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use tokio::time::Instant;
use block_info::BlockInfo;
use cli::Args;
@ -15,7 +16,8 @@ use solana_sdk::signature::Signature;
use transaction_info::TransactionInfo;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdateBlock,
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
SubscribeUpdateBlock,
};
use crate::prometheus_sync::PrometheusSync;
@ -43,7 +45,7 @@ async fn main() {
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
let grpc_addr = args.grpc_address;
let mut client = GeyserGrpcClient::connect(grpc_addr, None::<&'static str>, None).unwrap();
let mut client = GeyserGrpcClient::connect(grpc_addr, args.grpc_x_token, None).unwrap();
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
@ -79,7 +81,8 @@ async fn main() {
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let (send_block, mut recv_block) = tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>();
let (send_block, mut recv_block) =
tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>();
let slot_by_error_task = slot_by_errors.clone();
let map_of_infos_task = map_of_infos.clone();
@ -110,7 +113,6 @@ async fn main() {
}
});
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
continue;
@ -150,7 +152,10 @@ async fn main() {
debug!("got block {}", block.slot);
BLOCK_TXS.set(block.transactions.len() as i64);
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
send_block.send(( Instant::now() + Duration::from_secs(30), block)).expect("should works");
send_block
.send((Instant::now() + Duration::from_secs(30), block))
.expect("should works");
// delay queue so that we get all the banking stage errors before processing block
}
_ => {}

View File

@ -4,11 +4,14 @@ use std::{
};
use anyhow::Context;
use base64::Engine;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use itertools::Itertools;
use log::{debug, error, info};
use tokio_postgres::{tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use tokio_postgres::{config::SslMode, tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use crate::{block_info::BlockInfo, transaction_info::TransactionInfo};
@ -21,7 +24,33 @@ impl PostgresSession {
let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?;
let pg_config = pg_config.parse::<tokio_postgres::Config>()?;
let client = Self::spawn_connection(pg_config, NoTls).await?;
let client = if let SslMode::Disable = pg_config.get_ssl_mode() {
Self::spawn_connection(pg_config, NoTls).await?
} else {
let ca_pem_b64 = std::env::var("CA_PEM_B64").context("env CA_PEM_B64 not found")?;
let client_pks_b64 =
std::env::var("CLIENT_PKS_B64").context("env CLIENT_PKS_B64 not found")?;
let client_pks_password =
std::env::var("CLIENT_PKS_PASS").context("env CLIENT_PKS_PASS not found")?;
let ca_pem = base64::engine::general_purpose::STANDARD
.decode(ca_pem_b64)
.context("ca pem decode")?;
let client_pks = base64::engine::general_purpose::STANDARD
.decode(client_pks_b64)
.context("client pks decode")?;
let connector = TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_pem)?)
.identity(
Identity::from_pkcs12(&client_pks, &client_pks_password).context("Identity")?,
)
.danger_accept_invalid_hostnames(true)
.danger_accept_invalid_certs(true)
.build()?;
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
};
Ok(Self { client })
}
@ -93,7 +122,6 @@ impl PostgresSession {
.collect();
for tx in txs.iter() {
args.push(&tx.signature);
args.push(&tx.transaction_message);
args.push(&tx.errors);
args.push(&tx.is_executed);
args.push(&tx.is_confirmed);
@ -108,7 +136,7 @@ impl PostgresSession {
let mut query = String::from(
r#"
INSERT INTO banking_stage_results.transaction_infos
(signature, message, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot)
(signature, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot)
VALUES
"#,
);
@ -200,7 +228,6 @@ impl Postgres {
pub struct PostgresTransactionInfo {
pub signature: String,
pub transaction_message: Option<String>,
pub errors: Vec<String>,
pub is_executed: bool,
pub is_confirmed: bool,
@ -226,10 +253,6 @@ impl From<&TransactionInfo> for PostgresTransactionInfo {
.collect();
Self {
signature: value.signature.clone(),
transaction_message: value
.transaction_message
.as_ref()
.map(|x| base64::encode(bincode::serialize(&x).unwrap())),
errors,
is_executed: value.is_executed,
is_confirmed: value.is_confirmed,