diff --git a/Cargo.lock b/Cargo.lock index 1a4ab89..b7e3ff6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3258,6 +3258,7 @@ dependencies = [ "multiqueue", "rand 0.8.5", "rayon", + "regex", "serde", "serde_derive", "serde_json", @@ -4712,9 +4713,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.1" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" +checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d" dependencies = [ "aho-corasick", "memchr", @@ -4729,9 +4730,9 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" [[package]] name = "regex-syntax" -version = "0.6.28" +version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "rend" diff --git a/Cargo.toml b/Cargo.toml index 61edf3e..a016cf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ serde_json = "1.0.79" serde_yaml = "0.8.23" thiserror = "1.0" tokio = { version = "1", features = ["full"] } +regex = "1.7.3" solana-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" } solana-core = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" } @@ -50,6 +51,7 @@ solana-transaction-status = { git = "https://github.com/solana-labs/solana.git", solana-quic-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" } solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" } + # pin program to mango-v3 version of solana sdk # now we can use sdk for recent version and program for legacy # we have a bunch of helpers to convert between the two explicitly diff --git a/src/cli.rs b/src/cli.rs index 8b4a6fd..e91e49c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -21,6 +21,7 @@ pub struct Config { pub mango_cluster: String, pub txs_batch_size: Option, pub priority_fees_proba: u8, + pub keeper_prioritization: u64, pub keeper_authority: Option, pub number_of_markers_per_mm: u8, } @@ -43,6 +44,7 @@ impl Default for Config { priority_fees_proba: 0, keeper_authority: None, number_of_markers_per_mm: 5, + keeper_prioritization: 1000, } } } @@ -204,6 +206,15 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .required(false) .help("Number of markets a market maker will trade on at a time"), ) + .arg( + Arg::with_name("keeper-prioritization-fees") + .long("keeper-prioritization-fees") + .value_name("UINT") + .min_values(0) + .takes_value(true) + .required(false) + .help("Prioritization fees set for all keeper instructions (1000 by default)") + ) } /// Parses a clap `ArgMatches` structure into a `Config` @@ -299,5 +310,10 @@ pub fn extract_args(matches: &ArgMatches) -> Config { .expect("can't parse number of markets per market maker"), None => 5, }; + + args.keeper_prioritization = match matches.value_of("keeper-prioritization-fees") { + Some(x) => x.parse().expect("can't parse keeper prioritization fees"), + None => 1000, + }; args } diff --git a/src/crank.rs b/src/crank.rs index 8e526a9..c80b1a2 100644 --- a/src/crank.rs +++ b/src/crank.rs @@ -75,6 +75,7 @@ pub fn start( "crank-tx-sender signing with keypair pk={:?}", identity.pubkey() ); + loop { if exit_signal.load(Ordering::Acquire) { break; diff --git a/src/main.rs b/src/main.rs index 6b7488d..046397b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,9 +54,11 @@ pub async fn main() -> anyhow::Result<()> { priority_fees_proba, keeper_authority, number_of_markers_per_mm, + keeper_prioritization, .. } = &cli_config; let number_of_markers_per_mm = *number_of_markers_per_mm; + let keeper_prioritization = *keeper_prioritization; let transaction_save_file = transaction_save_file.clone(); let block_data_save_file = block_data_save_file.clone(); @@ -110,7 +112,7 @@ pub async fn main() -> anyhow::Result<()> { info!( "accounts:{:?} markets:{:?} quotes_per_second:{:?} expected_tps:{:?} duration:{:?}", account_keys_parsed.len(), - mango_group_config.perp_markets.len(), + number_of_markers_per_mm, quotes_per_second, account_keys_parsed.len() * number_of_markers_per_mm as usize @@ -167,7 +169,7 @@ pub async fn main() -> anyhow::Result<()> { keeper_authority, quote_root_bank, quote_node_banks, - 0, + keeper_prioritization, ); Some(jl) } else { @@ -188,7 +190,7 @@ pub async fn main() -> anyhow::Result<()> { tpu_manager.clone(), mango_group_config, id, - 0, + keeper_prioritization, ); let warmup_duration = Duration::from_secs(20); @@ -213,20 +215,20 @@ pub async fn main() -> anyhow::Result<()> { let mut tasks = vec![]; tasks.push(blockhash_thread); - let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1024); - let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1024); + let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1000000); + let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1000000); + + let stats_handle = mango_sim_stats.update_from_tx_status_stream(tx_status_rx); + tasks.push(stats_handle); let mut writers_jh = initialize_result_writers( transaction_save_file, block_data_save_file, - tx_status_rx, + tx_status_sx.subscribe(), block_status_rx, ); tasks.append(&mut writers_jh); - let stats_handle = mango_sim_stats.update_from_tx_status_stream(tx_status_sx.subscribe()); - tasks.push(stats_handle); - let mut confirmation_threads = confirmations_by_blocks( nb_rpc_client, tx_record_rx, @@ -250,7 +252,7 @@ pub async fn main() -> anyhow::Result<()> { break; } tokio::time::sleep(Duration::from_secs(60)).await; - mango_sim_stats.report(false, METRICS_NAME); + mango_sim_stats.report(false, METRICS_NAME).await; } }); tasks.push(reporting_thread); @@ -266,6 +268,6 @@ pub async fn main() -> anyhow::Result<()> { exit_signal.store(true, Ordering::Relaxed); futures::future::join_all(tasks).await; - mango_sim_stats.report(true, METRICS_NAME); + mango_sim_stats.report(true, METRICS_NAME).await; Ok(()) } diff --git a/src/stats.rs b/src/stats.rs index 0f36848..47b15ef 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -4,15 +4,16 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, }, - time::Instant, + time::Instant, collections::HashMap, }; use crate::states::{KeeperInstruction, TransactionConfirmRecord}; +use iter_tools::Itertools; use solana_metrics::datapoint_info; -use tokio::task::JoinHandle; +use tokio::{task::JoinHandle, sync::RwLock}; // Non atomic version of counters -#[derive(Clone, Default, Debug, Copy)] +#[derive(Clone, Default, Debug)] struct NACounters { num_confirmed_txs: u64, num_error_txs: u64, @@ -39,10 +40,21 @@ struct NACounters { succ_cache_root_banks_txs: u64, succ_update_perp_cache_txs: u64, succ_update_funding_txs: u64, + + // errors section + errors: HashMap, } impl NACounters { pub fn diff(&self, other: &NACounters) -> NACounters { + let mut new_error_count = HashMap::new(); + for (error, count) in &self.errors { + if let Some(v) = other.errors.get( error ) { + new_error_count.insert(error.clone(), *count - *v); + } else { + new_error_count.insert(error.clone(), *count); + } + } NACounters { num_confirmed_txs: self.num_confirmed_txs - other.num_confirmed_txs, num_error_txs: self.num_error_txs - other.num_error_txs, @@ -73,6 +85,7 @@ impl NACounters { succ_update_perp_cache_txs: self.succ_update_perp_cache_txs - other.succ_update_perp_cache_txs, succ_update_funding_txs: self.succ_update_funding_txs - other.succ_update_funding_txs, + errors: new_error_count, } } } @@ -104,10 +117,13 @@ struct Counters { succ_cache_root_banks_txs: Arc, succ_update_perp_cache_txs: Arc, succ_update_funding_txs: Arc, + + // Errors + errors: Arc>>, } impl Counters { - pub fn to_na_counters(&self) -> NACounters { + pub async fn to_na_counters(&self) -> NACounters { NACounters { num_confirmed_txs: self.num_confirmed_txs.load(Ordering::Relaxed), num_error_txs: self.num_error_txs.load(Ordering::Relaxed), @@ -138,6 +154,7 @@ impl Counters { succ_cache_root_banks_txs: self.succ_cache_root_banks_txs.load(Ordering::Relaxed), succ_update_perp_cache_txs: self.succ_update_perp_cache_txs.load(Ordering::Relaxed), succ_update_funding_txs: self.succ_update_funding_txs.load(Ordering::Relaxed), + errors: self.errors.read().await.clone(), } } } @@ -170,14 +187,22 @@ impl MangoSimulationStats { tx_confirm_record_reciever: tokio::sync::broadcast::Receiver, ) -> JoinHandle<()> { let counters = self.counters.clone(); + let regex = regex::Regex::new(r"Error processing Instruction \d+: ").unwrap(); tokio::spawn(async move { let mut tx_confirm_record_reciever = tx_confirm_record_reciever; loop { if let Ok(tx_data) = tx_confirm_record_reciever.recv().await { if let Some(_) = tx_data.confirmed_at { counters.num_confirmed_txs.fetch_add(1, Ordering::Relaxed); - if let Some(_) = tx_data.error { + if let Some(error) = tx_data.error { + let error = regex.replace_all(&error, "").to_string(); counters.num_error_txs.fetch_add(1, Ordering::Relaxed); + let mut lock = counters.errors.write().await; + if let Some(value) = lock.get_mut(&error) { + *value += 1; + } else { + lock.insert(error, 1); + } } else { counters.num_successful.fetch_add(1, Ordering::Relaxed); @@ -262,13 +287,13 @@ impl MangoSimulationStats { } } - pub fn report(&mut self, is_final: bool, name: &'static str) { + pub async fn report(&mut self, is_final: bool, name: &'static str) { let time_diff = std::time::Instant::now() - self.instant; - let counters = self.counters.to_na_counters(); + let counters = self.counters.to_na_counters().await; let diff = { let mut prev_counter_lock = self.previous_counters.lock().unwrap(); let diff = counters.diff(&prev_counter_lock); - *prev_counter_lock = counters; + *prev_counter_lock = counters.clone(); diff }; @@ -341,24 +366,30 @@ impl MangoSimulationStats { ); println!( - "Transactions confirmed {}%", + "Transactions confirmed : {}%", (counters.num_confirmed_txs * 100) .checked_div(counters.num_sent) .unwrap_or(0) ); println!( - "Transactions successful {}%", + "Transactions successful : {}%", (counters.num_successful * 100) .checked_div(counters.num_sent) .unwrap_or(0) ); println!( - "Transactions timed out {}%", + "Transactions timed out : {}%", (counters.num_timeout_txs * 100) .checked_div(counters.num_sent) .unwrap_or(0) ); - println!("\n\n"); + let top_5_errors = counters.errors.iter().sorted_by(|x,y| {(*y.1).cmp(x.1)}).take(5).enumerate().collect_vec(); + let mut errors_to_print: String = String::new(); + for (idx, (error, count)) in top_5_errors { + println!("Error #{idx} : {error} ({count})"); + errors_to_print += format!("{error}({count}),").as_str(); + } + println!("\n"); if !is_final { datapoint_info!( @@ -458,6 +489,11 @@ impl MangoSimulationStats { diff.succ_update_funding_txs, i64 ), + ( + "top_5_errors", + errors_to_print, + String + ) ); } }