Changing schema and adding accounts data (#10)

* chaning schema

fixing the schema

updating postgres code

Adding transaction slots into postgres

saving account used by transactions

saving block data

* Saving all the block info

completing block save

Optimizing postgres saving of data

fixing schema after groovies review

Schema changes after groovies review

Fixing that is_writable is always true

* adding a new index on slot
This commit is contained in:
galactus 2023-12-12 09:58:56 +01:00 committed by GitHub
parent d7969a8087
commit 49f519df0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 813 additions and 590 deletions

View File

@ -1,41 +1,117 @@
CREATE SCHEMA banking_stage_results;
CREATE SCHEMA banking_stage_results_2;
CREATE TABLE banking_stage_results.transaction_infos (
signature CHAR(88),
first_notification_slot BIGINT NOT NULL,
errors text,
is_executed BOOL,
is_confirmed BOOL,
cu_requested BIGINT,
prioritization_fees BIGINT,
utc_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
accounts_used text,
processed_slot BIGINT,
supp_infos text,
Primary key (signature, first_notification_slot)
CREATE TABLE banking_stage_results_2.transactions(
signature char(88) primary key,
transaction_id bigserial,
UNIQUE(transaction_id)
);
CREATE TABLE banking_stage_results.blocks (
CREATE TABLE banking_stage_results_2.transaction_infos (
transaction_id BIGINT PRIMARY KEY,
processed_slot BIGINT,
is_successful BOOL,
cu_requested BIGINT,
cu_consumed BIGINT,
prioritization_fees BIGINT,
supp_infos text
);
CREATE TABLE banking_stage_results_2.errors (
error_code int primary key,
error_text text
);
CREATE TABLE banking_stage_results_2.transaction_slot (
transaction_id BIGINT,
slot BIGINT,
error_code INT REFERENCES errors(error_code),
count INT,
utc_timestamp TIMESTAMP NOT NULL,
PRIMARY KEY (transaction_id, slot, error_code)
);
CREATE INDEX idx_transaction_slot_timestamp ON banking_stage_results_2.transaction_slot(utc_timestamp);
CREATE TABLE banking_stage_results_2.blocks (
slot BIGINT PRIMARY KEY,
block_hash CHAR(44),
leader_identity CHAR(44),
successful_transactions BIGINT,
banking_stage_errors BIGINT,
processed_transactions BIGINT,
total_cu_used BIGINT,
total_cu_requested BIGINT,
heavily_writelocked_accounts text,
heavily_readlocked_accounts text,
supp_infos text
);
CREATE INDEX idx_blocks_slot ON banking_stage_results.blocks(slot);
-- optional
CLUSTER banking_stage_results.blocks using idx_blocks_slot;
VACUUM FULL banking_stage_results.blocks;
CREATE INDEX idx_blocks_slot_errors ON banking_stage_results.blocks(slot) WHERE banking_stage_errors > 0;
CREATE TABLE banking_stage_results_2.accounts(
acc_id bigserial primary key,
account_key text,
UNIQUE (account_key)
);
CREATE INDEX idx_transaction_infos_timestamp ON banking_stage_results.transaction_infos(utc_timestamp);
CREATE TABLE banking_stage_results_2.accounts_map_transaction(
acc_id BIGINT,
transaction_id BIGINT,
is_writable BOOL,
is_signer BOOL,
PRIMARY KEY (acc_id, transaction_id)
);
CREATE INDEX accounts_map_transaction_acc_id ON banking_stage_results_2.accounts_map_transaction(acc_id);
CREATE INDEX accounts_map_transaction_transaction_id ON banking_stage_results_2.accounts_map_transaction(transaction_id);
CREATE TABLE banking_stage_results_2.accounts_map_blocks (
acc_id BIGINT,
slot BIGINT,
is_write_locked BOOL,
total_cu_consumed BIGINT,
total_cu_requested BIGINT,
prioritization_fees_info text,
supp_infos text,
PRIMARY KEY (acc_id, slot, is_write_locked)
);
CREATE INDEX idx_accounts_map_blocks_slot ON banking_stage_results_2.accounts_map_blocks(slot);
insert into banking_stage_results_2.errors (error_text, error_code) VALUES
('AccountBorrowOutstanding', 0),
('AccountInUse', 1),
('AccountLoadedTwice', 2),
('AccountNotFound', 3),
('AddressLookupTableNotFound', 4),
('AlreadyProcessed', 5),
('BlockhashNotFound', 6),
('CallChainTooDeep', 7),
('ClusterMaintenance', 8),
('DuplicateInstruction', 9),
('InstructionError', 10),
('InsufficientFundsForFee', 11),
('InsufficientFundsForRent', 12),
('InvalidAccountForFee', 13),
('InvalidAccountIndex', 14),
('InvalidAddressLookupTableData', 15),
('InvalidAddressLookupTableIndex', 16),
('InvalidAddressLookupTableOwner', 17),
('InvalidLoadedAccountsDataSizeLimit', 18),
('InvalidProgramForExecution', 19),
('InvalidRentPayingAccount', 20),
('InvalidWritableAccount', 21),
('MaxLoadedAccountsDataSizeExceeded', 22),
('MissingSignatureForFee', 23),
('ProgramAccountNotFound', 24),
('ResanitizationNeeded', 25),
('SanitizeFailure', 26),
('SignatureFailure', 27),
('TooManyAccountLocks', 28),
('UnbalancedTransaction', 29),
('UnsupportedVersion', 30),
('WouldExceedAccountDataBlockLimit', 31),
('WouldExceedAccountDataTotalLimit', 32),
('WouldExceedMaxAccountCostLimit', 33),
('WouldExceedMaxBlockCostLimit', 34),
('WouldExceedMaxVoteCostLimit', 35);
CLUSTER banking_stage_results_2.blocks using blocks_pkey;
VACUUM FULL banking_stage_results_2.blocks;
-- optional
CLUSTER banking_stage_results.transaction_infos using idx_transaction_infos_timestamp;
VACUUM FULL banking_stage_results.transaction_infos;
CLUSTER banking_stage_results_2.transaction_slot using idx_transaction_slot_timestamp;
VACUUM FULL banking_stage_results_2.transaction_slot;

View File

@ -9,19 +9,43 @@ use solana_sdk::{
MessageHeader, VersionedMessage,
},
pubkey::Pubkey,
signature::Signature,
slot_history::Slot,
};
use solana_transaction_status::{RewardType, UiConfirmedBlock};
use std::collections::HashMap;
#[derive(Serialize, Debug, Clone)]
pub struct PrioFeeData {
pub max: Option<u64>,
pub min: Option<u64>,
pub p75: Option<u64>,
pub p90: Option<u64>,
pub p95: Option<u64>,
pub med: Option<u64>,
}
impl PrioFeeData {
pub fn new(pf_vec: &Vec<u64>) -> Self {
let mut vec = pf_vec.clone();
vec.sort();
Self {
max: vec.last().cloned(),
min: vec.first().cloned(),
p75: (pf_vec.len() > 1).then(|| pf_vec[pf_vec.len() * 75 / 100]),
p90: (pf_vec.len() > 1).then(|| pf_vec[pf_vec.len() * 90 / 100]),
p95: (pf_vec.len() > 1).then(|| pf_vec[pf_vec.len() * 95 / 100]),
med: (pf_vec.len() > 1).then(|| pf_vec[pf_vec.len() / 2]),
}
}
}
#[derive(Serialize, Debug, Clone)]
pub struct AccountUsage {
pub key: String,
pub cu_requested: u64,
pub cu_consumed: u64,
pub max_pf: u64,
pub min_pf: u64,
pub median_pf: u64,
pub is_write_locked: bool,
pub cu_requested: i64,
pub cu_consumed: i64,
pub prioritization_fee_data: PrioFeeData,
}
pub struct AccountData {
@ -31,24 +55,21 @@ pub struct AccountData {
pub vec_pf: Vec<u64>,
}
impl From<&AccountData> for AccountUsage {
fn from(value: &AccountData) -> Self {
let mut median = value.vec_pf.clone();
median.sort();
let mid = median.len() / 2;
impl From<(&AccountData, bool)> for AccountUsage {
fn from(value: (&AccountData, bool)) -> Self {
let (account_data, is_write_locked) = value;
AccountUsage {
key: value.key.clone(),
cu_requested: value.cu_requested,
cu_consumed: value.cu_consumed,
max_pf: value.vec_pf.iter().max().cloned().unwrap_or_default(),
min_pf: value.vec_pf.iter().min().cloned().unwrap_or_default(),
median_pf: median[mid],
key: account_data.key.clone(),
cu_requested: account_data.cu_requested as i64,
cu_consumed: account_data.cu_consumed as i64,
is_write_locked,
prioritization_fee_data: PrioFeeData::new(&account_data.vec_pf),
}
}
}
#[derive(Serialize, Debug)]
pub struct BlockSupplimentaryInfo {
pub struct PrioritizationFeesInfo {
pub p_min: u64,
pub p_median: u64,
pub p_75: u64,
@ -56,22 +77,40 @@ pub struct BlockSupplimentaryInfo {
pub p_max: u64,
}
pub struct TransactionAccount {
pub key: String,
pub is_writable: bool,
pub is_signer: bool,
}
pub struct BlockTransactionInfo {
pub signature: String,
pub processed_slot: i64,
pub is_successful: bool,
pub cu_requested: i64,
pub cu_consumed: i64,
pub prioritization_fees: i64,
pub supp_infos: String,
pub accounts: Vec<TransactionAccount>,
}
pub struct BlockInfo {
pub block_hash: String,
pub slot: i64,
pub leader_identity: Option<String>,
pub successful_transactions: i64,
pub banking_stage_errors: Option<i64>,
pub processed_transactions: i64,
pub total_cu_used: i64,
pub total_cu_requested: i64,
pub heavily_writelocked_accounts: Vec<AccountUsage>,
pub heavily_readlocked_accounts: Vec<AccountUsage>,
pub sup_info: Option<BlockSupplimentaryInfo>,
pub heavily_locked_accounts: Vec<AccountUsage>,
pub sup_info: Option<PrioritizationFeesInfo>,
pub transactions: Vec<BlockTransactionInfo>,
}
impl BlockInfo {
pub fn process_versioned_message(
signature: String,
slot: Slot,
message: &VersionedMessage,
prio_fees_in_block: &mut Vec<u64>,
writelocked_accounts: &mut HashMap<Pubkey, AccountData>,
@ -79,7 +118,8 @@ impl BlockInfo {
cu_consumed: u64,
total_cu_requested: &mut u64,
is_vote: bool,
) {
is_successful: bool,
) -> Option<BlockTransactionInfo> {
let (cu_requested, prio_fees, nb_ix_except_cb) = {
let mut cu_request: Option<u64> = None;
let mut prio_fees: Option<u64> = None;
@ -133,7 +173,13 @@ impl BlockInfo {
.static_account_keys()
.iter()
.enumerate()
.map(|(index, account)| (message.is_maybe_writable(index), *account))
.map(|(index, account)| {
(
message.is_maybe_writable(index),
*account,
message.is_signer(index),
)
})
.collect_vec();
for writable_account in accounts.iter().filter(|x| x.0).map(|x| x.1) {
match writelocked_accounts.get_mut(&writable_account) {
@ -176,36 +222,54 @@ impl BlockInfo {
}
}
}
Some(BlockTransactionInfo {
signature,
processed_slot: slot as i64,
is_successful,
cu_requested: cu_requested as i64,
cu_consumed: cu_consumed as i64,
prioritization_fees: prioritization_fees as i64,
supp_infos: String::new(),
accounts: accounts
.iter()
.map(|(is_writable, key, is_signer)| TransactionAccount {
key: key.to_string(),
is_signer: *is_signer,
is_writable: *is_writable,
})
.collect(),
})
} else {
None
}
}
pub fn calculate_account_usage(
writelocked_accounts: &HashMap<Pubkey, AccountData>,
readlocked_accounts: &HashMap<Pubkey, AccountData>,
) -> (Vec<AccountUsage>, Vec<AccountUsage>) {
let mut heavily_writelocked_accounts = writelocked_accounts
) -> Vec<AccountUsage> {
let mut accounts = writelocked_accounts
.iter()
.map(|(_, data)| AccountUsage::from(data))
.map(|(_, data)| AccountUsage::from((data, true)))
.collect_vec();
heavily_writelocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
let mut heavily_readlocked_accounts: Vec<_> = readlocked_accounts
let mut heavily_readlocked_accounts = readlocked_accounts
.iter()
.map(|(_, data)| AccountUsage::from(data))
.collect();
heavily_readlocked_accounts.sort_by(|lhs, rhs| rhs.cu_consumed.cmp(&lhs.cu_consumed));
(heavily_writelocked_accounts, heavily_readlocked_accounts)
.map(|(_, data)| AccountUsage::from((data, false)))
.collect_vec();
accounts.append(&mut heavily_readlocked_accounts);
accounts
}
pub fn calculate_supp_info(
prio_fees_in_block: &mut Vec<u64>,
) -> Option<BlockSupplimentaryInfo> {
) -> Option<PrioritizationFeesInfo> {
if !prio_fees_in_block.is_empty() {
prio_fees_in_block.sort();
let median_index = prio_fees_in_block.len() / 2;
let p75_index = prio_fees_in_block.len() * 75 / 100;
let p90_index = prio_fees_in_block.len() * 90 / 100;
Some(BlockSupplimentaryInfo {
Some(PrioritizationFeesInfo {
p_min: prio_fees_in_block[0],
p_median: prio_fees_in_block[median_index],
p_75: prio_fees_in_block[p75_index],
@ -219,7 +283,6 @@ impl BlockInfo {
pub fn new(
block: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateBlock,
banking_stage_errors: Option<i64>,
) -> BlockInfo {
let block_hash = block.blockhash.clone();
let slot = block.slot;
@ -255,6 +318,7 @@ impl BlockInfo {
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![];
let mut block_transactions = vec![];
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
@ -271,6 +335,9 @@ impl BlockInfo {
let Some(meta) = &transaction.meta else {
continue;
};
let signature = Signature::try_from(&tx.signatures[0][0..64])
.unwrap()
.to_string();
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
@ -317,7 +384,9 @@ impl BlockInfo {
.collect(),
});
Self::process_versioned_message(
let transaction = Self::process_versioned_message(
signature,
slot,
&message,
&mut prio_fees_in_block,
&mut writelocked_accounts,
@ -325,10 +394,14 @@ impl BlockInfo {
meta.compute_units_consumed.unwrap_or(0),
&mut total_cu_requested,
transaction.is_vote,
meta.err.is_none(),
);
if let Some(transaction) = transaction {
block_transactions.push(transaction);
}
}
let (heavily_writelocked_accounts, heavily_readlocked_accounts) =
let heavily_locked_accounts =
Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts);
let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block);
@ -339,105 +412,11 @@ impl BlockInfo {
leader_identity,
successful_transactions: successful_transactions as i64,
processed_transactions: processed_transactions as i64,
banking_stage_errors,
total_cu_used,
total_cu_requested: total_cu_requested as i64,
heavily_writelocked_accounts,
heavily_readlocked_accounts,
heavily_locked_accounts,
sup_info,
transactions: block_transactions,
}
}
pub fn _new_from_rpc_block(
slot: Slot,
block: &UiConfirmedBlock,
banking_stage_errors_count: i64,
) -> Option<Self> {
let block_hash = block.blockhash.clone();
let leader_identity = block
.rewards
.as_ref()
.map(|rewards| {
rewards
.iter()
.find(|x| x.reward_type == Some(RewardType::Fee))
.map(|x| x.pubkey.clone())
})
.unwrap_or(None);
let transactions = if let Some(transactions) = &block.transactions {
transactions
} else {
return None;
};
let successful_transactions = transactions
.iter()
.filter(|x| x.meta.as_ref().map(|x| x.err.is_none()).unwrap_or(false))
.count() as u64;
let processed_transactions = transactions.len() as u64;
let total_cu_used = transactions
.iter()
.map(|x| {
x.meta
.as_ref()
.map(|x| match x.compute_units_consumed {
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => {
x
}
solana_transaction_status::option_serializer::OptionSerializer::Skip => 0,
solana_transaction_status::option_serializer::OptionSerializer::None => 0,
})
.unwrap_or(0)
})
.sum::<u64>() as i64;
let mut writelocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![];
for transaction in transactions {
let Some(tx) = transaction.transaction.decode() else {
continue;
};
let message = &tx.message;
let Some(meta) = &transaction.meta else {
continue;
};
let is_vote = false;
Self::process_versioned_message(
message,
&mut prio_fees_in_block,
&mut writelocked_accounts,
&mut readlocked_accounts,
match meta.compute_units_consumed {
solana_transaction_status::option_serializer::OptionSerializer::None => 0,
solana_transaction_status::option_serializer::OptionSerializer::Skip => 0,
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => x,
},
&mut total_cu_requested,
is_vote,
);
}
let (heavily_writelocked_accounts, heavily_readlocked_accounts) =
Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts);
let sup_info = Self::calculate_supp_info(&mut prio_fees_in_block);
Some(BlockInfo {
block_hash,
slot: slot as i64,
leader_identity,
successful_transactions: successful_transactions as i64,
processed_transactions: processed_transactions as i64,
banking_stage_errors: Some(banking_stage_errors_count),
total_cu_used,
total_cu_requested: total_cu_requested as i64,
heavily_writelocked_accounts,
heavily_readlocked_accounts,
sup_info,
})
}
}

View File

@ -14,7 +14,6 @@ use dashmap::DashMap;
use futures::StreamExt;
use log::{debug, error};
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
use solana_sdk::signature::Signature;
use transaction_info::TransactionInfo;
mod block_info;
@ -60,6 +59,7 @@ pub async fn start_tracking_banking_stage_errors(
String,
yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots,
> = if subscribe_to_slots {
log::info!("subscribing to slots on grpc banking errors");
let mut slot_sub = HashMap::new();
slot_sub.insert(
"slot_sub".to_string(),
@ -93,12 +93,13 @@ pub async fn start_tracking_banking_stage_errors(
let Some(update) = message.update_oneof else {
continue;
};
log::trace!("got banking stage notification");
match update{
yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof::BankingTransactionErrors(transaction) => {
if transaction.error.is_none() {
continue;
}
// if transaction.error.is_none() {
// continue;
// }
BANKING_STAGE_ERROR_EVENT_COUNT.inc();
let sig = transaction.signature.to_string();
match slot_by_errors.get_mut(&transaction.slot) {
@ -132,6 +133,7 @@ pub async fn start_tracking_banking_stage_errors(
let load_slot = slot.load(std::sync::atomic::Ordering::Relaxed);
if load_slot < s.slot {
// update slot to process updates
// updated slot
slot.store(s.slot, std::sync::atomic::Ordering::Relaxed);
}
},
@ -149,8 +151,6 @@ async fn start_tracking_blocks(
grpc_x_token: Option<String>,
postgres: postgres::Postgres,
slot: Arc<AtomicU64>,
slot_by_errors: Arc<DashMap<u64, u64>>,
map_of_infos: Arc<DashMap<String, BTreeMap<u64, TransactionInfo>>>,
) {
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
grpc_block_addr,
@ -227,27 +227,8 @@ async fn start_tracking_blocks(
BANKING_STAGE_BLOCKS_TASK.inc();
let postgres = postgres.clone();
let slot = slot.clone();
let map_of_infos = map_of_infos.clone();
let slot_by_errors = slot_by_errors.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else {
continue;
};
let signature = Signature::try_from(tx.signatures[0].clone())
.unwrap()
.to_string();
if let Some(mut tree) = map_of_infos.get_mut(&signature) {
for (_, tx_info) in tree.iter_mut() {
tx_info.add_transaction(transaction, block.slot);
}
}
}
let banking_stage_error_count =
slot_by_errors.get(&block.slot).map(|x| *x.value() as i64);
let block_info = BlockInfo::new(&block, banking_stage_error_count);
let block_info = BlockInfo::new(&block);
TXERROR_COUNT.add(
block_info.processed_transactions - block_info.successful_transactions,
@ -256,7 +237,6 @@ async fn start_tracking_blocks(
panic!("Error saving block {}", e);
}
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
slot_by_errors.remove(&block.slot);
BANKING_STAGE_BLOCKS_TASK.dec();
});
// delay queue so that we get all the banking stage errors before processing block
@ -264,7 +244,7 @@ async fn start_tracking_blocks(
_ => {}
};
}
log::error!("stopping the sidecar, geyser block stream is broken");
log::error!("geyser block stream is broken, retrying");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
@ -306,15 +286,7 @@ async fn main() {
})
.collect_vec();
if let Some(gprc_block_addr) = grpc_block_addr {
start_tracking_blocks(
gprc_block_addr,
args.grpc_x_token,
postgres,
slot,
slot_by_errors,
map_of_infos,
)
.await;
start_tracking_blocks(gprc_block_addr, args.grpc_x_token, postgres, slot).await;
}
futures::future::join_all(jhs).await;
}

View File

@ -6,7 +6,6 @@ use std::{
use anyhow::Context;
use base64::Engine;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use futures::pin_mut;
use itertools::Itertools;
@ -23,10 +22,34 @@ use tokio_postgres::{
Client, CopyInSink, NoTls, Socket,
};
use crate::{block_info::BlockInfo, transaction_info::TransactionInfo};
use crate::{
block_info::{BlockInfo, BlockTransactionInfo},
transaction_info::TransactionInfo,
};
pub struct TempTableTracker {
count: AtomicU64,
}
impl TempTableTracker {
pub fn new() -> Self {
Self {
count: AtomicU64::new(1),
}
}
pub fn get_new_temp_table(&self) -> String {
format!(
"temp_table_{}",
self.count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
)
}
}
pub struct PostgresSession {
client: Client,
temp_table_tracker: TempTableTracker,
}
impl PostgresSession {
@ -62,7 +85,10 @@ impl PostgresSession {
Self::spawn_connection(pg_config, MakeTlsConnector::new(connector)).await?
};
Ok(Self { client })
Ok(Self {
client,
temp_table_tracker: TempTableTracker::new(),
})
}
async fn spawn_connection<T>(
@ -92,29 +118,479 @@ impl PostgresSession {
Ok(client)
}
pub fn _multiline_query(query: &mut String, args: usize, rows: usize, types: &[&str]) {
let mut arg_index = 1usize;
for row in 0..rows {
query.push('(');
pub async fn drop_temp_table(&self, table: String) -> anyhow::Result<()> {
self.client
.execute(format!("drop table if exists {};", table).as_str(), &[])
.await?;
Ok(())
}
for i in 0..args {
if row == 0 && !types.is_empty() {
query.push_str(&format!("(${arg_index})::{}", types[i]));
} else {
query.push_str(&format!("${arg_index}"));
}
arg_index += 1;
if i != (args - 1) {
query.push(',');
}
}
pub async fn create_transaction_ids(&self, signatures: Vec<String>) -> anyhow::Result<()> {
// create temp table
let temp_table = self.temp_table_tracker.get_new_temp_table();
query.push(')');
self.client
.execute(
format!(
r#"
CREATE TEMP TABLE {}(
signature char(88)
);
"#,
temp_table
)
.as_str(),
&[],
)
.await?;
if row != (rows - 1) {
query.push(',');
let statement = format!(
r#"
COPY {}(
signature
) FROM STDIN BINARY
"#,
temp_table
);
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
pin_mut!(writer);
for signature in signatures {
writer.as_mut().write(&[&signature]).await?;
}
writer.finish().await?;
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.transactions(signature) SELECT signature from {}
ON CONFLICT DO NOTHING;
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}
pub async fn create_accounts_for_transaction(
&self,
accounts: Vec<String>,
) -> anyhow::Result<()> {
// create temp table
let temp_table = self.temp_table_tracker.get_new_temp_table();
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
key char(44)
);",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = format!(
r#"
COPY {}(
key
) FROM STDIN BINARY
"#,
temp_table
);
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::TEXT]);
pin_mut!(writer);
for account in accounts {
writer.as_mut().write(&[&account]).await?;
}
writer.finish().await?;
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts(account_key) SELECT key from {}
ON CONFLICT DO NOTHING;
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}
pub async fn insert_transaction_in_txslot_table(
&self,
txs: &[TransactionInfo],
) -> anyhow::Result<()> {
let temp_table = self.temp_table_tracker.get_new_temp_table();
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
sig char(88),
slot BIGINT,
error_code INT,
count INT,
utc_timestamp TIMESTAMP
);",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = format!(
r#"
COPY {}(
sig, slot, error_code, count, utc_timestamp
) FROM STDIN BINARY
"#,
temp_table
);
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::TEXT,
Type::INT8,
Type::INT4,
Type::INT4,
Type::TIMESTAMP,
],
);
pin_mut!(writer);
for tx in txs {
let slot: i64 = tx.slot as i64;
for (error, count) in &tx.errors {
let error_code = error.to_int();
let count = *count as i32;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(5);
let timestamp = tx.utc_timestamp.naive_local();
args.push(&tx.signature);
args.push(&slot);
args.push(&error_code);
args.push(&count);
args.push(&timestamp);
writer.as_mut().write(&args).await?;
}
}
writer.finish().await?;
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.transaction_slot(transaction_id, slot, error_code, count, utc_timestamp)
SELECT ( select transaction_id from banking_stage_results_2.transactions where signature = t.sig ), t.slot, t.error_code, t.count, t.utc_timestamp
FROM (
SELECT sig, slot, error_code, count, utc_timestamp from {}
)
as t (sig, slot, error_code, count, utc_timestamp) ON CONFLICT DO NOTHING;
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}
pub async fn insert_accounts_for_transaction(
&self,
accounts_for_transaction: Vec<AccountsForTransaction>,
) -> anyhow::Result<()> {
let temp_table = self.temp_table_tracker.get_new_temp_table();
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
account_key char(44),
signature char(88),
is_writable BOOL,
is_signer BOOL
);",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = format!(
r#"
COPY {}(
account_key, signature, is_writable, is_signer
) FROM STDIN BINARY
"#,
temp_table
);
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer =
BinaryCopyInWriter::new(sink, &[Type::TEXT, Type::TEXT, Type::BOOL, Type::BOOL]);
pin_mut!(writer);
for acc_tx in accounts_for_transaction {
for acc in &acc_tx.accounts {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(4);
args.push(&acc.key);
args.push(&acc_tx.signature);
args.push(&acc.writable);
args.push(&acc.is_signer);
writer.as_mut().write(&args).await?;
}
}
writer.finish().await?;
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_transaction(acc_id, transaction_id, is_writable, is_signer)
SELECT
( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ),
( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ),
t.is_writable,
t.is_signer
FROM (
SELECT account_key, signature, is_writable, is_signer from {}
)
as t (account_key, signature, is_writable, is_signer)
ON CONFLICT DO NOTHING;
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}
pub async fn insert_transactions_for_block(
&self,
transactions: &Vec<BlockTransactionInfo>,
slot: i64,
) -> anyhow::Result<()> {
let temp_table = self.temp_table_tracker.get_new_temp_table();
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
signature char(88),
processed_slot BIGINT,
is_successful BOOL,
cu_requested BIGINT,
cu_consumed BIGINT,
prioritization_fees BIGINT,
supp_infos text
);",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = format!(
r#"
COPY {}(
signature,
processed_slot,
is_successful,
cu_requested,
cu_consumed,
prioritization_fees,
supp_infos
) FROM STDIN BINARY
"#,
temp_table
);
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::TEXT,
Type::INT8,
Type::BOOL,
Type::INT8,
Type::INT8,
Type::INT8,
Type::TEXT,
],
);
pin_mut!(writer);
for transaction in transactions {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(7);
args.push(&transaction.signature);
args.push(&slot);
args.push(&transaction.is_successful);
args.push(&transaction.cu_requested);
args.push(&transaction.cu_consumed);
args.push(&transaction.prioritization_fees);
args.push(&transaction.supp_infos);
writer.as_mut().write(&args).await?;
}
writer.finish().await?;
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.transaction_infos
(transaction_id, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos)
SELECT
( select transaction_id from banking_stage_results_2.transactions where signature = t.signature ),
t.processed_slot,
t.is_successful,
t.cu_requested,
t.cu_consumed,
t.prioritization_fees,
t.supp_infos
FROM (
SELECT signature, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos from {}
)
as t (signature, processed_slot, is_successful, cu_requested, cu_consumed, prioritization_fees, supp_infos)
ON CONFLICT DO NOTHING;
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}
pub async fn save_account_usage_in_block(&self, block_info: &BlockInfo) -> anyhow::Result<()> {
let temp_table = self.temp_table_tracker.get_new_temp_table();
self.client
.execute(
format!(
"CREATE TEMP TABLE {}(
account_key CHAR(44),
slot BIGINT,
is_write_locked BOOL,
total_cu_requested BIGINT,
total_cu_consumed BIGINT,
prioritization_fees_info text
);",
temp_table
)
.as_str(),
&[],
)
.await?;
let statement = format!(
r#"
COPY {}(
account_key,
slot,
is_write_locked,
total_cu_requested,
total_cu_consumed,
prioritization_fees_info
) FROM STDIN BINARY
"#,
temp_table
);
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement.as_str()).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::TEXT,
Type::INT8,
Type::BOOL,
Type::INT8,
Type::INT8,
Type::TEXT,
],
);
pin_mut!(writer);
for account_usage in block_info.heavily_locked_accounts.iter() {
let is_writable = account_usage.is_write_locked;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(6);
let pf_json = serde_json::to_string(&account_usage.prioritization_fee_data)?;
args.push(&account_usage.key);
args.push(&block_info.slot);
args.push(&is_writable);
args.push(&account_usage.cu_requested);
args.push(&account_usage.cu_consumed);
args.push(&pf_json);
writer.as_mut().write(&args).await?;
}
writer.finish().await?;
let statement = format!(
r#"
INSERT INTO banking_stage_results_2.accounts_map_blocks
( acc_id,
slot,
is_write_locked,
total_cu_requested,
total_cu_consumed,
prioritization_fees_info
)
SELECT
( select acc_id from banking_stage_results_2.accounts where account_key = t.account_key ),
t.slot,
t.is_write_locked,
t.total_cu_requested,
t.total_cu_consumed,
t.prioritization_fees_info
FROM (
SELECT account_key,
slot,
is_write_locked,
total_cu_requested,
total_cu_consumed,
prioritization_fees_info from {}
)
as t (account_key,
slot,
is_write_locked,
total_cu_requested,
total_cu_consumed,
prioritization_fees_info
)
ON CONFLICT DO NOTHING;
"#,
temp_table
);
self.client.execute(statement.as_str(), &[]).await?;
self.drop_temp_table(temp_table).await?;
Ok(())
}
pub async fn save_block_info(&self, block_info: &BlockInfo) -> anyhow::Result<()> {
let statement = r#"
INSERT INTO banking_stage_results_2.blocks (
slot,
block_hash,
leader_identity,
successful_transactions,
processed_transactions,
total_cu_used,
total_cu_requested,
supp_infos
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8);
"#;
self.client
.execute(
statement,
&[
&block_info.slot,
&block_info.block_hash,
&block_info.leader_identity.clone().unwrap_or_default(),
&block_info.successful_transactions,
&block_info.processed_transactions,
&block_info.total_cu_used,
&block_info.total_cu_requested,
&serde_json::to_string(&block_info.sup_info)?,
],
)
.await?;
Ok(())
}
pub async fn save_banking_transaction_results(
@ -124,49 +600,40 @@ impl PostgresSession {
if txs.is_empty() {
return Ok(());
}
const NUMBER_OF_ARGS: usize = 10;
let txs: Vec<PostgresTransactionInfo> =
txs.iter().map(PostgresTransactionInfo::from).collect();
let statement = r#"
COPY banking_stage_results.transaction_infos(
signature, errors, is_executed, is_confirmed, first_notification_slot, cu_requested, prioritization_fees, utc_timestamp, accounts_used, processed_slot
) FROM STDIN BINARY
"#;
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await?;
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::TEXT,
Type::TEXT,
Type::BOOL,
Type::BOOL,
Type::INT8,
Type::INT8,
Type::INT8,
Type::TIMESTAMPTZ,
Type::TEXT,
Type::INT8,
],
);
pin_mut!(writer);
for tx in txs.iter() {
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
args.push(&tx.signature);
args.push(&tx.errors);
args.push(&tx.is_executed);
args.push(&tx.is_confirmed);
args.push(&tx.first_notification_slot);
args.push(&tx.cu_requested);
args.push(&tx.prioritization_fees);
args.push(&tx.utc_timestamp);
args.push(&tx.accounts_used);
args.push(&tx.processed_slot);
writer.as_mut().write(&args).await?;
}
writer.finish().await?;
// create transaction ids
let signatures = txs
.iter()
.map(|transaction| transaction.signature.clone())
.collect_vec();
self.create_transaction_ids(signatures).await?;
// create account ids
let accounts = txs
.iter()
.map(|transaction| transaction.account_used.clone())
.flatten()
.map(|(acc, _)| acc)
.collect_vec();
self.create_accounts_for_transaction(accounts).await?;
// add transaction in tx slot table
self.insert_transaction_in_txslot_table(&txs.as_slice())
.await?;
let txs_accounts = txs
.iter()
.map(|tx| AccountsForTransaction {
signature: tx.signature.clone(),
accounts: tx
.account_used
.iter()
.map(|(key, is_writable)| AccountUsed {
key: key.clone(),
writable: *is_writable,
is_signer: false,
})
.collect(),
})
.collect_vec();
// insert accounts for transaction
self.insert_accounts_for_transaction(txs_accounts).await?;
Ok(())
}
@ -180,51 +647,46 @@ impl PostgresSession {
}
pub async fn save_block(&self, block_info: BlockInfo) -> anyhow::Result<()> {
const NUMBER_OF_ARGS: usize = 11;
let mut args: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(NUMBER_OF_ARGS);
args.push(&block_info.block_hash);
args.push(&block_info.slot);
args.push(&block_info.leader_identity);
args.push(&block_info.successful_transactions);
args.push(&block_info.banking_stage_errors);
args.push(&block_info.processed_transactions);
args.push(&block_info.total_cu_used);
args.push(&block_info.total_cu_requested);
let heavily_writelocked_accounts =
serde_json::to_string(&block_info.heavily_writelocked_accounts).unwrap_or_default();
let heavily_readlocked_accounts =
serde_json::to_string(&block_info.heavily_readlocked_accounts).unwrap_or_default();
args.push(&heavily_writelocked_accounts);
args.push(&heavily_readlocked_accounts);
// create transaction ids
let signatures = block_info
.transactions
.iter()
.map(|transaction| transaction.signature.clone())
.collect_vec();
self.create_transaction_ids(signatures).await?;
// create account ids
let accounts = block_info
.heavily_locked_accounts
.iter()
.map(|acc| acc.key.clone())
.collect_vec();
self.create_accounts_for_transaction(accounts).await?;
let supp_infos = serde_json::to_string(&block_info.sup_info).unwrap_or_default();
args.push(&supp_infos);
let txs_accounts = block_info
.transactions
.iter()
.map(|tx| AccountsForTransaction {
signature: tx.signature.clone(),
accounts: tx
.accounts
.iter()
.map(|acc| AccountUsed {
key: acc.key.clone(),
writable: acc.is_writable,
is_signer: acc.is_signer,
})
.collect(),
})
.collect_vec();
self.insert_accounts_for_transaction(txs_accounts).await?;
let statement = r#"
COPY banking_stage_results.blocks(
block_hash, slot, leader_identity, successful_transactions, banking_stage_errors, processed_transactions, total_cu_used, total_cu_requested, heavily_writelocked_accounts, heavily_readlocked_accounts, supp_infos
) FROM STDIN BINARY
"#;
let sink: CopyInSink<bytes::Bytes> = self.copy_in(statement).await.unwrap();
let writer = BinaryCopyInWriter::new(
sink,
&[
Type::TEXT,
Type::INT8,
Type::TEXT,
Type::INT8,
Type::INT8,
Type::INT8,
Type::INT8,
Type::INT8,
Type::TEXT,
Type::TEXT,
Type::TEXT,
],
);
pin_mut!(writer);
writer.as_mut().write(&args).await?;
writer.finish().await?;
// save transactions in block
self.insert_transactions_for_block(&block_info.transactions, block_info.slot)
.await?;
// save account usage in blocks
self.save_account_usage_in_block(&block_info).await?;
self.save_block_info(&block_info).await?;
Ok(())
}
}
@ -269,13 +731,13 @@ impl Postgres {
.map(|(_, tree)| tree.iter().map(|(_, info)| info).cloned().collect_vec())
.flatten()
.collect_vec();
let batches = data.chunks(32).collect_vec();
let batches = data.chunks(1024).collect_vec();
for batch in batches {
if let Err(e) = session
if let Err(err) = session
.save_banking_transaction_results(batch.to_vec())
.await
{
panic!("saving transaction infos failed {}", e);
panic!("Error saving transaction infos {}", err);
}
}
}
@ -288,62 +750,19 @@ impl Postgres {
}
}
pub struct PostgresTransactionInfo {
pub signature: String,
pub errors: String,
pub is_executed: bool,
pub is_confirmed: bool,
pub first_notification_slot: i64,
pub cu_requested: Option<i64>,
pub prioritization_fees: Option<i64>,
pub utc_timestamp: DateTime<Utc>,
pub accounts_used: String,
pub processed_slot: Option<i64>,
}
#[derive(Serialize, Clone)]
pub struct TransactionErrorData {
error: TransactionError,
slot: u64,
count: usize,
}
#[derive(Serialize, Clone)]
pub struct AccountUsed {
key: String,
writable: bool,
is_signer: bool,
}
impl From<&TransactionInfo> for PostgresTransactionInfo {
fn from(value: &TransactionInfo) -> Self {
let errors = value
.errors
.iter()
.map(|(key, count)| TransactionErrorData {
error: key.error.clone(),
slot: key.slot,
count: *count,
})
.collect_vec();
let accounts_used = value
.account_used
.iter()
.map(|(key, writable)| AccountUsed {
key: key.to_string(),
writable: *writable,
})
.collect_vec();
Self {
signature: value.signature.clone(),
errors: serde_json::to_string(&errors).unwrap_or_default(),
is_executed: value.is_executed,
is_confirmed: value.is_confirmed,
cu_requested: value.cu_requested.map(|x| x as i64),
first_notification_slot: value.first_notification_slot as i64,
prioritization_fees: value.prioritization_fees.map(|x| x as i64),
utc_timestamp: value.utc_timestamp,
accounts_used: serde_json::to_string(&accounts_used).unwrap_or_default(),
processed_slot: value.processed_slot.map(|x| x as i64),
}
}
pub struct AccountsForTransaction {
pub signature: String,
pub accounts: Vec<AccountUsed>,
}

View File

@ -2,18 +2,7 @@ use std::{collections::HashMap, hash::Hash};
use chrono::{DateTime, Utc};
use itertools::Itertools;
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
compute_budget::{self, ComputeBudgetInstruction},
instruction::CompiledInstruction,
message::{
v0::{self, MessageAddressTableLookup},
MessageHeader, VersionedMessage,
},
pubkey::Pubkey,
slot_history::Slot,
transaction::{TransactionError, VersionedTransaction},
};
use solana_sdk::transaction::TransactionError;
use yellowstone_grpc_proto::prelude::SubscribeUpdateBankingTransactionResults;
fn convert_transaction_error_into_int(error: &TransactionError) -> u8 {
@ -60,20 +49,24 @@ fn convert_transaction_error_into_int(error: &TransactionError) -> u8 {
#[derive(Clone, PartialEq)]
pub struct ErrorKey {
pub error: TransactionError,
pub slot: Slot,
}
impl ToString for ErrorKey {
fn to_string(&self) -> String {
self.error.to_string() + "-" + self.slot.to_string().as_str()
self.error.to_string()
}
}
impl Hash for ErrorKey {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
let tmp = convert_transaction_error_into_int(&self.error);
tmp.hash(state);
self.slot.hash(state);
tmp.hash(state)
}
}
impl ErrorKey {
pub fn to_int(&self) -> i32 {
convert_transaction_error_into_int(&self.error) as i32
}
}
@ -83,30 +76,21 @@ impl Eq for ErrorKey {}
pub struct TransactionInfo {
pub signature: String,
pub errors: HashMap<ErrorKey, usize>,
pub is_executed: bool,
pub is_confirmed: bool,
pub first_notification_slot: u64,
pub cu_requested: Option<u64>,
pub prioritization_fees: Option<u64>,
pub slot: u64,
pub utc_timestamp: DateTime<Utc>,
pub account_used: Vec<(String, bool)>,
pub processed_slot: Option<Slot>,
}
impl TransactionInfo {
pub fn new(notification: &SubscribeUpdateBankingTransactionResults) -> Self {
let mut errors = HashMap::new();
let is_executed = notification.error.is_none();
// Get time
let utc_timestamp = Utc::now();
match &notification.error {
Some(e) => {
let error: TransactionError = bincode::deserialize(&e.err).unwrap();
let key = ErrorKey {
error,
slot: notification.slot,
};
let key = ErrorKey { error };
errors.insert(key, 1);
}
None => {}
@ -120,23 +104,17 @@ impl TransactionInfo {
Self {
signature: notification.signature.clone(),
errors,
is_executed,
is_confirmed: false,
first_notification_slot: notification.slot,
cu_requested: None,
prioritization_fees: None,
slot: notification.slot,
utc_timestamp,
account_used,
processed_slot: None,
}
}
pub fn add_notification(&mut self, notification: &SubscribeUpdateBankingTransactionResults) {
match &notification.error {
Some(error) => {
let slot = notification.slot;
let error: TransactionError = bincode::deserialize(&error.err).unwrap();
let key = ErrorKey { error, slot };
let key = ErrorKey { error };
match self.errors.get_mut(&key) {
Some(x) => {
*x += 1;
@ -149,205 +127,4 @@ impl TransactionInfo {
None => {}
}
}
pub fn add_rpc_transaction(&mut self, slot: u64, transaction: &VersionedTransaction) {
let message = &transaction.message;
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((units, Some(((units * 1000) / additional_fee) as u64)));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let prioritization_fees = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
})
.or(legacy_prioritization_fees);
if let Some(cu_requested) = cu_requested {
self.cu_requested = Some(cu_requested as u64);
}
if let Some(prioritization_fees) = prioritization_fees {
self.prioritization_fees = Some(prioritization_fees);
}
self.is_confirmed = true;
self.is_executed = true;
self.processed_slot = Some(slot);
}
pub fn add_transaction(
&mut self,
transaction: &yellowstone_grpc_proto_original::prelude::SubscribeUpdateTransactionInfo,
slot: Slot,
) {
let Some(transaction) = &transaction.transaction else {
return;
};
let Some(message) = &transaction.message else {
return;
};
let Some(header) = &message.header else {
return;
};
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] = key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
MessageAddressTableLookup {
account_key: Pubkey::new_from_array(bytes),
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
let legacy_compute_budget: Option<(u32, Option<u64>)> =
message.instructions().iter().find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
units,
additional_fee,
}) = try_from_slice_unchecked(i.data.as_slice())
{
if additional_fee > 0 {
return Some((units, Some(((units * 1000) / additional_fee) as u64)));
} else {
return Some((units, None));
}
}
}
None
});
let legacy_cu_requested = legacy_compute_budget.map(|x| x.0);
let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None);
let cu_requested = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(limit);
}
}
None
})
.or(legacy_cu_requested);
let prioritization_fees = message
.instructions()
.iter()
.find_map(|i| {
if i.program_id(message.static_account_keys())
.eq(&compute_budget::id())
{
if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) =
try_from_slice_unchecked(i.data.as_slice())
{
return Some(price);
}
}
None
})
.or(legacy_prioritization_fees);
if let Some(cu_requested) = cu_requested {
self.cu_requested = Some(cu_requested as u64);
}
if let Some(prioritization_fees) = prioritization_fees {
self.prioritization_fees = Some(prioritization_fees);
}
self.is_confirmed = true;
self.is_executed = true;
self.processed_slot = Some(slot);
}
}