adding prioritization fees for keeper instructions (#8)

* adding prioritization fees for keeper instructions

* add top 5 errors encountered in stats

---------

Co-authored-by: Maximilian Schneider <max@mango.markets>
This commit is contained in:
galactus 2023-04-15 12:39:26 +02:00 committed by GitHub
parent 1ffd1e344a
commit be148301bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 27 deletions

9
Cargo.lock generated
View File

@ -3258,6 +3258,7 @@ dependencies = [
"multiqueue",
"rand 0.8.5",
"rayon",
"regex",
"serde",
"serde_derive",
"serde_json",
@ -4712,9 +4713,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.7.1"
version = "1.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733"
checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d"
dependencies = [
"aho-corasick",
"memchr",
@ -4729,9 +4730,9 @@ checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]]
name = "regex-syntax"
version = "0.6.28"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "rend"

View File

@ -34,6 +34,7 @@ serde_json = "1.0.79"
serde_yaml = "0.8.23"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
regex = "1.7.3"
solana-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-core = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
@ -50,6 +51,7 @@ solana-transaction-status = { git = "https://github.com/solana-labs/solana.git",
solana-quic-client = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", tag="v1.15.2" }
# pin program to mango-v3 version of solana sdk
# now we can use sdk for recent version and program for legacy
# we have a bunch of helpers to convert between the two explicitly

View File

@ -21,6 +21,7 @@ pub struct Config {
pub mango_cluster: String,
pub txs_batch_size: Option<usize>,
pub priority_fees_proba: u8,
pub keeper_prioritization: u64,
pub keeper_authority: Option<Keypair>,
pub number_of_markers_per_mm: u8,
}
@ -43,6 +44,7 @@ impl Default for Config {
priority_fees_proba: 0,
keeper_authority: None,
number_of_markers_per_mm: 5,
keeper_prioritization: 1000,
}
}
}
@ -204,6 +206,15 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.required(false)
.help("Number of markets a market maker will trade on at a time"),
)
.arg(
Arg::with_name("keeper-prioritization-fees")
.long("keeper-prioritization-fees")
.value_name("UINT")
.min_values(0)
.takes_value(true)
.required(false)
.help("Prioritization fees set for all keeper instructions (1000 by default)")
)
}
/// Parses a clap `ArgMatches` structure into a `Config`
@ -299,5 +310,10 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
.expect("can't parse number of markets per market maker"),
None => 5,
};
args.keeper_prioritization = match matches.value_of("keeper-prioritization-fees") {
Some(x) => x.parse().expect("can't parse keeper prioritization fees"),
None => 1000,
};
args
}

View File

@ -75,6 +75,7 @@ pub fn start(
"crank-tx-sender signing with keypair pk={:?}",
identity.pubkey()
);
loop {
if exit_signal.load(Ordering::Acquire) {
break;

View File

@ -54,9 +54,11 @@ pub async fn main() -> anyhow::Result<()> {
priority_fees_proba,
keeper_authority,
number_of_markers_per_mm,
keeper_prioritization,
..
} = &cli_config;
let number_of_markers_per_mm = *number_of_markers_per_mm;
let keeper_prioritization = *keeper_prioritization;
let transaction_save_file = transaction_save_file.clone();
let block_data_save_file = block_data_save_file.clone();
@ -110,7 +112,7 @@ pub async fn main() -> anyhow::Result<()> {
info!(
"accounts:{:?} markets:{:?} quotes_per_second:{:?} expected_tps:{:?} duration:{:?}",
account_keys_parsed.len(),
mango_group_config.perp_markets.len(),
number_of_markers_per_mm,
quotes_per_second,
account_keys_parsed.len()
* number_of_markers_per_mm as usize
@ -167,7 +169,7 @@ pub async fn main() -> anyhow::Result<()> {
keeper_authority,
quote_root_bank,
quote_node_banks,
0,
keeper_prioritization,
);
Some(jl)
} else {
@ -188,7 +190,7 @@ pub async fn main() -> anyhow::Result<()> {
tpu_manager.clone(),
mango_group_config,
id,
0,
keeper_prioritization,
);
let warmup_duration = Duration::from_secs(20);
@ -213,20 +215,20 @@ pub async fn main() -> anyhow::Result<()> {
let mut tasks = vec![];
tasks.push(blockhash_thread);
let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1024);
let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1024);
let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1000000);
let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1000000);
let stats_handle = mango_sim_stats.update_from_tx_status_stream(tx_status_rx);
tasks.push(stats_handle);
let mut writers_jh = initialize_result_writers(
transaction_save_file,
block_data_save_file,
tx_status_rx,
tx_status_sx.subscribe(),
block_status_rx,
);
tasks.append(&mut writers_jh);
let stats_handle = mango_sim_stats.update_from_tx_status_stream(tx_status_sx.subscribe());
tasks.push(stats_handle);
let mut confirmation_threads = confirmations_by_blocks(
nb_rpc_client,
tx_record_rx,
@ -250,7 +252,7 @@ pub async fn main() -> anyhow::Result<()> {
break;
}
tokio::time::sleep(Duration::from_secs(60)).await;
mango_sim_stats.report(false, METRICS_NAME);
mango_sim_stats.report(false, METRICS_NAME).await;
}
});
tasks.push(reporting_thread);
@ -266,6 +268,6 @@ pub async fn main() -> anyhow::Result<()> {
exit_signal.store(true, Ordering::Relaxed);
futures::future::join_all(tasks).await;
mango_sim_stats.report(true, METRICS_NAME);
mango_sim_stats.report(true, METRICS_NAME).await;
Ok(())
}

View File

