Adding postgres

This commit is contained in:
Godmode Galactus 2023-10-02 10:51:38 +02:00
parent a3dca97d4d
commit 8c029c19f3
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
7 changed files with 378 additions and 90 deletions

30
Cargo.lock generated
View File

@ -1439,6 +1439,7 @@ dependencies = [
"native-tls",
"postgres-native-tls",
"prometheus",
"rustls 0.20.8",
"serde",
"serde_json",
"solana-rpc-client",
@ -1447,6 +1448,7 @@ dependencies = [
"solana-transaction-status",
"thiserror",
"tokio",
"tokio-postgres",
"yellowstone-grpc-client",
"yellowstone-grpc-proto",
]
@ -1642,7 +1644,7 @@ dependencies = [
"futures-util",
"http",
"hyper",
"rustls",
"rustls 0.21.7",
"tokio",
"tokio-rustls",
]
@ -2415,6 +2417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c"
dependencies = [
"bytes",
"chrono",
"fallible-iterator",
"postgres-protocol",
]
@ -2727,7 +2730,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls 0.21.7",
"rustls-pemfile",
"serde",
"serde_json",
@ -2793,6 +2796,17 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "rustls"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustls"
version = "0.21.7"
@ -3829,7 +3843,7 @@ version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls",
"rustls 0.21.7",
"tokio",
]
@ -4213,6 +4227,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0e74f82d49d545ad128049b7e88f6576df2da6b02e9ce565c6f533be576957e"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.25.2"

View File

@ -29,6 +29,8 @@ native-tls = "0.2.11"
postgres-native-tls = "0.5.0"
prometheus = "0.13.3"
lazy_static = "1.4.0"
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] }
rustls = { version = "=0.20.8", default-features = false }
tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] }
yellowstone-grpc-client = { path = "../yellowstone-grpc/yellowstone-grpc-client" }

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# TO INSTALL POSTGRES SCHEMA AND DATABASE
psql -d mangolana < migration.sql

12
migration.sql Normal file
View File

@ -0,0 +1,12 @@
CREATE SCHEMA banking_stage_results;
CREATE TABLE banking_stage_results.transaction_infos (
signature CHAR(88) NOT NULL,
message CHAR(1280),
errors CHAR(10000) NOT NULL,
is_executed BOOL,
is_confirmed BOOL,
first_notification_slot BIGINT NOT NULL,
cu_requested BIGINT,
prioritization_fees BIGINT
);

View File

@ -2,11 +2,15 @@ use std::{collections::HashMap, sync::Arc};
use dashmap::DashMap;
use futures::StreamExt;
use solana_sdk::signature::Signature;
use transaction_info::TransactionInfo;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::prelude::{SubscribeRequestFilterBlocks, CommitmentLevel, subscribe_update::UpdateOneof};
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
};
mod transaction_info;
mod postgres;
#[tokio::main()]
async fn main() {
@ -27,41 +31,57 @@ async fn main() {
let commitment_level = CommitmentLevel::Processed;
let mut stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
true
).await.unwrap();
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
true,
)
.await
.unwrap();
while let Some(message) = stream.next().await {
let message = message.unwrap();
let Some(update) = message.update_oneof else {
let message = message.unwrap();
let Some(update) = message.update_oneof else {
continue;
};
match update {
UpdateOneof::BankingTransactionErrors(transaction) => {
let sig = transaction.signature.to_string();
match map_of_infos.get_mut(&sig) {
Some(mut x) => {
x.add_notification(&transaction);
},
None => {
map_of_infos.insert(sig, TransactionInfo::new(transaction.signature, transaction.slot));
}
}
},
UpdateOneof::Block(block) => {
},
_ => {
continue;
match update {
UpdateOneof::BankingTransactionErrors(transaction) => {
if transaction.error.is_none() {
continue;
}
let sig = transaction.signature.to_string();
match map_of_infos.get_mut(&sig) {
Some(mut x) => {
x.add_notification(&transaction);
}
};
}
None => {
map_of_infos.insert(
sig,
TransactionInfo::new(transaction.signature, transaction.slot),
);
}
}
}
UpdateOneof::Block(block) => {
for transaction in block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) {
info.add_transaction(&transaction);
}
}
}
_ => {
}
};
}
}

99
src/postgres.rs Normal file
View File

