diff --git a/Cargo.lock b/Cargo.lock index b12fdd2..53a6cc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,6 +327,19 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-channel" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" +dependencies = [ + "concurrent-queue", + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.4" @@ -805,6 +818,15 @@ dependencies = [ "unreachable", ] +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.7" @@ -1186,6 +1208,27 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "event-listener" +version = "4.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "218a870470cce1469024e9fb66b901aa983929d81304a1cdb299f28118e550d5" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -1414,6 +1457,7 @@ name = "grpc_banking_transactions_notifications" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "base64 0.21.5", "bincode", "bs58", @@ -2257,6 +2301,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" diff --git a/Cargo.toml b/Cargo.toml index aa50c90..2cb468e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ prometheus = "0.13.3" lazy_static = "1.4.0" tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] } rustls = { version = "=0.20.8", default-features = false } +async-channel = "2.1.1" tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] } yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"} diff --git a/src/alt_store.rs b/src/alt_store.rs index 936bf89..bb9b7e3 100644 --- a/src/alt_store.rs +++ b/src/alt_store.rs @@ -4,7 +4,8 @@ use prometheus::{opts, register_int_gauge, IntGauge}; use solana_address_lookup_table_program::state::AddressLookupTable; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey}; -use std::{str::FromStr, sync::Arc, time::Duration}; +use std::sync::Arc; +use tokio::sync::RwLock; use crate::block_info::TransactionAccount; lazy_static::lazy_static! { @@ -16,6 +17,7 @@ lazy_static::lazy_static! { pub struct ALTStore { rpc_client: Arc, pub map: Arc>>, + is_loading: Arc>>>, } impl ALTStore { @@ -23,18 +25,30 @@ impl ALTStore { Self { rpc_client, map: Arc::new(DashMap::new()), + is_loading: Arc::new(DashMap::new()), } } - pub async fn load_all_alts(&self, alts_list: Vec) { + pub async fn load_all_alts(&self, alts_list: Vec) { let alts_list = alts_list .iter() - .map(|x| x.trim()) - .filter(|x| x.len() > 0) - .map(|x| Pubkey::from_str(&x).unwrap()) + .filter(|x| self.map.contains_key(&x) || self.is_loading.contains_key(&x)) + .cloned() .collect_vec(); + + if alts_list.is_empty() { + return; + } + log::info!("Preloading {} ALTs", alts_list.len()); + for batches in alts_list.chunks(1000).map(|x| x.to_vec()) { + let lock = Arc::new(RwLock::new(())); + // add in loading list + batches.iter().for_each(|x| { + self.is_loading.insert(x.clone(), lock.clone()); + }); + let _context = lock.write().await; let tasks = batches.chunks(10).map(|batch| { let batch = batch.to_vec(); let rpc_client = self.rpc_client.clone(); @@ -72,13 +86,10 @@ impl ALTStore { drop(lookup_table); } - pub async fn load_alt_from_rpc(&self, alt: &Pubkey) { - if !self.map.contains_key(&alt) { - self.reload_alt_from_rpc(&alt).await; - } - } - pub async fn reload_alt_from_rpc(&self, alt: &Pubkey) { + let lock = Arc::new(RwLock::new(())); + let _ = lock.write().await; + self.is_loading.insert(alt.clone(), lock.clone()); let response = self .rpc_client .get_account_with_commitment(alt, CommitmentConfig::processed()) @@ -146,7 +157,17 @@ impl ALTStore { write_accounts: &Vec, read_account: &Vec, ) -> Vec { - self.load_alt_from_rpc(&alt).await; + match self.is_loading.get(&alt) { + Some(x) => { + // if there is a lock we wait until it is fullfilled + let x = x.value().clone(); + log::debug!("waiting for alt {}", alt.to_string()); + let _ = x.read().await; + } + None => { + // not loading + } + } match self.load_accounts(alt, write_accounts, read_account).await { Some(x) => x, None => { @@ -155,7 +176,6 @@ impl ALTStore { match self.load_accounts(alt, write_accounts, read_account).await { Some(x) => x, None => { - tokio::time::sleep(Duration::from_millis(500)).await; // reloading did not work log::error!("cannot load alt even after"); vec![] diff --git a/src/block_info.rs b/src/block_info.rs index aa0148e..5a91805 100644 --- a/src/block_info.rs +++ b/src/block_info.rs @@ -13,7 +13,10 @@ use solana_sdk::{ signature::Signature, slot_history::Slot, }; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; #[derive(Serialize, Debug, Clone)] pub struct PrioFeeData { @@ -76,6 +79,11 @@ pub struct PrioritizationFeesInfo { pub p_75: u64, pub p_90: u64, pub p_max: u64, + + pub med_cu: Option, + pub p75_cu: Option, + pub p90_cu: Option, + pub p95_cu: Option, } #[derive(Clone)] @@ -116,7 +124,7 @@ impl BlockInfo { signature: String, slot: Slot, message: &VersionedMessage, - prio_fees_in_block: &mut Vec, + prio_fees_in_block: &mut Vec<(u64, u64)>, writelocked_accounts: &mut HashMap, readlocked_accounts: &mut HashMap, cu_consumed: u64, @@ -168,7 +176,7 @@ impl BlockInfo { (cu_request, prio_fees, nb_ix_except_cb) }; let prioritization_fees = prio_fees.unwrap_or_default(); - prio_fees_in_block.push(prioritization_fees); + prio_fees_in_block.push((prioritization_fees, cu_consumed)); let cu_requested = std::cmp::min(1_400_000, cu_requested.unwrap_or(200000 * nb_ix_except_cb)); *total_cu_requested += cu_requested; @@ -281,19 +289,51 @@ impl BlockInfo { } pub fn calculate_supp_info( - prio_fees_in_block: &mut Vec, + prio_fees_in_block: &mut Vec<(u64, u64)>, ) -> Option { if !prio_fees_in_block.is_empty() { - prio_fees_in_block.sort(); + // get stats by transaction + prio_fees_in_block.sort_by(|a, b| a.0.cmp(&b.0)); let median_index = prio_fees_in_block.len() / 2; let p75_index = prio_fees_in_block.len() * 75 / 100; let p90_index = prio_fees_in_block.len() * 90 / 100; + let p_min = prio_fees_in_block[0].0; + let p_median = prio_fees_in_block[median_index].0; + let p_75 = prio_fees_in_block[p75_index].0; + let p_90 = prio_fees_in_block[p90_index].0; + let p_max = prio_fees_in_block.last().map(|x| x.0).unwrap_or_default(); + + let mut med_cu = None; + let mut p75_cu = None; + let mut p90_cu = None; + let mut p95_cu = None; + + // get stats by CU + let cu_sum: u64 = prio_fees_in_block.iter().map(|x| x.1).sum(); + let mut agg: u64 = 0; + for (prio, cu) in prio_fees_in_block { + agg = agg + *cu; + if med_cu.is_none() && agg > (cu_sum as f64 * 0.5) as u64 { + med_cu = Some(*prio); + } else if p75_cu.is_none() && agg > (cu_sum as f64 * 0.75) as u64 { + p75_cu = Some(*prio) + } else if p90_cu.is_none() && agg > (cu_sum as f64 * 0.9) as u64 { + p90_cu = Some(*prio); + } else if p95_cu.is_none() && agg > (cu_sum as f64 * 0.95) as u64 { + p95_cu = Some(*prio) + } + } + Some(PrioritizationFeesInfo { - p_min: prio_fees_in_block[0], - p_median: prio_fees_in_block[median_index], - p_75: prio_fees_in_block[p75_index], - p_90: prio_fees_in_block[p90_index], - p_max: prio_fees_in_block.last().cloned().unwrap_or_default(), + p_min, + p_median, + p_75, + p_90, + p_max, + med_cu, + p75_cu, + p90_cu, + p95_cu, }) } else { None @@ -339,6 +379,7 @@ impl BlockInfo { let mut total_cu_requested: u64 = 0; let mut prio_fees_in_block = vec![]; let mut block_transactions = vec![]; + let mut lookup_tables = HashSet::new(); for transaction in &block.transactions { let Some(tx) = &transaction.transaction else { continue; @@ -395,8 +436,10 @@ impl BlockInfo { .account_key .try_into() .unwrap_or(Pubkey::default().to_bytes()); + let account_key = Pubkey::new_from_array(bytes); + lookup_tables.insert(account_key.clone()); MessageAddressTableLookup { - account_key: Pubkey::new_from_array(bytes), + account_key, writable_indexes: table.writable_indexes, readonly_indexes: table.readonly_indexes, } @@ -404,6 +447,9 @@ impl BlockInfo { .collect(), }); let atl_store = atl_store.clone(); + atl_store + .load_all_alts(lookup_tables.iter().cloned().collect_vec()) + .await; let transaction = Self::process_versioned_message( atl_store, diff --git a/src/main.rs b/src/main.rs index 7f7d3d8..7e1df03 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,11 @@ use solana_rpc_client::nonblocking::rpc_client::{self, RpcClient}; use solana_sdk::pubkey::Pubkey; use std::{ collections::HashMap, - sync::{atomic::AtomicU64, Arc}, + str::FromStr, + sync::{ + atomic::{AtomicBool, AtomicU64}, + Arc, + }, time::Duration, }; use tokio::io::AsyncReadExt; @@ -136,8 +140,30 @@ async fn start_tracking_blocks( grpc_x_token: Option, postgres: postgres::Postgres, slot: Arc, - alts_list: Vec, + alts_list: Vec, ) { + let block_counter = Arc::new(AtomicU64::new(0)); + let restart_block_subscription = Arc::new(AtomicBool::new(false)); + let _block_counter_checker = { + let block_counter = block_counter.clone(); + let restart_block_subscription = restart_block_subscription.clone(); + tokio::spawn(async move { + let mut old_count = block_counter.load(std::sync::atomic::Ordering::Relaxed); + loop { + tokio::time::sleep(Duration::from_secs(20)).await; + let new_count = block_counter.load(std::sync::atomic::Ordering::Relaxed); + if old_count > 0 && old_count == new_count { + log::error!( + "Did not recieve any block for 20 s, restarting block subscription" + ); + restart_block_subscription.store(true, std::sync::atomic::Ordering::Relaxed); + tokio::time::sleep(Duration::from_secs(10)).await; + } + old_count = new_count; + } + }) + }; + let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect( grpc_block_addr, grpc_x_token, @@ -146,6 +172,7 @@ async fn start_tracking_blocks( .unwrap(); let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client)); atl_store.load_all_alts(alts_list).await; + loop { let mut blocks_subs = HashMap::new(); blocks_subs.insert( @@ -193,6 +220,10 @@ async fn start_tracking_blocks( while let Ok(Some(message)) = tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await { + if restart_block_subscription.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + let Ok(message) = message else { continue; }; @@ -200,7 +231,7 @@ async fn start_tracking_blocks( let Some(update) = message.update_oneof else { continue; }; - + block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); match update { yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block( block, @@ -238,6 +269,7 @@ async fn start_tracking_blocks( _ => {} }; } + restart_block_subscription.store(false, std::sync::atomic::Ordering::Relaxed); log::error!("geyser block stream is broken, retrying"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } @@ -266,7 +298,9 @@ async fn main() -> anyhow::Result<()> { alts_file.read_to_string(&mut alts_string).await?; let alts_list = alts_string .split("\r\n") - .map(|x| x.to_string()) + .map(|x| x.trim().to_string()) + .filter(|x| x.len() > 0) + .map(|x| Pubkey::from_str(&x).unwrap()) .collect_vec(); postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());