Merge remote-tracking branch 'origin/main' into deploy/fly-cleanupjob

This commit is contained in:
GroovieGermanikus 2024-01-05 11:12:18 +01:00
commit 009dd4ea03
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
11 changed files with 5505 additions and 106 deletions

50
Cargo.lock generated
View File

@ -327,6 +327,19 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "async-channel"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
dependencies = [
"concurrent-queue",
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]] [[package]]
name = "async-compression" name = "async-compression"
version = "0.4.4" version = "0.4.4"
@ -805,6 +818,15 @@ dependencies = [
"unreachable", "unreachable",
] ]
[[package]]
name = "concurrent-queue"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363"
dependencies = [
"crossbeam-utils",
]
[[package]] [[package]]
name = "console" name = "console"
version = "0.15.7" version = "0.15.7"
@ -1186,6 +1208,27 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "event-listener"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "218a870470cce1469024e9fb66b901aa983929d81304a1cdb299f28118e550d5"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]] [[package]]
name = "fallible-iterator" name = "fallible-iterator"
version = "0.2.0" version = "0.2.0"
@ -1414,6 +1457,7 @@ name = "grpc_banking_transactions_notifications"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-channel",
"base64 0.21.5", "base64 0.21.5",
"bincode", "bincode",
"bs58", "bs58",
@ -2257,6 +2301,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.1" version = "0.12.1"

View File

@ -33,6 +33,7 @@ prometheus = "0.13.3"
lazy_static = "1.4.0" lazy_static = "1.4.0"
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] } tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] }
rustls = { version = "=0.20.8", default-features = false } rustls = { version = "=0.20.8", default-features = false }
async-channel = "2.1.1"
tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] } tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] }
yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"} yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"}

View File

@ -20,5 +20,6 @@ FROM debian:bullseye-slim as run
RUN apt-get update && apt-get -y install ca-certificates libc6 RUN apt-get update && apt-get -y install ca-certificates libc6
COPY --from=build /app/target/release/grpc_banking_transactions_notifications /usr/local/bin/ COPY --from=build /app/target/release/grpc_banking_transactions_notifications /usr/local/bin/
COPY --from=build /app/target/release/cleanupdb /usr/local/bin/ COPY --from=build /app/target/release/cleanupdb /usr/local/bin/
COPY --from=build /app/alts.txt /usr/local/bin/
CMD run-service-and-cleanup.sh CMD run-service-and-cleanup.sh

