cargo fmt and minor changes

This commit is contained in:
godmodegalactus 2024-01-26 11:46:39 +01:00
parent 2b16651f2b
commit edcdfa85d9
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
3 changed files with 111 additions and 104 deletions

View File

@ -1,3 +1,4 @@
use crate::block_info::TransactionAccount;
use dashmap::DashMap;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
@ -6,7 +7,6 @@ use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::{collections::HashSet, sync::Arc, time::Duration};
use crate::block_info::TransactionAccount;
use tokio::sync::RwLock;
lazy_static::lazy_static! {
@ -34,10 +34,10 @@ impl ALTStore {
let alts_list = {
let lk = self.under_loading.read().await;
alts_list
.iter()
.filter(|x| !self.map.contains_key(x) && !lk.contains(x))
.cloned()
.collect_vec()
.iter()
.filter(|x| !self.map.contains_key(x) && !lk.contains(x))
.cloned()
.collect_vec()
};
if alts_list.is_empty() {
@ -54,12 +54,14 @@ impl ALTStore {
let rpc_client = self.rpc_client.clone();
let this = self.clone();
tokio::spawn(async move {
if let Ok(Ok(multiple_accounts)) = tokio::time::timeout( Duration::from_secs(30), rpc_client
.get_multiple_accounts_with_commitment(
if let Ok(Ok(multiple_accounts)) = tokio::time::timeout(
Duration::from_secs(30),
rpc_client.get_multiple_accounts_with_commitment(
&batch,
CommitmentConfig::processed(),
))
.await
),
)
.await
{
for (index, acc) in multiple_accounts.value.iter().enumerate() {
if let Some(acc) = acc {
@ -79,7 +81,6 @@ impl ALTStore {
self.finished_loading(&alts_list).await;
ALTS_IN_STORE.set(alts_list.len() as i64);
}
pub fn save_account(&self, address: &Pubkey, data: &[u8]) {
@ -164,9 +165,8 @@ impl ALTStore {
write_accounts: &Vec<u8>,
read_account: &Vec<u8>,
) -> Vec<TransactionAccount> {
let mut times = 0;
const MAX_TIMES_RETRY : usize = 100;
const MAX_TIMES_RETRY: usize = 100;
while self.is_loading_contains(alt).await {
if times > MAX_TIMES_RETRY {
break;
@ -203,14 +203,14 @@ impl ALTStore {
}
}
async fn is_loading(&self, alts_list: &Vec<Pubkey>) {
async fn is_loading(&self, alts_list: &Vec<Pubkey>) {
let mut write = self.under_loading.write().await;
for alt in alts_list {
write.insert(alt.clone());
}
}
async fn finished_loading(&self, alts_list: &Vec<Pubkey>) {
async fn finished_loading(&self, alts_list: &Vec<Pubkey>) {
let mut write = self.under_loading.write().await;
for alt in alts_list {
write.remove(alt);
@ -225,14 +225,15 @@ impl ALTStore {
#[derive(Serialize, Deserialize)]
pub struct BinaryALTData {
pub data: Vec<(Pubkey, Vec<Pubkey>)>
pub data: Vec<(Pubkey, Vec<Pubkey>)>,
}
impl BinaryALTData {
pub fn new(map: &Arc<DashMap<Pubkey, Vec<Pubkey>>>) -> Self {
let data = map.iter().map(|x| (x.key().clone(), x.value().clone())).collect_vec();
Self {
data
}
let data = map
.iter()
.map(|x| (x.key().clone(), x.value().clone()))
.collect_vec();
Self { data }
}
}
}

View File

@ -118,8 +118,6 @@ pub struct BlockInfo {
pub transactions: Vec<BlockTransactionInfo>,
}
impl BlockInfo {
pub async fn process_versioned_message(
atl_store: &Arc<ALTStore>,
@ -381,97 +379,103 @@ impl BlockInfo {
let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![];
let mut lookup_tables = HashSet::new();
let sigs_and_messages = block.transactions.iter().filter_map( |transaction| {
let Some(tx) = &transaction.transaction else {
return None;
};
let sigs_and_messages = block
.transactions
.iter()
.filter_map(|transaction| {
let Some(tx) = &transaction.transaction else {
return None;
};
let Some(message) = &tx.message else {
return None;
};
let Some(message) = &tx.message else {
return None;
};
let Some(header) = &message.header else {
return None;
};
let Some(header) = &message.header else {
return None;
};
let Some(meta) = &transaction.meta else {
return None;
};
let signature = Signature::try_from(&tx.signatures[0][0..64])
.unwrap()
.to_string();
let Some(meta) = &transaction.meta else {
return None;
};
let signature = Signature::try_from(&tx.signatures[0][0..64])
.unwrap()
.to_string();
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
let account_key = Pubkey::new_from_array(bytes);
lookup_tables.insert(account_key.clone());
MessageAddressTableLookup {
account_key,
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
Some((signature, message, meta, transaction.is_vote))
}).collect_vec();
let message = VersionedMessage::V0(v0::Message {
header: MessageHeader {
num_required_signatures: header.num_required_signatures as u8,
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
},
account_keys: message
.account_keys
.clone()
.into_iter()
.map(|key| {
let bytes: [u8; 32] =
key.try_into().unwrap_or(Pubkey::default().to_bytes());
Pubkey::new_from_array(bytes)
})
.collect(),
recent_blockhash: solana_sdk::hash::Hash::new(&message.recent_blockhash),
instructions: message
.instructions
.clone()
.into_iter()
.map(|ix| CompiledInstruction {
program_id_index: ix.program_id_index as u8,
accounts: ix.accounts,
data: ix.data,
})
.collect(),
address_table_lookups: message
.address_table_lookups
.clone()
.into_iter()
.map(|table| {
let bytes: [u8; 32] = table
.account_key
.try_into()
.unwrap_or(Pubkey::default().to_bytes());
let account_key = Pubkey::new_from_array(bytes);
lookup_tables.insert(account_key.clone());
MessageAddressTableLookup {
account_key,
writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes,
}
})
.collect(),
});
Some((signature, message, meta, transaction.is_vote))
})
.collect_vec();
atl_store.load_all_alts(lookup_tables.iter().cloned().collect_vec()).await;
atl_store
.load_all_alts(lookup_tables.iter().cloned().collect_vec())
.await;
let mut block_transactions = vec![];
for (signature, message, meta, is_vote) in sigs_and_messages {
let tx = Self::process_versioned_message(
&atl_store,
&signature,
slot,
&message,
&mut prio_fees_in_block,
&mut writelocked_accounts,
&mut readlocked_accounts,
meta.compute_units_consumed.unwrap_or(0),
&mut total_cu_requested,
is_vote,
meta.err.is_none(),
)
.await;
&atl_store,
&signature,
slot,
&message,
&mut prio_fees_in_block,
&mut writelocked_accounts,
&mut readlocked_accounts,
meta.compute_units_consumed.unwrap_or(0),
&mut total_cu_requested,
is_vote,
meta.err.is_none(),
)
.await;
if let Some(tx) = tx {
block_transactions.push(tx);
}
};
}
let heavily_locked_accounts =
Self::calculate_account_usage(&writelocked_accounts, &readlocked_accounts);

View File

@ -13,7 +13,10 @@ use std::{
},
time::Duration,
};
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, time::Instant};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
time::Instant,
};
use crate::prometheus_sync::PrometheusSync;
use block_info::BlockInfo;
@ -207,7 +210,7 @@ async fn start_tracking_blocks(
// .collect_vec();
// ALT store from binary
// let atl_store = {
// let atl_store = {
// let alt_store = Arc::new(alt_store::ALTStore::new(rpc_client));
// let mut alts_file = tokio::fs::File::open(alts_list).await.unwrap();
// let mut buf = vec![];
@ -325,7 +328,6 @@ async fn start_tracking_blocks(
}
}
#[tokio::main(worker_threads=1)]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();