chaning schema
fixing the schema updating postgres code Adding transaction slots into postgres saving account used by transactions saving block data
This commit is contained in:
parent
d7969a8087
commit
51a0b93c44
126
migration.sql
126
migration.sql
|
@ -1,21 +1,33 @@
|
|||
CREATE SCHEMA banking_stage_results;
|
||||
CREATE SCHEMA banking_stage_results_2;
|
||||
|
||||
CREATE TABLE banking_stage_results.transaction_infos (
|
||||
signature CHAR(88),
|
||||
first_notification_slot BIGINT NOT NULL,
|
||||
errors text,
|
||||
is_executed BOOL,
|
||||
is_confirmed BOOL,
|
||||
cu_requested BIGINT,
|
||||
prioritization_fees BIGINT,
|
||||
utc_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
accounts_used text,
|
||||
processed_slot BIGINT,
|
||||
supp_infos text,
|
||||
Primary key (signature, first_notification_slot)
|
||||
CREATE TABLE banking_stage_results_2.transactions(
|
||||
signature char(88) primary key,
|
||||
transaction_id bigserial,
|
||||
UNIQUE(transaction_id)
|
||||
);
|
||||
|
||||
CREATE TABLE banking_stage_results.blocks (
|
||||
CREATE TABLE banking_stage_results_2.transaction_infos (
|
||||
transaction_id BIGINT PRIMARY KEY,
|
||||
processed_slot BIGINT,
|
||||
is_successful BOOL,
|
||||
cu_requested BIGINT,
|
||||
cu_consumed BIGINT,
|
||||
prioritization_fees BIGINT,
|
||||
supp_infos text
|
||||
);
|
||||
|
||||
CREATE TABLE banking_stage_results_2.transaction_slot (
|
||||
transaction_id BIGINT,
|
||||
slot BIGINT NOT NULL,
|
||||
error INT,
|
||||
count INT,
|
||||
utc_timestamp TIMESTAMP NOT NULL,
|
||||
PRIMARY KEY (transaction_id, slot, error)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_transaction_slot_timestamp ON banking_stage_results_2.transaction_slot(utc_timestamp);
|
||||
|
||||
CREATE TABLE banking_stage_results_2.blocks (
|
||||
slot BIGINT PRIMARY KEY,
|
||||
block_hash CHAR(44),
|
||||
leader_identity CHAR(44),
|
||||
|
@ -24,18 +36,82 @@ CREATE TABLE banking_stage_results.blocks (
|
|||
processed_transactions BIGINT,
|
||||
total_cu_used BIGINT,
|
||||
total_cu_requested BIGINT,
|
||||
heavily_writelocked_accounts text,
|
||||
heavily_readlocked_accounts text,
|
||||
supp_infos text
|
||||
);
|
||||
|
||||
CREATE INDEX idx_blocks_slot ON banking_stage_results.blocks(slot);
|
||||
-- optional
|
||||
CLUSTER banking_stage_results.blocks using idx_blocks_slot;
|
||||
VACUUM FULL banking_stage_results.blocks;
|
||||
CREATE INDEX idx_blocks_slot_errors ON banking_stage_results.blocks(slot) WHERE banking_stage_errors > 0;
|
||||
CREATE TABLE banking_stage_results_2.accounts(
|
||||
acc_id bigserial primary key,
|
||||
account_key text,
|
||||
UNIQUE (account_key)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_account_key ON banking_stage_results_2.accounts(account_key);
|
||||
|
||||
CREATE TABLE banking_stage_results_2.accounts_map_transaction(
|
||||
acc_id BIGINT,
|
||||
transaction_id BIGINT,
|
||||
is_writable BOOL,
|
||||
is_signer BOOL,
|
||||
PRIMARY KEY (acc_id, transaction_id)
|
||||
);
|
||||
|
||||
CREATE TABLE banking_stage_results_2.accounts_map_blocks (
|
||||
acc_id BIGINT,
|
||||
slot BIGINT,
|
||||
is_write_locked BOOL,
|
||||
total_cu_consumed BIGINT,
|
||||
total_cu_requested BIGINT,
|
||||
prioritization_fees_info text,
|
||||
supp_infos text,
|
||||
PRIMARY KEY (acc_id, slot, is_writable)
|
||||
);
|
||||
|
||||
CREATE TABLE banking_stage_results_2.errors (
|
||||
error_code int primary key,
|
||||
error text
|
||||
);
|
||||
|
||||
insert into banking_stage_results_2.errors (error, error_code) VALUES
|
||||
('TransactionError::AccountBorrowOutstanding', 0),
|
||||
('TransactionError::AccountInUse', 1),
|
||||
('TransactionError::AccountLoadedTwice', 2),
|
||||
('TransactionError::AccountNotFound', 3),
|
||||
('TransactionError::AddressLookupTableNotFound', 4),
|
||||
('TransactionError::AlreadyProcessed', 5),
|
||||
('TransactionError::BlockhashNotFound', 6),
|
||||
('TransactionError::CallChainTooDeep', 7),
|
||||
('TransactionError::ClusterMaintenance', 8),
|
||||
('TransactionError::DuplicateInstruction', 9),
|
||||
('TransactionError::InstructionError', 10),
|
||||
('TransactionError::InsufficientFundsForFee', 11),
|
||||
('TransactionError::InsufficientFundsForRent', 12),
|
||||
('TransactionError::InvalidAccountForFee', 13),
|
||||
('TransactionError::InvalidAccountIndex', 14),
|
||||
('TransactionError::InvalidAddressLookupTableData', 15),
|
||||
('TransactionError::InvalidAddressLookupTableIndex', 16),
|
||||
('TransactionError::InvalidAddressLookupTableOwner', 17),
|
||||
('TransactionError::InvalidLoadedAccountsDataSizeLimit', 18),
|
||||
('TransactionError::InvalidProgramForExecution', 19),
|
||||
('TransactionError::InvalidRentPayingAccount', 20),
|
||||
('TransactionError::InvalidWritableAccount', 21),
|
||||
('TransactionError::MaxLoadedAccountsDataSizeExceeded', 22),
|
||||
('TransactionError::MissingSignatureForFee', 23),
|
||||
('TransactionError::ProgramAccountNotFound', 24),
|
||||
('TransactionError::ResanitizationNeeded', 25),
|
||||
('TransactionError::SanitizeFailure', 26),
|
||||
('TransactionError::SignatureFailure', 27),
|
||||
('TransactionError::TooManyAccountLocks', 28),
|
||||
('TransactionError::UnbalancedTransaction', 29),
|
||||
('TransactionError::UnsupportedVersion', 30),
|
||||
('TransactionError::WouldExceedAccountDataBlockLimit', 31),
|
||||
('TransactionError::WouldExceedAccountDataTotalLimit', 32),
|
||||
('TransactionError::WouldExceedMaxAccountCostLimit', 33),
|
||||
('TransactionError::WouldExceedMaxBlockCostLimit', 34),
|
||||
('TransactionError::WouldExceedMaxVoteCostLimit', 35);
|
||||
|
||||
CREATE INDEX idx_transaction_infos_timestamp ON banking_stage_results.transaction_infos(utc_timestamp);
|
||||
-- optional
|
||||
CLUSTER banking_stage_results.transaction_infos using idx_transaction_infos_timestamp;
|
||||
VACUUM FULL banking_stage_results.transaction_infos;
|
||||
CLUSTER banking_stage_results_2.blocks using blocks_pkey;
|
||||
VACUUM FULL banking_stage_results_2.blocks;
|
||||
-- optional
|
||||
CLUSTER banking_stage_results_2.transaction_slot using idx_transaction_slot_timestamp;
|
||||
VACUUM FULL banking_stage_results_2.transaction_slot;
|
|
@ -9,19 +9,44 @@ use solana_sdk::{
|
|||
MessageHeader, VersionedMessage,
|
||||
},
|
||||
pubkey::Pubkey,
|
||||
signature::Signature,
|
||||
slot_history::Slot,
|
||||
};
|
||||
use solana_transaction_status::{RewardType, UiConfirmedBlock};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
pub struct PrioFeeData {
|
||||
pub max: Option<u64>,
|
||||
pub min: Option<u64>,
|
||||
pub p75: Option<u64>,
|
||||
pub p90: Option<u64>,
|
||||
pub p95: Option<u64>,
|
||||
pub med: Option<u64>,
|
||||
}
|
||||
|
||||
impl PrioFeeData {
|
||||
pub fn new(pf_vec: &Vec<u64>) -> Self {
|
||||
let mut vec = pf_vec.clone();
|
||||
vec.sort();
|
||||
let mid = vec.len() / 2;
|
||||
Self {
|
||||
max: vec.last().cloned(),
|
||||
min: vec.first().cloned(),
|
||||
p75: (pf_vec.len() > 1).then(|| pf_vec[pf_vec.len() * 75 / 100]),
|
||||
p90: (pf_vec.len() > 1).then(|| pf_vec[pf_vec.len() * 90 / 100]),
|
||||
p95: (pf_vec.len() > 1).then(|| pf_vec[pf_vec.len() * 95 / 100]),
|
||||
med: (pf_vec.len() > 1).then(|| pf_vec[pf_vec.len() / 2]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
pub struct AccountUsage {
|
||||
pub key: String,
|
||||
pub cu_requested: u64,
|
||||
pub cu_consumed: u64,
|
||||
pub max_pf: u64,
|
||||
pub min_pf: u64,
|
||||
pub median_pf: u64,
|
||||
pub is_write_locked: bool,
|
||||
pub cu_requested: i64,
|
||||
pub cu_consumed: i64,
|
||||
pub prioritization_fee_data: PrioFeeData,
|
||||
}
|
||||
|
||||
pub struct AccountData {
|
||||
|
@ -31,24 +56,21 @@ pub struct AccountData {
|
|||
pub vec_pf: Vec<u64>,
|
||||
}
|
||||
|
||||
impl From<&AccountData> for AccountUsage {
|
||||
fn from(value: &AccountData) -> Self {
|
||||
let mut median = value.vec_pf.clone();
|
||||
median.sort();
|
||||
let mid = median.len() / 2;
|
||||
impl From<(&AccountData, bool)> for AccountUsage {
|
||||
fn from(value: (&AccountData, bool)) -> Self {
|
||||
let (account_data, is_write_locked) = value;
|
||||
AccountUsage {
|
||||
key: value.key.clone(),
|
||||
cu_requested: value.cu_requested,
|
||||
cu_consumed: value.cu_consumed,
|
||||
max_pf: value.vec_pf.iter().max().cloned().unwrap_or_default(),
|
||||
min_pf: value.vec_pf.iter().min().cloned().unwrap_or_default(),
|
||||
median_pf: median[mid],
|
||||
key: account_data.key.clone(),
|
||||
cu_requested: account_data.cu_requested as i64,
|
||||
cu_consumed: account_data.cu_consumed as i64,
|
||||
is_write_locked,
|
||||
prioritization_fee_data: PrioFeeData::new(&account_data.vec_pf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
pub struct BlockSupplimentaryInfo {
|
||||
pub struct PrioritizationFeesInfo {
|
||||
pub p_min: u64,
|
||||
pub p_median: u64,
|
||||
pub p_75: u64,
|
||||
|
@ -56,22 +78,40 @@ pub struct BlockSupplimentaryInfo {
|
|||
pub p_max: u64,
|
||||
}
|
||||
|
||||
pub struct TransactionAccount {
|
||||
pub key: String,
|
||||
pub is_writable: bool,
|
||||
pub is_signer: bool,
|
||||
}
|
||||
|
||||
pub struct BlockTransactionInfo {
|
||||
pub signature: String,
|
||||
pub processed_slot: i64,
|
||||
pub is_successful: bool,
|
||||
pub cu_requested: i64,
|
||||
pub cu_consumed: i64,
|
||||
pub prioritization_fees: i64,
|
||||
pub supp_infos: String,
|
||||
pub accounts: Vec<TransactionAccount>,
|
||||
}
|
||||
|
||||
pub struct BlockInfo {
|
||||
pub block_hash: String,
|
||||
pub slot: i64,
|
||||
pub leader_identity: Option<String>,
|
||||
pub successful_transactions: i64,
|
||||
pub banking_stage_errors: Option<i64>,
|
||||
pub processed_transactions: i64,
|
||||
pub total_cu_used: i64,
|
||||
pub total_cu_requested: i64,
|
||||
pub heavily_writelocked_accounts: Vec<AccountUsage>,
|
||||
pub heavily_readlocked_accounts: Vec<AccountUsage>,
|
||||
pub sup_info: Option<BlockSupplimentaryInfo>,
|
||||
pub heavily_locked_accounts: Vec<AccountUsage>,
|
||||
pub sup_info: Option<PrioritizationFeesInfo>,
|
||||
pub transactions: Vec<BlockTransactionInfo>,
|
||||
}
|
||||
|
||||
impl BlockInfo {
|
||||
pub fn process_versioned_message(
|
||||
signature: String,
|
||||
slot: Slot,
|
||||
message: &VersionedMessage,
|
||||
prio_fees_in_block: &mut Vec<u64>,
|
||||
writelocked_accounts: &mut HashMap<Pubkey, AccountData>,
|
||||
|
@ -79,7 +119,8 @@ impl BlockInfo {
|
|||
cu_consumed: u64,
|
||||
total_cu_requested: &mut u64,
|
||||
is_vote: bool,
|
||||
) {
|
||||
is_successful: bool,
|
||||
) -> Option<BlockTransactionInfo> {
|
||||
let (cu_requested, prio_fees, nb_ix_except_cb) = {
|
||||
let mut cu_request: Option<u64> = None;
|
||||
let mut prio_fees: Option<u64> = None;
|
||||
|
@ -133,7 +174,13 @@ impl BlockInfo {
|
|||
.static_account_keys()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(index, account)| (message.is_maybe_writable(index), *account))
|
||||
.map(|(index, account)| {
|
||||
(
|
||||
message.is_maybe_writable(index),
|
||||
*account,
|
||||
message.is_signer(index),
|
||||
)
|
||||
})
|
||||
.collect_vec();
|
||||
for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) {
|
||||
match writelocked_accounts.get_mut(&writable_account) {
|
||||
|
@ -176,36 +223,54 @@ impl BlockInfo {
|
|||
}
|
||||
}
|
||||
}
|
||||
Some(BlockTransactionInfo {
|
||||
signature,
|
||||
processed_slot: slot as i64,
|
||||
is_successful,
|
||||
cu_requested: cu_requested as i64,
|
||||
cu_consumed: cu_consumed as i64,
|
||||
prioritization_fees: prioritization_fees as i64,
|
||||
supp_infos: String::new(),
|
||||
accounts: accounts
|
||||
.iter()
|
||||
.map(|(is_writable, key, is_signer)| TransactionAccount {
|
||||
key: key.to_string(),
|
||||
is_signer: *is_signer,
|
||||
is_writable: *is_writable,
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn calculate_account_usage(
|
||||
writelocked_accounts: &HashMap<Pubkey, AccountData>,
|
||||
readlocked_accounts: &HashMap<Pubkey, AccountData>,
|
||||
) -> (Vec<AccountUsage>, Vec<AccountUsage>) {
|
||||
let mut heavily_writelocked_accounts = writelocked_accounts
|
||||
) -> Vec<AccountUsage> {
|
||||
let mut accounts = writelocked_accounts
|
||||
.iter()
|
||||
.map(|(_, data)| AccountUsage::from(data))
|
||||
.map(|(_, data)| AccountUsage::from((data, true)))
|
||||
.collect_vec();
|
||||
heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
|
||||
|
||||
let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts
|
||||
let mut heavily_readlocked_accounts = readlocked_accounts
|
||||
.iter()
|
||||
.map(|(_, data)| AccountUsage::from(data))
|
||||
.collect();
|
||||
heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
|
||||
(heavily_writelocked_accounts, heavily_readlocked_accounts)
|
||||
.map(|(_, data)| AccountUsage::from((data, false)))
|
||||
.collect_vec();
|
||||
accounts.append(&mut heavily_readlocked_accounts);
|
||||
accounts
|
||||
}
|
||||
|
||||
pub fn calculate_supp_info(
|
||||
prio_fees_in_block: &mut Vec<u64>,
|
||||
) -> Option<BlockSupplimentaryInfo> {
|
||||
) -> Option<PrioritizationFeesInfo> {
|
||||
if !prio_fees_in_block.is_empty() {
|
||||
prio_fees_in_block.sort();
|
||||
let median_index = prio_fees_in_block.len() / 2;
|
||||
let p75_index = prio_fees_in_block.len() * 75 / 100;
|
||||
let p90_index = prio_fees_in_block.len() * 90 / 100;
|
||||
Some(BlockSupplimentaryInfo {
|
||||
Some(PrioritizationFeesInfo {
|
||||
p_min: prio_fees_in_block[0],
|
||||
p_median: prio_fees_in_block[median_index],
|
||||
p_75: prio_fees_in_block[p75_index],
|
||||
|
@ -219,7 +284,6 @@ impl BlockInfo {
|
|||
|
||||
pub fn new(
|
||||
block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock,
|
||||
banking_stage_errors: Option<i64>,
|
||||
) -> BlockInfo {
|
||||
let block_hash = block.blockhash.clone();
|
||||
let slot = block.slot;
|
||||
|
@ -255,6 +319,7 @@ impl BlockInfo {
|
|||
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
|
||||
let mut total_cu_requested: u64 = 0;
|
||||
let mut prio_fees_in_block = vec![];
|
||||
let mut block_transactions = vec![];
|
||||
for transaction in &block.transactions {
|
||||
let Some(tx) = &transaction.transaction else {
|
||||
continue;
|
||||
|
@ -271,6 +336,9 @@ impl BlockInfo {
|
|||
let Some(meta) = &transaction.meta else {
|
||||
continue;
|
||||
};
|
||||
let signature = Signature::try_from(&tx.signatures[0][0..64])
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
let message = VersionedMessage::V0(v0::Message {
|
||||
header: MessageHeader {
|
||||
|
@ -317,7 +385,9 @@ impl BlockInfo {
|
|||
.collect(),
|
||||
});
|
||||
|
||||
Self::process_versioned_message(
|
||||
let transaction = Self::process_versioned_message(
|
||||
signature,
|
||||
slot,
|
||||
&message,
|
||||
&mut prio_fees_in_block,
|
||||
&mut writelocked_accounts,
|
||||
|
@ -325,10 +395,14 @@ impl BlockInfo {
|
|||
meta.compute_units_consumed.unwrap_or(0),
|
||||
&mut total_cu_requested,
|
||||
transaction.is_vote,
|
||||
meta.err.is_none(),
|
||||
);
|
||||
if let Some(transaction) = transaction {
|
||||
block_transactions.push(transaction);
|
||||
}
|
||||
}
|
||||
|
||||
let (heavily_writelocked_accounts, heavily_readlocked_accounts) =
|
||||
let heavily_locked_accounts =
|
||||
Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts);
|
||||
|
||||
let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block);
|
||||
|
@ -339,105 +413,11 @@ impl BlockInfo {
|
|||
leader_identity,
|
||||
successful_transactions: successful_transactions as i64,
|
||||
processed_transactions: processed_transactions as i64,
|
||||
banking_stage_errors,
|
||||
total_cu_used,
|
||||
total_cu_requested: total_cu_requested as i64,
|
||||
heavily_writelocked_accounts,
|
||||
heavily_readlocked_accounts,
|
||||
heavily_locked_accounts,
|
||||
sup_info,
|
||||
transactions: block_transactions,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn _new_from_rpc_block(
|
||||
slot: Slot,
|
||||
block: &UiConfirmedBlock,
|
||||
banking_stage_errors_count: i64,
|
||||
) -> Option<Self> {
|
||||
let block_hash = block.blockhash.clone();
|
||||
let leader_identity = block
|
||||
.rewards
|
||||
.as_ref()
|
||||
.map(|rewards| {
|
||||
rewards
|
||||
.iter()
|
||||
.find(|x| x.reward_type == Some(RewardType::Fee))
|
||||
.map(|x| x.pubkey.clone())
|
||||
})
|
||||
.unwrap_or(None);
|
||||
let transactions = if let Some(transactions) = &block.transactions {
|
||||
transactions
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let successful_transactions = transactions
|
||||
.iter()
|
||||
.filter(|x| x.meta.as_ref().map(|x| x.err.is_none()).unwrap_or(false))
|
||||
.count() as u64;
|
||||
let processed_transactions = transactions.len() as u64;
|
||||
|
||||
let total_cu_used = transactions
|
||||
.iter()
|
||||
.map(|x| {
|
||||
x.meta
|
||||
.as_ref()
|
||||
.map(|x| match x.compute_units_consumed {
|
||||
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => {
|
||||
x
|
||||
}
|
||||
solana_transaction_status::option_serializer::OptionSerializer::Skip => 0,
|
||||
solana_transaction_status::option_serializer::OptionSerializer::None => 0,
|
||||
})
|
||||
.unwrap_or(0)
|
||||
})
|
||||
.sum::<u64>() as i64;
|
||||
let mut writelocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
|
||||
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
|
||||
let mut total_cu_requested: u64 = 0;
|
||||
let mut prio_fees_in_block = vec![];
|
||||
for transaction in transactions {
|
||||
let Some(tx) = transaction.transaction.decode() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let message = &tx.message;
|
||||
|
||||
let Some(meta) = &transaction.meta else {
|
||||
continue;
|
||||
};
|
||||
let is_vote = false;
|
||||
|
||||
Self::process_versioned_message(
|
||||
message,
|
||||
&mut prio_fees_in_block,
|
||||
&mut writelocked_accounts,
|
||||
&mut readlocked_accounts,
|
||||
match meta.compute_units_consumed {
|
||||
solana_transaction_status::option_serializer::OptionSerializer::None => 0,
|
||||
solana_transaction_status::option_serializer::OptionSerializer::Skip => 0,
|
||||
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => x,
|
||||
},
|
||||
&mut total_cu_requested,
|
||||
is_vote,
|
||||
);
|
||||
}
|
||||
|
||||
let (heavily_writelocked_accounts, heavily_readlocked_accounts) =
|
||||
Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts);
|
||||
|
||||
let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block);
|
||||
Some(BlockInfo {
|
||||
block_hash,
|
||||
slot: slot as i64,
|
||||
leader_identity,
|
||||
successful_transactions: successful_transactions as i64,
|
||||
processed_transactions: processed_transactions as i64,
|
||||
banking_stage_errors: Some(banking_stage_errors_count),
|
||||
total_cu_used,
|
||||
total_cu_requested: total_cu_requested as i64,
|
||||
heavily_writelocked_accounts,
|
||||
heavily_readlocked_accounts,
|
||||
sup_info,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
46
src/main.rs
46
src/main.rs
|
@ -14,7 +14,6 @@ use dashmap::DashMap;
|
|||
use futures::StreamExt;
|
||||
use log::{debug, error};
|
||||
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
|
||||
use solana_sdk::signature::Signature;
|
||||
use transaction_info::TransactionInfo;
|
||||
|
||||
mod block_info;
|
||||
|
@ -60,6 +59,7 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
String,
|
||||
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots,
|
||||
> = if subscribe_to_slots {
|
||||
log::info!("subscribing to slots on grpc banking errors");
|
||||
let mut slot_sub = HashMap::new();
|
||||
slot_sub.insert(
|
||||
"slot_sub".to_string(),
|
||||
|
@ -93,12 +93,13 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
let Some(update) = message.update_oneof else {
|
||||
continue;
|
||||
};
|
||||
log::trace!("got banking stage notification");
|
||||
|
||||
match update{
|
||||
yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) => {
|
||||
if transaction.error.is_none() {
|
||||
continue;
|
||||
}
|
||||
// if transaction.error.is_none() {
|
||||
// continue;
|
||||
// }
|
||||
BANKING_STAGE_ERROR_EVENT_COUNT.inc();
|
||||
let sig = transaction.signature.to_string();
|
||||
match slot_by_errors.get_mut(&transaction.slot) {
|
||||
|
@ -132,6 +133,7 @@ pub async fn start_tracking_banking_stage_errors(
|
|||
let load_slot = slot.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if load_slot < s.slot {
|
||||
// update slot to process updates
|
||||
// updated slot
|
||||
slot.store(s.slot, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
},
|
||||
|
@ -149,8 +151,6 @@ async fn start_tracking_blocks(
|
|||
grpc_x_token: Option<String>,
|
||||
postgres: postgres::Postgres,
|
||||
slot: Arc<AtomicU64>,
|
||||
slot_by_errors: Arc<DashMap<u64, u64>>,
|
||||
map_of_infos: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
|
||||
) {
|
||||
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
|
||||
grpc_block_addr,
|
||||
|
@ -227,36 +227,16 @@ async fn start_tracking_blocks(
|
|||
BANKING_STAGE_BLOCKS_TASK.inc();
|
||||
let postgres = postgres.clone();
|
||||
let slot = slot.clone();
|
||||
let map_of_infos = map_of_infos.clone();
|
||||
let slot_by_errors = slot_by_errors.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
for transaction in &block.transactions {
|
||||
let Some(tx) = &transaction.transaction else {
|
||||
continue;
|
||||
};
|
||||
let signature = Signature::try_from(tx.signatures[0].clone())
|
||||
.unwrap()
|
||||
.to_string();
|
||||
if let Some(mut tree) = map_of_infos.get_mut(&signature) {
|
||||
for (_, tx_info) in tree.iter_mut() {
|
||||
tx_info.add_transaction(transaction, block.slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let banking_stage_error_count =
|
||||
slot_by_errors.get(&block.slot).map(|x| *x.value() as i64);
|
||||
let block_info = BlockInfo::new(&block, banking_stage_error_count);
|
||||
let block_info = BlockInfo::new(&block);
|
||||
|
||||
TXERROR_COUNT.add(
|
||||
block_info.processed_transactions - block_info.successful_transactions,
|
||||
);
|
||||
if let Err(e) = postgres.save_block_info(block_info).await {
|
||||
panic!("Error saving block {}", e);
|
||||
error!("Error saving block {}", e);
|
||||
}
|
||||
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
|
||||
slot_by_errors.remove(&block.slot);
|
||||
BANKING_STAGE_BLOCKS_TASK.dec();
|
||||
});
|
||||
// delay queue so that we get all the banking stage errors before processing block
|
||||
|
@ -306,15 +286,7 @@ async fn main() {
|
|||
})
|
||||
.collect_vec();
|
||||
if let Some(gprc_block_addr) = grpc_block_addr {
|
||||
start_tracking_blocks(
|
||||
gprc_block_addr,
|
||||
args.grpc_x_token,
|
||||
postgres,
|
||||
slot,
|
||||
slot_by_errors,
|
||||
map_of_infos,
|
||||
)
|
||||
.await;
|
||||
start_tracking_blocks(gprc_block_addr, args.grpc_x_token, postgres, slot).await;
|
||||
}
|
||||
futures::future::join_all(jhs).await;
|
||||
}
|
||||
|
|
698
src/postgres.rs
698
src/postgres.rs
|
@ -6,7 +6,6 @@ use std::{
|
|||
|
||||
use anyhow::Context;
|
||||
use base64::Engine;
|
||||
use chrono::{DateTime, Utc};
|
||||
use dashmap::DashMap;
|
||||
use futures::pin_mut;
|
||||
use itertools::Itertools;
|
||||
|
@ -23,10 +22,34 @@ use tokio_postgres::{
|
|||
Client, CopyInSink, NoTls, Socket,
|
||||
};
|
||||
|
||||
use crate::{block_info::BlockInfo, transaction_info::TransactionInfo};
|
||||
use crate::{
|
||||
block_info::{BlockInfo, BlockTransactionInfo},
|
||||
transaction_info::TransactionInfo,
|
||||
};
|
||||
|
||||
pub struct TempTableTracker {
|
||||
count: AtomicU64,
|
||||
}
|
||||
|
||||
impl TempTableTracker {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
count: AtomicU64::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_new_temp_table(&self) -> String {
|
||||
format!(
|
||||
"temp_table_{}",
|
||||
self.count
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresSession {
|
||||
client: Client,
|
||||
temp_table_tracker: TempTableTracker,
|
||||
}
|
||||
|
||||
impl PostgresSession {
|
||||
|
@ -62,7 +85,10 @@ impl PostgresSession {
|
|||
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
|
||||
};
|
||||
|
||||
Ok(Self { client })
|
||||
Ok(Self {
|
||||
client,
|
||||
temp_table_tracker: TempTableTracker::new(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn spawn_connection<T>(
|
||||
|
@ -92,29 +118,448 @@ impl PostgresSession {
|
|||
Ok(client)
|
||||
}
|
||||
|
||||
pub fn _multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) {
|
||||
let mut arg_index = 1usize;
|
||||
for row in 0..rows {
|
||||
query.push('(');
|
||||
pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> {
|
||||
self.client
|
||||
.execute(format!("drop table if exists {};", table).as_str(), &[])
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
for i in 0..args {
|
||||
if row == 0 && !types.is_empty() {
|
||||
query.push_str(&format!("(${arg_index})::{}", types[i]));
|
||||
} else {
|
||||
query.push_str(&format!("${arg_index}"));
|
||||
}
|
||||
arg_index += 1;
|
||||
if i != (args - 1) {
|
||||
query.push(',');
|
||||
}
|
||||
}
|
||||
pub async fn create_transaction_ids(&self, signatures: Vec<String>) -> anyhow::Result<()> {
|
||||
// create temp table
|
||||
let temp_table = self.temp_table_tracker.get_new_temp_table();
|
||||
|
||||
query.push(')');
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
r#"
|
||||
CREATE TEMP TABLE {}(
|
||||
signature char(88)
|
||||
);
|
||||
"#,
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
if row != (rows - 1) {
|
||||
query.push(',');
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
signature
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
|
||||
pin_mut!(writer);
|
||||
for signature in signatures {
|
||||
writer.as_mut().write(&[&signature]).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature from {}
|
||||
ON CONFLICT DO NOTHING;
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn create_accounts_for_transaction(
|
||||
&self,
|
||||
accounts: Vec<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
// create temp table
|
||||
let temp_table = self.temp_table_tracker.get_new_temp_table();
|
||||
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
key char(44)
|
||||
);",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
key
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
|
||||
pin_mut!(writer);
|
||||
for account in accounts {
|
||||
writer.as_mut().write(&[&account]).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {}
|
||||
ON CONFLICT DO NOTHING;
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await.unwrap();
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_transaction_in_txslot_table(
|
||||
&self,
|
||||
txs: &[TransactionInfo],
|
||||
) -> anyhow::Result<()> {
|
||||
let temp_table = self.temp_table_tracker.get_new_temp_table();
|
||||
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
sig char(88),
|
||||
slot BIGINT,
|
||||
error INT,
|
||||
count INT,
|
||||
utc_timestamp TIMESTAMP
|
||||
);",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
sig, slot, error, count, utc_timestamp
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
&[
|
||||
Type::TEXT,
|
||||
Type::INT8,
|
||||
Type::INT4,
|
||||
Type::INT4,
|
||||
Type::TIMESTAMP,
|
||||
],
|
||||
);
|
||||
pin_mut!(writer);
|
||||
for tx in txs {
|
||||
let slot: i64 = tx.slot as i64;
|
||||
for (error, count) in &tx.errors {
|
||||
let error = error.to_int();
|
||||
let count = *count as i32;
|
||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(5);
|
||||
let timestamp = tx.utc_timestamp.naive_local();
|
||||
args.push(&tx.signature);
|
||||
args.push(&slot);
|
||||
args.push(&error);
|
||||
args.push(&count);
|
||||
args.push(×tamp);
|
||||
|
||||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
}
|
||||
writer.finish().await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.transaction_slot(transaction_id, slot, error, count, utc_timestamp)
|
||||
SELECT ( select transaction_id from banking_stage_results_2.transactions where signature = t.sig ), t.slot, t.error, t.count, t.utc_timestamp
|
||||
FROM (
|
||||
SELECT sig, slot, error, count, utc_timestamp from {}
|
||||
)
|
||||
as t (sig, slot, error, count, utc_timestamp) ON CONFLICT DO NOTHING;
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_accounts_for_transaction(
|
||||
&self,
|
||||
accounts_for_transaction: Vec<AccountsForTransaction>,
|
||||
) -> anyhow::Result<()> {
|
||||
let temp_table = self.temp_table_tracker.get_new_temp_table();
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
account_key char(44),
|
||||
signature char(88),
|
||||
is_writable BOOL,
|
||||
is_signer BOOL
|
||||
);",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
account_key, signature, is_writable, is_signer
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer =
|
||||
BinaryCopyInWriter::new(sink, &[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL]);
|
||||
pin_mut!(writer);
|
||||
for acc_tx in accounts_for_transaction {
|
||||
for acc in acc_tx.accounts {
|
||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4);
|
||||
args.push(&acc.key);
|
||||
args.push(&acc_tx.signature);
|
||||
args.push(&acc.writable);
|
||||
args.push(&acc.is_signer);
|
||||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
}
|
||||
writer.finish().await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer)
|
||||
SELECT
|
||||
( select acc_id from banking_stage_results_2.accounts where account_key == t.account_key ),
|
||||
( select transaction_id from banking_stage_results_2.transactions where signature = t.sig ),
|
||||
t.is_writable,
|
||||
t.is_signer
|
||||
FROM (
|
||||
SELECT account_key, signature, is_writable, is_signer from {}
|
||||
)
|
||||
as t (account_key, signature, is_writable, is_signer)
|
||||
ON CONFLICT DO NOTHING;
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_transactions_for_block(
|
||||
&self,
|
||||
transactions: &Vec<BlockTransactionInfo>,
|
||||
slot: i64,
|
||||
) -> anyhow::Result<()> {
|
||||
let temp_table = self.temp_table_tracker.get_new_temp_table();
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
signature text,
|
||||
processed_slot BIGINT,
|
||||
is_successful BOOL,
|
||||
cu_requested BIGINT,
|
||||
cu_consumed BIGINT,
|
||||
prioritization_fees BIGINT,
|
||||
supp_infos text
|
||||
);",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
signature,
|
||||
processed_slot,
|
||||
is_successful,
|
||||
cu_requested,
|
||||
cu_consumed,
|
||||
prioritization_fees,
|
||||
supp_infos
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
&[
|
||||
Type::TEXT,
|
||||
Type::INT8,
|
||||
Type::BOOL,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::TEXT,
|
||||
],
|
||||
);
|
||||
pin_mut!(writer);
|
||||
for transaction in transactions {
|
||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(7);
|
||||
args.push(&transaction.signature);
|
||||
args.push(&slot);
|
||||
args.push(&transaction.is_successful);
|
||||
args.push(&transaction.cu_requested);
|
||||
args.push(&transaction.cu_consumed);
|
||||
args.push(&transaction.prioritization_fees);
|
||||
args.push(&transaction.supp_infos);
|
||||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.transaction_infos
|
||||
(transaction_id, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos)
|
||||
SELECT
|
||||
( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ),
|
||||
t.processed_slot,
|
||||
t.is_successful,
|
||||
t.cu_requested,
|
||||
t.cu_consumed,
|
||||
t.prioritization_fees,
|
||||
t.supp_infos
|
||||
FROM (
|
||||
SELECT signature, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos from {}
|
||||
)
|
||||
as t (signature, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos)
|
||||
ON CONFLICT DO NOTHING;
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn save_account_usage_in_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
|
||||
let temp_table = self.temp_table_tracker.get_new_temp_table();
|
||||
self.client
|
||||
.execute(
|
||||
format!(
|
||||
"CREATE TEMP TABLE {}(
|
||||
account_key text,
|
||||
slot BIGINT,
|
||||
is_write_locked BOOL,
|
||||
total_cu_requested BIGINT,
|
||||
total_cu_consumed BIGINT,
|
||||
prioritization_fees_info text
|
||||
);",
|
||||
temp_table
|
||||
)
|
||||
.as_str(),
|
||||
&[],
|
||||
)
|
||||
.await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
COPY {}(
|
||||
account_key,
|
||||
slot,
|
||||
is_write_locked,
|
||||
total_cu_requested,
|
||||
total_cu_consumed,
|
||||
prioritization_fees_info
|
||||
) FROM STDIN BINARY
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
&[
|
||||
Type::TEXT,
|
||||
Type::INT8,
|
||||
Type::BOOL,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::TEXT,
|
||||
],
|
||||
);
|
||||
pin_mut!(writer);
|
||||
for account_usage in block_info.heavily_locked_accounts.iter() {
|
||||
let is_writable = true;
|
||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(6);
|
||||
let pf_json = serde_json::to_string(&account_usage.prioritization_fee_data).unwrap();
|
||||
args.push(&account_usage.key);
|
||||
args.push(&block_info.slot);
|
||||
args.push(&is_writable);
|
||||
args.push(&account_usage.cu_requested);
|
||||
args.push(&account_usage.cu_consumed);
|
||||
args.push(&pf_json);
|
||||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
|
||||
let statement = format!(
|
||||
r#"
|
||||
INSERT INTO banking_stage_results_2.accounts_map_blocks
|
||||
( acc_id,
|
||||
slot,
|
||||
is_write_locked,
|
||||
total_cu_requested,
|
||||
total_cu_consumed,
|
||||
prioritization_fees_info
|
||||
)
|
||||
SELECT
|
||||
( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ),
|
||||
t.slot,
|
||||
t.is_write_locked,
|
||||
t.total_cu_requested,
|
||||
t.total_cu_consumed,
|
||||
t.prioritization_fees_info
|
||||
FROM (
|
||||
SELECT account_key,
|
||||
slot,
|
||||
is_write_locked,
|
||||
total_cu_requested,
|
||||
total_cu_consumed,
|
||||
prioritization_fees_info from {}
|
||||
)
|
||||
as t (account_key,
|
||||
slot,
|
||||
is_write_locked,
|
||||
total_cu_requested,
|
||||
total_cu_consumed,
|
||||
prioritization_fees_info
|
||||
)
|
||||
ON CONFLICT DO NOTHING;
|
||||
"#,
|
||||
temp_table
|
||||
);
|
||||
self.client.execute(statement.as_str(), &[]).await?;
|
||||
|
||||
self.drop_temp_table(temp_table).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn save_banking_transaction_results(
|
||||
|
@ -124,49 +569,40 @@ impl PostgresSession {
|
|||
if txs.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
const NUMBER_OF_ARGS: usize = 10;
|
||||
|
||||
let txs: Vec<PostgresTransactionInfo> =
|
||||
txs.iter().map(PostgresTransactionInfo::from).collect();
|
||||
|
||||
let statement = r#"
|
||||
COPY banking_stage_results.transaction_infos(
|
||||
signature, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot
|
||||
) FROM STDIN BINARY
|
||||
"#;
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await?;
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
&[
|
||||
Type::TEXT,
|
||||
Type::TEXT,
|
||||
Type::BOOL,
|
||||
Type::BOOL,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::TIMESTAMPTZ,
|
||||
Type::TEXT,
|
||||
Type::INT8,
|
||||
],
|
||||
);
|
||||
pin_mut!(writer);
|
||||
for tx in txs.iter() {
|
||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
|
||||
args.push(&tx.signature);
|
||||
args.push(&tx.errors);
|
||||
args.push(&tx.is_executed);
|
||||
args.push(&tx.is_confirmed);
|
||||
args.push(&tx.first_notification_slot);
|
||||
args.push(&tx.cu_requested);
|
||||
args.push(&tx.prioritization_fees);
|
||||
args.push(&tx.utc_timestamp);
|
||||
args.push(&tx.accounts_used);
|
||||
args.push(&tx.processed_slot);
|
||||
|
||||
writer.as_mut().write(&args).await?;
|
||||
}
|
||||
writer.finish().await?;
|
||||
// create transaction ids
|
||||
let signatures = txs
|
||||
.iter()
|
||||
.map(|transaction| transaction.signature.clone())
|
||||
.collect_vec();
|
||||
self.create_transaction_ids(signatures).await?;
|
||||
// create account ids
|
||||
let accounts = txs
|
||||
.iter()
|
||||
.map(|transaction| transaction.account_used.clone())
|
||||
.flatten()
|
||||
.map(|(acc, _)| acc)
|
||||
.collect_vec();
|
||||
self.create_accounts_for_transaction(accounts).await?;
|
||||
// add transaction in tx slot table
|
||||
self.insert_transaction_in_txslot_table(&txs.as_slice())
|
||||
.await?;
|
||||
let txs_accounts = txs
|
||||
.iter()
|
||||
.map(|tx| AccountsForTransaction {
|
||||
signature: tx.signature.clone(),
|
||||
accounts: tx
|
||||
.account_used
|
||||
.iter()
|
||||
.map(|(key, is_writable)| AccountUsed {
|
||||
key: key.clone(),
|
||||
writable: *is_writable,
|
||||
is_signer: false,
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
.collect();
|
||||
// insert accounts for transaction
|
||||
self.insert_accounts_for_transaction(txs_accounts).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -180,51 +616,45 @@ impl PostgresSession {
|
|||
}
|
||||
|
||||
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
|
||||
const NUMBER_OF_ARGS: usize = 11;
|
||||
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
|
||||
args.push(&block_info.block_hash);
|
||||
args.push(&block_info.slot);
|
||||
args.push(&block_info.leader_identity);
|
||||
args.push(&block_info.successful_transactions);
|
||||
args.push(&block_info.banking_stage_errors);
|
||||
args.push(&block_info.processed_transactions);
|
||||
args.push(&block_info.total_cu_used);
|
||||
args.push(&block_info.total_cu_requested);
|
||||
let heavily_writelocked_accounts =
|
||||
serde_json::to_string(&block_info.heavily_writelocked_accounts).unwrap_or_default();
|
||||
let heavily_readlocked_accounts =
|
||||
serde_json::to_string(&block_info.heavily_readlocked_accounts).unwrap_or_default();
|
||||
args.push(&heavily_writelocked_accounts);
|
||||
args.push(&heavily_readlocked_accounts);
|
||||
// create transaction ids
|
||||
let signatures = block_info
|
||||
.transactions
|
||||
.iter()
|
||||
.map(|transaction| transaction.signature.clone())
|
||||
.collect_vec();
|
||||
self.create_transaction_ids(signatures).await?;
|
||||
// create account ids
|
||||
let accounts = block_info
|
||||
.heavily_locked_accounts
|
||||
.iter()
|
||||
.map(|acc| acc.key.clone())
|
||||
.collect_vec();
|
||||
self.create_accounts_for_transaction(accounts).await?;
|
||||
|
||||
let supp_infos = serde_json::to_string(&block_info.sup_info).unwrap_or_default();
|
||||
args.push(&supp_infos);
|
||||
let txs_accounts = block_info
|
||||
.transactions
|
||||
.iter()
|
||||
.map(|tx| AccountsForTransaction {
|
||||
signature: tx.signature.clone(),
|
||||
accounts: tx
|
||||
.accounts
|
||||
.iter()
|
||||
.map(|acc| AccountUsed {
|
||||
key: acc.key.clone(),
|
||||
writable: acc.is_writable,
|
||||
is_signer: acc.is_signer,
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
.collect();
|
||||
// insert accounts for transaction
|
||||
self.insert_accounts_for_transaction(txs_accounts).await?;
|
||||
|
||||
let statement = r#"
|
||||
COPY banking_stage_results.blocks(
|
||||
block_hash, slot, leader_identity, successful_transactions, banking_stage_errors, processed_transactions, total_cu_used, total_cu_requested, heavily_writelocked_accounts, heavily_readlocked_accounts, supp_infos
|
||||
) FROM STDIN BINARY
|
||||
"#;
|
||||
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await.unwrap();
|
||||
let writer = BinaryCopyInWriter::new(
|
||||
sink,
|
||||
&[
|
||||
Type::TEXT,
|
||||
Type::INT8,
|
||||
Type::TEXT,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::INT8,
|
||||
Type::TEXT,
|
||||
Type::TEXT,
|
||||
Type::TEXT,
|
||||
],
|
||||
);
|
||||
pin_mut!(writer);
|
||||
writer.as_mut().write(&args).await?;
|
||||
writer.finish().await?;
|
||||
// save transactions in block
|
||||
self.insert_transactions_for_block(&block_info.transactions, block_info.slot)
|
||||
.await?;
|
||||
|
||||
// save account usage in blocks
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -269,7 +699,8 @@ impl Postgres {
|
|||
.map(|(_, tree)| tree.iter().map(|(_, info)| info).cloned().collect_vec())
|
||||
.flatten()
|
||||
.collect_vec();
|
||||
let batches = data.chunks(32).collect_vec();
|
||||
println!("saving {} transaction infos", data.len());
|
||||
let batches = data.chunks(256).collect_vec();
|
||||
for batch in batches {
|
||||
if let Err(e) = session
|
||||
.save_banking_transaction_results(batch.to_vec())
|
||||
|
@ -288,62 +719,19 @@ impl Postgres {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct PostgresTransactionInfo {
|
||||
pub signature: String,
|
||||
pub errors: String,
|
||||
pub is_executed: bool,
|
||||
pub is_confirmed: bool,
|
||||
pub first_notification_slot: i64,
|
||||
pub cu_requested: Option<i64>,
|
||||
pub prioritization_fees: Option<i64>,
|
||||
pub utc_timestamp: DateTime<Utc>,
|
||||
pub accounts_used: String,
|
||||
pub processed_slot: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub struct TransactionErrorData {
|
||||
error: TransactionError,
|
||||
slot: u64,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub struct AccountUsed {
|
||||
key: String,
|
||||
writable: bool,
|
||||
is_signer: bool,
|
||||
}
|
||||
|
||||
impl From<&TransactionInfo> for PostgresTransactionInfo {
|
||||
fn from(value: &TransactionInfo) -> Self {
|
||||
let errors = value
|
||||
.errors
|
||||
.iter()
|
||||
.map(|(key, count)| TransactionErrorData {
|
||||
error: key.error.clone(),
|
||||
slot: key.slot,
|
||||
count: *count,
|
||||
})
|
||||
.collect_vec();
|
||||
let accounts_used = value
|
||||
.account_used
|
||||
.iter()
|
||||
.map(|(key, writable)| AccountUsed {
|
||||
key: key.to_string(),
|
||||
writable: *writable,
|
||||
})
|
||||
.collect_vec();
|
||||
Self {
|
||||
signature: value.signature.clone(),
|
||||
errors: serde_json::to_string(&errors).unwrap_or_default(),
|
||||
is_executed: value.is_executed,
|
||||
is_confirmed: value.is_confirmed,
|
||||
cu_requested: value.cu_requested.map(|x| x as i64),
|
||||
first_notification_slot: value.first_notification_slot as i64,
|
||||
prioritization_fees: value.prioritization_fees.map(|x| x as i64),
|
||||
utc_timestamp: value.utc_timestamp,
|
||||
accounts_used: serde_json::to_string(&accounts_used).unwrap_or_default(),
|
||||
processed_slot: value.processed_slot.map(|x| x as i64),
|
||||
}
|
||||
}
|
||||
pub struct AccountsForTransaction {
|
||||
pub signature: String,
|
||||
pub accounts: Vec<AccountUsed>,
|
||||
}
|
||||
|
|
|
@ -2,18 +2,7 @@ use std::{collections::HashMap, hash::Hash};
|
|||
|
||||
use chrono::{DateTime, Utc};
|
||||
use itertools::Itertools;
|
||||
use solana_sdk::{
|
||||
borsh0_10::try_from_slice_unchecked,
|
||||
compute_budget::{self, ComputeBudgetInstruction},
|
||||
instruction::CompiledInstruction,
|
||||
message::{
|
||||
v0::{self, MessageAddressTableLookup},
|
||||
MessageHeader, VersionedMessage,
|
||||
},
|
||||
pubkey::Pubkey,
|
||||
slot_history::Slot,
|
||||
transaction::{TransactionError, VersionedTransaction},
|
||||
};
|
||||
use solana_sdk::transaction::TransactionError;
|
||||
use yellowstone_grpc_proto::prelude::SubscribeUpdateBankingTransactionResults;
|
||||
|
||||
fn convert_transaction_error_into_int(error: &TransactionError) -> u8 {
|
||||
|
@ -60,20 +49,24 @@ fn convert_transaction_error_into_int(error: &TransactionError) -> u8 {
|
|||
#[derive(Clone, PartialEq)]
|
||||
pub struct ErrorKey {
|
||||
pub error: TransactionError,
|
||||
pub slot: Slot,
|
||||
}
|
||||
|
||||
impl ToString for ErrorKey {
|
||||
fn to_string(&self) -> String {
|
||||
self.error.to_string() + "-" + self.slot.to_string().as_str()
|
||||
self.error.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl Hash for ErrorKey {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
let tmp = convert_transaction_error_into_int(&self.error);
|
||||
tmp.hash(state);
|
||||
self.slot.hash(state);
|
||||
tmp.hash(state)
|
||||
}
|
||||
}
|
||||
|
||||
impl ErrorKey {
|
||||
pub fn to_int(&self) -> i32 {
|
||||
convert_transaction_error_into_int(&self.error) as i32
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -83,30 +76,21 @@ impl Eq for ErrorKey {}
|
|||
pub struct TransactionInfo {
|
||||
pub signature: String,
|
||||
pub errors: HashMap<ErrorKey, usize>,
|
||||
pub is_executed: bool,
|
||||
pub is_confirmed: bool,
|
||||
pub first_notification_slot: u64,
|
||||
pub cu_requested: Option<u64>,
|
||||
pub prioritization_fees: Option<u64>,
|
||||
pub slot: u64,
|
||||
pub utc_timestamp: DateTime<Utc>,
|
||||
pub account_used: Vec<(String, bool)>,
|
||||
pub processed_slot: Option<Slot>,
|
||||
}
|
||||
|
||||
impl TransactionInfo {
|
||||
pub fn new(notification: &SubscribeUpdateBankingTransactionResults) -> Self {
|
||||
let mut errors = HashMap::new();
|
||||
let is_executed = notification.error.is_none();
|
||||
// Get time
|
||||
let utc_timestamp = Utc::now();
|
||||
|
||||
match ¬ification.error {
|
||||
Some(e) => {
|
||||
let error: TransactionError = bincode::deserialize(&e.err).unwrap();
|
||||
let key = ErrorKey {
|
||||
error,
|
||||
slot: notification.slot,
|
||||
};
|
||||
let key = ErrorKey { error };
|
||||
errors.insert(key, 1);
|
||||
}
|
||||
None => {}
|
||||
|
@ -120,23 +104,17 @@ impl TransactionInfo {
|
|||
Self {
|
||||
signature: notification.signature.clone(),
|
||||
errors,
|
||||
is_executed,
|
||||
is_confirmed: false,
|
||||
first_notification_slot: notification.slot,
|
||||
cu_requested: None,
|
||||
prioritization_fees: None,
|
||||
slot: notification.slot,
|
||||
utc_timestamp,
|
||||
account_used,
|
||||
processed_slot: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_notification(&mut self, notification: &SubscribeUpdateBankingTransactionResults) {
|
||||
match ¬ification.error {
|
||||
Some(error) => {
|
||||
let slot = notification.slot;
|
||||
let error: TransactionError = bincode::deserialize(&error.err).unwrap();
|
||||
let key = ErrorKey { error, slot };
|
||||
let key = ErrorKey { error };
|
||||
match self.errors.get_mut(&key) {
|
||||
Some(x) => {
|
||||
*x += 1;
|
||||
|
@ -149,205 +127,4 @@ impl TransactionInfo {
|
|||
None => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_rpc_transaction(&mut self, slot: u64, transaction: &VersionedTransaction) {
|
||||
let message = &transaction.message;
|
||||
let legacy_compute_budget: Option<(u32, Option<u64>)> =
|
||||
message.instructions().iter().find_map(|i| {
|
||||
if i.program_id(message.static_account_keys())
|
||||
.eq(&compute_budget::id())
|
||||
{
|
||||
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
|
||||
units,
|
||||
additional_fee,
|
||||
}) = try_from_slice_unchecked(i.data.as_slice())
|
||||
{
|
||||
if additional_fee > 0 {
|
||||
return Some((units, Some(((units * 1000) / additional_fee) as u64)));
|
||||
} else {
|
||||
return Some((units, None));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
});
|
||||
|
||||
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
|
||||
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);
|
||||
|
||||
let cu_requested = message
|
||||
.instructions()
|
||||
.iter()
|
||||
.find_map(|i| {
|
||||
if i.program_id(message.static_account_keys())
|
||||
.eq(&compute_budget::id())
|
||||
{
|
||||
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
|
||||
try_from_slice_unchecked(i.data.as_slice())
|
||||
{
|
||||
return Some(limit);
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.or(legacy_cu_requested);
|
||||
|
||||
let prioritization_fees = message
|
||||
.instructions()
|
||||
.iter()
|
||||
.find_map(|i| {
|
||||
if i.program_id(message.static_account_keys())
|
||||
.eq(&compute_budget::id())
|
||||
{
|
||||
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
|
||||
try_from_slice_unchecked(i.data.as_slice())
|
||||
{
|
||||
return Some(price);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
})
|
||||
.or(legacy_prioritization_fees);
|
||||
if let Some(cu_requested) = cu_requested {
|
||||
self.cu_requested = Some(cu_requested as u64);
|
||||
}
|
||||
|
||||
if let Some(prioritization_fees) = prioritization_fees {
|
||||
self.prioritization_fees = Some(prioritization_fees);
|
||||
}
|
||||
self.is_confirmed = true;
|
||||
self.is_executed = true;
|
||||
self.processed_slot = Some(slot);
|
||||
}
|
||||
|
||||
pub fn add_transaction(
|
||||
&mut self,
|
||||
transaction: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateTransactionInfo,
|
||||
slot: Slot,
|
||||
) {
|
||||
let Some(transaction) = &transaction.transaction else {
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(message) = &transaction.message else {
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(header) = &message.header else {
|
||||
return;
|
||||
};
|
||||
|
||||
let message = VersionedMessage::V0(v0::Message {
|
||||
header: MessageHeader {
|
||||
num_required_signatures: header.num_required_signatures as u8,
|
||||
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
|
||||
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
|
||||
},
|
||||
account_keys: message
|
||||
.account_keys
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|key| {
|
||||
let bytes: [u8; 32] = key.try_into().unwrap_or(Pubkey::default().to_bytes());
|
||||
Pubkey::new_from_array(bytes)
|
||||
})
|
||||
.collect(),
|
||||
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
|
||||
instructions: message
|
||||
.instructions
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|ix| CompiledInstruction {
|
||||
program_id_index: ix.program_id_index as u8,
|
||||
accounts: ix.accounts,
|
||||
data: ix.data,
|
||||
})
|
||||
.collect(),
|
||||
address_table_lookups: message
|
||||
.address_table_lookups
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|table| {
|
||||
let bytes: [u8; 32] = table
|
||||
.account_key
|
||||
.try_into()
|
||||
.unwrap_or(Pubkey::default().to_bytes());
|
||||
MessageAddressTableLookup {
|
||||
account_key: Pubkey::new_from_array(bytes),
|
||||
writable_indexes: table.writable_indexes,
|
||||
readonly_indexes: table.readonly_indexes,
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
|
||||
let legacy_compute_budget: Option<(u32, Option<u64>)> =
|
||||
message.instructions().iter().find_map(|i| {
|
||||
if i.program_id(message.static_account_keys())
|
||||
.eq(&compute_budget::id())
|
||||
{
|
||||
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
|
||||
units,
|
||||
additional_fee,
|
||||
}) = try_from_slice_unchecked(i.data.as_slice())
|
||||
{
|
||||
if additional_fee > 0 {
|
||||
return Some((units, Some(((units * 1000) / additional_fee) as u64)));
|
||||
} else {
|
||||
return Some((units, None));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
});
|
||||
|
||||
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
|
||||
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);
|
||||
|
||||
let cu_requested = message
|
||||
.instructions()
|
||||
.iter()
|
||||
.find_map(|i| {
|
||||
if i.program_id(message.static_account_keys())
|
||||
.eq(&compute_budget::id())
|
||||
{
|
||||
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
|
||||
try_from_slice_unchecked(i.data.as_slice())
|
||||
{
|
||||
return Some(limit);
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.or(legacy_cu_requested);
|
||||
|
||||
let prioritization_fees = message
|
||||
.instructions()
|
||||
.iter()
|
||||
.find_map(|i| {
|
||||
if i.program_id(message.static_account_keys())
|
||||
.eq(&compute_budget::id())
|
||||
{
|
||||
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
|
||||
try_from_slice_unchecked(i.data.as_slice())
|
||||
{
|
||||
return Some(price);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
})
|
||||
.or(legacy_prioritization_fees);
|
||||
if let Some(cu_requested) = cu_requested {
|
||||
self.cu_requested = Some(cu_requested as u64);
|
||||
}
|
||||
|
||||
if let Some(prioritization_fees) = prioritization_fees {
|
||||
self.prioritization_fees = Some(prioritization_fees);
|
||||
}
|
||||
self.is_confirmed = true;
|
||||
self.is_executed = true;
|
||||
self.processed_slot = Some(slot);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue