diff --git a/.github/workflows/build_test.yml b/.github/workflows/build_test.yml new file mode 100644 index 0000000..738a24b --- /dev/null +++ b/.github/workflows/build_test.yml @@ -0,0 +1,59 @@ +name: Rust Build and Clippy Check + +on: + pull_request: + branches: + - main + push: + branches: + - main + +env: + SCCACHE_GHA_ENABLED: true + RUSTC_WRAPPER: sccache + SCCACHE_CACHE_SIZE: "1G" + +jobs: + build_all: + name: Rust project + runs-on: ubuntu-22.04 + steps: + - name: Install Linux Packages + run: | + sudo apt-get update -y + sudo apt-get install libssl-dev openssl -y + + - uses: actions/checkout@v4 + + # The toolchain action should definitely be run before the cache action + - uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + # use toolchain version from rust-toolchain.toml + components: rustfmt, clippy + cache: true + # avoid the default "-D warnings" which thrashes cache + rustflags: "" + + + - name: Run sccache-cache + uses: mozilla-actions/sccache-action@v0.0.3 + + + # https://github.com/actions/cache/blob/main/examples.md#rust---cargo + # https://blog.arriven.wtf/posts/rust-ci-cache/ + - uses: Swatinem/rust-cache@v2 + with: + # will be covered by sscache + cache-targets: false + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + + + - name: Early Build + run: | + cargo build --locked --workspace --tests + + - name: Run fmt+clippy + run: | + cargo fmt --all --check + cargo clippy --locked --workspace --all-targets + diff --git a/src/cli.rs b/src/cli.rs index 8e6b352..b5dbcb9 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -50,7 +50,7 @@ impl Default for Config { } /// Defines and builds the CLI args for a run of the benchmark -pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { +pub fn build_args(version: &str) -> App<'_, '_> { App::new(crate_name!()) .about(crate_description!()) .version(version) @@ -302,7 +302,7 @@ pub fn extract_args(matches: &ArgMatches) -> Config { &config.keypair_path, ); - args.keeper_authority = read_keypair_file(kp_auth_path.clone()).ok(); + args.keeper_authority = read_keypair_file(kp_auth_path).ok(); args.number_of_markers_per_mm = match matches.value_of("markets-per-mm") { Some(x) => x diff --git a/src/confirmation_strategies.rs b/src/confirmation_strategies.rs index 1bd1b4a..b48c070 100644 --- a/src/confirmation_strategies.rs +++ b/src/confirmation_strategies.rs @@ -59,19 +59,15 @@ pub async fn process_blocks( }; for signature in &transaction.signatures { let transaction_record_op = { - let rec = transaction_map.get(&signature); - match rec { - Some(x) => Some(x.clone()), - None => None, - } + let rec = transaction_map.get(signature); + rec.map(|x| x.clone()) }; // add CU in counter if let Some(meta) = &meta { - match meta.compute_units_consumed { - solana_transaction_status::option_serializer::OptionSerializer::Some(x) => { - cu_consumed = cu_consumed.saturating_add(x); - } - _ => {} + if let solana_transaction_status::option_serializer::OptionSerializer::Some(x) = + meta.compute_units_consumed + { + cu_consumed = cu_consumed.saturating_add(x); } } if let Some(transaction_record) = transaction_record_op { @@ -110,7 +106,7 @@ pub async fn process_blocks( } } - transaction_map.remove(&signature); + transaction_map.remove(signature); } } // push block data @@ -210,7 +206,7 @@ pub fn confirmation_by_lite_rpc_notification_stream( if tx_notification.commitment != CommitmentLevel::Finalized { continue; } - + if let Some(value) = transaction_map.get(&tx_notification.signature) { let (tx_sent_record, _) = value.clone(); let error = match &tx_notification.transaction_status { @@ -258,9 +254,9 @@ pub fn confirmation_by_lite_rpc_notification_stream( }; let cleaner_jh = { - let transaction_map = transaction_map.clone(); - let exit_signal = exit_signal.clone(); - let tx_confirm_records = tx_confirm_records.clone(); + let transaction_map = transaction_map; + let exit_signal = exit_signal; + let tx_confirm_records = tx_confirm_records; tokio::spawn(async move { loop { tokio::time::sleep(Duration::from_secs(60)).await; @@ -391,7 +387,7 @@ pub fn confirmations_by_blocks( timed_out: true, priority_fees: sent_record.priority_fees, }); - to_remove.push(signature.clone()); + to_remove.push(*signature); } } @@ -409,7 +405,7 @@ pub fn confirmations_by_blocks( }; let block_confirmation_jh = { - let exit_signal = exit_signal.clone(); + let exit_signal = exit_signal; tokio::spawn(async move { let mut start_block = from_slot; let mut start_instant = tokio::time::Instant::now(); diff --git a/src/crank.rs b/src/crank.rs index 8113124..af0b860 100644 --- a/src/crank.rs +++ b/src/crank.rs @@ -36,6 +36,7 @@ pub struct KeeperConfig { pub websocket_url: String, } +#[allow(clippy::too_many_arguments)] pub fn start( config: KeeperConfig, exit_signal: Arc, diff --git a/src/helpers.rs b/src/helpers.rs index 41b58f2..bcfe393 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -55,7 +55,7 @@ pub fn to_sdk_instruction( pub async fn load_from_rpc(rpc_client: &RpcClient, pk: &Pubkey) -> T { let acc = rpc_client.get_account(&to_sdk_pk(pk)).await.unwrap(); - return T::load_from_bytes(acc.data.as_slice()).unwrap().clone(); + *T::load_from_bytes(acc.data.as_slice()).unwrap() } pub async fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash { @@ -116,10 +116,8 @@ pub async fn poll_blockhash_and_slot( *blockhash.write().await = new_blockhash; } blockhash_last_updated = Instant::now(); - } else { - if blockhash_last_updated.elapsed().as_secs() > 120 { - break; - } + } else if blockhash_last_updated.elapsed().as_secs() > 120 { + break; } tokio::time::sleep(Duration::from_millis(100)).await; @@ -198,7 +196,7 @@ pub async fn get_mango_market_perps_cache( order_base_lots, price, price_quote_lots, - mango_program_pk: mango_program_pk.clone(), + mango_program_pk: *mango_program_pk, mango_group_pk, mango_cache_pk, perp_market_pk, diff --git a/src/keeper.rs b/src/keeper.rs index 2017255..80fd690 100644 --- a/src/keeper.rs +++ b/src/keeper.rs @@ -131,7 +131,7 @@ pub fn prepare_transaction( priority_fees: prioritization_fee, keeper_instruction: Some(keeper_instruction), }; - return (tx, tx_send_record); + (tx, tx_send_record) } pub fn create_update_and_cache_quote_banks( @@ -161,6 +161,7 @@ pub fn create_update_and_cache_quote_banks( vec![to_sdk_instruction(ix_update), to_sdk_instruction(ix_cache)] } +#[allow(clippy::too_many_arguments)] pub fn start_keepers( exit_signal: Arc, tpu_manager: TpuManager, diff --git a/src/main.rs b/src/main.rs index 51531c7..0126682 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,6 @@ use { stats::MangoSimulationStats, tpu_manager::TpuManager, }, - serde_json, solana_client::nonblocking::rpc_client::RpcClient as NbRpcClient, solana_lite_rpc_core::{ block_store::BlockStore, @@ -105,7 +104,8 @@ pub async fn main() -> anyhow::Result<()> { solana_logger::setup_with_default("info"); solana_metrics::set_panic_hook("bench-mango", /*version:*/ None); - let matches = cli::build_args(solana_version::version!()).get_matches(); + let version = solana_version::version!(); + let matches = cli::build_args(version).get_matches(); let cli_config = cli::extract_args(&matches); let cli::Config { @@ -206,9 +206,7 @@ pub async fn main() -> anyhow::Result<()> { account_keys_parsed.len(), number_of_markers_per_mm, quotes_per_second, - account_keys_parsed.len() - * number_of_markers_per_mm as usize - * quotes_per_second.clone() as usize, + account_keys_parsed.len() * number_of_markers_per_mm as usize * *quotes_per_second as usize, duration ); @@ -221,7 +219,6 @@ pub async fn main() -> anyhow::Result<()> { let quote_root_bank = Pubkey::from_str(mango_group_config.tokens.last().unwrap().root_key.as_str()) .expect("Quote root bank should be able to convert into pubkey"); - (); let quote_node_banks = mango_group_config .tokens .last() @@ -287,7 +284,7 @@ pub async fn main() -> anyhow::Result<()> { blockhash.clone(), current_slot.clone(), tpu_manager.clone(), - &duration, + duration, *quotes_per_second, *priority_fees_proba, number_of_markers_per_mm, diff --git a/src/mango_v3_perp_crank_sink.rs b/src/mango_v3_perp_crank_sink.rs index 41e10f0..e0c43d8 100644 --- a/src/mango_v3_perp_crank_sink.rs +++ b/src/mango_v3_perp_crank_sink.rs @@ -45,7 +45,7 @@ impl MangoV3PerpCrankSink { Self { mkt_pks_by_evq_pks: pks .iter() - .map(|(mkt_pk, evq_pk)| (evq_pk.clone(), mkt_pk.clone())) + .map(|(mkt_pk, evq_pk)| (*evq_pk, *mkt_pk)) .collect(), group_pk, cache_pk, @@ -73,7 +73,7 @@ impl AccountWriteSink for MangoV3PerpCrankSink { const HEADER_SIZE: usize = size_of::(); let header_data = array_ref![account.data(), 0, HEADER_SIZE]; let header = RefCell::::new(*bytemuck::from_bytes(header_data)); - let seq_num = header.clone().into_inner().seq_num.clone(); + let seq_num = header.clone().into_inner().seq_num; // trace!("evq {} seq_num {}", mkt.name, header.seq_num); const QUEUE_SIZE: usize = EVENT_SIZE * QUEUE_LEN; @@ -87,8 +87,7 @@ impl AccountWriteSink for MangoV3PerpCrankSink { // only crank if at least 1 fill or a sufficient events of other categories are buffered let contains_fill_events = event_queue .iter() - .find(|e| e.event_type == EventType::Fill as u8) - .is_some(); + .any(|e| e.event_type == EventType::Fill as u8); let len = event_queue.iter().count(); let has_backlog = len > MAX_BACKLOG; debug!("evq {pk:?} seq_num={seq_num} len={len} contains_fill_events={contains_fill_events} has_backlog={has_backlog}"); @@ -121,7 +120,7 @@ impl AccountWriteSink for MangoV3PerpCrankSink { let mkt_pk = self .mkt_pks_by_evq_pks .get(&pk) - .expect(&format!("{pk:?} is a known public key")); + .unwrap_or_else(|| panic!("{pk:?} is a known public key")); let ix = to_sdk_instruction( consume_events( @@ -130,16 +129,13 @@ impl AccountWriteSink for MangoV3PerpCrankSink { &to_sp_pk(&self.cache_pk), &to_sp_pk(mkt_pk), &to_sp_pk(&pk), - &mut mango_accounts - .iter() - .map(|pk| pk.clone()) - .collect::>(), + &mut mango_accounts.iter().copied().collect::>(), MAX_EVENTS_PER_TX, ) .unwrap(), ); - (Ok(ix), mkt_pk.clone()) + (Ok(ix), *mkt_pk) }; // info!( diff --git a/src/market_markers.rs b/src/market_markers.rs index 4605a91..47c4d96 100644 --- a/src/market_markers.rs +++ b/src/market_markers.rs @@ -143,20 +143,19 @@ fn generate_random_fees( .map(|_| { if prioritization_fee_proba == 0 { 0 + } else if range_probability.sample(&mut rng) <= prioritization_fee_proba { + range.sample(&mut rng) } else { - if range_probability.sample(&mut rng) <= prioritization_fee_proba { - range.sample(&mut rng) as u64 - } else { - 0 - } + 0 } }) .collect() } +#[allow(clippy::too_many_arguments)] pub async fn send_mm_transactions( quotes_per_second: u64, - perp_market_caches: &Vec, + perp_market_caches: &[PerpMarketCache], tpu_manager: TpuManager, mango_account_pk: Pubkey, mango_account_signer: &Keypair, @@ -179,7 +178,7 @@ pub async fn send_mm_transactions( let mut tx = create_ask_bid_transaction( c, mango_account_pk, - &mango_account_signer, + mango_account_signer, prioritization_fee, ); @@ -207,6 +206,7 @@ pub async fn send_mm_transactions( } } +#[allow(clippy::too_many_arguments)] pub fn start_market_making_threads( account_keys_parsed: Vec, perp_market_caches: Vec, @@ -226,7 +226,7 @@ pub fn start_market_making_threads( let exit_signal = exit_signal.clone(); let blockhash = blockhash.clone(); let current_slot = current_slot.clone(); - let duration = duration.clone(); + let duration = *duration; let perp_market_caches = perp_market_caches.clone(); let mango_account_pk = Pubkey::from_str(account_keys.mango_account_pks[0].as_str()).unwrap(); @@ -241,7 +241,7 @@ pub fn start_market_making_threads( ); let perp_market_caches = perp_market_caches .choose_multiple(&mut rng, number_of_markers_per_mm as usize) - .map(|x| x.clone()) + .cloned() .collect_vec(); tokio::spawn(async move { @@ -312,7 +312,7 @@ fn create_cancel_all_orders( pub async fn clean_market_makers( rpc_client: Arc, - account_keys_parsed: &Vec, + account_keys_parsed: &[AccountKeys], perp_market_caches: &Vec, blockhash: Arc>, ) { @@ -323,11 +323,11 @@ pub async fn clean_market_makers( for market_maker in account_keys_parsed { let mango_account_pk = Pubkey::from_str(market_maker.mango_account_pks[0].as_str()).unwrap(); + for perp_market in perp_market_caches { let market_maker = market_maker.clone(); let perp_market = perp_market.clone(); let rpc_client = rpc_client.clone(); - let mango_account_pk = mango_account_pk.clone(); let blockhash = blockhash.clone(); let task = tokio::spawn(async move { diff --git a/src/result_writer.rs b/src/result_writer.rs index 7a7d793..712b05a 100644 --- a/src/result_writer.rs +++ b/src/result_writer.rs @@ -16,12 +16,8 @@ pub fn initialize_result_writers( File::create(transaction_save_file).await.unwrap(), ); let mut tx_data = tx_data; - loop { - if let Ok(record) = tx_data.recv().await { - writer.serialize(record).await.unwrap(); - } else { - break; - } + while let Ok(record) = tx_data.recv().await { + writer.serialize(record).await.unwrap(); } writer.flush().await.unwrap(); }); @@ -34,12 +30,8 @@ pub fn initialize_result_writers( File::create(block_data_save_file).await.unwrap(), ); let mut block_data = block_data; - loop { - if let Ok(record) = block_data.recv().await { - writer.serialize(record).await.unwrap(); - } else { - break; - } + while let Ok(record) = block_data.recv().await { + writer.serialize(record).await.unwrap(); } writer.flush().await.unwrap(); }); diff --git a/src/states.rs b/src/states.rs index a64a0c4..04d82d9 100644 --- a/src/states.rs +++ b/src/states.rs @@ -3,7 +3,7 @@ use fixed::types::I80F48; use mango::state::PerpMarket; use serde::Serialize; use solana_program::{pubkey::Pubkey, slot_history::Slot}; -use solana_sdk::{signature::Signature, commitment_config::CommitmentLevel}; +use solana_sdk::{commitment_config::CommitmentLevel, signature::Signature}; use std::fmt; #[derive(Clone, Debug, Serialize)] diff --git a/src/stats.rs b/src/stats.rs index af2136c..4865556 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -191,57 +191,53 @@ impl MangoSimulationStats { let regex = regex::Regex::new(r"Error processing Instruction \d+: ").unwrap(); tokio::spawn(async move { let mut tx_confirm_record_reciever = tx_confirm_record_reciever; - loop { - if let Ok(tx_data) = tx_confirm_record_reciever.recv().await { - if let Some(_) = tx_data.confirmed_at { - counters.num_confirmed_txs.fetch_add(1, Ordering::Relaxed); - if let Some(error) = tx_data.error { - let error = regex.replace_all(&error, "").to_string(); - counters.num_error_txs.fetch_add(1, Ordering::Relaxed); - let mut lock = counters.errors.write().await; - if let Some(value) = lock.get_mut(&error) { - *value += 1; - } else { - lock.insert(error, 1); - } + while let Ok(tx_data) = tx_confirm_record_reciever.recv().await { + if tx_data.confirmed_at.is_some() { + counters.num_confirmed_txs.fetch_add(1, Ordering::Relaxed); + if let Some(error) = tx_data.error { + let error = regex.replace_all(&error, "").to_string(); + counters.num_error_txs.fetch_add(1, Ordering::Relaxed); + let mut lock = counters.errors.write().await; + if let Some(value) = lock.get_mut(&error) { + *value += 1; } else { - counters.num_successful.fetch_add(1, Ordering::Relaxed); - - if let Some(keeper_instruction) = tx_data.keeper_instruction { - match keeper_instruction { - KeeperInstruction::CachePrice => counters - .succ_cache_price_txs - .fetch_add(1, Ordering::Relaxed), - KeeperInstruction::CacheRootBanks => counters - .succ_cache_root_banks_txs - .fetch_add(1, Ordering::Relaxed), - KeeperInstruction::ConsumeEvents => counters - .succ_consume_events_txs - .fetch_add(1, Ordering::Relaxed), - KeeperInstruction::UpdateAndCacheQuoteRootBank => counters - .succ_update_and_cache_quote_bank_txs - .fetch_add(1, Ordering::Relaxed), - KeeperInstruction::UpdateFunding => counters - .succ_update_funding_txs - .fetch_add(1, Ordering::Relaxed), - KeeperInstruction::UpdatePerpCache => counters - .succ_update_perp_cache_txs - .fetch_add(1, Ordering::Relaxed), - KeeperInstruction::UpdateRootBanks => counters - .succ_update_root_banks_txs - .fetch_add(1, Ordering::Relaxed), - }; - } else { - counters - .succ_market_makers_txs - .fetch_add(1, Ordering::Relaxed); - } + lock.insert(error, 1); } } else { - counters.num_timeout_txs.fetch_add(1, Ordering::Relaxed); + counters.num_successful.fetch_add(1, Ordering::Relaxed); + + if let Some(keeper_instruction) = tx_data.keeper_instruction { + match keeper_instruction { + KeeperInstruction::CachePrice => counters + .succ_cache_price_txs + .fetch_add(1, Ordering::Relaxed), + KeeperInstruction::CacheRootBanks => counters + .succ_cache_root_banks_txs + .fetch_add(1, Ordering::Relaxed), + KeeperInstruction::ConsumeEvents => counters + .succ_consume_events_txs + .fetch_add(1, Ordering::Relaxed), + KeeperInstruction::UpdateAndCacheQuoteRootBank => counters + .succ_update_and_cache_quote_bank_txs + .fetch_add(1, Ordering::Relaxed), + KeeperInstruction::UpdateFunding => counters + .succ_update_funding_txs + .fetch_add(1, Ordering::Relaxed), + KeeperInstruction::UpdatePerpCache => counters + .succ_update_perp_cache_txs + .fetch_add(1, Ordering::Relaxed), + KeeperInstruction::UpdateRootBanks => counters + .succ_update_root_banks_txs + .fetch_add(1, Ordering::Relaxed), + }; + } else { + counters + .succ_market_makers_txs + .fetch_add(1, Ordering::Relaxed); + } } } else { - break; + counters.num_timeout_txs.fetch_add(1, Ordering::Relaxed); } } }) diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index 63927ff..26ac097 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -47,15 +47,15 @@ impl TpuManager { } let transaction = bincode::serialize(transaction).unwrap(); - let res = self.transaction_service + let res = self + .transaction_service .send_transaction(transaction, None) .await; - if let Err(e) = &res{ + if let Err(e) = &res { print!("error sending txs on custom tpu {e:?}"); } res.is_ok() - } pub async fn send_transaction_batch(