get banking blocks from rpc

This commit is contained in:
godmodegalactus 2023-11-24 11:41:19 +01:00
parent 2e994bd2dc
commit c1a61ab0a6
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
7 changed files with 386 additions and 57 deletions

View File

@ -9,4 +9,11 @@ grant all privileges on database mangolana to galactus;
psql -d mangolana < migration.sql
export PG_CONFIG="host=localhost dbname=mangolana user=username password=password sslmode=disable"
export PG_CONFIG="host=localhost dbname=mangolana user=galactus password=test sslmode=disable"
### give rights to user
GRANT ALL PRIVILEGES ON DATABASE mangolana TO galactus;
GRANT ALL PRIVILEGES ON SCHEMA banking_stage_results TO galactus;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA banking_stage_results TO galactus;
ALTER DEFAULT PRIVILEGES IN SCHEMA banking_stage_results GRANT ALL PRIVILEGES ON TABLES TO galactus;

View File

@ -22,7 +22,7 @@ CREATE TABLE banking_stage_results.blocks (
processed_transactions BIGINT,
total_cu_used BIGINT,
total_cu_requested BIGINT,
heavily_writelocked_accounts text
heavily_writelocked_accounts text,
heavily_readlocked_accounts text
);

View File

@ -9,13 +9,15 @@ use solana_sdk::{
MessageHeader, VersionedMessage,
},
pubkey::Pubkey,
slot_history::Slot,
};
use solana_transaction_status::{RewardType, UiConfirmedBlock};
use std::collections::HashMap;
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
#[derive(Serialize, Debug, Clone)]
pub struct AccountUsage {
pub key: Pubkey,
pub key: String,
pub cu_requested: u64,
pub cu_consumed: u64,
}
@ -34,7 +36,7 @@ pub struct BlockInfo {
}
impl BlockInfo {
pub fn new(block: &SubscribeUpdateBlock, banking_stage_error_count: Option<i64>) -> BlockInfo {
pub fn new(block: &SubscribeUpdateBlock) -> BlockInfo {
let block_hash = block.blockhash.clone();
let slot = block.slot;
let leader_identity = block
@ -177,15 +179,15 @@ impl BlockInfo {
.or(legacy_cu_requested);
let cu_requested = cu_requested.unwrap_or(200000) as u64;
let cu_consumed = meta.compute_units_consumed.unwrap_or(0);
total_cu_requested = total_cu_requested + cu_requested;
total_cu_requested += cu_requested;
let accounts = message
.static_account_keys()
.iter()
.enumerate()
.map(|(index, account)| (message.is_maybe_writable(index), account.clone()))
.map(|(index, account)| (message.is_maybe_writable(index), *account))
.collect_vec();
for writable_account in accounts.iter().filter(|x| x.0 == true).map(|x| x.1) {
for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) {
match writelocked_accounts.get_mut(&writable_account) {
Some(x) => {
x.cu_requested += cu_requested;
@ -195,7 +197,7 @@ impl BlockInfo {
writelocked_accounts.insert(
writable_account,
AccountUsage {
key: writable_account,
key: writable_account.to_string(),
cu_consumed,
cu_requested,
},
@ -204,7 +206,7 @@ impl BlockInfo {
}
}
for readable_account in accounts.iter().filter(|x| x.0 == false).map(|x| x.1) {
for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) {
match readlocked_accounts.get_mut(&readable_account) {
Some(x) => {
x.cu_requested += cu_requested;
@ -214,7 +216,7 @@ impl BlockInfo {
readlocked_accounts.insert(
readable_account,
AccountUsage {
key: readable_account,
key: readable_account.to_string(),
cu_consumed,
cu_requested,
},
@ -243,11 +245,193 @@ impl BlockInfo {
leader_identity,
successful_transactions: successful_transactions as i64,
processed_transactions: processed_transactions as i64,
banking_stage_errors: banking_stage_error_count,
banking_stage_errors: None,
total_cu_used,
total_cu_requested: total_cu_requested as i64,
heavily_writelocked_accounts,
heavily_readlocked_accounts,
}
}
pub fn new_from_rpc_block(
slot: Slot,
block: &UiConfirmedBlock,
banking_stage_errors_count: i64,
) -> Option<Self> {
let block_hash = block.blockhash.clone();
let leader_identity = block
.rewards
.as_ref()
.map(|rewards| {
rewards
.iter()
.find(|x| x.reward_type == Some(RewardType::Fee))
.map(|x| x.pubkey.clone())
})
.unwrap_or(None);
let transactions = if let Some(transactions) = &block.transactions {
transactions
} else {
return None;
};
let successful_transactions = transactions
.iter()
.filter(|x| x.meta.as_ref().map(|x| x.err.is_none()).unwrap_or(false))
.count() as u64;
let processed_transactions = transactions.len() as u64;
let total_cu_used = transactions
.iter()
.map(|x| {
x.meta
.as_ref()
.map(|x| match x.compute_units_consumed {
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => {
x
}
solana_transaction_status::option_serializer::OptionSerializer::Skip => 0,
solana_transaction_status::option_serializer::OptionSerializer::None => 0,
})
.unwrap_or(0)
})
.sum::<u64>() as i64;
let mut writelocked_accounts: HashMap<Pubkey, AccountUsage> = HashMap::new();
let mut readlocked_accounts: HashMap<Pubkey, AccountUsage> = HashMap::new();
let mut total_cu_requested: u64 = 0;
for transaction in transactions {
let Some(tx) = transaction.transaction.decode() else {
continue;
};
let message = &tx.message;
let Some(meta) = &transaction.meta else {
continue;
};
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(
((units.saturating_mul(1000))
.saturating_div(additional_fee))
as u64,
),
));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let cu_requested = cu_requested.unwrap_or(200000) as u64;
let cu_consumed = match meta.compute_units_consumed {
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => x,
solana_transaction_status::option_serializer::OptionSerializer::Skip => 0,
solana_transaction_status::option_serializer::OptionSerializer::None => 0,
};
total_cu_requested += cu_requested;
let accounts = message
.static_account_keys()
.iter()
.enumerate()
.map(|(index, account)| (message.is_maybe_writable(index), *account))
.collect_vec();
for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) {
match writelocked_accounts.get_mut(&writable_account) {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
}
None => {
writelocked_accounts.insert(
writable_account,
AccountUsage {
key: writable_account.to_string(),
cu_consumed,
cu_requested,
},
);
}
}
}
for readable_account in accounts.iter().filter(|x| !x.0).map(|x| x.1) {
match readlocked_accounts.get_mut(&readable_account) {
Some(x) => {
x.cu_requested += cu_requested;
x.cu_consumed += cu_consumed;
}
None => {
readlocked_accounts.insert(
readable_account,
AccountUsage {
key: readable_account.to_string(),
cu_consumed,
cu_requested,
},
);
}
}
}
}
let mut heavily_writelocked_accounts = writelocked_accounts
.iter()
.filter(|(_, account)| account.cu_consumed > 1000000)
.map(|x| x.1.clone())
.collect_vec();
heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts
.iter()
.filter(|(_, acc)| acc.cu_consumed > 1000000)
.map(|x| x.1.clone())
.collect();
heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
Some(BlockInfo {
block_hash,
slot: slot as i64,
leader_identity,
successful_transactions: successful_transactions as i64,
processed_transactions: processed_transactions as i64,
banking_stage_errors: Some(banking_stage_errors_count),
total_cu_used,
total_cu_requested: total_cu_requested as i64,
heavily_writelocked_accounts,
heavily_readlocked_accounts,
})
}
}

View File

@ -9,6 +9,9 @@ pub struct Args {
#[arg(long)]
pub grpc_x_token: Option<String>,
#[arg(short, long, default_value_t = String::from("http://127.0.0.1:8899"))]
pub rpc_url: String,
/// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))]
pub prometheus_addr: String,

View File

@ -1,4 +1,5 @@
use clap::Parser;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc},
@ -11,9 +12,9 @@ use block_info::BlockInfo;
use cli::Args;
use dashmap::DashMap;
use futures::StreamExt;
use log::{debug, error, info};
use prometheus::{IntCounter, IntGauge, opts, register_int_counter, register_int_gauge};
use solana_sdk::signature::Signature;
use log::{debug, error};
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use transaction_info::TransactionInfo;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::prelude::{
@ -45,6 +46,7 @@ async fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
let rpc_url = args.rpc_url;
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
@ -86,43 +88,106 @@ async fn main() {
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let (send_block, mut recv_block) =
tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>();
tokio::sync::mpsc::unbounded_channel::<SubscribeUpdateBlock>();
let slot_by_error_task = slot_by_errors.clone();
let map_of_infos_task = map_of_infos.clone();
// process blocks with 2 mins delay so that we process all the banking stage errors before processing blocks
tokio::spawn(async move {
while let Some((wait_until, block)) = recv_block.recv().await {
if wait_until > Instant::now() + Duration::from_secs(5) {
info!(
"wait until {:?} to collect errors for block {}",
wait_until, block.slot
);
let jh = {
let postgres = postgres.clone();
tokio::spawn(async move {
while let Some(block) = recv_block.recv().await {
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos_task.get_mut(&signature.to_string()) {
info.add_transaction(transaction, block.slot);
}
}
let block_info = BlockInfo::new(&block);
TXERROR_COUNT
.add(block_info.processed_transactions - block_info.successful_transactions);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_error_task.remove(&block.slot);
}
tokio::time::sleep_until(wait_until).await;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
})
};
// get blocks from rpc server
// because validator does not send banking blocks
let (rpc_blocks_sender, rpc_blocks_reciever) =
tokio::sync::mpsc::unbounded_channel::<(Instant, u64)>();
let jh2 = {
let map_of_infos = map_of_infos.clone();
let postgres = postgres.clone();
let slot_by_errors = slot_by_errors.clone();
tokio::spawn(async move {
let mut rpc_blocks_reciever = rpc_blocks_reciever;
let rpc_client = RpcClient::new(rpc_url);
while let Some((wait_until, slot)) = rpc_blocks_reciever.recv().await {
tokio::time::sleep_until(wait_until).await;
let block = if let Ok(block) = rpc_client
.get_block_with_config(
slot,
solana_rpc_client_api::config::RpcBlockConfig {
encoding: Some(
solana_transaction_status::UiTransactionEncoding::Base64,
),
transaction_details: Some(
solana_transaction_status::TransactionDetails::Full,
),
rewards: Some(true),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
},
)
.await
{
block
} else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone()).unwrap();
if let Some(mut info) = map_of_infos_task.get_mut(&signature.to_string()) {
info.add_transaction(&transaction, block.slot);
let Some(transactions) = &block.transactions else {
continue;
};
for transaction in transactions {
let Some(transaction) = &transaction.transaction.decode() else {
continue;
};
let signature = transaction.signatures[0].to_string();
if let Some(mut info) = map_of_infos.get_mut(&signature) {
info.add_rpc_transaction(slot, transaction);
}
}
let banking_stage_error_count = slot_by_errors
.get(&slot)
.map(|x| *x.value() as i64)
.unwrap_or_default();
let block_info =
BlockInfo::new_from_rpc_block(slot, &block, banking_stage_error_count);
if let Some(block_info) = block_info {
BANKING_STAGE_ERROR_COUNT.add(banking_stage_error_count);
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot_by_errors.remove(&slot);
}
}
let banking_stage_error_count = slot_by_error_task
.get(&block.slot)
.map(|x| *x.value() as i64);
let block_info = BlockInfo::new(&block, banking_stage_error_count);
BANKING_STAGE_ERROR_COUNT.add(block_info.banking_stage_errors.unwrap_or(0));
TXERROR_COUNT
.add(block_info.processed_transactions - block_info.successful_transactions);
if let Err(e) = postgres.save_block_info(block_info).await {
error!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_error_task.remove(&block.slot);
}
});
})
};
while let Some(message) = geyser_stream.next().await {
let Ok(message) = message else {
@ -142,10 +207,13 @@ async fn main() {
let sig = transaction.signature.to_string();
match slot_by_errors.get_mut(&transaction.slot) {
Some(mut value) => {
*value = *value + 1;
*value += 1;
}
None => {
slot_by_errors.insert(transaction.slot, 1);
rpc_blocks_sender
.send((Instant::now() + Duration::from_secs(30), transaction.slot))
.expect("should works");
}
}
match map_of_infos.get_mut(&sig) {
@ -163,12 +231,12 @@ async fn main() {
debug!("got block {}", block.slot);
BLOCK_TXS.set(block.transactions.len() as i64);
BANKING_STAGE_BLOCKS_COUNTER.inc();
send_block
.send((Instant::now() + Duration::from_secs(30), block))
.expect("should works");
send_block.send(block).expect("should works");
// delay queue so that we get all the banking stage errors before processing block
}
_ => {}
};
}
jh.await.unwrap();
jh2.await.unwrap();
}

View File

@ -115,13 +115,11 @@ impl PostgresSession {
if txs.is_empty() {
return Ok(());
}
const NUMBER_OF_ARGS: usize = 11;
const NUMBER_OF_ARGS: usize = 10;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
let txs: Vec<PostgresTransactionInfo> = txs
.iter()
.map(|x| PostgresTransactionInfo::from(x))
.collect();
let txs: Vec<PostgresTransactionInfo> =
txs.iter().map(PostgresTransactionInfo::from).collect();
for tx in txs.iter() {
args.push(&tx.signature);
args.push(&tx.errors);
@ -150,7 +148,7 @@ impl PostgresSession {
}
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 9;
const NUMBER_OF_ARGS: usize = 10;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
args.push(&block_info.block_hash);
args.push(&block_info.slot);
@ -182,6 +180,7 @@ impl PostgresSession {
}
}
#[derive(Clone)]
pub struct Postgres {
session: Arc<PostgresSession>,
}

View File

@ -11,7 +11,7 @@ use solana_sdk::{
},
pubkey::Pubkey,
slot_history::Slot,
transaction::TransactionError,
transaction::{TransactionError, VersionedTransaction},
};
use yellowstone_grpc_proto::prelude::{
SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo,
@ -83,7 +83,6 @@ impl Eq for ErrorKey {}
#[derive(Clone)]
pub struct TransactionInfo {
pub signature: String,
pub transaction_message: Option<VersionedMessage>,
pub errors: HashMap<ErrorKey, usize>,
pub is_executed: bool,
pub is_confirmed: bool,
@ -126,7 +125,6 @@ impl TransactionInfo {
.collect();
Self {
signature: notification.signature.clone(),
transaction_message: None,
errors,
is_executed,
is_confirmed: false,
@ -147,7 +145,7 @@ impl TransactionInfo {
let key = ErrorKey { error, slot };
match self.errors.get_mut(&key) {
Some(x) => {
*x = *x + 1;
*x += 1;
}
None => {
self.errors.insert(key, 1);
@ -158,6 +156,77 @@ impl TransactionInfo {
}
}
pub fn add_rpc_transaction(&mut self, slot: u64, transaction: &VersionedTransaction) {
let message = &transaction.message;
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((units, Some(((units * 1000) / additional_fee) as u64)));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let prioritization_fees = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
})
.or(legacy_prioritization_fees);
if let Some(cu_requested) = cu_requested {
self.cu_requested = Some(cu_requested as u64);
}
if let Some(prioritization_fees) = prioritization_fees {
self.prioritization_fees = Some(prioritization_fees);
}
self.is_confirmed = true;
self.is_executed = true;
self.processed_slot = Some(slot);
}
pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo, slot: Slot) {
let Some(transaction) = &transaction.transaction else {
return;
@ -280,7 +349,6 @@ impl TransactionInfo {
self.prioritization_fees = Some(prioritization_fees);
}
self.is_confirmed = true;
self.transaction_message = Some(message);
self.is_executed = true;
self.processed_slot = Some(slot);
}