saving some more transaction data and block_info

This commit is contained in:
Godmode Galactus 2023-10-17 12:11:54 +02:00
parent 29616a19c9
commit 3cb4a874b9
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
5 changed files with 241 additions and 9 deletions

View File

@ -8,5 +8,19 @@ CREATE TABLE banking_stage_results.transaction_infos (
is_confirmed BOOL,
first_notification_slot BIGINT NOT NULL,
cu_requested BIGINT,
prioritization_fees BIGINT
prioritization_fees BIGINT,
timestamp TIMESTAMP NOT NULL,
accounts_used text[],
);
CREATE TABLE banking_stage_results.blocks (
block_hash CHAR(44) NOT NULL,
slot BIGINT,
leader_identity CHAR(44),
successful_transactions BIGINT,
banking_stage_errors BIGINT,
processed_transactions BIGINT,
total_cu_used BIGINT,
total_cu_requested BIGINT,
heavily_writelocked_accounts text[],
);

162
src/block_info.rs Normal file
View File

@ -0,0 +1,162 @@
use std::{sync::Arc, collections::HashMap};
use dashmap::DashMap;
use itertools::Itertools;
use solana_sdk::{pubkey::Pubkey, message::{VersionedMessage, MessageHeader, v0::{self, MessageAddressTableLookup}}, instruction::CompiledInstruction, compute_budget::{self, ComputeBudgetInstruction}, borsh0_10::try_from_slice_unchecked};
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
use crate::transaction_info::TransactionInfo;
pub struct BlockInfo {
pub block_hash: String,
pub slot: i64,
pub leader_identity: Option<String>,
pub successful_transactions: i64,
pub banking_stage_errors: i64,
pub processed_transactions: i64,
pub total_cu_used: i64,
pub total_cu_requested: i64,
pub heavily_writelocked_accounts: Vec<String>,
}
impl BlockInfo {
pub fn new(block: &SubscribeUpdateBlock, map_of_infos: Arc<DashMap<String, TransactionInfo>>) -> BlockInfo {
let block_hash = block.blockhash.clone();
let slot = block.slot;
let leader_identity = block.rewards.as_ref().map(|rewards| rewards.rewards.iter().find(|x| x.reward_type == 1).map(|x| x.pubkey.clone())).unwrap_or(None);
let successful_transactions = block.transactions.iter().filter(|x| x.meta.as_ref().map(|x| x.err.is_none()).unwrap_or(false)).count() as u64;
let processed_transactions = block.transactions.len() as u64;
let banking_stage_errors = map_of_infos.iter()
.filter(|x| x.first_notification_slot == slot)
.map(|x| x.errors.iter().filter(|(x, _)| x.slot == slot).map(|(_,x)| *x).sum::<usize>()).sum::<usize>() as u64;
let total_cu_used = block.transactions.iter().map(|x| x.meta.as_ref().map(|x| x.compute_units_consumed.unwrap_or(0)).unwrap_or(0)).sum::<u64>() as i64;
let mut writelocked_accounts = HashMap::new();
let mut total_cu_requested: u64 = 0;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let Some(message) = &tx.message else {
continue;
};
let Some(header) = &message.header else {
continue;
};
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
MessageAddressTableLookup {
account_key: Pubkey::new_from_array(bytes),
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(((units * 1000) / additional_fee) as u64),
));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let cu_requested = cu_requested.unwrap_or(200000) as u64;
total_cu_requested = total_cu_requested + cu_requested;
let writable_accounts = message.static_account_keys().iter().enumerate().filter(|(index,_)| message.is_maybe_writable(*index)).map(|x| x.1.clone()).collect_vec();
for writable_account in writable_accounts {
match writelocked_accounts.get_mut(&writable_account) {
Some(x) => {
*x += cu_requested;
},
None => {
writelocked_accounts.insert(writable_account, cu_requested);
}
}
}
}
let mut heavily_writelocked_accounts = writelocked_accounts.iter().filter(|x| *x.1 > 1000000).collect_vec();
heavily_writelocked_accounts.sort_by(|lhs, rhs| (*rhs.1).cmp(lhs.1));
let heavily_writelocked_accounts = heavily_writelocked_accounts.iter().map(|(pubkey, cu)| format!("{}:{}", **pubkey, **cu)).collect_vec();
BlockInfo {
block_hash,
slot: slot as i64,
leader_identity,
successful_transactions: successful_transactions as i64,
processed_transactions: processed_transactions as i64,
banking_stage_errors: banking_stage_errors as i64,
total_cu_used,
total_cu_requested: total_cu_requested as i64,
heavily_writelocked_accounts,
}
}
}