@ -0,0 +1,99 @@
use std::sync::Arc;
use anyhow::Context;
use tokio::sync::RwLock;
use tokio_postgres::{Client, config::SslMode, NoTls, tls::MakeTlsConnect, Socket, types::ToSql};
use crate::transaction_info::TransactionInfo;
pub struct PostgresSession {
client: Client,
}
impl PostgresSession {
pub async fn new() -> anyhow::Result<Self> {
let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?;
let pg_config = pg_config.parse::<tokio_postgres::Config>()?;
let client =
Self::spawn_connection(pg_config, NoTls).await?;
Ok(Self { client })
}
async fn spawn_connection<T>(
pg_config: tokio_postgres::Config,
connector: T,
) -> anyhow::Result<Client>
where
T: MakeTlsConnect<Socket> + Send + 'static,
<T as MakeTlsConnect<Socket>>::Stream: Send,
{
let (client, connection) = pg_config
.connect(connector)
.await
.context("Connecting to Postgres failed")?;
tokio::spawn(async move {
log::info!("Connecting to Postgres");
if let Err(err) = connection.await {
log::error!("Connection to Postgres broke {err:?}");
return;
}
unreachable!("Postgres thread returned")
});
Ok(client)
}
pub fn multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) {
let mut arg_index = 1usize;
for row in 0..rows {
query.push('(');
for i in 0..args {
if row == 0 && !types.is_empty() {
query.push_str(&format!("(${arg_index})::{}", types[i]));
} else {
query.push_str(&format!("${arg_index}"));
}
arg_index += 1;
if i != (args - 1) {
query.push(',');
}
}
query.push(')');
if row != (rows - 1) {
query.push(',');
}
}
}
pub async fn save_banking_transaction_results(&self, txs: &[TransactionInfo]) -> anyhow::Result<()> {
if txs.is_empty() {
return Ok(());
}
let NUMBER_OF_ARGS = 8;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
for tx in txs.iter() {
args.push(&tx.signature);
args.push(&tx.transaction_message);
args.push(&tx.errors);
args.push(&tx.is_executed);
args.push(&tx.is_confirmed);
args.push(&tx.first_notification_slot);
args.push(&tx.cu_requested);
args.push(&tx.prioritization_fees);
}
Ok(())
}
}
pub struct Postgres {
session: Arc<RwLock<PostgresSession>>,
}

View File

