diff --git a/README.md b/README.md index 23469d4..e974d16 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,11 @@ grant all privileges on database mangolana to galactus; psql -d mangolana < migration.sql -export PG_CONFIG="host=localhost dbname=mangolana user=username password=password sslmode=disable" +export PG_CONFIG="host=localhost dbname=mangolana user=galactus password=test sslmode=disable" + +### give rights to user + +GRANT ALL PRIVILEGES ON DATABASE mangolana TO galactus; +GRANT ALL PRIVILEGES ON SCHEMA banking_stage_results TO galactus; +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA banking_stage_results TO galactus; +ALTER DEFAULT PRIVILEGES IN SCHEMA banking_stage_results GRANT ALL PRIVILEGES ON TABLES TO galactus; \ No newline at end of file diff --git a/migration.sql b/migration.sql index e8f925b..04aaca5 100644 --- a/migration.sql +++ b/migration.sql @@ -22,7 +22,7 @@ CREATE TABLE banking_stage_results.blocks ( processed_transactions BIGINT, total_cu_used BIGINT, total_cu_requested BIGINT, - heavily_writelocked_accounts text + heavily_writelocked_accounts text, heavily_readlocked_accounts text ); diff --git a/src/block_info.rs b/src/block_info.rs index 3b8b459..5d1cc48 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -9,13 +9,15 @@ use solana_sdk::{ MessageHeader, VersionedMessage, }, pubkey::Pubkey, + slot_history::Slot, }; +use solana_transaction_status::{RewardType, UiConfirmedBlock}; use std::collections::HashMap; use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock; #[derive(Serialize, Debug, Clone)] pub struct AccountUsage { - pub key: Pubkey, + pub key: String, pub cu_requested: u64, pub cu_consumed: u64, } @@ -34,7 +36,7 @@ pub struct BlockInfo { } impl BlockInfo { - pub fn new(block: &SubscribeUpdateBlock, banking_stage_error_count: Option) -> BlockInfo { + pub fn new(block: &SubscribeUpdateBlock) -> BlockInfo { let block_hash = block.blockhash.clone(); let slot = block.slot; let leader_identity = block @@ -177,15 +179,15 @@ impl BlockInfo { .or(legacy_cu_requested); let cu_requested = cu_requested.unwrap_or(200000) as u64; let cu_consumed = meta.compute_units_consumed.unwrap_or(0); - total_cu_requested = total_cu_requested + cu_requested; + total_cu_requested += cu_requested; let accounts = message .static_account_keys() .iter() .enumerate() - .map(|(index, account)| (message.is_maybe_writable(index), account.clone())) + .map(|(index, account)| (message.is_maybe_writable(index), *account)) .collect_vec(); - for writable_account in accounts.iter().filter(|x| x.0 == true).map(|x| x.1) { + for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) { match writelocked_accounts.get_mut(&writable_account) { Some(x) => { x.cu_requested += cu_requested; @@ -195,7 +197,7 @@ impl BlockInfo { writelocked_accounts.insert( writable_account, AccountUsage { - key: writable_account, + key: writable_account.to_string(), cu_consumed, cu_requested, }, @@ -204,7 +206,7 @@ impl BlockInfo { } } - for readable_account in accounts.iter().filter(|x| x.0 == false).map(|x| x.1) { + for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) { match readlocked_accounts.get_mut(&readable_account) { Some(x) => { x.cu_requested += cu_requested; @@ -214,7 +216,7 @@ impl BlockInfo { readlocked_accounts.insert( readable_account, AccountUsage { - key: readable_account, + key: readable_account.to_string(), cu_consumed, cu_requested, }, @@ -243,11 +245,193 @@ impl BlockInfo { leader_identity, successful_transactions: successful_transactions as i64, processed_transactions: processed_transactions as i64, - banking_stage_errors: banking_stage_error_count, + banking_stage_errors: None, total_cu_used, total_cu_requested: total_cu_requested as i64, heavily_writelocked_accounts, heavily_readlocked_accounts, } } + + pub fn new_from_rpc_block( + slot: Slot, + block: &UiConfirmedBlock, + banking_stage_errors_count: i64, + ) -> Option { + let block_hash = block.blockhash.clone(); + let leader_identity = block + .rewards + .as_ref() + .map(|rewards| { + rewards + .iter() + .find(|x| x.reward_type == Some(RewardType::Fee)) + .map(|x| x.pubkey.clone()) + }) + .unwrap_or(None); + let transactions = if let Some(transactions) = &block.transactions { + transactions + } else { + return None; + }; + + let successful_transactions = transactions + .iter() + .filter(|x| x.meta.as_ref().map(|x| x.err.is_none()).unwrap_or(false)) + .count() as u64; + let processed_transactions = transactions.len() as u64; + + let total_cu_used = transactions + .iter() + .map(|x| { + x.meta + .as_ref() + .map(|x| match x.compute_units_consumed { + solana_transaction_status::option_serializer::OptionSerializer::Some(x) => { + x + } + solana_transaction_status::option_serializer::OptionSerializer::Skip => 0, + solana_transaction_status::option_serializer::OptionSerializer::None => 0, + }) + .unwrap_or(0) + }) + .sum::() as i64; + let mut writelocked_accounts: HashMap = HashMap::new(); + let mut readlocked_accounts: HashMap = HashMap::new(); + let mut total_cu_requested: u64 = 0; + for transaction in transactions { + let Some(tx) = transaction.transaction.decode() else { + continue; + }; + + let message = &tx.message; + + let Some(meta) = &transaction.meta else { + continue; + }; + + let legacy_compute_budget: Option<(u32, Option)> = + message.instructions().iter().find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + }) = try_from_slice_unchecked(i.data.as_slice()) + { + if additional_fee > 0 { + return Some(( + units, + Some( + ((units.saturating_mul(1000)) + .saturating_div(additional_fee)) + as u64, + ), + )); + } else { + return Some((units, None)); + } + } + } + None + }); + + let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); + + let cu_requested = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(limit); + } + } + None + }) + .or(legacy_cu_requested); + let cu_requested = cu_requested.unwrap_or(200000) as u64; + let cu_consumed = match meta.compute_units_consumed { + solana_transaction_status::option_serializer::OptionSerializer::Some(x) => x, + solana_transaction_status::option_serializer::OptionSerializer::Skip => 0, + solana_transaction_status::option_serializer::OptionSerializer::None => 0, + }; + total_cu_requested += cu_requested; + + let accounts = message + .static_account_keys() + .iter() + .enumerate() + .map(|(index, account)| (message.is_maybe_writable(index), *account)) + .collect_vec(); + for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) { + match writelocked_accounts.get_mut(&writable_account) { + Some(x) => { + x.cu_requested += cu_requested; + x.cu_consumed += cu_consumed; + } + None => { + writelocked_accounts.insert( + writable_account, + AccountUsage { + key: writable_account.to_string(), + cu_consumed, + cu_requested, + }, + ); + } + } + } + + for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) { + match readlocked_accounts.get_mut(&readable_account) { + Some(x) => { + x.cu_requested += cu_requested; + x.cu_consumed += cu_consumed; + } + None => { + readlocked_accounts.insert( + readable_account, + AccountUsage { + key: readable_account.to_string(), + cu_consumed, + cu_requested, + }, + ); + } + } + } + } + + let mut heavily_writelocked_accounts = writelocked_accounts + .iter() + .filter(|(_, account)| account.cu_consumed > 1000000) + .map(|x| x.1.clone()) + .collect_vec(); + heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed)); + + let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts + .iter() + .filter(|(_, acc)| acc.cu_consumed > 1000000) + .map(|x| x.1.clone()) + .collect(); + heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed)); + Some(BlockInfo { + block_hash, + slot: slot as i64, + leader_identity, + successful_transactions: successful_transactions as i64, + processed_transactions: processed_transactions as i64, + banking_stage_errors: Some(banking_stage_errors_count), + total_cu_used, + total_cu_requested: total_cu_requested as i64, + heavily_writelocked_accounts, + heavily_readlocked_accounts, + }) + } } diff --git a/src/cli.rs b/src/cli.rs index 94495ca..e4e65a3 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -9,6 +9,9 @@ pub struct Args { #[arg(long)] pub grpc_x_token: Option, + #[arg(short, long, default_value_t = String::from("http://127.0.0.1:8899"))] + pub rpc_url: String, + /// enable metrics to prometheus at addr #[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))] pub prometheus_addr: String, diff --git a/src/main.rs b/src/main.rs index 632c6d4..83f2998 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use clap::Parser; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; use std::{ collections::HashMap, sync::{atomic::AtomicU64, Arc}, @@ -11,9 +12,9 @@ use block_info::BlockInfo; use cli::Args; use dashmap::DashMap; use futures::StreamExt; -use log::{debug, error, info}; -use prometheus::{IntCounter, IntGauge, opts, register_int_counter, register_int_gauge}; -use solana_sdk::signature::Signature; +use log::{debug, error}; +use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge}; +use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; use transaction_info::TransactionInfo; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::prelude::{ @@ -45,6 +46,7 @@ async fn main() { tracing_subscriber::fmt::init(); let args = Args::parse(); + let rpc_url = args.rpc_url; let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); @@ -86,43 +88,106 @@ async fn main() { postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); let (send_block, mut recv_block) = - tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>(); + tokio::sync::mpsc::unbounded_channel::(); let slot_by_error_task = slot_by_errors.clone(); let map_of_infos_task = map_of_infos.clone(); // process blocks with 2 mins delay so that we process all the banking stage errors before processing blocks - tokio::spawn(async move { - while let Some((wait_until, block)) = recv_block.recv().await { - if wait_until > Instant::now() + Duration::from_secs(5) { - info!( - "wait until {:?} to collect errors for block {}", - wait_until, block.slot - ); + let jh = { + let postgres = postgres.clone(); + tokio::spawn(async move { + while let Some(block) = recv_block.recv().await { + for transaction in &block.transactions { + let Some(tx) = &transaction.transaction else { + continue; + }; + let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); + if let Some(mut info) = map_of_infos_task.get_mut(&signature.to_string()) { + info.add_transaction(transaction, block.slot); + } + } + + let block_info = BlockInfo::new(&block); + + TXERROR_COUNT + .add(block_info.processed_transactions - block_info.successful_transactions); + if let Err(e) = postgres.save_block_info(block_info).await { + error!("Error saving block {}", e); + } + slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); + slot_by_error_task.remove(&block.slot); } - tokio::time::sleep_until(wait_until).await; - for transaction in &block.transactions { - let Some(tx) = &transaction.transaction else { + }) + }; + + // get blocks from rpc server + // because validator does not send banking blocks + let (rpc_blocks_sender, rpc_blocks_reciever) = + tokio::sync::mpsc::unbounded_channel::<(Instant, u64)>(); + let jh2 = { + let map_of_infos = map_of_infos.clone(); + let postgres = postgres.clone(); + let slot_by_errors = slot_by_errors.clone(); + tokio::spawn(async move { + let mut rpc_blocks_reciever = rpc_blocks_reciever; + let rpc_client = RpcClient::new(rpc_url); + while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await { + tokio::time::sleep_until(wait_until).await; + let block = if let Ok(block) = rpc_client + .get_block_with_config( + slot, + solana_rpc_client_api::config::RpcBlockConfig { + encoding: Some( + solana_transaction_status::UiTransactionEncoding::Base64, + ), + transaction_details: Some( + solana_transaction_status::TransactionDetails::Full, + ), + rewards: Some(true), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }, + ) + .await + { + block + } else { continue; }; - let signature = Signature::try_from(tx.signatures[0].clone()).unwrap(); - if let Some(mut info) = map_of_infos_task.get_mut(&signature.to_string()) { - info.add_transaction(&transaction, block.slot); + + let Some(transactions) = &block.transactions else { + continue; + }; + + for transaction in transactions { + let Some(transaction) = &transaction.transaction.decode() else { + continue; + }; + let signature = transaction.signatures[0].to_string(); + if let Some(mut info) = map_of_infos.get_mut(&signature) { + info.add_rpc_transaction(slot, transaction); + } + } + + let banking_stage_error_count = slot_by_errors + .get(&slot) + .map(|x| *x.value() as i64) + .unwrap_or_default(); + let block_info = + BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count); + if let Some(block_info) = block_info { + BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count); + TXERROR_COUNT.add( + block_info.processed_transactions - block_info.successful_transactions, + ); + if let Err(e) = postgres.save_block_info(block_info).await { + error!("Error saving block {}", e); + } + slot_by_errors.remove(&slot); } } - let banking_stage_error_count = slot_by_error_task - .get(&block.slot) - .map(|x| *x.value() as i64); - let block_info = BlockInfo::new(&block, banking_stage_error_count); - BANKING_STAGE_ERROR_COUNT.add(block_info.banking_stage_errors.unwrap_or(0)); - TXERROR_COUNT - .add(block_info.processed_transactions - block_info.successful_transactions); - if let Err(e) = postgres.save_block_info(block_info).await { - error!("Error saving block {}", e); - } - slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); - slot_by_error_task.remove(&block.slot); - } - }); + }) + }; while let Some(message) = geyser_stream.next().await { let Ok(message) = message else { @@ -142,10 +207,13 @@ async fn main() { let sig = transaction.signature.to_string(); match slot_by_errors.get_mut(&transaction.slot) { Some(mut value) => { - *value = *value + 1; + *value += 1; } None => { slot_by_errors.insert(transaction.slot, 1); + rpc_blocks_sender + .send((Instant::now() + Duration::from_secs(30), transaction.slot)) + .expect("should works"); } } match map_of_infos.get_mut(&sig) { @@ -163,12 +231,12 @@ async fn main() { debug!("got block {}", block.slot); BLOCK_TXS.set(block.transactions.len() as i64); BANKING_STAGE_BLOCKS_COUNTER.inc(); - send_block - .send((Instant::now() + Duration::from_secs(30), block)) - .expect("should works"); + send_block.send(block).expect("should works"); // delay queue so that we get all the banking stage errors before processing block } _ => {} }; } + jh.await.unwrap(); + jh2.await.unwrap(); } diff --git a/src/postgres.rs b/src/postgres.rs index 0d5984a..9accaba 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -115,13 +115,11 @@ impl PostgresSession { if txs.is_empty() { return Ok(()); } - const NUMBER_OF_ARGS: usize = 11; + const NUMBER_OF_ARGS: usize = 10; let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len()); - let txs: Vec = txs - .iter() - .map(|x| PostgresTransactionInfo::from(x)) - .collect(); + let txs: Vec = + txs.iter().map(PostgresTransactionInfo::from).collect(); for tx in txs.iter() { args.push(&tx.signature); args.push(&tx.errors); @@ -150,7 +148,7 @@ impl PostgresSession { } pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> { - const NUMBER_OF_ARGS: usize = 9; + const NUMBER_OF_ARGS: usize = 10; let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS); args.push(&block_info.block_hash); args.push(&block_info.slot); @@ -182,6 +180,7 @@ impl PostgresSession { } } +#[derive(Clone)] pub struct Postgres { session: Arc, } diff --git a/src/transaction_info.rs b/src/transaction_info.rs index 42c183b..9c6e74d 100644 --- a/src/transaction_info.rs +++ b/src/transaction_info.rs @@ -11,7 +11,7 @@ use solana_sdk::{ }, pubkey::Pubkey, slot_history::Slot, - transaction::TransactionError, + transaction::{TransactionError, VersionedTransaction}, }; use yellowstone_grpc_proto::prelude::{ SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo, @@ -83,7 +83,6 @@ impl Eq for ErrorKey {} #[derive(Clone)] pub struct TransactionInfo { pub signature: String, - pub transaction_message: Option, pub errors: HashMap, pub is_executed: bool, pub is_confirmed: bool, @@ -126,7 +125,6 @@ impl TransactionInfo { .collect(); Self { signature: notification.signature.clone(), - transaction_message: None, errors, is_executed, is_confirmed: false, @@ -147,7 +145,7 @@ impl TransactionInfo { let key = ErrorKey { error, slot }; match self.errors.get_mut(&key) { Some(x) => { - *x = *x + 1; + *x += 1; } None => { self.errors.insert(key, 1); @@ -158,6 +156,77 @@ impl TransactionInfo { } } + pub fn add_rpc_transaction(&mut self, slot: u64, transaction: &VersionedTransaction) { + let message = &transaction.message; + let legacy_compute_budget: Option<(u32, Option)> = + message.instructions().iter().find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + }) = try_from_slice_unchecked(i.data.as_slice()) + { + if additional_fee > 0 { + return Some((units, Some(((units * 1000) / additional_fee) as u64))); + } else { + return Some((units, None)); + } + } + } + None + }); + + let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); + let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None); + + let cu_requested = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(limit); + } + } + None + }) + .or(legacy_cu_requested); + + let prioritization_fees = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(price); + } + } + + None + }) + .or(legacy_prioritization_fees); + if let Some(cu_requested) = cu_requested { + self.cu_requested = Some(cu_requested as u64); + } + + if let Some(prioritization_fees) = prioritization_fees { + self.prioritization_fees = Some(prioritization_fees); + } + self.is_confirmed = true; + self.is_executed = true; + self.processed_slot = Some(slot); + } + pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo, slot: Slot) { let Some(transaction) = &transaction.transaction else { return; @@ -280,7 +349,6 @@ impl TransactionInfo { self.prioritization_fees = Some(prioritization_fees); } self.is_confirmed = true; - self.transaction_message = Some(message); self.is_executed = true; self.processed_slot = Some(slot); }