Loading all ALTS in bulk, saving stats of priority fees by cus (#34)
* Loading all ALTS in bulk, saving stats of priority fees by cus * cargo fmt
This commit is contained in:
parent
3a783316b3
commit
6b0e2d269f
|
@ -327,6 +327,19 @@ version = "1.5.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener",
|
||||
"event-listener-strategy",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.4"
|
||||
|
@ -805,6 +818,15 @@ dependencies = [
|
|||
"unreachable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console"
|
||||
version = "0.15.7"
|
||||
|
@ -1186,6 +1208,27 @@ dependencies = [
|
|||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "4.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "218a870470cce1469024e9fb66b901aa983929d81304a1cdb299f28118e550d5"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"parking",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener-strategy"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fallible-iterator"
|
||||
version = "0.2.0"
|
||||
|
@ -1414,6 +1457,7 @@ name = "grpc_banking_transactions_notifications"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-channel",
|
||||
"base64 0.21.5",
|
||||
"bincode",
|
||||
"bs58",
|
||||
|
@ -2257,6 +2301,12 @@ version = "0.1.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "parking"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.1"
|
||||
|
|
|
@ -33,6 +33,7 @@ prometheus = "0.13.3"
|
|||
lazy_static = "1.4.0"
|
||||
tokio-postgres = { version = "0.7.10", features = ["with-chrono-0_4"] }
|
||||
rustls = { version = "=0.20.8", default-features = false }
|
||||
async-channel = "2.1.1"
|
||||
|
||||
tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros", "time"] }
|
||||
yellowstone-grpc-client = {git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", branch = "tag-v1.16-mango"}
|
||||
|
|
|
@ -4,7 +4,8 @@ use prometheus::{opts, register_int_gauge, IntGauge};
|
|||
use solana_address_lookup_table_program::state::AddressLookupTable;
|
||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig, pubkey::Pubkey};
|
||||
use std::{str::FromStr, sync::Arc, time::Duration};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::block_info::TransactionAccount;
|
||||
lazy_static::lazy_static! {
|
||||
|
@ -16,6 +17,7 @@ lazy_static::lazy_static! {
|
|||
pub struct ALTStore {
|
||||
rpc_client: Arc<RpcClient>,
|
||||
pub map: Arc<DashMap<Pubkey, Vec<Pubkey>>>,
|
||||
is_loading: Arc<DashMap<Pubkey, Arc<tokio::sync::RwLock<()>>>>,
|
||||
}
|
||||
|
||||
impl ALTStore {
|
||||
|
@ -23,18 +25,30 @@ impl ALTStore {
|
|||
Self {
|
||||
rpc_client,
|
||||
map: Arc::new(DashMap::new()),
|
||||
is_loading: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn load_all_alts(&self, alts_list: Vec<String>) {
|
||||
pub async fn load_all_alts(&self, alts_list: Vec<Pubkey>) {
|
||||
let alts_list = alts_list
|
||||
.iter()
|
||||
.map(|x| x.trim())
|
||||
.filter(|x| x.len() > 0)
|
||||
.map(|x| Pubkey::from_str(&x).unwrap())
|
||||
.filter(|x| self.map.contains_key(&x) || self.is_loading.contains_key(&x))
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
|
||||
if alts_list.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
log::info!("Preloading {} ALTs", alts_list.len());
|
||||
|
||||
for batches in alts_list.chunks(1000).map(|x| x.to_vec()) {
|
||||
let lock = Arc::new(RwLock::new(()));
|
||||
// add in loading list
|
||||
batches.iter().for_each(|x| {
|
||||
self.is_loading.insert(x.clone(), lock.clone());
|
||||
});
|
||||
let _context = lock.write().await;
|
||||
let tasks = batches.chunks(10).map(|batch| {
|
||||
let batch = batch.to_vec();
|
||||
let rpc_client = self.rpc_client.clone();
|
||||
|
@ -72,13 +86,10 @@ impl ALTStore {
|
|||
drop(lookup_table);
|
||||
}
|
||||
|
||||
pub async fn load_alt_from_rpc(&self, alt: &Pubkey) {
|
||||
if !self.map.contains_key(&alt) {
|
||||
self.reload_alt_from_rpc(&alt).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reload_alt_from_rpc(&self, alt: &Pubkey) {
|
||||
let lock = Arc::new(RwLock::new(()));
|
||||
let _ = lock.write().await;
|
||||
self.is_loading.insert(alt.clone(), lock.clone());
|
||||
let response = self
|
||||
.rpc_client
|
||||
.get_account_with_commitment(alt, CommitmentConfig::processed())
|
||||
|
@ -146,7 +157,17 @@ impl ALTStore {
|
|||
write_accounts: &Vec<u8>,
|
||||
read_account: &Vec<u8>,
|
||||
) -> Vec<TransactionAccount> {
|
||||
self.load_alt_from_rpc(&alt).await;
|
||||
match self.is_loading.get(&alt) {
|
||||
Some(x) => {
|
||||
// if there is a lock we wait until it is fullfilled
|
||||
let x = x.value().clone();
|
||||
log::debug!("waiting for alt {}", alt.to_string());
|
||||
let _ = x.read().await;
|
||||
}
|
||||
None => {
|
||||
// not loading
|
||||
}
|
||||
}
|
||||
match self.load_accounts(alt, write_accounts, read_account).await {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
|
@ -155,7 +176,6 @@ impl ALTStore {
|
|||
match self.load_accounts(alt, write_accounts, read_account).await {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
// reloading did not work
|
||||
log::error!("cannot load alt even after");
|
||||
vec![]
|
||||
|
|
|
@ -13,7 +13,10 @@ use solana_sdk::{
|
|||
signature::Signature,
|
||||
slot_history::Slot,
|
||||
};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
pub struct PrioFeeData {
|
||||
|
@ -76,6 +79,11 @@ pub struct PrioritizationFeesInfo {
|
|||
pub p_75: u64,
|
||||
pub p_90: u64,
|
||||
pub p_max: u64,
|
||||
|
||||
pub med_cu: Option<u64>,
|
||||
pub p75_cu: Option<u64>,
|
||||
pub p90_cu: Option<u64>,
|
||||
pub p95_cu: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -116,7 +124,7 @@ impl BlockInfo {
|
|||
signature: String,
|
||||
slot: Slot,
|
||||
message: &VersionedMessage,
|
||||
prio_fees_in_block: &mut Vec<u64>,
|
||||
prio_fees_in_block: &mut Vec<(u64, u64)>,
|
||||
writelocked_accounts: &mut HashMap<Pubkey, AccountData>,
|
||||
readlocked_accounts: &mut HashMap<Pubkey, AccountData>,
|
||||
cu_consumed: u64,
|
||||
|
@ -168,7 +176,7 @@ impl BlockInfo {
|
|||
(cu_request, prio_fees, nb_ix_except_cb)
|
||||
};
|
||||
let prioritization_fees = prio_fees.unwrap_or_default();
|
||||
prio_fees_in_block.push(prioritization_fees);
|
||||
prio_fees_in_block.push((prioritization_fees, cu_consumed));
|
||||
let cu_requested =
|
||||
std::cmp::min(1_400_000, cu_requested.unwrap_or(200000 * nb_ix_except_cb));
|
||||
*total_cu_requested += cu_requested;
|
||||
|
@ -281,19 +289,51 @@ impl BlockInfo {
|
|||
}
|
||||
|
||||
pub fn calculate_supp_info(
|
||||
prio_fees_in_block: &mut Vec<u64>,
|
||||
prio_fees_in_block: &mut Vec<(u64, u64)>,
|
||||
) -> Option<PrioritizationFeesInfo> {
|
||||
if !prio_fees_in_block.is_empty() {
|
||||
prio_fees_in_block.sort();
|
||||
// get stats by transaction
|
||||
prio_fees_in_block.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
let median_index = prio_fees_in_block.len() / 2;
|
||||
let p75_index = prio_fees_in_block.len() * 75 / 100;
|
||||
let p90_index = prio_fees_in_block.len() * 90 / 100;
|
||||
let p_min = prio_fees_in_block[0].0;
|
||||
let p_median = prio_fees_in_block[median_index].0;
|
||||
let p_75 = prio_fees_in_block[p75_index].0;
|
||||
let p_90 = prio_fees_in_block[p90_index].0;
|
||||
let p_max = prio_fees_in_block.last().map(|x| x.0).unwrap_or_default();
|
||||
|
||||
let mut med_cu = None;
|
||||
let mut p75_cu = None;
|
||||
let mut p90_cu = None;
|
||||
let mut p95_cu = None;
|
||||
|
||||
// get stats by CU
|
||||
let cu_sum: u64 = prio_fees_in_block.iter().map(|x| x.1).sum();
|
||||
let mut agg: u64 = 0;
|
||||
for (prio, cu) in prio_fees_in_block {
|
||||
agg = agg + *cu;
|
||||
if med_cu.is_none() && agg > (cu_sum as f64 * 0.5) as u64 {
|
||||
med_cu = Some(*prio);
|
||||
} else if p75_cu.is_none() && agg > (cu_sum as f64 * 0.75) as u64 {
|
||||
p75_cu = Some(*prio)
|
||||
} else if p90_cu.is_none() && agg > (cu_sum as f64 * 0.9) as u64 {
|
||||
p90_cu = Some(*prio);
|
||||
} else if p95_cu.is_none() && agg > (cu_sum as f64 * 0.95) as u64 {
|
||||
p95_cu = Some(*prio)
|
||||
}
|
||||
}
|
||||
|
||||
Some(PrioritizationFeesInfo {
|
||||
p_min: prio_fees_in_block[0],
|
||||
p_median: prio_fees_in_block[median_index],
|
||||
p_75: prio_fees_in_block[p75_index],
|
||||
p_90: prio_fees_in_block[p90_index],
|
||||
p_max: prio_fees_in_block.last().cloned().unwrap_or_default(),
|
||||
p_min,
|
||||
p_median,
|
||||
p_75,
|
||||
p_90,
|
||||
p_max,
|
||||
med_cu,
|
||||
p75_cu,
|
||||
p90_cu,
|
||||
p95_cu,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
|
@ -339,6 +379,7 @@ impl BlockInfo {
|
|||
let mut total_cu_requested: u64 = 0;
|
||||
let mut prio_fees_in_block = vec![];
|
||||
let mut block_transactions = vec![];
|
||||
let mut lookup_tables = HashSet::new();
|
||||
for transaction in &block.transactions {
|
||||
let Some(tx) = &transaction.transaction else {
|
||||
continue;
|
||||
|
@ -395,8 +436,10 @@ impl BlockInfo {
|
|||
.account_key
|
||||
.try_into()
|
||||
.unwrap_or(Pubkey::default().to_bytes());
|
||||
let account_key = Pubkey::new_from_array(bytes);
|
||||
lookup_tables.insert(account_key.clone());
|
||||
MessageAddressTableLookup {
|
||||
account_key: Pubkey::new_from_array(bytes),
|
||||
account_key,
|
||||
writable_indexes: table.writable_indexes,
|
||||
readonly_indexes: table.readonly_indexes,
|
||||
}
|
||||
|
@ -404,6 +447,9 @@ impl BlockInfo {
|
|||
.collect(),
|
||||
});
|
||||
let atl_store = atl_store.clone();
|
||||
atl_store
|
||||
.load_all_alts(lookup_tables.iter().cloned().collect_vec())
|
||||
.await;
|
||||
|
||||
let transaction = Self::process_versioned_message(
|
||||
atl_store,
|
||||
|
|
42
src/main.rs
42
src/main.rs
|
@ -4,7 +4,11 @@ use solana_rpc_client::nonblocking::rpc_client::{self, RpcClient};
|
|||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
str::FromStr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
@ -136,8 +140,30 @@ async fn start_tracking_blocks(
|
|||
grpc_x_token: Option<String>,
|
||||
postgres: postgres::Postgres,
|
||||
slot: Arc<AtomicU64>,
|
||||
alts_list: Vec<String>,
|
||||
alts_list: Vec<Pubkey>,
|
||||
) {
|
||||
let block_counter = Arc::new(AtomicU64::new(0));
|
||||
let restart_block_subscription = Arc::new(AtomicBool::new(false));
|
||||
let _block_counter_checker = {
|
||||
let block_counter = block_counter.clone();
|
||||
let restart_block_subscription = restart_block_subscription.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut old_count = block_counter.load(std::sync::atomic::Ordering::Relaxed);
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(20)).await;
|
||||
let new_count = block_counter.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if old_count > 0 && old_count == new_count {
|
||||
log::error!(
|
||||
"Did not recieve any block for 20 s, restarting block subscription"
|
||||
);
|
||||
restart_block_subscription.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
old_count = new_count;
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let mut client = yellowstone_grpc_client_original::GeyserGrpcClient::connect(
|
||||
grpc_block_addr,
|
||||
grpc_x_token,
|
||||
|
@ -146,6 +172,7 @@ async fn start_tracking_blocks(
|
|||
.unwrap();
|
||||
let atl_store = Arc::new(alt_store::ALTStore::new(rpc_client));
|
||||
atl_store.load_all_alts(alts_list).await;
|
||||
|
||||
loop {
|
||||
let mut blocks_subs = HashMap::new();
|
||||
blocks_subs.insert(
|
||||
|
@ -193,6 +220,10 @@ async fn start_tracking_blocks(
|
|||
while let Ok(Some(message)) =
|
||||
tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await
|
||||
{
|
||||
if restart_block_subscription.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
let Ok(message) = message else {
|
||||
continue;
|
||||
};
|
||||
|
@ -200,7 +231,7 @@ async fn start_tracking_blocks(
|
|||
let Some(update) = message.update_oneof else {
|
||||
continue;
|
||||
};
|
||||
|
||||
block_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
match update {
|
||||
yellowstone_grpc_proto_original::prelude::subscribe_update::UpdateOneof::Block(
|
||||
block,
|
||||
|
@ -238,6 +269,7 @@ async fn start_tracking_blocks(
|
|||
_ => {}
|
||||
};
|
||||
}
|
||||
restart_block_subscription.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||
log::error!("geyser block stream is broken, retrying");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
|
@ -266,7 +298,9 @@ async fn main() -> anyhow::Result<()> {
|
|||
alts_file.read_to_string(&mut alts_string).await?;
|
||||
let alts_list = alts_string
|
||||
.split("\r\n")
|
||||
.map(|x| x.to_string())
|
||||
.map(|x| x.trim().to_string())
|
||||
.filter(|x| x.len() > 0)
|
||||
.map(|x| Pubkey::from_str(&x).unwrap())
|
||||
.collect_vec();
|
||||
|
||||
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
|
||||
|
|
Loading…
Reference in New Issue