adding accounts in banking stage notifications

This commit is contained in:
Godmode Galactus 2023-10-20 10:23:26 +02:00
parent ccc497bc9d
commit 77ff31a547
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
5 changed files with 583 additions and 294 deletions

638
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,17 @@
use std::{sync::Arc, collections::HashMap};
use std::{collections::HashMap, sync::Arc};
use dashmap::DashMap;
use itertools::Itertools;
use solana_sdk::{pubkey::Pubkey, message::{VersionedMessage, MessageHeader, v0::{self, MessageAddressTableLookup}}, instruction::CompiledInstruction, compute_budget::{self, ComputeBudgetInstruction}, borsh0_10::try_from_slice_unchecked};
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
compute_budget::{self, ComputeBudgetInstruction},
instruction::CompiledInstruction,
message::{
v0::{self, MessageAddressTableLookup},
MessageHeader, VersionedMessage,
},
pubkey::Pubkey,
};
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
use crate::transaction_info::TransactionInfo;
@ -20,16 +29,50 @@ pub struct BlockInfo {
}
impl BlockInfo {
pub fn new(block: &SubscribeUpdateBlock, map_of_infos: Arc<DashMap<String, TransactionInfo>>) -> BlockInfo {
pub fn new(
block: &SubscribeUpdateBlock,
map_of_infos: Arc<DashMap<String, TransactionInfo>>,
) -> BlockInfo {
let block_hash = block.blockhash.clone();
let slot = block.slot;
let leader_identity = block.rewards.as_ref().map(|rewards| rewards.rewards.iter().find(|x| x.reward_type == 1).map(|x| x.pubkey.clone())).unwrap_or(None);
let successful_transactions = block.transactions.iter().filter(|x| x.meta.as_ref().map(|x| x.err.is_none()).unwrap_or(false)).count() as u64;
let leader_identity = block
.rewards
.as_ref()
.map(|rewards| {
rewards
.rewards
.iter()
.find(|x| x.reward_type == 1)
.map(|x| x.pubkey.clone())
})
.unwrap_or(None);
let successful_transactions = block
.transactions
.iter()
.filter(|x| x.meta.as_ref().map(|x| x.err.is_none()).unwrap_or(false))
.count() as u64;
let processed_transactions = block.transactions.len() as u64;
let banking_stage_errors = map_of_infos.iter()
.filter(|x| x.first_notification_slot == slot)
.map(|x| x.errors.iter().filter(|(x, _)| x.slot == slot).map(|(_,x)| *x).sum::<usize>()).sum::<usize>() as u64;
let total_cu_used = block.transactions.iter().map(|x| x.meta.as_ref().map(|x| x.compute_units_consumed.unwrap_or(0)).unwrap_or(0)).sum::<u64>() as i64;
let banking_stage_errors = map_of_infos
.iter()
.filter(|x| x.first_notification_slot == slot)
.map(|x| {
x.errors
.iter()
.filter(|(x, _)| x.slot == slot)
.map(|(_, x)| *x)
.sum::<usize>()
})
.sum::<usize>() as u64;
let total_cu_used = block
.transactions
.iter()
.map(|x| {
x.meta
.as_ref()
.map(|x| x.compute_units_consumed.unwrap_or(0))
.unwrap_or(0)
})
.sum::<u64>() as i64;
let mut writelocked_accounts = HashMap::new();
let mut total_cu_requested: u64 = 0;
for transaction in &block.transactions {
@ -52,7 +95,8 @@ impl BlockInfo {
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys.clone()
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
@ -62,7 +106,8 @@ impl BlockInfo {
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions.clone()
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
@ -71,7 +116,8 @@ impl BlockInfo {
})
.collect(),
address_table_lookups: message
.address_table_lookups.clone()
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
@ -86,7 +132,7 @@ impl BlockInfo {
})
.collect(),
});
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
@ -109,9 +155,9 @@ impl BlockInfo {
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let cu_requested = message
.instructions()
.iter()
@ -130,12 +176,18 @@ impl BlockInfo {
.or(legacy_cu_requested);
let cu_requested = cu_requested.unwrap_or(200000) as u64;
total_cu_requested = total_cu_requested + cu_requested;
let writable_accounts = message.static_account_keys().iter().enumerate().filter(|(index,_)| message.is_maybe_writable(*index)).map(|x| x.1.clone()).collect_vec();
let writable_accounts = message
.static_account_keys()
.iter()
.enumerate()
.filter(|(index, _)| message.is_maybe_writable(*index))
.map(|x| x.1.clone())
.collect_vec();
for writable_account in writable_accounts {
match writelocked_accounts.get_mut(&writable_account) {
Some(x) => {
*x += cu_requested;
},
}
None => {
writelocked_accounts.insert(writable_account, cu_requested);
}
@ -143,9 +195,15 @@ impl BlockInfo {
}
}
let mut heavily_writelocked_accounts = writelocked_accounts.iter().filter(|x| *x.1 > 1000000).collect_vec();
let mut heavily_writelocked_accounts = writelocked_accounts
.iter()
.filter(|x| *x.1 > 1000000)
.collect_vec();
heavily_writelocked_accounts.sort_by(|lhs, rhs| (*rhs.1).cmp(lhs.1));
let heavily_writelocked_accounts = heavily_writelocked_accounts.iter().map(|(pubkey, cu)| format!("{}:{}", **pubkey, **cu)).collect_vec();
let heavily_writelocked_accounts = heavily_writelocked_accounts
.iter()
.map(|(pubkey, cu)| format!("{}:{}", **pubkey, **cu))
.collect_vec();
BlockInfo {
block_hash,
slot: slot as i64,
@ -159,4 +217,3 @@ impl BlockInfo {
}
}
}

View File

@ -1,4 +1,7 @@
use std::{collections::HashMap, sync::{Arc, atomic::AtomicU64}};
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc},
};
use block_info::BlockInfo;
use dashmap::DashMap;
@ -10,9 +13,10 @@ use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
};
mod transaction_info;
mod postgres;
mod block_info;
mod postgres;
mod transaction_info;
mod cli;
#[tokio::main()]
async fn main() {
@ -72,10 +76,7 @@ async fn main() {
None => {
let mut x = TransactionInfo::new(&transaction);
x.add_notification(&transaction);
map_of_infos.insert(
sig,
x,
);
map_of_infos.insert(sig, x);
}
}
}
@ -96,8 +97,7 @@ async fn main() {
log::error!("Error saving block {}", e);
}
}
_ => {
}
_ => {}
};
}
}