View File

@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::{Arc, atomic::AtomicU64}};
use block_info::BlockInfo;
use dashmap::DashMap;
use futures::StreamExt;
use solana_sdk::signature::Signature;
@ -11,6 +12,7 @@ use yellowstone_grpc_proto::prelude::{
mod transaction_info;
mod postgres;
mod block_info;
#[tokio::main()]
async fn main() {
@ -79,7 +81,7 @@ async fn main() {
}
UpdateOneof::Block(block) => {
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
for transaction in block.transactions {
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
@ -88,6 +90,11 @@ async fn main() {
info.add_transaction(&transaction);
}
}
let block_info = BlockInfo::new(&block, map_of_infos.clone());
if let Err(e) = postgres.save_block_info(block_info).await {
log::error!("Error saving block {}", e);
}
}
_ => {
}

View File

@ -1,11 +1,12 @@
use std::{sync::{Arc, atomic::AtomicU64}, time::Duration};
use anyhow::Context;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use itertools::Itertools;
use tokio_postgres::{Client, NoTls, tls::MakeTlsConnect, Socket, types::ToSql};
use crate::transaction_info::TransactionInfo;
use crate::{transaction_info::TransactionInfo, block_info::BlockInfo};
pub struct PostgresSession {
@ -78,7 +79,7 @@ impl PostgresSession {
if txs.is_empty() {
return Ok(());
}
const NUMBER_OF_ARGS : usize = 8;
const NUMBER_OF_ARGS : usize = 10;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
let txs: Vec<PostgresTransactionInfo> = txs.iter().map(|x| PostgresTransactionInfo::from(x)).collect();
@ -91,12 +92,14 @@ impl PostgresSession {
args.push(&tx.first_notification_slot);
args.push(&tx.cu_requested);
args.push(&tx.prioritization_fees);
args.push(&tx.utc_timestamp);
args.push(&tx.accounts_used);
}
let mut query = String::from(
r#"
INSERT INTO banking_stage_results.transaction_infos
(signature, message, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees)
(signature, message, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, timestamp, accounts_used)
VALUES
"#,
);
@ -106,6 +109,33 @@ impl PostgresSession {
Ok(())
}
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
const NUMBER_OF_ARGS : usize = 9;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
args.push(&block_info.block_hash);
args.push(&block_info.slot);
args.push(&block_info.leader_identity);
args.push(&block_info.successful_transactions);
args.push(&block_info.banking_stage_errors);
args.push(&block_info.processed_transactions);
args.push(&block_info.total_cu_used);
args.push(&block_info.total_cu_requested);
args.push(&block_info.heavily_writelocked_accounts);
let mut query = String::from(
r#"
INSERT INTO banking_stage_results.blocks
(block_hash, slot, leader_identity, successful_transactions, banking_stage_errors, processed_transactions, total_cu_used, total_cu_requested, heavily_writelocked_accounts)
VALUES
"#,
);
Self::multiline_query(&mut query, NUMBER_OF_ARGS, 1, &[]);
self.client.execute(&query, &args).await?;
Ok(())
}
}
pub struct Postgres {
@ -123,7 +153,6 @@ impl Postgres {
tokio::task::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
println!("postgres loop");
let slot = slots.load(std::sync::atomic::Ordering::Relaxed);
let mut txs_to_store = vec![];
for tx in map_of_transaction.iter() {
@ -145,6 +174,10 @@ impl Postgres {
}
});
}
pub async fn save_block_info(&self, block: BlockInfo) -> anyhow::Result<()> {
self.session.save_block(block).await
}
}
pub struct PostgresTransactionInfo {
@ -156,6 +189,8 @@ pub struct PostgresTransactionInfo {
pub first_notification_slot: i64,
pub cu_requested: Option<i64>,
pub prioritization_fees: Option<i64>,
pub utc_timestamp: DateTime<Utc>,
pub accounts_used: Vec<String>,
}
impl From<&TransactionInfo> for PostgresTransactionInfo {
@ -164,6 +199,7 @@ impl From<&TransactionInfo> for PostgresTransactionInfo {
let str = is + x.0.to_string().as_str() + ":" + x.1.to_string().as_str() + ";";
str
});
let accounts_used = value.account_used.iter().map(|x| format!("{}({})", x.0, x.1).to_string()).collect();
Self {
signature: value.signature.clone(),
transaction_message: value.transaction_message.as_ref().map(|x| base64::encode(bincode::serialize(&x).unwrap())),
@ -172,7 +208,9 @@ impl From<&TransactionInfo> for PostgresTransactionInfo {
is_confirmed: value.is_confirmed,
cu_requested: value.cu_requested.map(|x| x as i64),
first_notification_slot: value.first_notification_slot as i64,
prioritization_fees: value.prioritization_fees.map(|x| x as i64)
prioritization_fees: value.prioritization_fees.map(|x| x as i64),
utc_timestamp: value.utc_timestamp,
accounts_used,
}
}
}

View File

@ -1,5 +1,6 @@
use std::{collections::HashMap, hash::Hash};
use chrono::{DateTime, Utc};
use solana_sdk::{slot_history::Slot, transaction::TransactionError, message::{VersionedMessage, v0::{self, MessageAddressTableLookup}, MessageHeader}, pubkey::Pubkey, instruction::CompiledInstruction, compute_budget::{self, ComputeBudgetInstruction}, borsh0_10::try_from_slice_unchecked};
use yellowstone_grpc_proto::prelude::{SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo};
@ -46,8 +47,8 @@ fn convert_transaction_error_into_int(error: &TransactionError) -> u8 {
#[derive(Clone, PartialEq)]
pub struct ErrorKey {
error: TransactionError,
slot: Slot,
pub error: TransactionError,
pub slot: Slot,
}
impl ToString for ErrorKey {
@ -76,12 +77,16 @@ pub struct TransactionInfo {
pub first_notification_slot: u64,
pub cu_requested: Option<u64>,
pub prioritization_fees: Option<u64>,
pub utc_timestamp: DateTime<Utc>,
pub account_used: HashMap<Pubkey, char>,
}
impl TransactionInfo {
pub fn new(notification: &SubscribeUpdateBankingTransactionResults) -> Self {
let mut errors = HashMap::new();
let is_executed = notification.error.is_none();
// Get time
let utc_timestamp = Utc::now();
match &notification.error {
Some(e) => {
@ -101,6 +106,9 @@ impl TransactionInfo {
first_notification_slot: notification.slot,
cu_requested: None,
prioritization_fees: None,
utc_timestamp,
account_used: HashMap::new(),
}
}
@ -242,6 +250,8 @@ impl TransactionInfo {
})
.or(legacy_prioritization_fees);
let account_used: HashMap<Pubkey, char> = message.static_account_keys().iter().enumerate().map(|(index, x)| (x.clone(), if message.is_maybe_writable(index) {'w'} else {'r'})).collect();
if let Some(cu_requested) = cu_requested {
self.cu_requested = Some(cu_requested as u64);
}
@ -252,5 +262,6 @@ impl TransactionInfo {
self.is_confirmed = true;
self.transaction_message = Some(message);
self.is_executed = true;
self.account_used = account_used;
}
}