5156
alts.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
CREATE SCHEMA banking_stage_results_2; CREATE SCHEMA banking_stage_results_2;
CREATE TABLE banking_stage_results_2.transactions( CREATE TABLE banking_stage_results_2.transactions(
signature char(88) primary key, signature text primary key,
transaction_id bigserial, transaction_id bigserial,
UNIQUE(transaction_id) UNIQUE(transaction_id)
); );
@ -35,8 +35,8 @@ CREATE INDEX idx_transaction_slot_slot ON banking_stage_results_2.transaction_sl
CREATE TABLE banking_stage_results_2.blocks ( CREATE TABLE banking_stage_results_2.blocks (
slot BIGINT PRIMARY KEY, slot BIGINT PRIMARY KEY,
block_hash CHAR(44), block_hash text,
leader_identity CHAR(44), leader_identity text,
successful_transactions BIGINT, successful_transactions BIGINT,
processed_transactions BIGINT, processed_transactions BIGINT,
total_cu_used BIGINT, total_cu_used BIGINT,
@ -46,7 +46,7 @@ CREATE TABLE banking_stage_results_2.blocks (
CREATE TABLE banking_stage_results_2.accounts( CREATE TABLE banking_stage_results_2.accounts(
acc_id bigserial primary key, acc_id bigserial primary key,
account_key text, account_key char(44),
UNIQUE (account_key) UNIQUE (account_key)
); );
@ -55,7 +55,8 @@ CREATE TABLE banking_stage_results_2.accounts_map_transaction(
transaction_id BIGINT, transaction_id BIGINT,
is_writable BOOL, is_writable BOOL,
is_signer BOOL, is_signer BOOL,
PRIMARY KEY (acc_id, transaction_id) is_atl BOOL,
PRIMARY KEY (transaction_id, acc_id)
); );
ALTER TABLE banking_stage_results_2.accounts_map_transaction ADD COLUMN is_atl BOOL; ALTER TABLE banking_stage_results_2.accounts_map_transaction ADD COLUMN is_atl BOOL;
@ -120,3 +121,12 @@ VACUUM FULL banking_stage_results_2.blocks;
-- optional -- optional
CLUSTER banking_stage_results_2.transaction_slot using idx_transaction_slot_timestamp; CLUSTER banking_stage_results_2.transaction_slot using idx_transaction_slot_timestamp;
VACUUM FULL banking_stage_results_2.transaction_slot; VACUUM FULL banking_stage_results_2.transaction_slot;
ALTER TABLE banking_stage_results_2.transactions ALTER COLUMN signature TYPE TEXT;
CLUSTER banking_stage_results_2.accounts_map_transaction using accounts_map_transaction_pkey;
CLUSTER banking_stage_results_2.transactions using transactions_pkey;
CLUSTER banking_stage_results_2.accounts using accounts_pkey;

View File

@ -16,4 +16,4 @@ while true; do
sleep 18000; sleep 18000;
done & done &
/usr/local/bin/grpc_banking_transactions_notifications --rpc-url "$RPC_URL" --grpc-address-to-fetch-blocks "$GEYSER_GRPC_ADDRESS" --grpc-x-token "$GEYSER_GRPC_X_TOKEN" --banking-grpc-addresses "$LIST_OF_BANKING_STAGE_GRPCS" /usr/local/bin/grpc_banking_transactions_notifications --rpc-url "$RPC_URL" --grpc-address-to-fetch-blocks "$GEYSER_GRPC_ADDRESS" --grpc-x-token "$GEYSER_GRPC_X_TOKEN" --banking-grpc-addresses "$LIST_OF_BANKING_STAGE_GRPCS" -a /usr/local/bin/alts.txt

View File

@ -1,18 +1,23 @@
use dashmap::DashMap; use dashmap::DashMap;
use itertools::Itertools; use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_address_lookup_table_program::state::AddressLookupTable; use solana_address_lookup_table_program::state::AddressLookupTable;
use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{ use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey};
commitment_config::CommitmentConfig, pubkey::Pubkey, slot_hashes::SlotHashes,
slot_history::Slot,
};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock;
use crate::block_info::TransactionAccount; use crate::block_info::TransactionAccount;
lazy_static::lazy_static! {
static ref ALTS_IN_STORE: IntGauge =
register_int_gauge!(opts!("banking_stage_sidecar_alts_stored", "Alts stored in sidecar")).unwrap();
}
#[derive(Clone)]
pub struct ALTStore { pub struct ALTStore {
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
pub map: Arc<DashMap<Pubkey, Vec<u8>>>, pub map: Arc<DashMap<Pubkey, Vec<Pubkey>>>,
is_loading: Arc<DashMap<Pubkey, Arc<tokio::sync::RwLock<()>>>>,
} }
impl ALTStore { impl ALTStore {
@ -20,56 +25,112 @@ impl ALTStore {
Self { Self {
rpc_client, rpc_client,
map: Arc::new(DashMap::new()), map: Arc::new(DashMap::new()),
is_loading: Arc::new(DashMap::new()),
} }
} }
pub async fn load_alt_from_rpc(&self, alt: &Pubkey) { pub async fn load_all_alts(&self, alts_list: Vec<Pubkey>) {
if !self.map.contains_key(&alt) { let alts_list = alts_list
self.reload_alt_from_rpc(&alt).await; .iter()
.filter(|x| self.map.contains_key(&x) || self.is_loading.contains_key(&x))
.cloned()
.collect_vec();
if alts_list.is_empty() {
return;
} }
log::info!("Preloading {} ALTs", alts_list.len());
for batches in alts_list.chunks(1000).map(|x| x.to_vec()) {
let lock = Arc::new(RwLock::new(()));
// add in loading list
batches.iter().for_each(|x| {
self.is_loading.insert(x.clone(), lock.clone());
});
let _context = lock.write().await;
let tasks = batches.chunks(10).map(|batch| {
let batch = batch.to_vec();
let rpc_client = self.rpc_client.clone();
let this = self.clone();
tokio::spawn(async move {
if let Ok(multiple_accounts) = rpc_client
.get_multiple_accounts_with_commitment(
&batch,
CommitmentConfig::processed(),
)
.await
{
for (index, acc) in multiple_accounts.value.iter().enumerate() {
if let Some(acc) = acc {
this.save_account(&batch[index], &acc.data);
}
}
}
})
});
futures::future::join_all(tasks).await;
}
log::info!("Finished Loading {} ALTs", alts_list.len());
}
pub fn save_account(&self, address: &Pubkey, data: &[u8]) {
let lookup_table = AddressLookupTable::deserialize(&data).unwrap();
if self
.map
.insert(address.clone(), lookup_table.addresses.to_vec())
.is_none()
{
ALTS_IN_STORE.inc();
}
drop(lookup_table);
} }
pub async fn reload_alt_from_rpc(&self, alt: &Pubkey) { pub async fn reload_alt_from_rpc(&self, alt: &Pubkey) {
let lock = Arc::new(RwLock::new(()));
let _ = lock.write().await;
self.is_loading.insert(alt.clone(), lock.clone());
let response = self let response = self
.rpc_client .rpc_client
.get_account_with_commitment(alt, CommitmentConfig::processed()) .get_account_with_commitment(alt, CommitmentConfig::processed())
.await; .await;
if let Ok(account_res) = response { if let Ok(account_res) = response {
if let Some(account) = account_res.value { if let Some(account) = account_res.value {
self.map.insert(*alt, account.data); self.save_account(alt, account.data());
} }
} }
} }
pub async fn load_accounts( pub async fn load_accounts(
&self, &self,
slot: Slot,
alt: &Pubkey, alt: &Pubkey,
write_accounts: &Vec<u8>, write_accounts: &Vec<u8>,
read_account: &Vec<u8>, read_account: &Vec<u8>,
) -> Option<Vec<TransactionAccount>> { ) -> Option<Vec<TransactionAccount>> {
match self.map.get(&alt) { match self.map.get(&alt) {
Some(account) => { Some(lookup_table) => {
let lookup_table = AddressLookupTable::deserialize(&account.value()).unwrap(); if write_accounts
let write_accounts = .iter()
lookup_table.lookup(slot, write_accounts, &SlotHashes::default()); .any(|x| *x as usize >= lookup_table.len())
let read_account = lookup_table.lookup(slot, read_account, &SlotHashes::default()); || read_account
.iter()
let write_accounts = if let Ok(write_accounts) = write_accounts { .any(|x| *x as usize >= lookup_table.len())
write_accounts {
} else {
return None; return None;
}; }
let read_account = if let Ok(read_account) = read_account { let write_accounts = write_accounts
read_account .iter()
} else { .map(|i| lookup_table[*i as usize])
return None; .collect_vec();
}; let read_account = read_account
.iter()
.map(|i| lookup_table[*i as usize])
.collect_vec();
let wa = write_accounts let wa = write_accounts
.iter() .iter()
.map(|key| TransactionAccount { .map(|key| TransactionAccount {
key: key.to_string(), key: key.clone(),
is_writable: true, is_writable: true,
is_signer: false, is_signer: false,
is_alt: true, is_alt: true,
@ -78,7 +139,7 @@ impl ALTStore {
let ra = read_account let ra = read_account
.iter() .iter()
.map(|key| TransactionAccount { .map(|key| TransactionAccount {
key: key.to_string(), key: key.clone(),
is_writable: false, is_writable: false,
is_signer: false, is_signer: false,
is_alt: true, is_alt: true,
@ -92,24 +153,27 @@ impl ALTStore {
pub async fn get_accounts( pub async fn get_accounts(
&self, &self,
current_slot: Slot,
alt: &Pubkey, alt: &Pubkey,
write_accounts: &Vec<u8>, write_accounts: &Vec<u8>,
read_account: &Vec<u8>, read_account: &Vec<u8>,
) -> Vec<TransactionAccount> { ) -> Vec<TransactionAccount> {
self.load_alt_from_rpc(&alt).await; match self.is_loading.get(&alt) {
match self Some(x) => {
.load_accounts(current_slot, alt, write_accounts, read_account) // if there is a lock we wait until it is fullfilled
.await let x = x.value().clone();
{ log::debug!("waiting for alt {}", alt.to_string());
let _ = x.read().await;
}
None => {
// not loading
}
}
match self.load_accounts(alt, write_accounts, read_account).await {
Some(x) => x, Some(x) => x,
None => { None => {
//load alt //load alt
self.reload_alt_from_rpc(&alt).await; self.reload_alt_from_rpc(&alt).await;
match self match self.load_accounts(alt, write_accounts, read_account).await {
.load_accounts(current_slot, alt, write_accounts, read_account)
.await
{
Some(x) => x, Some(x) => x,
None => { None => {
// reloading did not work // reloading did not work

View File

@ -13,7 +13,10 @@ use solana_sdk::{
signature::Signature, signature::Signature,
slot_history::Slot, slot_history::Slot,
}; };
use std::{collections::HashMap, sync::Arc}; use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
#[derive(Serialize, Debug, Clone)] #[derive(Serialize, Debug, Clone)]
pub struct PrioFeeData { pub struct PrioFeeData {
@ -76,11 +79,16 @@ pub struct PrioritizationFeesInfo {
pub p_75: u64, pub p_75: u64,
pub p_90: u64, pub p_90: u64,
pub p_max: u64, pub p_max: u64,
pub med_cu: Option<u64>,
pub p75_cu: Option<u64>,
pub p90_cu: Option<u64>,
pub p95_cu: Option<u64>,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct TransactionAccount { pub struct TransactionAccount {
pub key: String, pub key: Pubkey,
pub is_writable: bool, pub is_writable: bool,
pub is_signer: bool, pub is_signer: bool,
pub is_alt: bool, pub is_alt: bool,
@ -116,9 +124,9 @@ impl BlockInfo {
signature: String, signature: String,
slot: Slot, slot: Slot,
message: &VersionedMessage, message: &VersionedMessage,
prio_fees_in_block: &mut Vec<u64>, prio_fees_in_block: &mut Vec<(u64, u64)>,
writelocked_accounts: &mut HashMap<String, AccountData>, writelocked_accounts: &mut HashMap<Pubkey, AccountData>,
readlocked_accounts: &mut HashMap<String, AccountData>, readlocked_accounts: &mut HashMap<Pubkey, AccountData>,
cu_consumed: u64, cu_consumed: u64,
total_cu_requested: &mut u64, total_cu_requested: &mut u64,
is_vote: bool, is_vote: bool,
@ -168,7 +176,7 @@ impl BlockInfo {
(cu_request, prio_fees, nb_ix_except_cb) (cu_request, prio_fees, nb_ix_except_cb)
}; };
let prioritization_fees = prio_fees.unwrap_or_default(); let prioritization_fees = prio_fees.unwrap_or_default();
prio_fees_in_block.push(prioritization_fees); prio_fees_in_block.push((prioritization_fees, cu_consumed));
let cu_requested = let cu_requested =
std::cmp::min(1_400_000, cu_requested.unwrap_or(200000 * nb_ix_except_cb)); std::cmp::min(1_400_000, cu_requested.unwrap_or(200000 * nb_ix_except_cb));
*total_cu_requested += cu_requested; *total_cu_requested += cu_requested;
@ -178,7 +186,7 @@ impl BlockInfo {
.iter() .iter()
.enumerate() .enumerate()
.map(|(index, account)| TransactionAccount { .map(|(index, account)| TransactionAccount {
key: account.to_string(), key: account.clone(),
is_writable: message.is_maybe_writable(index), is_writable: message.is_maybe_writable(index),
is_signer: message.is_signer(index), is_signer: message.is_signer(index),
is_alt: false, is_alt: false,
@ -189,7 +197,6 @@ impl BlockInfo {
let atl_acc = atl_message.account_key; let atl_acc = atl_message.account_key;
let mut atl_accs = atl_store let mut atl_accs = atl_store
.get_accounts( .get_accounts(
slot,
&atl_acc, &atl_acc,
&atl_message.writable_indexes, &atl_message.writable_indexes,
&atl_message.readonly_indexes, &atl_message.readonly_indexes,
@ -214,7 +221,7 @@ impl BlockInfo {
writelocked_accounts.insert( writelocked_accounts.insert(
writable_account.clone(), writable_account.clone(),
AccountData { AccountData {
key: writable_account, key: writable_account.to_string(),
cu_consumed, cu_consumed,
cu_requested, cu_requested,
vec_pf: vec![prioritization_fees], vec_pf: vec![prioritization_fees],
@ -239,7 +246,7 @@ impl BlockInfo {
readlocked_accounts.insert( readlocked_accounts.insert(
readable_account.clone(), readable_account.clone(),
AccountData { AccountData {
key: readable_account, key: readable_account.to_string(),
cu_consumed, cu_consumed,
cu_requested, cu_requested,
vec_pf: vec![prioritization_fees], vec_pf: vec![prioritization_fees],
@ -265,8 +272,8 @@ impl BlockInfo {
} }
pub fn calculate_account_usage( pub fn calculate_account_usage(
writelocked_accounts: &HashMap<String, AccountData>, writelocked_accounts: &HashMap<Pubkey, AccountData>,
readlocked_accounts: &HashMap<String, AccountData>, readlocked_accounts: &HashMap<Pubkey, AccountData>,
) -> Vec<AccountUsage> { ) -> Vec<AccountUsage> {
let mut accounts = writelocked_accounts let mut accounts = writelocked_accounts
.iter() .iter()
@ -282,19 +289,51 @@ impl BlockInfo {
} }
pub fn calculate_supp_info( pub fn calculate_supp_info(
prio_fees_in_block: &mut Vec<u64>, prio_fees_in_block: &mut Vec<(u64, u64)>,
) -> Option<PrioritizationFeesInfo> { ) -> Option<PrioritizationFeesInfo> {
if !prio_fees_in_block.is_empty() { if !prio_fees_in_block.is_empty() {
prio_fees_in_block.sort(); // get stats by transaction
prio_fees_in_block.sort_by(|a, b| a.0.cmp(&b.0));
let median_index = prio_fees_in_block.len() / 2; let median_index = prio_fees_in_block.len() / 2;
let p75_index = prio_fees_in_block.len() * 75 / 100; let p75_index = prio_fees_in_block.len() * 75 / 100;
let p90_index = prio_fees_in_block.len() * 90 / 100; let p90_index = prio_fees_in_block.len() * 90 / 100;
let p_min = prio_fees_in_block[0].0;
let p_median = prio_fees_in_block[median_index].0;
let p_75 = prio_fees_in_block[p75_index].0;
let p_90 = prio_fees_in_block[p90_index].0;
let p_max = prio_fees_in_block.last().map(|x| x.0).unwrap_or_default();
let mut med_cu = None;
let mut p75_cu = None;
let mut p90_cu = None;
let mut p95_cu = None;
// get stats by CU
let cu_sum: u64 = prio_fees_in_block.iter().map(|x| x.1).sum();
let mut agg: u64 = 0;
for (prio, cu) in prio_fees_in_block {
agg = agg + *cu;
if med_cu.is_none() && agg > (cu_sum as f64 * 0.5) as u64 {
med_cu = Some(*prio);
} else if p75_cu.is_none() && agg > (cu_sum as f64 * 0.75) as u64 {
p75_cu = Some(*prio)
} else if p90_cu.is_none() && agg > (cu_sum as f64 * 0.9) as u64 {
p90_cu = Some(*prio);
} else if p95_cu.is_none() && agg > (cu_sum as f64 * 0.95) as u64 {
p95_cu = Some(*prio)
}
}
Some(PrioritizationFeesInfo { Some(PrioritizationFeesInfo {
p_min: prio_fees_in_block[0], p_min,
p_median: prio_fees_in_block[median_index], p_median,
p_75: prio_fees_in_block[p75_index], p_75,
p_90: prio_fees_in_block[p90_index], p_90,
p_max: prio_fees_in_block.last().cloned().unwrap_or_default(), p_max,
med_cu,
p75_cu,
p90_cu,
p95_cu,
}) })
} else { } else {
None None
@ -335,11 +374,12 @@ impl BlockInfo {
.unwrap_or(0) .unwrap_or(0)
}) })
.sum::<u64>() as i64; .sum::<u64>() as i64;
let mut writelocked_accounts: HashMap<String, AccountData> = HashMap::new(); let mut writelocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut readlocked_accounts: HashMap<String, AccountData> = HashMap::new(); let mut readlocked_accounts: HashMap<Pubkey, AccountData> = HashMap::new();
let mut total_cu_requested: u64 = 0; let mut total_cu_requested: u64 = 0;
let mut prio_fees_in_block = vec![]; let mut prio_fees_in_block = vec![];
let mut block_transactions = vec![]; let mut block_transactions = vec![];
let mut lookup_tables = HashSet::new();
for transaction in &block.transactions { for transaction in &block.transactions {
let Some(tx) = &transaction.transaction else { let Some(tx) = &transaction.transaction else {
continue; continue;
@ -396,8 +436,10 @@ impl BlockInfo {
.account_key .account_key
.try_into() .try_into()
.unwrap_or(Pubkey::default().to_bytes()); .unwrap_or(Pubkey::default().to_bytes());
let account_key = Pubkey::new_from_array(bytes);
lookup_tables.insert(account_key.clone());
MessageAddressTableLookup { MessageAddressTableLookup {
account_key: Pubkey::new_from_array(bytes), account_key,
writable_indexes: table.writable_indexes, writable_indexes: table.writable_indexes,
readonly_indexes: table.readonly_indexes, readonly_indexes: table.readonly_indexes,
} }
@ -405,6 +447,9 @@ impl BlockInfo {
.collect(), .collect(),
}); });
let atl_store = atl_store.clone(); let atl_store = atl_store.clone();
atl_store
.load_all_alts(lookup_tables.iter().cloned().collect_vec())
.await;
let transaction = Self::process_versioned_message( let transaction = Self::process_versioned_message(
atl_store, atl_store,

View File

@ -18,4 +18,7 @@ pub struct Args {
/// enable metrics to prometheus at addr /// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))] #[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))]
pub prometheus_addr: String, pub prometheus_addr: String,
#[arg(short = 'a', long, default_value_t = String::from("alts.txt"))]
pub alts: String,
} }

View File

@ -4,9 +4,14 @@ use solana_rpc_client::nonblocking::rpc_client::{self, RpcClient};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{atomic::AtomicU64, Arc}, str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU64},
Arc,
},
time::Duration, time::Duration,
}; };
use tokio::io::AsyncReadExt;
use crate::prometheus_sync::PrometheusSync; use crate::prometheus_sync::PrometheusSync;
use block_info::BlockInfo; use block_info::BlockInfo;
@ -44,7 +49,6 @@ lazy_static::lazy_static! {
pub async fn start_tracking_banking_stage_errors( pub async fn start_tracking_banking_stage_errors(
grpc_address: String, grpc_address: String,
map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>, map_of_infos: Arc<DashMap<(String, u64), TransactionInfo>>,
slot_by_errors: Arc<DashMap<u64, u64>>,
slot: Arc<AtomicU64>, slot: Arc<AtomicU64>,
_subscribe_to_slots: bool, _subscribe_to_slots: bool,
) { ) {
@ -104,14 +108,6 @@ pub async fn start_tracking_banking_stage_errors(
// } // }
BANKING_STAGE_ERROR_EVENT_COUNT.inc(); BANKING_STAGE_ERROR_EVENT_COUNT.inc();
let sig = transaction.signature.to_string(); let sig = transaction.signature.to_string();
match slot_by_errors.get_mut(&transaction.slot) {
Some(mut value) => {
*value += 1;
}
None => {
slot_by_errors.insert(transaction.slot, 1);
}
}
match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) { match map_of_infos.get_mut(&(sig.clone(), transaction.slot)) {
Some(mut x) => { Some(mut x) => {
let tx_info = x.value_mut(); let tx_info = x.value_mut();
@ -144,7 +140,30 @@ async fn start_tracking_blocks(
grpc_x_token: Option<String>, grpc_x_token: Option<String>,
postgres: postgres::Postgres, postgres: postgres::Postgres,
slot: Arc<AtomicU64>, slot: Arc<AtomicU64>,
alts_list: Vec<Pubkey>,
) { ) {
let block_counter = Arc::new(AtomicU64::new(0));
let restart_block_subscription = Arc::new(AtomicBool::new(false));
let _block_counter_checker = {
let block_counter = block_counter.clone();
let restart_block_subscription = restart_block_subscription.clone();
tokio::spawn(async move {
let mut old_count = block_counter.load(std::sync::atomic::Ordering::Relaxed);
loop {
tokio::time::sleep(Duration::from_secs(20)).await;
let new_count = block_counter.load(std::sync::atomic::Ordering::Relaxed);
if old_count > 0 && old_count == new_count {
log::error!(
"Did not recieve any block for 20 s, restarting block subscription"
);
restart_block_subscription.store(true, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(Duration::from_secs(10)).await;
}
old_count = new_count;
}
})
};
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect( let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
grpc_block_addr, grpc_block_addr,
grpc_x_token, grpc_x_token,
@ -152,6 +171,7 @@ async fn start_tracking_blocks(
) )
.unwrap(); .unwrap();
let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client)); let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client));
atl_store.load_all_alts(alts_list).await;
loop { loop {
let mut blocks_subs = HashMap::new(); let mut blocks_subs = HashMap::new();
@ -200,6 +220,10 @@ async fn start_tracking_blocks(
while let Ok(Some(message)) = while let Ok(Some(message)) =
tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await
{ {
if restart_block_subscription.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let Ok(message) = message else { let Ok(message) = message else {
continue; continue;
}; };
@ -207,7 +231,7 @@ async fn start_tracking_blocks(
let Some(update) = message.update_oneof else { let Some(update) = message.update_oneof else {
continue; continue;
}; };
block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
match update { match update {
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block( yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
block, block,
@ -239,19 +263,20 @@ async fn start_tracking_blocks(
if let Some(account) = account_update.account { if let Some(account) = account_update.account {
let bytes: [u8; 32] = account.pubkey.try_into().unwrap_or(Pubkey::default().to_bytes()); let bytes: [u8; 32] = account.pubkey.try_into().unwrap_or(Pubkey::default().to_bytes());
let pubkey = Pubkey::new_from_array(bytes); let pubkey = Pubkey::new_from_array(bytes);
atl_store.map.insert( pubkey, account.data); atl_store.save_account(&pubkey, &account.data);
} }
}, },
_ => {} _ => {}
}; };
} }
restart_block_subscription.store(false, std::sync::atomic::Ordering::Relaxed);
log::error!("geyser block stream is broken, retrying"); log::error!("geyser block stream is broken, retrying");
tokio::time::sleep(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} }
} }
#[tokio::main()] #[tokio::main()]
async fn main() { async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let args = Args::parse(); let args = Args::parse();
@ -261,11 +286,23 @@ async fn main() {
let grpc_block_addr = args.grpc_address_to_fetch_blocks; let grpc_block_addr = args.grpc_address_to_fetch_blocks;
let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new()); let map_of_infos = Arc::new(DashMap::<(String, u64), TransactionInfo>::new());
let slot_by_errors = Arc::new(DashMap::<u64, u64>::new());
let postgres = postgres::Postgres::new().await; let postgres = postgres::Postgres::new().await;
let slot = Arc::new(AtomicU64::new(0)); let slot = Arc::new(AtomicU64::new(0));
let no_block_subscription = grpc_block_addr.is_none(); let no_block_subscription = grpc_block_addr.is_none();
let alts = args.alts;
//load alts from the file
let mut alts_string = String::new();
let mut alts_file = tokio::fs::File::open(alts).await?;
alts_file.read_to_string(&mut alts_string).await?;
let alts_list = alts_string
.split("\r\n")
.map(|x| x.trim().to_string())
.filter(|x| x.len() > 0)
.map(|x| Pubkey::from_str(&x).unwrap())
.collect_vec();
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let jhs = args let jhs = args
.banking_grpc_addresses .banking_grpc_addresses
@ -273,13 +310,11 @@ async fn main() {
.map(|address| { .map(|address| {
let address = address.clone(); let address = address.clone();
let map_of_infos = map_of_infos.clone(); let map_of_infos = map_of_infos.clone();
let slot_by_errors = slot_by_errors.clone();
let slot = slot.clone(); let slot = slot.clone();
tokio::spawn(async move { tokio::spawn(async move {
start_tracking_banking_stage_errors( start_tracking_banking_stage_errors(
address, address,
map_of_infos, map_of_infos,
slot_by_errors,
slot, slot,
no_block_subscription, no_block_subscription,
) )
@ -294,8 +329,10 @@ async fn main() {
args.grpc_x_token, args.grpc_x_token,
postgres, postgres,
slot, slot,
alts_list,
) )
.await; .await;
} }
futures::future::join_all(jhs).await; futures::future::join_all(jhs).await;
Ok(())
} }

View File

@ -134,7 +134,7 @@ impl PostgresSession {
format!( format!(
r#" r#"
CREATE TEMP TABLE {}( CREATE TEMP TABLE {}(
signature char(88) signature text
); );
"#, "#,
temp_table temp_table
@ -184,7 +184,7 @@ impl PostgresSession {
.execute( .execute(
format!( format!(
"CREATE TEMP TABLE {}( "CREATE TEMP TABLE {}(
key char(44) key TEXT
);", );",
temp_table temp_table
) )
@ -231,7 +231,7 @@ impl PostgresSession {
.execute( .execute(
format!( format!(
"CREATE TEMP TABLE {}( "CREATE TEMP TABLE {}(
sig char(88), sig TEXT,
slot BIGINT, slot BIGINT,
error_code INT, error_code INT,
count INT, count INT,
@ -307,8 +307,8 @@ impl PostgresSession {
.execute( .execute(
format!( format!(
"CREATE TEMP TABLE {}( "CREATE TEMP TABLE {}(
account_key char(44), account_key text,
signature char(88), signature text,
is_writable BOOL, is_writable BOOL,
is_signer BOOL, is_signer BOOL,
is_atl BOOL is_atl BOOL
@ -381,7 +381,7 @@ impl PostgresSession {
.execute( .execute(
format!( format!(
"CREATE TEMP TABLE {}( "CREATE TEMP TABLE {}(
signature char(88), signature text,
processed_slot BIGINT, processed_slot BIGINT,
is_successful BOOL, is_successful BOOL,
cu_requested BIGINT, cu_requested BIGINT,
@ -470,7 +470,7 @@ impl PostgresSession {
.execute( .execute(
format!( format!(
"CREATE TEMP TABLE {}( "CREATE TEMP TABLE {}(
account_key CHAR(44), account_key TEXT,
slot BIGINT, slot BIGINT,
is_write_locked BOOL, is_write_locked BOOL,
total_cu_requested BIGINT, total_cu_requested BIGINT,
@ -676,7 +676,7 @@ impl PostgresSession {
.accounts .accounts
.iter() .iter()
.map(|acc| AccountUsed { .map(|acc| AccountUsed {
key: acc.key.clone(), key: acc.key.to_string(),
writable: acc.is_writable, writable: acc.is_writable,
is_signer: acc.is_signer, is_signer: acc.is_signer,
is_atl: acc.is_alt, is_atl: acc.is_alt,
@ -693,6 +693,7 @@ impl PostgresSession {
// save account usage in blocks // save account usage in blocks
self.save_account_usage_in_block(&block_info).await?; self.save_account_usage_in_block(&block_info).await?;
self.save_block_info(&block_info).await?; self.save_block_info(&block_info).await?;
info!("block saved");
Ok(()) Ok(())
} }
} }
@ -816,35 +817,50 @@ impl PostgresSession {
cutoff_transaction_incl cutoff_transaction_incl
); );
// delete accounts_map_transaction
{ {
let tx_to_delete = self.client.query_one( let txs_to_delete = self.client.query_one(
&format!( &format!(
r" r"
SELECT count(*) as cnt_tx FROM banking_stage_results_2.accounts_map_transaction amt SELECT count(*) as cnt_tx FROM banking_stage_results_2.transactions txs
WHERE txs.transaction_id <= {cutoff_transaction}
",
cutoff_transaction = cutoff_transaction_incl
),
&[]).await.unwrap();
let txs_to_delete: i64 = txs_to_delete.get("cnt_tx");
info!("would delete transactions: {}", txs_to_delete);
}
{
let amt_to_delete = self.client.query_one(
&format!(
r"
SELECT count(*) as cnt_amt FROM banking_stage_results_2.accounts_map_transaction amt
WHERE amt.transaction_id <= {cutoff_transaction} WHERE amt.transaction_id <= {cutoff_transaction}
", ",
cutoff_transaction = cutoff_transaction_incl cutoff_transaction = cutoff_transaction_incl
), ),
&[]).await.unwrap(); &[]).await.unwrap();
let tx_to_delete: i64 = tx_to_delete.get("cnt_tx"); let amt_to_delete: i64 = amt_to_delete.get("cnt_amt");
info!("would delete transactions: {}", tx_to_delete); info!("would delete accounts_map_transaction: {}", amt_to_delete);
} }
{ {
let amb_to_delete = self.client.query_one( let amb_to_delete = self.client.query_one(
&format!( &format!(
r" r"
SELECT count(*) as cnt_ambs FROM banking_stage_results_2.accounts_map_blocks amb SELECT count(*) as cnt_amb FROM banking_stage_results_2.accounts_map_blocks amb
WHERE amb.slot < {cutoff_slot} WHERE amb.slot < {cutoff_slot}
", ",
cutoff_slot = cutoff_slot_excl cutoff_slot = cutoff_slot_excl
), ),
&[]).await.unwrap(); &[]).await.unwrap();
let amb_to_delete: i64 = amb_to_delete.get("cnt_ambs"); let amb_to_delete: i64 = amb_to_delete.get("cnt_amb");
info!("would delete from accounts_map_blocks: {}", amb_to_delete); info!("would delete from accounts_map_blocks: {}", amb_to_delete);
} }
@ -895,7 +911,7 @@ impl PostgresSession {
", transaction_id = cutoff_transaction_incl ", transaction_id = cutoff_transaction_incl
), &[]).await.unwrap(); ), &[]).await.unwrap();
info!( info!(
"Deleted {} rows from transactions in {:.2}ms", "Deleted {} rows from transactions in {:.3}s",
deleted_rows, deleted_rows,
started.elapsed().as_secs_f32() started.elapsed().as_secs_f32()
); );
@ -909,7 +925,21 @@ impl PostgresSession {
", transaction_id = cutoff_transaction_incl ", transaction_id = cutoff_transaction_incl
), &[]).await.unwrap(); ), &[]).await.unwrap();
info!( info!(
"Deleted {} rows from accounts_map_transaction in {:.2}ms", "Deleted {} rows from accounts_map_transaction in {:.3}s",
deleted_rows,
started.elapsed().as_secs_f32()
);
}
{
let started = Instant::now();
let deleted_rows = self.client.execute(
&format!(
r"
DELETE FROM banking_stage_results_2.accounts_map_blocks WHERE slot <= {cutoff_slot}
", cutoff_slot = cutoff_slot_excl
), &[]).await.unwrap();
info!(
"Deleted {} rows from accounts_map_blocks in {:.3}s",
deleted_rows, deleted_rows,
started.elapsed().as_secs_f32() started.elapsed().as_secs_f32()
); );
@ -923,7 +953,7 @@ impl PostgresSession {
", cutoff_slot = cutoff_slot_excl ", cutoff_slot = cutoff_slot_excl
), &[]).await.unwrap(); ), &[]).await.unwrap();
info!( info!(
"Deleted {} rows from transaction_infos in {:.2}ms", "Deleted {} rows from transaction_infos in {:.3}s",
deleted_rows, deleted_rows,
started.elapsed().as_secs_f32() started.elapsed().as_secs_f32()
); );
@ -944,7 +974,7 @@ impl PostgresSession {
.await .await
.unwrap(); .unwrap();
info!( info!(
"Deleted {} rows from transaction_slot in {:.2}ms", "Deleted {} rows from transaction_slot in {:.3}s",
deleted_rows, deleted_rows,
started.elapsed().as_secs_f32() started.elapsed().as_secs_f32()
); );
@ -961,6 +991,8 @@ impl PostgresSession {
self.log_rowcount(Level::Info, "transaction_infos").await; self.log_rowcount(Level::Info, "transaction_infos").await;
self.log_rowcount(Level::Info, "transaction_slot").await; self.log_rowcount(Level::Info, "transaction_slot").await;
} }
info!("Cleanup job completed.");
} }
async fn log_rowcount(&self, level: Level, table: &str) { async fn log_rowcount(&self, level: Level, table: &str) {