diff --git a/Cargo.toml b/Cargo.toml index 589fd5b..a39cb23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,8 @@ license = "Apache-2.0" homepage = "https://solana.com/" publish = false +rust-version = "1.66.1" + [dependencies] borsh = "0.9.3" chrono = "0.4.19" diff --git a/deps/mango-v3 b/deps/mango-v3 deleted file mode 160000 index 771cabe..0000000 --- a/deps/mango-v3 +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 771cabea1cb848439d282e43bf5f0e20ef7c9703 diff --git a/deps/solana b/deps/solana deleted file mode 160000 index 030eb5f..0000000 --- a/deps/solana +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 030eb5f2caadebeb5d656c1a0b2d2058ea4bd242 diff --git a/src/cli.rs b/src/cli.rs index af66fb8..8b4a6fd 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,8 +1,6 @@ -use solana_clap_utils::input_validators::is_valid_percentage; - use { clap::{crate_description, crate_name, App, Arg, ArgMatches}, - solana_clap_utils::input_validators::{is_url, is_url_or_moniker}, + solana_clap_utils::input_validators::{is_url, is_url_or_moniker, is_valid_percentage}, solana_cli_config::{ConfigInput, CONFIG_FILE}, solana_sdk::signature::{read_keypair_file, Keypair}, std::{net::SocketAddr, process::exit, time::Duration}, @@ -23,6 +21,8 @@ pub struct Config { pub mango_cluster: String, pub txs_batch_size: Option, pub priority_fees_proba: u8, + pub keeper_authority: Option, + pub number_of_markers_per_mm: u8, } impl Default for Config { @@ -41,6 +41,8 @@ impl Default for Config { mango_cluster: "testnet.0".to_string(), txs_batch_size: None, priority_fees_proba: 0, + keeper_authority: None, + number_of_markers_per_mm: 5, } } } @@ -51,7 +53,7 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .about(crate_description!()) .version(version) .arg({ - let arg = Arg::with_name("config_file") + let arg = Arg::with_name("config-file") .short("C") .long("config") .value_name("FILEPATH") @@ -65,7 +67,7 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { } }) .arg( - Arg::with_name("json_rpc_url") + Arg::with_name("json-rpc-url") .short("u") .long("url") .value_name("URL_OR_MONIKER") @@ -78,7 +80,7 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { ), ) .arg( - Arg::with_name("websocket_url") + Arg::with_name("websocket-url") .long("ws") .value_name("URL") .takes_value(true) @@ -100,9 +102,10 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { Arg::with_name("identity") .short("i") .long("identity") - .value_name("PATH") + .value_name("FILEPATH") .takes_value(true) - .help("File containing a client identity (keypair)"), + .help("Identity used in the QUIC connection. Identity with a lot of stake has a \ + better chance to send transaction to the leader"), ) .arg( Arg::with_name("duration") @@ -113,15 +116,15 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .help("Seconds to run benchmark, then exit; default is forever"), ) .arg( - Arg::with_name("qoutes_per_second") + Arg::with_name("quotes-per-second") .short("q") - .long("qoutes_per_second") + .long("quotes-per-second") .value_name("QPS") .takes_value(true) .help("Number of quotes per second"), ) .arg( - Arg::with_name("account_keys") + Arg::with_name("account-keys") .short("a") .long("accounts") .value_name("FILENAME") @@ -130,7 +133,7 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .help("Read account keys from JSON file generated with mango-client-v3"), ) .arg( - Arg::with_name("mango_keys") + Arg::with_name("mango-keys") .short("m") .long("mango") .value_name("FILENAME") @@ -139,18 +142,18 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .help("Read mango keys from JSON file generated with mango-client-v3"), ) .arg( - Arg::with_name("transaction_save_file") + Arg::with_name("transaction-save-file") .short("tsf") - .long("transaction_save_file") + .long("transaction-save-file") .value_name("FILENAME") .takes_value(true) .required(false) .help("To save details of all transactions during a run"), ) .arg( - Arg::with_name("block_data_save_file") + Arg::with_name("block-data-save-file") .short("bdsf") - .long("block_data_save_file") + .long("block-data-save-file") .value_name("FILENAME") .takes_value(true) .required(false) @@ -180,7 +183,26 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { .validator(is_valid_percentage) .takes_value(true) .required(false) - .help("Takes percentage of transaction we want to add random prioritization fees to, prioritization fees are random number between 100-1000"), + .help("Takes percentage of transaction we want to add random prioritization fees to, prioritization fees are random number between 100-1000") + ) + .arg( + Arg::with_name("keeper-authority") + .long("keeper-authority") + .short("ka") + .value_name("FILEPATH") + .takes_value(true) + .required(false) + .help( + "If specified, authority keypair would be used to pay for keeper transactions", + ), + ) + .arg( + Arg::with_name("markets-per-mm") + .long("markets-per-mm") + .value_name("UINT") + .takes_value(true) + .required(false) + .help("Number of markets a market maker will trade on at a time"), ) } @@ -192,21 +214,21 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { pub fn extract_args(matches: &ArgMatches) -> Config { let mut args = Config::default(); - let config = if let Some(config_file) = matches.value_of("config_file") { + let config = if let Some(config_file) = matches.value_of("config-file") { solana_cli_config::Config::load(config_file).unwrap_or_default() } else { solana_cli_config::Config::default() }; let (_, json_rpc_url) = ConfigInput::compute_json_rpc_url_setting( - matches.value_of("json_rpc_url").unwrap_or(""), + matches.value_of("json-rpc-url").unwrap_or(""), &config.json_rpc_url, ); args.json_rpc_url = json_rpc_url; let (_, websocket_url) = ConfigInput::compute_websocket_url_setting( - matches.value_of("websocket_url").unwrap_or(""), + matches.value_of("websocket-url").unwrap_or(""), &config.websocket_url, - matches.value_of("json_rpc_url").unwrap_or(""), + matches.value_of("json-rpc-url").unwrap_or(""), &config.json_rpc_url, ); args.websocket_url = websocket_url; @@ -235,17 +257,17 @@ pub fn extract_args(matches: &ArgMatches) -> Config { ); } - if let Some(qps) = matches.value_of("qoutes_per_second") { - args.quotes_per_second = qps.parse().expect("can't parse qoutes_per_second"); + if let Some(qps) = matches.value_of("quotes-per-second") { + args.quotes_per_second = qps.parse().expect("can't parse quotes-per-second"); } - args.account_keys = matches.value_of("account_keys").unwrap().to_string(); - args.mango_keys = matches.value_of("mango_keys").unwrap().to_string(); - args.transaction_save_file = match matches.value_of("transaction_save_file") { + args.account_keys = matches.value_of("account-keys").unwrap().to_string(); + args.mango_keys = matches.value_of("mango-keys").unwrap().to_string(); + args.transaction_save_file = match matches.value_of("transaction-save-file") { Some(x) => x.to_string(), None => String::new(), }; - args.block_data_save_file = match matches.value_of("block_data_save_file") { + args.block_data_save_file = match matches.value_of("block-data-save-file") { Some(x) => x.to_string(), None => String::new(), }; @@ -259,8 +281,23 @@ pub fn extract_args(matches: &ArgMatches) -> Config { .map(|batch_size_str| batch_size_str.parse().expect("can't parse batch-size")); args.priority_fees_proba = match matches.value_of("prioritization-fees") { - Some(x) => x.parse().expect("Percentage of transactions having prioritization fees"), + Some(x) => x + .parse() + .expect("Percentage of transactions having prioritization fees"), None => 0, }; + let (_, kp_auth_path) = ConfigInput::compute_keypair_path_setting( + matches.value_of("keeper-authority").unwrap_or(""), + &config.keypair_path, + ); + + args.keeper_authority = read_keypair_file(kp_auth_path.clone()).ok(); + + args.number_of_markers_per_mm = match matches.value_of("markets-per-mm") { + Some(x) => x + .parse() + .expect("can't parse number of markets per market maker"), + None => 5, + }; args } diff --git a/src/confirmation_strategies.rs b/src/confirmation_strategies.rs index ac35a68..cb3cde5 100644 --- a/src/confirmation_strategies.rs +++ b/src/confirmation_strategies.rs @@ -2,10 +2,7 @@ use std::{ collections::HashMap, ops::Div, str::FromStr, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, RwLock, - }, + sync::{Arc, RwLock}, thread::{sleep, Builder}, time::Duration, }; @@ -23,7 +20,6 @@ use solana_transaction_status::RewardType; use crate::{ helpers::seconds_since, - rotating_queue::RotatingQueue, states::{BlockData, TransactionConfirmRecord, TransactionSendRecord}, }; @@ -201,20 +197,18 @@ pub fn confirmation_by_querying_rpc( } pub fn confirmations_by_blocks( - clients: RotatingQueue>, - current_slot: &AtomicU64, + client: Arc, recv_limit: usize, tx_record_rx: Receiver, tx_confirm_records: Arc>>, tx_timeout_records: Arc>>, tx_block_data: Arc>>, + from_slot: u64, ) { let mut recv_until_confirm = recv_limit; let transaction_map = Arc::new(RwLock::new( HashMap::::new(), )); - let last_slot = current_slot.load(Ordering::Acquire); - while recv_until_confirm != 0 { match tx_record_rx.try_recv() { Ok(tx_record) => { @@ -245,9 +239,8 @@ pub fn confirmations_by_blocks( let commitment_confirmation = CommitmentConfig { commitment: CommitmentLevel::Confirmed, }; - let block_res = clients - .get() - .get_blocks_with_commitment(last_slot, None, commitment_confirmation) + let block_res = client + .get_blocks_with_commitment(from_slot, None, commitment_confirmation) .unwrap(); let nb_blocks = block_res.len(); @@ -264,7 +257,7 @@ pub fn confirmations_by_blocks( .map(|x| x.to_vec()) { let map = transaction_map.clone(); - let client = clients.get().clone(); + let client = client.clone(); let tx_confirm_records = tx_confirm_records.clone(); let tx_block_data = tx_block_data.clone(); let joinble = Builder::new() diff --git a/src/helpers.rs b/src/helpers.rs index 5163764..570d810 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -245,6 +245,16 @@ pub fn get_mango_market_perps_cache( .div(I80F48::from_num(perp_market.base_lot_size)) .to_num(); + let root_bank = &mango_group_config.tokens[market_index].root_key; + let root_bank = Pubkey::from_str(root_bank.as_str()).unwrap(); + let node_banks = mango_group_config.tokens[market_index] + .node_keys + .iter() + .map(|x| Pubkey::from_str(x.as_str()).unwrap()) + .collect(); + let price_oracle = + Pubkey::from_str(mango_group_config.oracles[market_index].public_key.as_str()) + .unwrap(); PerpMarketCache { order_base_lots, price, @@ -254,6 +264,11 @@ pub fn get_mango_market_perps_cache( mango_cache_pk, perp_market_pk, perp_market, + root_bank, + node_banks, + price_oracle, + bids: perp_market.bids, + asks: perp_market.asks, } }) .collect() diff --git a/src/keeper.rs b/src/keeper.rs new file mode 100644 index 0000000..02094fb --- /dev/null +++ b/src/keeper.rs @@ -0,0 +1,214 @@ +use { + crate::{helpers::to_sdk_instruction, states::PerpMarketCache}, + iter_tools::Itertools, + solana_client::tpu_client::TpuClient, + solana_program::pubkey::Pubkey, + solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}, + solana_sdk::{ + hash::Hash, instruction::Instruction, message::Message, signature::Keypair, signer::Signer, + transaction::Transaction, + }, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::{Builder, JoinHandle}, + }, +}; + +fn create_root_bank_update_instructions(perp_markets: &[PerpMarketCache]) -> Vec { + perp_markets + .iter() + .map(|perp_market| { + let ix = mango::instruction::update_root_bank( + &perp_market.mango_program_pk, + &perp_market.mango_group_pk, + &perp_market.mango_cache_pk, + &perp_market.root_bank, + perp_market.node_banks.as_slice(), + ) + .unwrap(); + to_sdk_instruction(ix) + }) + .collect() +} + +fn create_update_fundings_instructions(perp_markets: &[PerpMarketCache]) -> Vec { + perp_markets + .iter() + .map(|perp_market| { + let ix = mango::instruction::update_funding( + &perp_market.mango_program_pk, + &perp_market.mango_group_pk, + &perp_market.mango_cache_pk, + &perp_market.perp_market_pk, + &perp_market.bids, + &perp_market.asks, + ) + .unwrap(); + to_sdk_instruction(ix) + }) + .collect() +} + +fn create_cache_root_bank_instruction(perp_markets: &[PerpMarketCache]) -> Instruction { + let mango_program_pk = perp_markets[0].mango_program_pk; + let mango_group_pk = perp_markets[0].mango_group_pk; + let mango_cache_pk = perp_markets[0].mango_cache_pk; + let root_banks = perp_markets.iter().map(|x| x.root_bank).collect_vec(); + + let ix = mango::instruction::cache_root_banks( + &mango_program_pk, + &mango_group_pk, + &mango_cache_pk, + root_banks.as_slice(), + ) + .unwrap(); + to_sdk_instruction(ix) +} + +fn create_update_price_cache_instructions(perp_markets: &[PerpMarketCache]) -> Instruction { + let mango_program_pk = perp_markets[0].mango_program_pk; + let mango_group_pk = perp_markets[0].mango_group_pk; + let mango_cache_pk = perp_markets[0].mango_cache_pk; + let price_oracles = perp_markets.iter().map(|x| x.price_oracle).collect_vec(); + + let ix = mango::instruction::cache_prices( + &mango_program_pk, + &mango_group_pk, + &mango_cache_pk, + price_oracles.as_slice(), + ) + .unwrap(); + to_sdk_instruction(ix) +} + +fn create_cache_perp_markets_instructions(perp_markets: &[PerpMarketCache]) -> Instruction { + let mango_program_pk = perp_markets[0].mango_program_pk; + let mango_group_pk = perp_markets[0].mango_group_pk; + let mango_cache_pk = perp_markets[0].mango_cache_pk; + let perp_market_pks = perp_markets.iter().map(|x| x.perp_market_pk).collect_vec(); + let ix = mango::instruction::cache_perp_markets( + &mango_program_pk, + &mango_group_pk, + &mango_cache_pk, + perp_market_pks.as_slice(), + ) + .unwrap(); + to_sdk_instruction(ix) +} + +pub fn send_transaction( + tpu_client: Arc>, + ixs: &[Instruction], + blockhash: Arc>, + payer: &Keypair, +) { + let mut tx = Transaction::new_unsigned(Message::new(ixs, Some(&payer.pubkey()))); + let recent_blockhash = blockhash.read().unwrap(); + tx.sign(&[payer], *recent_blockhash); + tpu_client.send_transaction(&tx); +} + +pub fn create_update_and_cache_quote_banks( + perp_markets: &[PerpMarketCache], + quote_root_bank: Pubkey, + quote_node_banks: Vec, +) -> Vec { + let mango_program_pk = perp_markets[0].mango_program_pk; + let mango_group_pk = perp_markets[0].mango_group_pk; + let mango_cache_pk = perp_markets[0].mango_cache_pk; + + let ix_update = mango::instruction::update_root_bank( + &mango_program_pk, + &mango_group_pk, + &mango_cache_pk, + "e_root_bank, + quote_node_banks.as_slice(), + ) + .unwrap(); + let ix_cache = mango::instruction::cache_root_banks( + &mango_program_pk, + &mango_group_pk, + &mango_cache_pk, + &[quote_root_bank], + ) + .unwrap(); + vec![to_sdk_instruction(ix_update), to_sdk_instruction(ix_cache)] +} + +pub fn start_keepers( + exit_signal: Arc, + tpu_client: Arc>, + perp_markets: Vec, + blockhash: Arc>, + authority: &Keypair, + quote_root_bank: Pubkey, + quote_node_banks: Vec, +) -> JoinHandle<()> { + let authority = Keypair::from_bytes(&authority.to_bytes()).unwrap(); + Builder::new() + .name("updating root bank keeper".to_string()) + .spawn(move || { + let root_update_ixs = create_root_bank_update_instructions(&perp_markets); + let cache_prices = create_update_price_cache_instructions(&perp_markets); + let update_perp_cache = create_cache_perp_markets_instructions(&perp_markets); + let cache_root_bank_ix = create_cache_root_bank_instruction(&perp_markets); + let update_funding_ix = create_update_fundings_instructions(&perp_markets); + let quote_root_bank_ix = create_update_and_cache_quote_banks( + &perp_markets, + quote_root_bank, + quote_node_banks, + ); + + let blockhash = blockhash.clone(); + + // add prioritization instruction + //let prioritization_ix = ComputeBudgetInstruction::set_compute_unit_price(10000); + //root_update_ixs.insert(0, prioritization_ix.clone()); + + while !exit_signal.load(Ordering::Relaxed) { + send_transaction( + tpu_client.clone(), + &[cache_prices.clone()], + blockhash.clone(), + &authority, + ); + + send_transaction( + tpu_client.clone(), + quote_root_bank_ix.as_slice(), + blockhash.clone(), + &authority, + ); + + for updates in update_funding_ix.chunks(3) { + send_transaction(tpu_client.clone(), updates, blockhash.clone(), &authority); + } + + send_transaction( + tpu_client.clone(), + root_update_ixs.as_slice(), + blockhash.clone(), + &authority, + ); + + send_transaction( + tpu_client.clone(), + &[update_perp_cache.clone()], + blockhash.clone(), + &authority, + ); + + send_transaction( + tpu_client.clone(), + &[cache_root_bank_ix.clone()], + blockhash.clone(), + &authority, + ); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + }) + .unwrap() +} diff --git a/src/lib.rs b/src/lib.rs index bc83105..d370761 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ pub mod cli; pub mod confirmation_strategies; pub mod helpers; +pub mod keeper; pub mod mango; pub mod market_markers; pub mod rotating_queue; diff --git a/src/main.rs b/src/main.rs index a6c22e6..d0054e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,31 +1,33 @@ -use log::{error, info}; -use serde_json; -use solana_bench_mango::{ - cli, - confirmation_strategies::confirmations_by_blocks, - helpers::{ - get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service, - write_block_data_into_csv, write_transaction_data_into_csv, +use { + log::{error, info}, + serde_json, + solana_bench_mango::{ + cli, + confirmation_strategies::confirmations_by_blocks, + helpers::{ + get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service, + write_block_data_into_csv, write_transaction_data_into_csv, + }, + keeper::start_keepers, + mango::{AccountKeys, MangoConfig}, + market_markers::start_market_making_threads, + states::{BlockData, PerpMarketCache, TransactionConfirmRecord, TransactionSendRecord}, }, - mango::{AccountKeys, MangoConfig}, - market_markers::start_market_making_threads, - rotating_queue::RotatingQueue, - states::{BlockData, PerpMarketCache, TransactionConfirmRecord, TransactionSendRecord}, -}; -use solana_client::{ - connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::TpuClient, -}; -use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; -use solana_sdk::commitment_config::CommitmentConfig; - -use std::{ - fs, - net::{IpAddr, Ipv4Addr}, - sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, RwLock, + solana_client::{ + connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::TpuClient, + }, + solana_program::pubkey::Pubkey, + solana_sdk::commitment_config::CommitmentConfig, + std::{ + fs, + net::{IpAddr, Ipv4Addr}, + str::FromStr, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, RwLock, + }, + thread::{Builder, JoinHandle}, }, - thread::{Builder, JoinHandle}, }; fn main() { @@ -48,13 +50,19 @@ fn main() { mango_cluster, txs_batch_size, priority_fees_proba, + keeper_authority, + number_of_markers_per_mm, .. } = &cli_config; + let number_of_markers_per_mm = *number_of_markers_per_mm; let transaction_save_file = transaction_save_file.clone(); let block_data_save_file = block_data_save_file.clone(); - info!("Connecting to the cluster"); + info!( + "Connecting to the cluster {}, {}", + json_rpc_url, websocket_url + ); let account_keys_json = fs::read_to_string(account_keys).expect("unable to read accounts file"); let account_keys_parsed: Vec = @@ -71,13 +79,10 @@ fn main() { .find(|g| g.name == *mango_group_id) .unwrap(); - let number_of_tpu_clients: usize = 1; - let rpc_clients = RotatingQueue::>::new(number_of_tpu_clients, || { - Arc::new(RpcClient::new_with_commitment( - json_rpc_url.to_string(), - CommitmentConfig::confirmed(), - )) - }); + let rpc_client = Arc::new(RpcClient::new_with_commitment( + json_rpc_url.to_string(), + CommitmentConfig::confirmed(), + )); let connection_cache = ConnectionCache::new_with_client_options( 4, @@ -91,20 +96,15 @@ fn main() { None }; - let tpu_client_pool = Arc::new(RotatingQueue::< - Arc>, - >::new(number_of_tpu_clients, || { - let quic_connection_cache = quic_connection_cache.clone(); - Arc::new( - TpuClient::new_with_connection_cache( - rpc_clients.get().clone(), - &websocket_url, - solana_client::tpu_client::TpuClientConfig::default(), - quic_connection_cache.unwrap(), - ) - .unwrap(), + let tpu_client = Arc::new( + TpuClient::new_with_connection_cache( + rpc_client.clone(), + &websocket_url, + solana_client::tpu_client::TpuClientConfig::default(), + quic_connection_cache.unwrap(), ) - })); + .unwrap(), + ); info!( "accounts:{:?} markets:{:?} quotes_per_second:{:?} expected_tps:{:?} duration:{:?}", @@ -135,7 +135,34 @@ fn main() { let perp_market_caches: Vec = get_mango_market_perps_cache(rpc_client.clone(), &mango_group_config); + let quote_root_bank = + Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()).unwrap(); + let quote_node_banks = mango_group_config + .tokens + .last() + .unwrap() + .node_keys + .iter() + .map(|x| Pubkey::from_str(x.as_str()).unwrap()) + .collect(); + // start keeper if keeper authority is present + let keepers_jl = if let Some(keeper_authority) = keeper_authority { + let jl = start_keepers( + exit_signal.clone(), + tpu_client.clone(), + perp_market_caches.clone(), + blockhash.clone(), + keeper_authority, + quote_root_bank, + quote_node_banks, + ); + Some(jl) + } else { + None + }; + let (tx_record_sx, tx_record_rx) = crossbeam_channel::unbounded(); + let from_slot = current_slot.load(Ordering::Relaxed); let mm_threads: Vec> = start_market_making_threads( account_keys_parsed.clone(), @@ -144,11 +171,12 @@ fn main() { exit_signal.clone(), blockhash.clone(), current_slot.clone(), - tpu_client_pool.clone(), + tpu_client.clone(), &duration, *quotes_per_second, *txs_batch_size, *priority_fees_proba, + number_of_markers_per_mm, ); let duration = duration.clone(); let quotes_per_second = quotes_per_second.clone(); @@ -164,40 +192,37 @@ fn main() { .name("solana-client-sender".to_string()) .spawn(move || { let recv_limit = account_keys_parsed.len() - * perp_market_caches.len() + * number_of_markers_per_mm as usize * duration.as_secs() as usize * quotes_per_second as usize; //confirmation_by_querying_rpc(recv_limit, rpc_client.clone(), &tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone()); confirmations_by_blocks( - rpc_clients, - ¤t_slot, + rpc_client.clone(), recv_limit, tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone(), tx_block_data.clone(), + from_slot, ); let confirmed: Vec = { let lock = tx_confirm_records.write().unwrap(); (*lock).clone() }; - let total_signed = account_keys_parsed.len() - * perp_market_caches.len() - * duration.as_secs() as usize - * quotes_per_second as usize; + info!( "confirmed {} signatures of {} rate {}%", confirmed.len(), - total_signed, - (confirmed.len() * 100) / total_signed + recv_limit, + (confirmed.len() * 100) / recv_limit ); let error_count = confirmed.iter().filter(|tx| !tx.error.is_empty()).count(); info!( "errors counted {} rate {}%", error_count, - (error_count as usize * 100) / total_signed + (error_count as usize * 100) / recv_limit ); let timeouts: Vec = { let timeouts = tx_timeout_records.clone(); @@ -207,7 +232,7 @@ fn main() { info!( "timeouts counted {} rate {}%", timeouts.len(), - (timeouts.len() * 100) / total_signed + (timeouts.len() * 100) / recv_limit ); // let mut confirmation_times = confirmed @@ -255,4 +280,10 @@ fn main() { if let Err(err) = blockhash_thread.join() { error!("blockhash join failed with: {:?}", err); } + + if let Some(keepers_jl) = keepers_jl { + if let Err(err) = keepers_jl.join() { + error!("keeper join failed with: {:?}", err); + } + } } diff --git a/src/market_markers.rs b/src/market_markers.rs index e5569c8..debfc25 100644 --- a/src/market_markers.rs +++ b/src/market_markers.rs @@ -16,7 +16,7 @@ use mango::{ instruction::{cancel_all_perp_orders, place_perp_order2}, matching::Side, }; -use rand::{distributions::Uniform, prelude::Distribution}; +use rand::{distributions::Uniform, prelude::Distribution, seq::SliceRandom}; use solana_client::tpu_client::TpuClient; use solana_program::pubkey::Pubkey; use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; @@ -28,7 +28,6 @@ use solana_sdk::{ use crate::{ helpers::{to_sdk_instruction, to_sp_pk}, mango::AccountKeys, - rotating_queue::RotatingQueue, states::{PerpMarketCache, TransactionSendRecord}, }; @@ -47,9 +46,8 @@ pub fn create_ask_bid_transaction( ); let mut instructions = vec![]; if prioritization_fee > 0 { - let pfees = compute_budget::ComputeBudgetInstruction::set_compute_unit_price( - prioritization_fee, - ); + let pfees = + compute_budget::ComputeBudgetInstruction::set_compute_unit_price(prioritization_fee); instructions.push(pfees); } @@ -130,7 +128,12 @@ pub fn create_ask_bid_transaction( )) } -fn generate_random_fees(prioritization_fee_proba : u8, n: usize, min_fee: u64, max_fee: u64) -> Vec { +fn generate_random_fees( + prioritization_fee_proba: u8, + n: usize, + min_fee: u64, + max_fee: u64, +) -> Vec { let mut rng = rand::thread_rng(); let range = Uniform::from(min_fee..max_fee); let range_probability = Uniform::from(1..100); @@ -147,15 +150,13 @@ fn generate_random_fees(prioritization_fee_proba : u8, n: usize, min_fee: u64, m } }) .collect() - } +} pub fn send_mm_transactions( quotes_per_second: u64, perp_market_caches: &Vec, tx_record_sx: &Sender, - tpu_client_pool: Arc< - RotatingQueue>>, - >, + tpu_client: Arc>, mango_account_pk: Pubkey, mango_account_signer: &Keypair, blockhash: Arc>, @@ -165,8 +166,13 @@ pub fn send_mm_transactions( let mango_account_signer_pk = to_sp_pk(&mango_account_signer.pubkey()); // update quotes 2x per second for _ in 0..quotes_per_second { - let prioritization_fee_by_market = generate_random_fees(prioritization_fee_proba, perp_market_caches.len(), 100, 1000); - for (i,c) in perp_market_caches.iter().enumerate() { + let prioritization_fee_by_market = generate_random_fees( + prioritization_fee_proba, + perp_market_caches.len(), + 100, + 1000, + ); + for (i, c) in perp_market_caches.iter().enumerate() { let prioritization_fee = prioritization_fee_by_market[i]; let mut tx = create_ask_bid_transaction( c, @@ -178,7 +184,7 @@ pub fn send_mm_transactions( if let Ok(recent_blockhash) = blockhash.read() { tx.sign(&[mango_account_signer], *recent_blockhash); } - let tpu_client = tpu_client_pool.get(); + tpu_client.send_transaction(&tx); let sent = tx_record_sx.send(TransactionSendRecord { signature: tx.signatures[0], @@ -203,9 +209,7 @@ pub fn send_mm_transactions_batched( quotes_per_second: u64, perp_market_caches: &Vec, tx_record_sx: &Sender, - tpu_client_pool: Arc< - RotatingQueue>>, - >, + tpu_client: Arc>, mango_account_pk: Pubkey, mango_account_signer: &Keypair, blockhash: Arc>, @@ -218,17 +222,19 @@ pub fn send_mm_transactions_batched( // update quotes 2x per second for _ in 0..quotes_per_second { for c in perp_market_caches.iter() { - let prioritization_fee_for_tx = generate_random_fees(prioritization_fee_proba, txs_batch_size, 100, 1000); + let prioritization_fee_for_tx = + generate_random_fees(prioritization_fee_proba, txs_batch_size, 100, 1000); for i in 0..txs_batch_size { let prioritization_fee = prioritization_fee_for_tx[i]; - transactions.push( - (create_ask_bid_transaction( + transactions.push(( + create_ask_bid_transaction( c, mango_account_pk, &mango_account_signer, prioritization_fee, - ),prioritization_fee) - ); + ), + prioritization_fee, + )); } if let Ok(recent_blockhash) = blockhash.read() { @@ -236,10 +242,14 @@ pub fn send_mm_transactions_batched( tx.0.sign(&[mango_account_signer], *recent_blockhash); } } - let tpu_client = tpu_client_pool.get(); + if tpu_client .try_send_transaction_batch( - &transactions.iter().map(|x| x.0.clone()).collect_vec().as_slice(), + &transactions + .iter() + .map(|x| x.0.clone()) + .collect_vec() + .as_slice(), ) .is_err() { @@ -275,21 +285,18 @@ pub fn start_market_making_threads( exit_signal: Arc, blockhash: Arc>, current_slot: Arc, - tpu_client_pool: Arc< - RotatingQueue>>, - >, + tpu_client: Arc>, duration: &Duration, quotes_per_second: u64, txs_batch_size: Option, prioritization_fee_proba: u8, + number_of_markers_per_mm: u8, ) -> Vec> { + let mut rng = rand::thread_rng(); account_keys_parsed .iter() .map(|account_keys| { let _exit_signal = exit_signal.clone(); - // having a tpu client for each MM - let tpu_client_pool = tpu_client_pool.clone(); - let blockhash = blockhash.clone(); let current_slot = current_slot.clone(); let duration = duration.clone(); @@ -298,6 +305,7 @@ pub fn start_market_making_threads( Pubkey::from_str(account_keys.mango_account_pks[0].as_str()).unwrap(); let mango_account_signer = Keypair::from_bytes(account_keys.secret_key.as_slice()).unwrap(); + let tpu_client = tpu_client.clone(); info!( "wallet:{:?} https://testnet.mango.markets/account?pubkey={:?}", @@ -306,6 +314,10 @@ pub fn start_market_making_threads( ); //sleep(Duration::from_secs(10)); let tx_record_sx = tx_record_sx.clone(); + let perp_market_caches = perp_market_caches + .choose_multiple(&mut rng, number_of_markers_per_mm as usize) + .map(|x| x.clone()) + .collect_vec(); Builder::new() .name("solana-client-sender".to_string()) @@ -320,7 +332,7 @@ pub fn start_market_making_threads( quotes_per_second, &perp_market_caches, &tx_record_sx, - tpu_client_pool.clone(), + tpu_client.clone(), mango_account_pk, &mango_account_signer, blockhash.clone(), @@ -332,7 +344,7 @@ pub fn start_market_making_threads( quotes_per_second, &perp_market_caches, &tx_record_sx, - tpu_client_pool.clone(), + tpu_client.clone(), mango_account_pk, &mango_account_signer, blockhash.clone(), diff --git a/src/states.rs b/src/states.rs index 10478a8..a252a78 100644 --- a/src/states.rs +++ b/src/states.rs @@ -43,6 +43,11 @@ pub struct PerpMarketCache { pub mango_cache_pk: Pubkey, pub perp_market_pk: Pubkey, pub perp_market: PerpMarket, + pub price_oracle: Pubkey, + pub root_bank: Pubkey, + pub node_banks: Vec, + pub bids: Pubkey, + pub asks: Pubkey, } pub struct _TransactionInfo {