@ -4,15 +4,16 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Instant,
time::Instant, collections::HashMap,
};
use crate::states::{KeeperInstruction, TransactionConfirmRecord};
use iter_tools::Itertools;
use solana_metrics::datapoint_info;
use tokio::task::JoinHandle;
use tokio::{task::JoinHandle, sync::RwLock};
// Non atomic version of counters
#[derive(Clone, Default, Debug, Copy)]
#[derive(Clone, Default, Debug)]
struct NACounters {
num_confirmed_txs: u64,
num_error_txs: u64,
@ -39,10 +40,21 @@ struct NACounters {
succ_cache_root_banks_txs: u64,
succ_update_perp_cache_txs: u64,
succ_update_funding_txs: u64,
// errors section
errors: HashMap<String, u64>,
}
impl NACounters {
pub fn diff(&self, other: &NACounters) -> NACounters {
let mut new_error_count = HashMap::new();
for (error, count) in &self.errors {
if let Some(v) = other.errors.get( error ) {
new_error_count.insert(error.clone(), *count - *v);
} else {
new_error_count.insert(error.clone(), *count);
}
}
NACounters {
num_confirmed_txs: self.num_confirmed_txs - other.num_confirmed_txs,
num_error_txs: self.num_error_txs - other.num_error_txs,
@ -73,6 +85,7 @@ impl NACounters {
succ_update_perp_cache_txs: self.succ_update_perp_cache_txs
- other.succ_update_perp_cache_txs,
succ_update_funding_txs: self.succ_update_funding_txs - other.succ_update_funding_txs,
errors: new_error_count,
}
}
}
@ -104,10 +117,13 @@ struct Counters {
succ_cache_root_banks_txs: Arc<AtomicU64>,
succ_update_perp_cache_txs: Arc<AtomicU64>,
succ_update_funding_txs: Arc<AtomicU64>,
// Errors
errors: Arc<RwLock<HashMap<String, u64>>>,
}
impl Counters {
pub fn to_na_counters(&self) -> NACounters {
pub async fn to_na_counters(&self) -> NACounters {
NACounters {
num_confirmed_txs: self.num_confirmed_txs.load(Ordering::Relaxed),
num_error_txs: self.num_error_txs.load(Ordering::Relaxed),
@ -138,6 +154,7 @@ impl Counters {
succ_cache_root_banks_txs: self.succ_cache_root_banks_txs.load(Ordering::Relaxed),
succ_update_perp_cache_txs: self.succ_update_perp_cache_txs.load(Ordering::Relaxed),
succ_update_funding_txs: self.succ_update_funding_txs.load(Ordering::Relaxed),
errors: self.errors.read().await.clone(),
}
}
}
@ -170,14 +187,22 @@ impl MangoSimulationStats {
tx_confirm_record_reciever: tokio::sync::broadcast::Receiver<TransactionConfirmRecord>,
) -> JoinHandle<()> {
let counters = self.counters.clone();
let regex = regex::Regex::new(r"Error processing Instruction \d+: ").unwrap();
tokio::spawn(async move {
let mut tx_confirm_record_reciever = tx_confirm_record_reciever;
loop {
if let Ok(tx_data) = tx_confirm_record_reciever.recv().await {
if let Some(_) = tx_data.confirmed_at {
counters.num_confirmed_txs.fetch_add(1, Ordering::Relaxed);
if let Some(_) = tx_data.error {
if let Some(error) = tx_data.error {
let error = regex.replace_all(&error, "").to_string();
counters.num_error_txs.fetch_add(1, Ordering::Relaxed);
let mut lock = counters.errors.write().await;
if let Some(value) = lock.get_mut(&error) {
*value += 1;
} else {
lock.insert(error, 1);
}
} else {
counters.num_successful.fetch_add(1, Ordering::Relaxed);
@ -262,13 +287,13 @@ impl MangoSimulationStats {
}
}
pub fn report(&mut self, is_final: bool, name: &'static str) {
pub async fn report(&mut self, is_final: bool, name: &'static str) {
let time_diff = std::time::Instant::now() - self.instant;
let counters = self.counters.to_na_counters();
let counters = self.counters.to_na_counters().await;
let diff = {
let mut prev_counter_lock = self.previous_counters.lock().unwrap();
let diff = counters.diff(&prev_counter_lock);
*prev_counter_lock = counters;
*prev_counter_lock = counters.clone();
diff
};
@ -341,24 +366,30 @@ impl MangoSimulationStats {
);
println!(
"Transactions confirmed {}%",
"Transactions confirmed : {}%",
(counters.num_confirmed_txs * 100)
.checked_div(counters.num_sent)
.unwrap_or(0)
);
println!(
"Transactions successful {}%",
"Transactions successful : {}%",
(counters.num_successful * 100)
.checked_div(counters.num_sent)
.unwrap_or(0)
);
println!(
"Transactions timed out {}%",
"Transactions timed out : {}%",
(counters.num_timeout_txs * 100)
.checked_div(counters.num_sent)
.unwrap_or(0)
);
println!("\n\n");
let top_5_errors = counters.errors.iter().sorted_by(|x,y| {(*y.1).cmp(x.1)}).take(5).enumerate().collect_vec();
let mut errors_to_print: String = String::new();
for (idx, (error, count)) in top_5_errors {
println!("Error #{idx} : {error} ({count})");
errors_to_print += format!("{error}({count}),").as_str();
}
println!("\n");
if !is_final {
datapoint_info!(
@ -458,6 +489,11 @@ impl MangoSimulationStats {
diff.succ_update_funding_txs,
i64
),
(
"top_5_errors",
errors_to_print,
String
)
);
}
}