diff --git a/Cargo.lock b/Cargo.lock index 27f9d97..8dfda2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1439,6 +1439,7 @@ dependencies = [ "native-tls", "postgres-native-tls", "prometheus", + "rustls 0.20.8", "serde", "serde_json", "solana-rpc-client", @@ -1447,6 +1448,7 @@ dependencies = [ "solana-transaction-status", "thiserror", "tokio", + "tokio-postgres", "yellowstone-grpc-client", "yellowstone-grpc-proto", ] @@ -1642,7 +1644,7 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls", + "rustls 0.21.7", "tokio", "tokio-rustls", ] @@ -2415,6 +2417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", ] @@ -2727,7 +2730,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", + "rustls 0.21.7", "rustls-pemfile", "serde", "serde_json", @@ -2793,6 +2796,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "ring", + "sct", + "webpki", +] + [[package]] name = "rustls" version = "0.21.7" @@ -3829,7 +3843,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.7", "tokio", ] @@ -4213,6 +4227,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0e74f82d49d545ad128049b7e88f6576df2da6b02e9ce565c6f533be576957e" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki-roots" version = "0.25.2" diff --git a/Cargo.toml b/Cargo.toml index e92968d..9afec13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,8 @@ native-tls = "0.2.11" postgres-native-tls = "0.5.0" prometheus = "0.13.3" lazy_static = "1.4.0" +tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] } +rustls = { version = "=0.20.8", default-features = false } tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] } yellowstone-grpc-client = { path = "../yellowstone-grpc/yellowstone-grpc-client" } diff --git a/README.md b/README.md new file mode 100644 index 0000000..ba48956 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# TO INSTALL POSTGRES SCHEMA AND DATABASE + +psql -d mangolana < migration.sql \ No newline at end of file diff --git a/migration.sql b/migration.sql new file mode 100644 index 0000000..a6dc8f4 --- /dev/null +++ b/migration.sql @@ -0,0 +1,12 @@ +CREATE SCHEMA banking_stage_results; + +CREATE TABLE banking_stage_results.transaction_infos ( + signature CHAR(88) NOT NULL, + message CHAR(1280), + errors CHAR(10000) NOT NULL, + is_executed BOOL, + is_confirmed BOOL, + first_notification_slot BIGINT NOT NULL, + cu_requested BIGINT, + prioritization_fees BIGINT +); diff --git a/src/main.rs b/src/main.rs index 7fc08e0..976d742 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,11 +2,15 @@ use std::{collections::HashMap, sync::Arc}; use dashmap::DashMap; use futures::StreamExt; +use solana_sdk::signature::Signature; use transaction_info::TransactionInfo; use yellowstone_grpc_client::GeyserGrpcClient; -use yellowstone_grpc_proto::prelude::{SubscribeRequestFilterBlocks, CommitmentLevel, subscribe_update::UpdateOneof}; +use yellowstone_grpc_proto::prelude::{ + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, +}; mod transaction_info; +mod postgres; #[tokio::main()] async fn main() { @@ -27,41 +31,57 @@ async fn main() { let commitment_level = CommitmentLevel::Processed; let mut stream = client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - blocks_subs, - Default::default(), - Some(commitment_level), - Default::default(), - true - ).await.unwrap(); + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + blocks_subs, + Default::default(), + Some(commitment_level), + Default::default(), + true, + ) + .await + .unwrap(); while let Some(message) = stream.next().await { - let message = message.unwrap(); - - let Some(update) = message.update_oneof else { + let message = message.unwrap(); + + let Some(update) = message.update_oneof else { continue; }; - - match update { - UpdateOneof::BankingTransactionErrors(transaction) => { - let sig = transaction.signature.to_string(); - match map_of_infos.get_mut(&sig) { - Some(mut x) => { - x.add_notification(&transaction); - }, - None => { - map_of_infos.insert(sig, TransactionInfo::new(transaction.signature, transaction.slot)); - } - } - }, - UpdateOneof::Block(block) => { - }, - _ => { - continue; + + match update { + UpdateOneof::BankingTransactionErrors(transaction) => { + if transaction.error.is_none() { + continue; + } + let sig = transaction.signature.to_string(); + match map_of_infos.get_mut(&sig) { + Some(mut x) => { + x.add_notification(&transaction); } - }; - } + None => { + map_of_infos.insert( + sig, + TransactionInfo::new(transaction.signature, transaction.slot), + ); + } + } + } + UpdateOneof::Block(block) => { + for transaction in block.transactions { + let Some(tx) = &transaction.transaction else { + continue; + }; + let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); + if let Some(mut info) = map_of_infos.get_mut(&signature.to_string()) { + info.add_transaction(&transaction); + } + } + } + _ => { + } + }; + } } diff --git a/src/postgres.rs b/src/postgres.rs new file mode 100644 index 0000000..223da45 --- /dev/null +++ b/src/postgres.rs @@ -0,0 +1,99 @@ +use std::sync::Arc; + +use anyhow::Context; +use tokio::sync::RwLock; +use tokio_postgres::{Client, config::SslMode, NoTls, tls::MakeTlsConnect, Socket, types::ToSql}; + +use crate::transaction_info::TransactionInfo; + + +pub struct PostgresSession { + client: Client, +} + +impl PostgresSession { + pub async fn new() -> anyhow::Result { + let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?; + let pg_config = pg_config.parse::()?; + + let client = + Self::spawn_connection(pg_config, NoTls).await?; + + Ok(Self { client }) + } + + async fn spawn_connection( + pg_config: tokio_postgres::Config, + connector: T, + ) -> anyhow::Result + where + T: MakeTlsConnect + Send + 'static, + >::Stream: Send, + { + let (client, connection) = pg_config + .connect(connector) + .await + .context("Connecting to Postgres failed")?; + + tokio::spawn(async move { + log::info!("Connecting to Postgres"); + + if let Err(err) = connection.await { + log::error!("Connection to Postgres broke {err:?}"); + return; + } + unreachable!("Postgres thread returned") + }); + + Ok(client) + } + + pub fn multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) { + let mut arg_index = 1usize; + for row in 0..rows { + query.push('('); + + for i in 0..args { + if row == 0 && !types.is_empty() { + query.push_str(&format!("(${arg_index})::{}", types[i])); + } else { + query.push_str(&format!("${arg_index}")); + } + arg_index += 1; + if i != (args - 1) { + query.push(','); + } + } + + query.push(')'); + + if row != (rows - 1) { + query.push(','); + } + } + } + + pub async fn save_banking_transaction_results(&self, txs: &[TransactionInfo]) -> anyhow::Result<()> { + if txs.is_empty() { + return Ok(()); + } + let NUMBER_OF_ARGS = 8; + + let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len()); + for tx in txs.iter() { + args.push(&tx.signature); + args.push(&tx.transaction_message); + args.push(&tx.errors); + args.push(&tx.is_executed); + args.push(&tx.is_confirmed); + args.push(&tx.first_notification_slot); + args.push(&tx.cu_requested); + args.push(&tx.prioritization_fees); + } + Ok(()) + } +} + +pub struct Postgres { + session: Arc>, +} \ No newline at end of file diff --git a/src/transaction_info.rs b/src/transaction_info.rs index 49d8f9f..6af8893 100644 --- a/src/transaction_info.rs +++ b/src/transaction_info.rs @@ -1,52 +1,52 @@ use std::{collections::HashMap, hash::Hash}; -use solana_sdk::{slot_history::Slot, transaction::Transaction, transaction::TransactionError}; -use yellowstone_grpc_proto::prelude::SubscribeUpdateBankingTransactionResults; +use solana_sdk::{slot_history::Slot, transaction::TransactionError, message::{VersionedMessage, v0::{self, MessageAddressTableLookup}, MessageHeader}, pubkey::Pubkey, instruction::CompiledInstruction, compute_budget::{self, ComputeBudgetInstruction}, borsh0_10::try_from_slice_unchecked}; +use yellowstone_grpc_proto::prelude::{SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo}; -fn convert_transaction_error_into_int(error: &TransactionError)-> u8 { +fn convert_transaction_error_into_int(error: &TransactionError) -> u8 { match error { - TransactionError::AccountBorrowOutstanding=>0, - TransactionError::AccountInUse=>1, - TransactionError::AccountLoadedTwice=>2, - TransactionError::AccountNotFound=>3, - TransactionError::AddressLookupTableNotFound=>4, - TransactionError::AlreadyProcessed=>5, - TransactionError::BlockhashNotFound=>6, - TransactionError::CallChainTooDeep=>7, - TransactionError::ClusterMaintenance=>8, - TransactionError::DuplicateInstruction(_)=>9, + TransactionError::AccountBorrowOutstanding => 0, + TransactionError::AccountInUse => 1, + TransactionError::AccountLoadedTwice => 2, + TransactionError::AccountNotFound => 3, + TransactionError::AddressLookupTableNotFound => 4, + TransactionError::AlreadyProcessed => 5, + TransactionError::BlockhashNotFound => 6, + TransactionError::CallChainTooDeep => 7, + TransactionError::ClusterMaintenance => 8, + TransactionError::DuplicateInstruction(_) => 9, TransactionError::InstructionError(_, _) => 10, - TransactionError::InsufficientFundsForFee=>11, + TransactionError::InsufficientFundsForFee => 11, TransactionError::InsufficientFundsForRent { .. } => 12, TransactionError::InvalidAccountForFee => 13, TransactionError::InvalidAccountIndex => 14, - TransactionError::InvalidAddressLookupTableData=>15, - TransactionError::InvalidAddressLookupTableIndex=>16, - TransactionError::InvalidAddressLookupTableOwner=>17, - TransactionError::InvalidLoadedAccountsDataSizeLimit=>18, - TransactionError::InvalidProgramForExecution=>19, - TransactionError::InvalidRentPayingAccount=>20, - TransactionError::InvalidWritableAccount=>21, - TransactionError::MaxLoadedAccountsDataSizeExceeded=>22, - TransactionError::MissingSignatureForFee=>23, - TransactionError::ProgramAccountNotFound=>24, - TransactionError::ResanitizationNeeded=>25, - TransactionError::SanitizeFailure=>26, - TransactionError::SignatureFailure=>27, - TransactionError::TooManyAccountLocks=>28, - TransactionError::UnbalancedTransaction=>29, - TransactionError::UnsupportedVersion=>30, - TransactionError::WouldExceedAccountDataBlockLimit=>31, - TransactionError::WouldExceedAccountDataTotalLimit=>32, - TransactionError::WouldExceedMaxAccountCostLimit=>33, - TransactionError::WouldExceedMaxBlockCostLimit=>34, - TransactionError::WouldExceedMaxVoteCostLimit=>35, + TransactionError::InvalidAddressLookupTableData => 15, + TransactionError::InvalidAddressLookupTableIndex => 16, + TransactionError::InvalidAddressLookupTableOwner => 17, + TransactionError::InvalidLoadedAccountsDataSizeLimit => 18, + TransactionError::InvalidProgramForExecution => 19, + TransactionError::InvalidRentPayingAccount => 20, + TransactionError::InvalidWritableAccount => 21, + TransactionError::MaxLoadedAccountsDataSizeExceeded => 22, + TransactionError::MissingSignatureForFee => 23, + TransactionError::ProgramAccountNotFound => 24, + TransactionError::ResanitizationNeeded => 25, + TransactionError::SanitizeFailure => 26, + TransactionError::SignatureFailure => 27, + TransactionError::TooManyAccountLocks => 28, + TransactionError::UnbalancedTransaction => 29, + TransactionError::UnsupportedVersion => 30, + TransactionError::WouldExceedAccountDataBlockLimit => 31, + TransactionError::WouldExceedAccountDataTotalLimit => 32, + TransactionError::WouldExceedMaxAccountCostLimit => 33, + TransactionError::WouldExceedMaxBlockCostLimit => 34, + TransactionError::WouldExceedMaxVoteCostLimit => 35, } } #[derive(Clone, PartialEq)] pub struct ErrorKey { - error : TransactionError, + error: TransactionError, slot: Slot, } @@ -58,30 +58,32 @@ impl Hash for ErrorKey { } } -impl Eq for ErrorKey { - -} +impl Eq for ErrorKey {} pub struct TransactionInfo { - pub signature : String, - pub transaction_message: Option, + pub signature: String, + pub transaction_message: Option, pub errors: HashMap, pub is_executed: bool, - pub is_confirmed : bool, - pub block_height : Option, + pub is_confirmed: bool, + pub block_height: Option, pub first_notification_slot: u64, + pub cu_requested: Option, + pub prioritization_fees: Option, } impl TransactionInfo { pub fn new(signature: String, first_notification_slot: Slot) -> Self { - Self { - signature, + Self { + signature, transaction_message: None, errors: HashMap::new(), is_executed: false, is_confirmed: false, first_notification_slot, - block_height: None, + block_height: Some(first_notification_slot + 300), + cu_requested: None, + prioritization_fees: None, } } @@ -90,22 +92,148 @@ impl TransactionInfo { Some(error) => { let slot = notification.slot; let error: TransactionError = bincode::deserialize(&error.err).unwrap(); - let key = ErrorKey { - error, - slot, - }; + let key = ErrorKey { error, slot }; match self.errors.get_mut(&key) { Some(x) => { *x = *x + 1; - }, + } None => { self.errors.insert(key, 1); } } - }, + } None => { self.is_executed = true; } } } -} \ No newline at end of file + + pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo) { + + let Some(transaction) = &transaction.transaction else { + return; + }; + + let Some(message) = &transaction.message else { + return; + }; + + let Some(header) = &message.header else { + return; + }; + + let message = VersionedMessage::V0(v0::Message { + header: MessageHeader { + num_required_signatures: header.num_required_signatures as u8, + num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8, + num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8, + }, + account_keys: message + .account_keys.clone() + .into_iter() + .map(|key| { + let bytes: [u8; 32] = + key.try_into().unwrap_or(Pubkey::default().to_bytes()); + Pubkey::new_from_array(bytes) + }) + .collect(), + recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash), + instructions: message + .instructions.clone() + .into_iter() + .map(|ix| CompiledInstruction { + program_id_index: ix.program_id_index as u8, + accounts: ix.accounts, + data: ix.data, + }) + .collect(), + address_table_lookups: message + .address_table_lookups.clone() + .into_iter() + .map(|table| { + let bytes: [u8; 32] = table + .account_key + .try_into() + .unwrap_or(Pubkey::default().to_bytes()); + MessageAddressTableLookup { + account_key: Pubkey::new_from_array(bytes), + writable_indexes: table.writable_indexes, + readonly_indexes: table.readonly_indexes, + } + }) + .collect(), + }); + + let legacy_compute_budget: Option<(u32, Option)> = + message.instructions().iter().find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + }) = try_from_slice_unchecked(i.data.as_slice()) + { + if additional_fee > 0 { + return Some(( + units, + Some(((units * 1000) / additional_fee) as u64), + )); + } else { + return Some((units, None)); + } + } + } + None + }); + + let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); + let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None); + + let cu_requested = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(limit); + } + } + None + }) + .or(legacy_cu_requested); + + let prioritization_fees = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(price); + } + } + + None + }) + .or(legacy_prioritization_fees); + + if let Some(cu_requested) = cu_requested { + self.cu_requested = Some(cu_requested as u64); + } + + if let Some(prioritization_fees) = prioritization_fees { + self.prioritization_fees = Some(prioritization_fees); + } + self.is_confirmed = true; + self.transaction_message = Some(message); + self.is_executed = true; + } +}