@ -1,52 +1,52 @@
use std::{collections::HashMap, hash::Hash};
use solana_sdk::{slot_history::Slot, transaction::Transaction, transaction::TransactionError};
use yellowstone_grpc_proto::prelude::SubscribeUpdateBankingTransactionResults;
use solana_sdk::{slot_history::Slot, transaction::TransactionError, message::{VersionedMessage, v0::{self, MessageAddressTableLookup}, MessageHeader}, pubkey::Pubkey, instruction::CompiledInstruction, compute_budget::{self, ComputeBudgetInstruction}, borsh0_10::try_from_slice_unchecked};
use yellowstone_grpc_proto::prelude::{SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo};
fn convert_transaction_error_into_int(error: &TransactionError)-> u8 {
fn convert_transaction_error_into_int(error: &TransactionError) -> u8 {
match error {
TransactionError::AccountBorrowOutstanding=>0,
TransactionError::AccountInUse=>1,
TransactionError::AccountLoadedTwice=>2,
TransactionError::AccountNotFound=>3,
TransactionError::AddressLookupTableNotFound=>4,
TransactionError::AlreadyProcessed=>5,
TransactionError::BlockhashNotFound=>6,
TransactionError::CallChainTooDeep=>7,
TransactionError::ClusterMaintenance=>8,
TransactionError::DuplicateInstruction(_)=>9,
TransactionError::AccountBorrowOutstanding => 0,
TransactionError::AccountInUse => 1,
TransactionError::AccountLoadedTwice => 2,
TransactionError::AccountNotFound => 3,
TransactionError::AddressLookupTableNotFound => 4,
TransactionError::AlreadyProcessed => 5,
TransactionError::BlockhashNotFound => 6,
TransactionError::CallChainTooDeep => 7,
TransactionError::ClusterMaintenance => 8,
TransactionError::DuplicateInstruction(_) => 9,
TransactionError::InstructionError(_, _) => 10,
TransactionError::InsufficientFundsForFee=>11,
TransactionError::InsufficientFundsForFee => 11,
TransactionError::InsufficientFundsForRent { .. } => 12,
TransactionError::InvalidAccountForFee => 13,
TransactionError::InvalidAccountIndex => 14,
TransactionError::InvalidAddressLookupTableData=>15,
TransactionError::InvalidAddressLookupTableIndex=>16,
TransactionError::InvalidAddressLookupTableOwner=>17,
TransactionError::InvalidLoadedAccountsDataSizeLimit=>18,
TransactionError::InvalidProgramForExecution=>19,
TransactionError::InvalidRentPayingAccount=>20,
TransactionError::InvalidWritableAccount=>21,
TransactionError::MaxLoadedAccountsDataSizeExceeded=>22,
TransactionError::MissingSignatureForFee=>23,
TransactionError::ProgramAccountNotFound=>24,
TransactionError::ResanitizationNeeded=>25,
TransactionError::SanitizeFailure=>26,
TransactionError::SignatureFailure=>27,
TransactionError::TooManyAccountLocks=>28,
TransactionError::UnbalancedTransaction=>29,
TransactionError::UnsupportedVersion=>30,
TransactionError::WouldExceedAccountDataBlockLimit=>31,
TransactionError::WouldExceedAccountDataTotalLimit=>32,
TransactionError::WouldExceedMaxAccountCostLimit=>33,
TransactionError::WouldExceedMaxBlockCostLimit=>34,
TransactionError::WouldExceedMaxVoteCostLimit=>35,
TransactionError::InvalidAddressLookupTableData => 15,
TransactionError::InvalidAddressLookupTableIndex => 16,
TransactionError::InvalidAddressLookupTableOwner => 17,
TransactionError::InvalidLoadedAccountsDataSizeLimit => 18,
TransactionError::InvalidProgramForExecution => 19,
TransactionError::InvalidRentPayingAccount => 20,
TransactionError::InvalidWritableAccount => 21,
TransactionError::MaxLoadedAccountsDataSizeExceeded => 22,
TransactionError::MissingSignatureForFee => 23,
TransactionError::ProgramAccountNotFound => 24,
TransactionError::ResanitizationNeeded => 25,
TransactionError::SanitizeFailure => 26,
TransactionError::SignatureFailure => 27,
TransactionError::TooManyAccountLocks => 28,
TransactionError::UnbalancedTransaction => 29,
TransactionError::UnsupportedVersion => 30,
TransactionError::WouldExceedAccountDataBlockLimit => 31,
TransactionError::WouldExceedAccountDataTotalLimit => 32,
TransactionError::WouldExceedMaxAccountCostLimit => 33,
TransactionError::WouldExceedMaxBlockCostLimit => 34,
TransactionError::WouldExceedMaxVoteCostLimit => 35,
}
}
#[derive(Clone, PartialEq)]
pub struct ErrorKey {
error : TransactionError,
error: TransactionError,
slot: Slot,
}
@ -58,30 +58,32 @@ impl Hash for ErrorKey {
}
}
impl Eq for ErrorKey {
}
impl Eq for ErrorKey {}
pub struct TransactionInfo {
pub signature : String,
pub transaction_message: Option<Transaction>,
pub signature: String,
pub transaction_message: Option<VersionedMessage>,
pub errors: HashMap<ErrorKey, usize>,
pub is_executed: bool,
pub is_confirmed : bool,
pub block_height : Option<u64>,
pub is_confirmed: bool,
pub block_height: Option<u64>,
pub first_notification_slot: u64,
pub cu_requested: Option<u64>,
pub prioritization_fees: Option<u64>,
}
impl TransactionInfo {
pub fn new(signature: String, first_notification_slot: Slot) -> Self {
Self {
signature,
Self {
signature,
transaction_message: None,
errors: HashMap::new(),
is_executed: false,
is_confirmed: false,
first_notification_slot,
block_height: None,
block_height: Some(first_notification_slot + 300),
cu_requested: None,
prioritization_fees: None,
}
}
@ -90,22 +92,148 @@ impl TransactionInfo {
Some(error) => {
let slot = notification.slot;
let error: TransactionError = bincode::deserialize(&error.err).unwrap();
let key = ErrorKey {
error,
slot,
};
let key = ErrorKey { error, slot };
match self.errors.get_mut(&key) {
Some(x) => {
*x = *x + 1;
},
}
None => {
self.errors.insert(key, 1);
}
}
},
}
None => {
self.is_executed = true;
}
}
}
}
pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo) {
let Some(transaction) = &transaction.transaction else {
return;
};
let Some(message) = &transaction.message else {
return;
};
let Some(header) = &message.header else {
return;
};
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
MessageAddressTableLookup {
account_key: Pubkey::new_from_array(bytes),
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(((units * 1000) / additional_fee) as u64),
));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let prioritization_fees = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
})
.or(legacy_prioritization_fees);
if let Some(cu_requested) = cu_requested {
self.cu_requested = Some(cu_requested as u64);
}
if let Some(prioritization_fees) = prioritization_fees {
self.prioritization_fees = Some(prioritization_fees);
}
self.is_confirmed = true;
self.transaction_message = Some(message);
self.is_executed = true;
}
}