View File

@ -1,13 +1,15 @@
use std::{sync::{Arc, atomic::AtomicU64}, time::Duration};
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use anyhow::Context;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use itertools::Itertools;
use tokio_postgres::{Client, NoTls, tls::MakeTlsConnect, Socket, types::ToSql};
use crate::{transaction_info::TransactionInfo, block_info::BlockInfo};
use tokio_postgres::{tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use crate::{block_info::BlockInfo, transaction_info::TransactionInfo};
pub struct PostgresSession {
client: Client,
@ -18,8 +20,7 @@ impl PostgresSession {
let pg_config = std::env::var("PG_CONFIG").context("env PG_CONFIG not found")?;
let pg_config = pg_config.parse::<tokio_postgres::Config>()?;
let client =
Self::spawn_connection(pg_config, NoTls).await?;
let client = Self::spawn_connection(pg_config, NoTls).await?;
Ok(Self { client })
}
@ -75,14 +76,20 @@ impl PostgresSession {
}
}
pub async fn save_banking_transaction_results(&self, txs: &[TransactionInfo]) -> anyhow::Result<()> {
pub async fn save_banking_transaction_results(
&self,
txs: &[TransactionInfo],
) -> anyhow::Result<()> {
if txs.is_empty() {
return Ok(());
}
const NUMBER_OF_ARGS : usize = 10;
const NUMBER_OF_ARGS: usize = 10;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS * txs.len());
let txs: Vec<PostgresTransactionInfo> = txs.iter().map(|x| PostgresTransactionInfo::from(x)).collect();
let txs: Vec<PostgresTransactionInfo> = txs
.iter()
.map(|x| PostgresTransactionInfo::from(x))
.collect();
for tx in txs.iter() {
args.push(&tx.signature);
args.push(&tx.transaction_message);
@ -110,8 +117,8 @@ impl PostgresSession {
Ok(())
}
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
const NUMBER_OF_ARGS : usize = 9;
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 9;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
args.push(&block_info.block_hash);
args.push(&block_info.slot);
@ -145,10 +152,16 @@ pub struct Postgres {
impl Postgres {
pub async fn new() -> Self {
let session = PostgresSession::new().await.unwrap();
Self { session: Arc::new(session) }
Self {
session: Arc::new(session),
}
}
pub fn start_saving_transaction(&self, map_of_transaction : Arc<DashMap<String, TransactionInfo>>, slots : Arc<AtomicU64>) {
pub fn start_saving_transaction(
&self,
map_of_transaction: Arc<DashMap<String, TransactionInfo>>,
slots: Arc<AtomicU64>,
) {
let session = self.session.clone();
tokio::task::spawn(async move {
loop {
@ -168,7 +181,10 @@ impl Postgres {
}
let batches = txs_to_store.chunks(8).collect_vec();
for batch in batches {
session.save_banking_transaction_results(batch).await.unwrap();
session
.save_banking_transaction_results(batch)
.await
.unwrap();
}
}
}
@ -199,10 +215,17 @@ impl From<&TransactionInfo> for PostgresTransactionInfo {
let str = is + x.0.to_string().as_str() + ":" + x.1.to_string().as_str() + ";";
str
});
let accounts_used = value.account_used.iter().map(|x| format!("{}({})", x.0, x.1).to_string()).collect();
let accounts_used = value
.account_used
.iter()
.map(|x| format!("{}({})", x.0, x.1).to_string())
.collect();
Self {
signature: value.signature.clone(),
transaction_message: value.transaction_message.as_ref().map(|x| base64::encode(bincode::serialize(&x).unwrap())),
transaction_message: value
.transaction_message
.as_ref()
.map(|x| base64::encode(bincode::serialize(&x).unwrap())),
errors,
is_executed: value.is_executed,
is_confirmed: value.is_confirmed,
@ -213,4 +236,4 @@ impl From<&TransactionInfo> for PostgresTransactionInfo {
accounts_used,
}
}
}
}

View File

@ -1,8 +1,21 @@
use std::{collections::HashMap, hash::Hash};
use std::{collections::HashMap, hash::Hash, str::FromStr};
use chrono::{DateTime, Utc};
use solana_sdk::{slot_history::Slot, transaction::TransactionError, message::{VersionedMessage, v0::{self, MessageAddressTableLookup}, MessageHeader}, pubkey::Pubkey, instruction::CompiledInstruction, compute_budget::{self, ComputeBudgetInstruction}, borsh0_10::try_from_slice_unchecked};
use yellowstone_grpc_proto::prelude::{SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo};
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
compute_budget::{self, ComputeBudgetInstruction},
instruction::CompiledInstruction,
message::{
v0::{self, MessageAddressTableLookup},
MessageHeader, VersionedMessage,
},
pubkey::Pubkey,
slot_history::Slot,
transaction::TransactionError,
};
use yellowstone_grpc_proto::prelude::{
SubscribeUpdateBankingTransactionResults, SubscribeUpdateTransactionInfo,
};
fn convert_transaction_error_into_int(error: &TransactionError) -> u8 {
match error {
@ -87,16 +100,29 @@ impl TransactionInfo {
let is_executed = notification.error.is_none();
// Get time
let utc_timestamp = Utc::now();
match &notification.error {
Some(e) => {
let error: TransactionError = bincode::deserialize(&e.err).unwrap();
let key = ErrorKey { error, slot: notification.slot };
errors.insert( key, 1);
},
None => {
let key = ErrorKey {
error,
slot: notification.slot,
};
errors.insert(key, 1);
}
None => {}
};
let account_used = notification
.accounts
.iter()
.map(|x| {
(
Pubkey::from_str(&x.account).unwrap(),
if x.is_writable { 'w' } else { 'r' },
)
})
.collect();
Self {
signature: notification.signature.clone(),
transaction_message: None,
@ -107,8 +133,7 @@ impl TransactionInfo {
cu_requested: None,
prioritization_fees: None,
utc_timestamp,
account_used: HashMap::new(),
account_used,
}
}
@ -134,7 +159,6 @@ impl TransactionInfo {
}
pub fn add_transaction(&mut self, transaction: &SubscribeUpdateTransactionInfo) {
let Some(transaction) = &transaction.transaction else {
return;
};
@ -154,17 +178,18 @@ impl TransactionInfo {
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys.clone()
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
let bytes: [u8; 32] = key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions.clone()
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
@ -173,7 +198,8 @@ impl TransactionInfo {
})
.collect(),
address_table_lookups: message
.address_table_lookups.clone()
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
@ -200,10 +226,7 @@ impl TransactionInfo {
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((
units,
Some(((units * 1000) / additional_fee) as u64),
));
return Some((units, Some(((units * 1000) / additional_fee) as u64)));
} else {
return Some((units, None));
}
@ -249,9 +272,6 @@ impl TransactionInfo {
None
})
.or(legacy_prioritization_fees);
let account_used: HashMap<Pubkey, char> = message.static_account_keys().iter().enumerate().map(|(index, x)| (x.clone(), if message.is_maybe_writable(index) {'w'} else {'r'})).collect();
if let Some(cu_requested) = cu_requested {
self.cu_requested = Some(cu_requested as u64);
}
@ -262,6 +282,5 @@ impl TransactionInfo {
self.is_confirmed = true;
self.transaction_message = Some(message);
self.is_executed = true;
self.account_used = account_used;
}
}