diff --git a/Cargo.lock b/Cargo.lock index 8505507..e38451c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1525,6 +1525,19 @@ dependencies = [ "rayon", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if 1.0.0", + "hashbrown 0.12.3", + "lock_api 0.4.9", + "once_cell", + "parking_lot_core 0.9.7", +] + [[package]] name = "data-encoding" version = "2.3.3" @@ -5397,8 +5410,8 @@ dependencies = [ "cargo-lock", "chrono", "clap 2.34.0", - "crossbeam-channel", "csv", + "dashmap 5.4.0", "fixed", "fixed-macro", "futures 0.3.26", @@ -5735,7 +5748,7 @@ dependencies = [ "bs58 0.4.0", "chrono", "crossbeam-channel", - "dashmap", + "dashmap 4.0.2", "eager", "etcd-client", "fs_extra", @@ -6013,7 +6026,7 @@ dependencies = [ "chrono", "chrono-humanize", "crossbeam-channel", - "dashmap", + "dashmap 4.0.2", "fs_extra", "futures 0.3.26", "itertools 0.10.5", @@ -6399,7 +6412,7 @@ dependencies = [ "bincode", "bs58 0.4.0", "crossbeam-channel", - "dashmap", + "dashmap 4.0.2", "itertools 0.10.5", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", @@ -6515,7 +6528,7 @@ dependencies = [ "byteorder", "bzip2", "crossbeam-channel", - "dashmap", + "dashmap 4.0.2", "dir-diff", "flate2 1.0.25", "fnv", diff --git a/Cargo.toml b/Cargo.toml index b2f804d..3774285 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ rust-version = "1.66.1" borsh = "0.9.3" chrono = "0.4.19" clap = "2.33.1" -crossbeam-channel = "0.5" fixed = { version = ">=1.11.0, <1.12.0", features = ["serde"] } fixed-macro = "^1.1.1" multiqueue = "^0.3.2" @@ -26,6 +25,7 @@ serde_derive = "1.0.103" serde_json = "1.0.79" serde_yaml = "0.8.23" iter_tools = "0.1.4" +dashmap = "5.4.0" mango = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0", default-features = false } mango-common = { git = "https://github.com/blockworks-foundation/mango-v3.git", tag = "v3.6.0" } diff --git a/src/confirmation_strategies.rs b/src/confirmation_strategies.rs index 46ac60d..542beec 100644 --- a/src/confirmation_strategies.rs +++ b/src/confirmation_strategies.rs @@ -1,422 +1,261 @@ use std::{ - collections::HashMap, - ops::Div, str::FromStr, - sync::{Arc, RwLock}, - thread::{sleep, Builder}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, time::Duration, }; use chrono::Utc; -use crossbeam_channel::{Receiver, TryRecvError}; -use log::{debug, error, info, trace}; -use solana_client::{rpc_client::RpcClient, rpc_config::RpcBlockConfig}; +use dashmap::DashMap; +use log::debug; +use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig}; use solana_sdk::{ commitment_config::{CommitmentConfig, CommitmentLevel}, - pubkey::Pubkey, signature::Signature, }; -use solana_transaction_status::RewardType; +use solana_transaction_status::{RewardType, UiConfirmedBlock}; -use crate::{ - helpers::seconds_since, - states::{BlockData, TransactionConfirmRecord, TransactionSendRecord}, -}; +use crate::states::{BlockData, TransactionConfirmRecord, TransactionSendRecord}; -pub fn process_signature_confirmation_batch( - rpc_client: &RpcClient, - batch: &Vec, - not_confirmed: &Arc>>, - confirmed: &Arc>>, - timeouts: Arc>>, - timeout: u64, +use async_channel::Sender; +use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant}; + +pub async fn process_blocks( + block: UiConfirmedBlock, + tx_confirm_records: Sender, + tx_block_data: Sender, + transaction_map: Arc>, + slot: u64, ) { - match rpc_client.get_signature_statuses(&batch.iter().map(|t| t.signature).collect::>()) + let mut mm_transaction_count: u64 = 0; + let rewards = &block.rewards.unwrap(); + let slot_leader = match rewards + .iter() + .find(|r| r.reward_type == Some(RewardType::Fee)) { - Ok(statuses) => { - trace!("batch result {:?}", statuses); - for (i, s) in statuses.value.iter().enumerate() { - let tx_record = &batch[i]; - match s { - Some(s) => { - if s.confirmation_status.is_none() { - not_confirmed.write().unwrap().push(tx_record.clone()); - } else { - let mut lock = confirmed.write().unwrap(); - (*lock).push(TransactionConfirmRecord { - signature: tx_record.signature.to_string(), - sent_slot: tx_record.sent_slot, - sent_at: tx_record.sent_at.to_string(), - confirmed_at: Utc::now().to_string(), - confirmed_slot: s.slot, - successful: s.err.is_none(), - error: match &s.err { - Some(e) => e.to_string(), - None => "".to_string(), - }, - block_hash: Pubkey::default().to_string(), - slot_leader: Pubkey::default().to_string(), - market: tx_record.market.to_string(), - market_maker: tx_record.market_maker.to_string(), - slot_processed: tx_record.sent_slot, - timed_out: false, - priority_fees: tx_record.priority_fees, - }); + Some(x) => x.pubkey.clone(), + None => "".to_string(), + }; - debug!( - "confirmed sig={} duration={:?}", - tx_record.signature, - seconds_since(tx_record.sent_at) - ); - } - } - None => { - if seconds_since(tx_record.sent_at) > timeout as i64 { - debug!( - "could not confirm tx {} within {} seconds, dropping it", - tx_record.signature, timeout - ); - let mut lock = timeouts.write().unwrap(); - (*lock).push(tx_record.clone()) - } else { - not_confirmed.write().unwrap().push(tx_record.clone()); - } - } - } - } - } - Err(err) => { - error!("could not confirm signatures err={}", err); - not_confirmed.write().unwrap().extend_from_slice(batch); - sleep(Duration::from_millis(500)); - } - } -} - -pub fn confirmation_by_querying_rpc( - recv_limit: usize, - rpc_client: Arc, - tx_record_rx: &Receiver, - tx_confirm_records: Arc>>, - tx_timeout_records: Arc>>, -) { - const TIMEOUT: u64 = 30; - let mut recv_until_confirm = recv_limit; - let not_confirmed: Arc>> = Arc::new(RwLock::new(Vec::new())); - loop { - let has_signatures_to_confirm = { not_confirmed.read().unwrap().len() > 0 }; - if has_signatures_to_confirm { - // collect all not confirmed records in a new buffer - - const BATCH_SIZE: usize = 256; - let to_confirm = { - let mut lock = not_confirmed.write().unwrap(); - let to_confirm = (*lock).clone(); - (*lock).clear(); - to_confirm - }; - - info!( - "break from reading channel, try to confirm {} in {} batches", - to_confirm.len(), - (to_confirm.len() / BATCH_SIZE) - + if to_confirm.len() % BATCH_SIZE > 0 { - 1 - } else { - 0 - } - ); - - let confirmed = tx_confirm_records.clone(); - let timeouts = tx_timeout_records.clone(); - for batch in to_confirm.rchunks(BATCH_SIZE).map(|x| x.to_vec()) { - process_signature_confirmation_batch( - &rpc_client, - &batch, - ¬_confirmed, - &confirmed, - timeouts.clone(), - TIMEOUT, - ); - } - // multi threaded implementation of confirming batches - // let mut confirmation_handles = Vec::new(); - // for batch in to_confirm.rchunks(BATCH_SIZE).map(|x| x.to_vec()) { - // let rpc_client = rpc_client.clone(); - // let not_confirmed = not_confirmed.clone(); - // let confirmed = tx_confirm_records.clone(); - - // let join_handle = Builder::new().name("solana-transaction-confirmation".to_string()).spawn(move || { - // process_signature_confirmation_batch(&rpc_client, &batch, ¬_confirmed, &confirmed, TIMEOUT) - // }).unwrap(); - // confirmation_handles.push(join_handle); - // }; - // for confirmation_handle in confirmation_handles { - // let (errors, timeouts) = confirmation_handle.join().unwrap(); - // error_count += errors; - // timeout_count += timeouts; - // } - // sleep(Duration::from_millis(100)); // so the confirmation thread does not spam a lot the rpc node - } + if let Some(transactions) = block.transactions { + let nb_transactions = transactions.len(); + let mut cu_consumed: u64 = 0; + for solana_transaction_status::EncodedTransactionWithStatusMeta { + transaction, meta, .. + } in transactions { - if recv_until_confirm == 0 && not_confirmed.read().unwrap().len() == 0 { - break; - } - } - // context for writing all the not_confirmed_transactions - if recv_until_confirm > 0 { - let mut lock = not_confirmed.write().unwrap(); - loop { - match tx_record_rx.try_recv() { - Ok(tx_record) => { - debug!( - "add to queue len={} sig={}", - (*lock).len() + 1, - tx_record.signature - ); - (*lock).push(tx_record); + if let solana_transaction_status::EncodedTransaction::Json(transaction) = transaction { + for signature in transaction.signatures { + let signature = Signature::from_str(&signature).unwrap(); - recv_until_confirm -= 1; - } - Err(TryRecvError::Empty) => { - debug!("channel emptied"); - sleep(Duration::from_millis(100)); - break; // still confirm remaining transctions - } - Err(TryRecvError::Disconnected) => { - { - info!("channel disconnected {}", recv_until_confirm); + let transaction_record_op = { + let rec = transaction_map.get(&signature); + match rec { + Some(x) => Some(x.clone()), + None => None, + } + }; + // add CU in counter + if let Some(meta) = &meta { + match meta.compute_units_consumed { + solana_transaction_status::option_serializer::OptionSerializer::Some(x) => { + cu_consumed = cu_consumed.saturating_add(x); + }, + _ => {}, } - debug!("channel disconnected"); - break; // still confirm remaining transctions } + if let Some(transaction_record) = transaction_record_op { + let transaction_record = transaction_record.0; + mm_transaction_count += 1; + + let _ = tx_confirm_records.send(TransactionConfirmRecord { + signature: transaction_record.signature.to_string(), + confirmed_slot: Some(slot), + confirmed_at: Some(Utc::now().to_string()), + sent_at: transaction_record.sent_at.to_string(), + sent_slot: transaction_record.sent_slot, + successful: if let Some(meta) = &meta { + meta.status.is_ok() + } else { + false + }, + error: if let Some(meta) = &meta { + meta.err.as_ref().map(|x| x.to_string()) + } else { + None + }, + block_hash: Some(block.blockhash.clone()), + market: transaction_record.market.to_string(), + market_maker: transaction_record.market_maker.to_string(), + slot_processed: Some(slot), + slot_leader: Some(slot_leader.clone()), + timed_out: false, + priority_fees: transaction_record.priority_fees, + }); + } + + transaction_map.remove(&signature); } } } + // push block data + { + let _ = tx_block_data.send(BlockData { + block_hash: block.blockhash, + block_leader: slot_leader, + block_slot: slot, + block_time: if let Some(time) = block.block_time { + time as u64 + } else { + 0 + }, + number_of_mm_transactions: mm_transaction_count, + total_transactions: nb_transactions as u64, + cu_consumed: cu_consumed, + }); + } } } pub fn confirmations_by_blocks( client: Arc, - recv_limit: usize, - tx_record_rx: Receiver, - tx_confirm_records: Arc>>, - tx_timeout_records: Arc>>, - tx_block_data: Arc>>, + mut tx_record_rx: UnboundedReceiver, + tx_confirm_records: Sender, + tx_block_data: Sender, from_slot: u64, -) { - let mut recv_until_confirm = recv_limit; - let transaction_map = Arc::new(RwLock::new( - HashMap::::new(), - )); - while recv_until_confirm != 0 { - match tx_record_rx.try_recv() { - Ok(tx_record) => { - let mut transaction_map = transaction_map.write().unwrap(); - debug!( - "add to queue len={} sig={}", - transaction_map.len() + 1, - tx_record.signature - ); - transaction_map.insert(tx_record.signature, tx_record); - recv_until_confirm -= 1; - } - Err(TryRecvError::Empty) => { - debug!("channel emptied"); - sleep(Duration::from_millis(100)); - } - Err(TryRecvError::Disconnected) => { - { - info!("channel disconnected {}", recv_until_confirm); - } - debug!("channel disconnected"); - break; // still confirm remaining transctions - } - } - } - println!("finished mapping all the trasactions"); - sleep(Duration::from_secs(30)); - let commitment_confirmation = CommitmentConfig { - commitment: CommitmentLevel::Confirmed, - }; - let block_res = client - .get_blocks_with_commitment(from_slot, None, commitment_confirmation) - .unwrap(); +) -> Vec> { + let transaction_map = Arc::new(DashMap::new()); + let do_exit = Arc::new(AtomicBool::new(false)); - let nb_blocks = block_res.len(); - let nb_thread: usize = 16; - println!("processing {} blocks", nb_blocks); - - let mut join_handles = Vec::new(); - for slot_batch in block_res - .chunks(if nb_blocks > nb_thread { - nb_blocks.div(nb_thread) - } else { - nb_blocks - }) - .map(|x| x.to_vec()) - { - let map = transaction_map.clone(); - let client = client.clone(); - let tx_confirm_records = tx_confirm_records.clone(); - let tx_block_data = tx_block_data.clone(); - let joinble = Builder::new() - .name("getting blocks and searching transactions".to_string()) - .spawn(move || { - for slot in slot_batch { - // retry search for block 10 times - let mut block = None; - for _i in 0..=10 { - let block_res = client - .get_block_with_config( - slot, - RpcBlockConfig { - encoding: None, - transaction_details: None, - rewards: None, - commitment: Some(commitment_confirmation), - max_supported_transaction_version: None, - }, + let map_filler_jh = { + let transaction_map = transaction_map.clone(); + let do_exit = do_exit.clone(); + tokio::spawn(async move { + loop { + match tx_record_rx.recv().await { + Some(tx_record) => { + debug!( + "add to queue len={} sig={}", + transaction_map.len() + 1, + tx_record.signature ); - - match block_res { - Ok(x) => { - block = Some(x); - break; - }, - _=>{ - // do nothing - } - } + transaction_map.insert(tx_record.signature, (tx_record, Instant::now())); } - let block = match block { - Some(x) => x, - None => continue, - }; - let mut mm_transaction_count: u64 = 0; - let rewards = &block.rewards.unwrap(); - let slot_leader = match rewards - .iter() - .find(|r| r.reward_type == Some(RewardType::Fee)) - { - Some(x) => x.pubkey.clone(), - None=> "".to_string(), - }; - - if let Some(transactions) = block.transactions { - let nb_transactions = transactions.len(); - let mut cu_consumed : u64 = 0; - for solana_transaction_status::EncodedTransactionWithStatusMeta { - transaction, - meta, - .. - } in transactions - { - if let solana_transaction_status::EncodedTransaction::Json( - transaction, - ) = transaction - { - for signature in transaction.signatures { - let signature = Signature::from_str(&signature).unwrap(); - - let transaction_record_op = { - let map = map.read().unwrap(); - let rec = map.get(&signature); - match rec { - Some(x) => Some(x.clone()), - None => None, - } - }; - // add CU in counter - if let Some(meta) = &meta { - match meta.compute_units_consumed { - solana_transaction_status::option_serializer::OptionSerializer::Some(x) => { - cu_consumed = cu_consumed.saturating_add(x); - }, - _ => {}, - } - } - if let Some(transaction_record) = transaction_record_op { - let mut lock = tx_confirm_records.write().unwrap(); - mm_transaction_count += 1; - - (*lock).push(TransactionConfirmRecord { - signature: transaction_record.signature.to_string(), - confirmed_slot: slot, // TODO: should be changed to correct slot - confirmed_at: Utc::now().to_string(), - sent_at: transaction_record.sent_at.to_string(), - sent_slot: transaction_record.sent_slot, - successful: if let Some(meta) = &meta { - meta.status.is_ok() - } else { - false - }, - error: if let Some(meta) = &meta { - match &meta.err { - Some(x) => x.to_string(), - None => "".to_string(), - } - } else { - "".to_string() - }, - block_hash: block.blockhash.clone(), - market: transaction_record.market.to_string(), - market_maker: transaction_record.market_maker.to_string(), - slot_processed: slot, - slot_leader: slot_leader.clone(), - timed_out: false, - priority_fees: transaction_record.priority_fees, - }) - } - - map.write().unwrap().remove(&signature); - } - } - } - // push block data - { - let mut blockstats_writer = tx_block_data.write().unwrap(); - blockstats_writer.push(BlockData { - block_hash: block.blockhash, - block_leader: slot_leader, - block_slot: slot, - block_time: if let Some(time) = block.block_time { - time as u64 - } else { - 0 - }, - number_of_mm_transactions: mm_transaction_count, - total_transactions: nb_transactions as u64, - cu_consumed: cu_consumed, - }) - } + None => { + do_exit.store(true, Ordering::Relaxed); + break; } } - }) - .unwrap(); - join_handles.push(joinble); - } - for handle in join_handles { - handle.join().unwrap(); - } + } + }) + }; - let mut timeout_writer = tx_timeout_records.write().unwrap(); - for x in transaction_map.read().unwrap().iter() { - timeout_writer.push(x.1.clone()) - } + let cleaner_jh = { + let transaction_map = transaction_map.clone(); + let do_exit = do_exit.clone(); + let tx_confirm_records = tx_confirm_records.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + { + transaction_map.retain(|signature, (sent_record, instant)| { + let retain = instant.elapsed() > Duration::from_secs(120); - // sort all blocks by slot and print info - { - let mut blockstats_writer = tx_block_data.write().unwrap(); - blockstats_writer.sort_by(|a, b| a.block_slot.partial_cmp(&b.block_slot).unwrap()); - for block_stat in blockstats_writer.iter() { - info!( - "block {} at slot {} contains {} transactions and consumerd {} CUs", - block_stat.block_hash, - block_stat.block_slot, - block_stat.total_transactions, - block_stat.cu_consumed, - ); - } - } + // add to timeout if not retaining + if !retain { + let _ = tx_confirm_records.send(TransactionConfirmRecord { + signature: signature.to_string(), + confirmed_slot: None, + confirmed_at: None, + sent_at: sent_record.sent_at.to_string(), + sent_slot: sent_record.sent_slot, + successful: false, + error: Some("timeout".to_string()), + block_hash: None, + market: sent_record.market.to_string(), + market_maker: sent_record.market_maker.to_string(), + slot_processed: None, + slot_leader: None, + timed_out: true, + priority_fees: sent_record.priority_fees, + }); + } + + retain + }); + + // if exit and all the transactions are processed + if do_exit.load(Ordering::Relaxed) && transaction_map.len() == 0 { + break; + } + } + } + }) + }; + + let block_confirmation_jh = { + let do_exit = do_exit.clone(); + tokio::spawn(async move { + let mut start_block = from_slot; + let mut start_instant = tokio::time::Instant::now(); + let refresh_in = Duration::from_secs(10); + let commitment_confirmation = CommitmentConfig { + commitment: CommitmentLevel::Confirmed, + }; + loop { + if do_exit.load(Ordering::Relaxed) && transaction_map.len() == 0 { + break; + } + + let wait_duration = tokio::time::Instant::now() - start_instant; + if wait_duration < refresh_in { + tokio::time::sleep(refresh_in - wait_duration).await; + } + start_instant = tokio::time::Instant::now(); + + let block_slots = client + .get_blocks_with_commitment(start_block, None, commitment_confirmation) + .await + .unwrap(); + if block_slots.is_empty() { + continue; + } + start_block = *block_slots.last().unwrap(); + + let blocks = block_slots.iter().map(|slot| { + client.get_block_with_config( + *slot, + RpcBlockConfig { + encoding: None, + transaction_details: None, + rewards: None, + commitment: Some(commitment_confirmation), + max_supported_transaction_version: None, + }, + ) + }); + let blocks = futures::future::join_all(blocks).await; + for block_slot in blocks.iter().zip(block_slots) { + let block = match block_slot.0 { + Ok(x) => x, + Err(_) => continue, + }; + let tx_confirm_records = tx_confirm_records.clone(); + let tx_block_data = tx_block_data.clone(); + let transaction_map = transaction_map.clone(); + process_blocks( + block.clone(), + tx_confirm_records, + tx_block_data, + transaction_map, + block_slot.1, + ) + .await; + } + } + }) + }; + vec![map_filler_jh, cleaner_jh, block_confirmation_jh] } diff --git a/src/crank.rs b/src/crank.rs index 0686da4..a4de601 100644 --- a/src/crank.rs +++ b/src/crank.rs @@ -1,14 +1,3 @@ -use std::{ - str::FromStr, - sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, RwLock, - }, - thread::Builder, - time::Duration, -}; - -// use solana_client::rpc_client::RpcClient; use crate::{ account_write_filter::{self, AccountWriteRoute}, grpc_plugin_source::FilterConfig, @@ -18,7 +7,7 @@ use crate::{ states::TransactionSendRecord, websocket_source::{self, KeeperConfig}, }; -use crossbeam_channel::{unbounded, Sender}; +use async_channel::unbounded; use log::*; use solana_client::tpu_client::TpuClient; use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; @@ -26,13 +15,20 @@ use solana_sdk::{ hash::Hash, instruction::Instruction, pubkey::Pubkey, signature::Keypair, signer::Signer, transaction::Transaction, }; +use std::{ + str::FromStr, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::sync::{mpsc::UnboundedSender, RwLock}; pub fn start( config: KeeperConfig, - _tx_record_sx: Sender, exit_signal: Arc, blockhash: Arc>, - _current_slot: Arc, tpu_client: Arc>, group: &GroupConfig, identity: &Keypair, @@ -61,41 +57,38 @@ pub fn start( let (instruction_sender, instruction_receiver) = unbounded::>(); let identity = Keypair::from_bytes(identity.to_bytes().as_slice()).unwrap(); - Builder::new() - .name("crank-tx-sender".into()) - .spawn(move || { - info!( - "crank-tx-sender signing with keypair pk={:?}", - identity.pubkey() - ); - loop { - if exit_signal.load(Ordering::Acquire) { - break; - } - - if let Ok(ixs) = instruction_receiver.recv() { - // TODO add priority fee - - let tx = Transaction::new_signed_with_payer( - &ixs, - Some(&identity.pubkey()), - &[&identity], - *blockhash.read().unwrap(), - ); - // TODO: find perp market pk and resolve import issue between solana program versions - // tx_record_sx.send(TransactionSendRecord { - // signature: tx.signatures[0], - // sent_at: Utc::now(), - // sent_slot: current_slot.load(Ordering::Acquire), - // market_maker: identity.pubkey(), - // market: c.perp_market_pk, - // }); - let ok = tpu_client.send_transaction(&tx); - trace!("send tx={:?} ok={ok}", tx.signatures[0]); - } + tokio::spawn(async move { + info!( + "crank-tx-sender signing with keypair pk={:?}", + identity.pubkey() + ); + loop { + if exit_signal.load(Ordering::Acquire) { + break; } - }) - .unwrap(); + + if let Ok(ixs) = instruction_receiver.recv().await { + // TODO add priority fee + + let tx = Transaction::new_signed_with_payer( + &ixs, + Some(&identity.pubkey()), + &[&identity], + *blockhash.read().await, + ); + // TODO: find perp market pk and resolve import issue between solana program versions + // tx_record_sx.send(TransactionSendRecord { + // signature: tx.signatures[0], + // sent_at: Utc::now(), + // sent_slot: current_slot.load(Ordering::Acquire), + // market_maker: identity.pubkey(), + // market: c.perp_market_pk, + // }); + let ok = tpu_client.send_transaction(&tx); + trace!("send tx={:?} ok={ok}", tx.signatures[0]); + } + } + }); tokio::spawn(async move { let metrics_tx = metrics::start( diff --git a/src/helpers.rs b/src/helpers.rs index 2b1c76b..7cd4053 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -3,9 +3,8 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, RwLock, + Arc, }, - thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -17,11 +16,9 @@ use mango_common::Loadable; use solana_client::rpc_client::RpcClient; use solana_program::{clock::DEFAULT_MS_PER_SLOT, pubkey::Pubkey}; use solana_sdk::hash::Hash; +use tokio::{sync::RwLock, task::JoinHandle}; -use crate::{ - mango::GroupConfig, - states::{BlockData, PerpMarketCache, TransactionConfirmRecord, TransactionSendRecord}, -}; +use crate::{mango::GroupConfig, states::PerpMarketCache}; // as there are similar modules solana_sdk and solana_program // solana internals use solana_sdk but external dependancies like mango use solana program @@ -61,19 +58,19 @@ pub fn load_from_rpc(rpc_client: &RpcClient, pk: &Pubkey) -> T { return T::load_from_bytes(acc.data.as_slice()).unwrap().clone(); } -pub fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash { +pub async fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash { loop { match rpc_client.get_latest_blockhash() { Ok(blockhash) => return blockhash, Err(err) => { info!("Couldn't get last blockhash: {:?}", err); - sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_secs(1)).await; } }; } } -pub fn get_new_latest_blockhash(client: Arc, blockhash: &Hash) -> Option { +pub async fn get_new_latest_blockhash(client: Arc, blockhash: &Hash) -> Option { let start = Instant::now(); while start.elapsed().as_secs() < 5 { if let Ok(new_blockhash) = client.get_latest_blockhash() { @@ -85,12 +82,12 @@ pub fn get_new_latest_blockhash(client: Arc, blockhash: &Hash) -> Opt debug!("Got same blockhash ({:?}), will retry...", blockhash); // Retry ~twice during a slot - sleep(Duration::from_millis(DEFAULT_MS_PER_SLOT / 2)); + tokio::time::sleep(Duration::from_millis(DEFAULT_MS_PER_SLOT / 2)).await; } None } -pub fn poll_blockhash_and_slot( +pub async fn poll_blockhash_and_slot( exit_signal: Arc, blockhash: Arc>, slot: &AtomicU64, @@ -100,8 +97,9 @@ pub fn poll_blockhash_and_slot( //let mut last_error_log = Instant::now(); loop { let client = client.clone(); - let old_blockhash = *blockhash.read().unwrap(); + let old_blockhash = *blockhash.read().await; if exit_signal.load(Ordering::Relaxed) { + info!("Stopping blockhash thread"); break; } @@ -110,9 +108,9 @@ pub fn poll_blockhash_and_slot( slot.store(new_slot, Ordering::Release); } - if let Some(new_blockhash) = get_new_latest_blockhash(client, &old_blockhash) { + if let Some(new_blockhash) = get_new_latest_blockhash(client, &old_blockhash).await { { - *blockhash.write().unwrap() = new_blockhash; + *blockhash.write().await = new_blockhash; } blockhash_last_updated = Instant::now(); } else { @@ -121,7 +119,7 @@ pub fn poll_blockhash_and_slot( } } - sleep(Duration::from_millis(100)); + tokio::time::sleep(Duration::from_millis(100)).await; } } @@ -129,79 +127,21 @@ pub fn seconds_since(dt: DateTime) -> i64 { Utc::now().signed_duration_since(dt).num_seconds() } -pub fn write_transaction_data_into_csv( - transaction_save_file: String, - tx_confirm_records: Arc>>, - tx_timeout_records: Arc>>, -) { - if transaction_save_file.is_empty() { - return; - } - let mut writer = csv::Writer::from_path(transaction_save_file).unwrap(); - { - let rd_lock = tx_confirm_records.read().unwrap(); - for confirm_record in rd_lock.iter() { - writer.serialize(confirm_record).unwrap(); - } - - let timeout_lk = tx_timeout_records.read().unwrap(); - for timeout_record in timeout_lk.iter() { - writer - .serialize(TransactionConfirmRecord { - block_hash: "".to_string(), - confirmed_at: "".to_string(), - confirmed_slot: 0, - error: "Timeout".to_string(), - market: timeout_record.market.to_string(), - market_maker: timeout_record.market_maker.to_string(), - sent_at: timeout_record.sent_at.to_string(), - sent_slot: timeout_record.sent_slot, - signature: timeout_record.signature.to_string(), - slot_leader: "".to_string(), - slot_processed: 0, - successful: false, - timed_out: true, - priority_fees: timeout_record.priority_fees, - }) - .unwrap(); - } - } - writer.flush().unwrap(); -} - -pub fn write_block_data_into_csv( - block_data_csv: String, - tx_block_data: Arc>>, -) { - if block_data_csv.is_empty() { - return; - } - let mut writer = csv::Writer::from_path(block_data_csv).unwrap(); - let data = tx_block_data.read().unwrap(); - - for d in data.iter().filter(|x| x.number_of_mm_transactions > 0) { - writer.serialize(d).unwrap(); - } - writer.flush().unwrap(); -} - pub fn start_blockhash_polling_service( exit_signal: Arc, blockhash: Arc>, current_slot: Arc, client: Arc, ) -> JoinHandle<()> { - Builder::new() - .name("solana-blockhash-poller".to_string()) - .spawn(move || { - poll_blockhash_and_slot( - exit_signal, - blockhash.clone(), - current_slot.as_ref(), - client, - ); - }) - .unwrap() + tokio::spawn(async move { + poll_blockhash_and_slot( + exit_signal, + blockhash.clone(), + current_slot.as_ref(), + client, + ) + .await; + }) } pub fn get_mango_market_perps_cache( diff --git a/src/keeper.rs b/src/keeper.rs index 02094fb..d61cdf5 100644 --- a/src/keeper.rs +++ b/src/keeper.rs @@ -8,13 +8,11 @@ use { hash::Hash, instruction::Instruction, message::Message, signature::Keypair, signer::Signer, transaction::Transaction, }, - std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, - }, - thread::{Builder, JoinHandle}, + std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, }, + tokio::{sync::RwLock, task::JoinHandle}, }; fn create_root_bank_update_instructions(perp_markets: &[PerpMarketCache]) -> Vec { @@ -99,14 +97,14 @@ fn create_cache_perp_markets_instructions(perp_markets: &[PerpMarketCache]) -> I to_sdk_instruction(ix) } -pub fn send_transaction( +pub async fn send_transaction( tpu_client: Arc>, ixs: &[Instruction], blockhash: Arc>, payer: &Keypair, ) { let mut tx = Transaction::new_unsigned(Message::new(ixs, Some(&payer.pubkey()))); - let recent_blockhash = blockhash.read().unwrap(); + let recent_blockhash = blockhash.read().await; tx.sign(&[payer], *recent_blockhash); tpu_client.send_transaction(&tx); } @@ -148,67 +146,66 @@ pub fn start_keepers( quote_node_banks: Vec, ) -> JoinHandle<()> { let authority = Keypair::from_bytes(&authority.to_bytes()).unwrap(); - Builder::new() - .name("updating root bank keeper".to_string()) - .spawn(move || { - let root_update_ixs = create_root_bank_update_instructions(&perp_markets); - let cache_prices = create_update_price_cache_instructions(&perp_markets); - let update_perp_cache = create_cache_perp_markets_instructions(&perp_markets); - let cache_root_bank_ix = create_cache_root_bank_instruction(&perp_markets); - let update_funding_ix = create_update_fundings_instructions(&perp_markets); - let quote_root_bank_ix = create_update_and_cache_quote_banks( - &perp_markets, - quote_root_bank, - quote_node_banks, - ); + tokio::spawn(async move { + let root_update_ixs = create_root_bank_update_instructions(&perp_markets); + let cache_prices = create_update_price_cache_instructions(&perp_markets); + let update_perp_cache = create_cache_perp_markets_instructions(&perp_markets); + let cache_root_bank_ix = create_cache_root_bank_instruction(&perp_markets); + let update_funding_ix = create_update_fundings_instructions(&perp_markets); + let quote_root_bank_ix = + create_update_and_cache_quote_banks(&perp_markets, quote_root_bank, quote_node_banks); - let blockhash = blockhash.clone(); + let blockhash = blockhash.clone(); - // add prioritization instruction - //let prioritization_ix = ComputeBudgetInstruction::set_compute_unit_price(10000); - //root_update_ixs.insert(0, prioritization_ix.clone()); + // add prioritization instruction + //let prioritization_ix = ComputeBudgetInstruction::set_compute_unit_price(10000); + //root_update_ixs.insert(0, prioritization_ix.clone()); - while !exit_signal.load(Ordering::Relaxed) { - send_transaction( - tpu_client.clone(), - &[cache_prices.clone()], - blockhash.clone(), - &authority, - ); + while !exit_signal.load(Ordering::Relaxed) { + send_transaction( + tpu_client.clone(), + &[cache_prices.clone()], + blockhash.clone(), + &authority, + ) + .await; - send_transaction( - tpu_client.clone(), - quote_root_bank_ix.as_slice(), - blockhash.clone(), - &authority, - ); + send_transaction( + tpu_client.clone(), + quote_root_bank_ix.as_slice(), + blockhash.clone(), + &authority, + ) + .await; - for updates in update_funding_ix.chunks(3) { - send_transaction(tpu_client.clone(), updates, blockhash.clone(), &authority); - } - - send_transaction( - tpu_client.clone(), - root_update_ixs.as_slice(), - blockhash.clone(), - &authority, - ); - - send_transaction( - tpu_client.clone(), - &[update_perp_cache.clone()], - blockhash.clone(), - &authority, - ); - - send_transaction( - tpu_client.clone(), - &[cache_root_bank_ix.clone()], - blockhash.clone(), - &authority, - ); - std::thread::sleep(std::time::Duration::from_secs(1)); + for updates in update_funding_ix.chunks(3) { + send_transaction(tpu_client.clone(), updates, blockhash.clone(), &authority).await; } - }) - .unwrap() + + send_transaction( + tpu_client.clone(), + root_update_ixs.as_slice(), + blockhash.clone(), + &authority, + ) + .await; + + send_transaction( + tpu_client.clone(), + &[update_perp_cache.clone()], + blockhash.clone(), + &authority, + ) + .await; + + send_transaction( + tpu_client.clone(), + &[cache_root_bank_ix.clone()], + blockhash.clone(), + &authority, + ) + .await; + std::thread::sleep(std::time::Duration::from_secs(1)); + } + }) } diff --git a/src/lib.rs b/src/lib.rs index b3e30e3..c31b76c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,11 @@ pub mod mango; pub mod mango_v3_perp_crank_sink; pub mod market_markers; pub mod metrics; +pub mod result_writer; pub mod rotating_queue; pub mod states; +pub mod stats; +pub mod tpu_manager; pub mod websocket_source; trait AnyhowWrap { diff --git a/src/main.rs b/src/main.rs index 1f99837..8c39014 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use { - log::{error, info}, + log::info, serde_json, simulate_mango_v3::{ cli, @@ -7,71 +7,35 @@ use { crank, helpers::{ get_latest_blockhash, get_mango_market_perps_cache, start_blockhash_polling_service, - to_sdk_pk, write_block_data_into_csv, write_transaction_data_into_csv, + to_sdk_pk, }, keeper::start_keepers, mango::{AccountKeys, MangoConfig}, market_markers::start_market_making_threads, - states::{BlockData, PerpMarketCache, TransactionConfirmRecord, TransactionSendRecord}, + result_writer::initialize_result_writers, + states::PerpMarketCache, + stats::MangoSimulationStats, websocket_source::KeeperConfig, }, solana_client::{ - connection_cache::ConnectionCache, rpc_client::RpcClient, tpu_client::TpuClient, + connection_cache::ConnectionCache, nonblocking::rpc_client::RpcClient as NbRpcClient, + rpc_client::RpcClient, tpu_client::TpuClient, }, - solana_metrics::datapoint_info, solana_program::pubkey::Pubkey, solana_sdk::commitment_config::CommitmentConfig, std::{ fs, net::{IpAddr, Ipv4Addr}, str::FromStr, - sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, RwLock, - }, - thread::sleep, - thread::{Builder, JoinHandle}, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, + sync::Arc, time::Duration, }, + tokio::{sync::RwLock, task::JoinHandle}, }; -#[derive(Default)] -struct MangoBencherStats { - recv_limit: usize, - num_confirmed_txs: usize, - num_error_txs: usize, - num_timeout_txs: usize, -} - -impl MangoBencherStats { - fn report(&self, name: &'static str) { - datapoint_info!( - name, - ("recv_limit", self.recv_limit, i64), - ("num_confirmed_txs", self.num_confirmed_txs, i64), - ("num_error_txs", self.num_error_txs, i64), - ("num_timeout_txs", self.num_timeout_txs, i64), - ( - "percent_confirmed_txs", - (self.num_confirmed_txs * 100) / self.recv_limit, - i64 - ), - ( - "percent_error_txs", - (self.num_error_txs * 100) / self.recv_limit, - i64 - ), - ( - "percent_timeout_txs", - (self.num_timeout_txs * 100) / self.recv_limit, - i64 - ), - ); - } -} - -#[tokio::main] -async fn main() { +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] +pub async fn main() -> anyhow::Result<()> { solana_logger::setup_with_default("solana=info"); solana_metrics::set_panic_hook("bench-mango", /*version:*/ None); @@ -125,6 +89,11 @@ async fn main() { CommitmentConfig::confirmed(), )); + let nb_rpc_client = Arc::new(NbRpcClient::new_with_commitment( + json_rpc_url.to_string(), + CommitmentConfig::confirmed(), + )); + let connection_cache = ConnectionCache::new_with_client_options( 4, None, @@ -153,18 +122,28 @@ async fn main() { mango_group_config.perp_markets.len(), quotes_per_second, account_keys_parsed.len() - * mango_group_config.perp_markets.len() + * number_of_markers_per_mm as usize * quotes_per_second.clone() as usize, duration ); + let nb_users = account_keys_parsed.len(); + + let mango_sim_stats = MangoSimulationStats::new( + nb_users, + *quotes_per_second as usize, + number_of_markers_per_mm as usize, + duration.as_secs() as usize, + ); + // continuosly fetch blockhash let rpc_client = Arc::new(RpcClient::new_with_commitment( json_rpc_url.to_string(), CommitmentConfig::confirmed(), )); let exit_signal = Arc::new(AtomicBool::new(false)); - let blockhash = Arc::new(RwLock::new(get_latest_blockhash(&rpc_client.clone()))); + let latest_blockhash = get_latest_blockhash(&rpc_client.clone()).await; + let blockhash = Arc::new(RwLock::new(latest_blockhash)); let current_slot = Arc::new(AtomicU64::new(0)); let blockhash_thread = start_blockhash_polling_service( exit_signal.clone(), @@ -202,7 +181,7 @@ async fn main() { None }; - let (tx_record_sx, tx_record_rx) = crossbeam_channel::unbounded(); + let (tx_record_sx, tx_record_rx) = tokio::sync::mpsc::unbounded_channel(); let from_slot = current_slot.load(Ordering::Relaxed); let keeper_config = KeeperConfig { program_id: to_sdk_pk(&mango_program_pk), @@ -212,16 +191,18 @@ async fn main() { crank::start( keeper_config, - tx_record_sx.clone(), exit_signal.clone(), blockhash.clone(), - current_slot.clone(), tpu_client.clone(), mango_group_config, id, ); - let mm_threads: Vec> = start_market_making_threads( + let warmup_duration = Duration::from_secs(10); + info!("waiting for keepers to warmup for {warmup_duration:?}"); + tokio::time::sleep(warmup_duration).await; + + let mut tasks: Vec> = start_market_making_threads( account_keys_parsed.clone(), perp_market_caches.clone(), tx_record_sx.clone(), @@ -231,125 +212,48 @@ async fn main() { tpu_client.clone(), &duration, *quotes_per_second, - *txs_batch_size, *priority_fees_proba, number_of_markers_per_mm, ); - let duration = duration.clone(); - let quotes_per_second = quotes_per_second.clone(); - let account_keys_parsed = account_keys_parsed.clone(); - let tx_confirm_records: Arc>> = - Arc::new(RwLock::new(Vec::new())); - let tx_timeout_records: Arc>> = - Arc::new(RwLock::new(Vec::new())); - - let tx_block_data = Arc::new(RwLock::new(Vec::::new())); - - let confirmation_thread = Builder::new() - .name("solana-client-sender".to_string()) - .spawn(move || { - let mut stats = MangoBencherStats::default(); - - stats.recv_limit = account_keys_parsed.len() - * number_of_markers_per_mm as usize - * duration.as_secs() as usize - * quotes_per_second as usize; - - //confirmation_by_querying_rpc(recv_limit, rpc_client.clone(), &tx_record_rx, tx_confirm_records.clone(), tx_timeout_records.clone()); - confirmations_by_blocks( - rpc_client.clone(), - stats.recv_limit, - tx_record_rx, - tx_confirm_records.clone(), - tx_timeout_records.clone(), - tx_block_data.clone(), - from_slot, - ); - - let confirmed: Vec = { - let lock = tx_confirm_records.write().unwrap(); - (*lock).clone() - }; - stats.num_confirmed_txs = confirmed.len(); - - info!( - "confirmed {} signatures of {} rate {}%", - stats.num_confirmed_txs, - stats.recv_limit, - (stats.num_confirmed_txs * 100) / stats.recv_limit - ); - stats.num_error_txs = confirmed.iter().filter(|tx| !tx.error.is_empty()).count(); - info!( - "errors counted {} rate {}%", - stats.num_error_txs, - (stats.num_error_txs as usize * 100) / stats.recv_limit - ); - let timeouts: Vec = { - let timeouts = tx_timeout_records.clone(); - let lock = timeouts.write().unwrap(); - (*lock).clone() - }; - stats.num_timeout_txs = timeouts.len(); - info!( - "timeouts counted {} rate {}%", - stats.num_timeout_txs, - (stats.num_timeout_txs * 100) / stats.recv_limit - ); - stats.report("mango-bencher"); - // metrics are submitted every 10s, - // it is necessary only because we do it once before the end of the execution - sleep(Duration::from_secs(10)); - - // let mut confirmation_times = confirmed - // .iter() - // .map(|r| { - // r.confirmed_at - // .signed_duration_since(r.sent_at) - // .num_milliseconds() - // }) - // .collect::>(); - // confirmation_times.sort(); - // info!( - // "confirmation times min={} max={} median={}", - // confirmation_times.first().unwrap(), - // confirmation_times.last().unwrap(), - // confirmation_times[confirmation_times.len() / 2] - // ); - - write_transaction_data_into_csv( - transaction_save_file, - tx_confirm_records, - tx_timeout_records, - ); - - write_block_data_into_csv(block_data_save_file, tx_block_data); - }) - .unwrap(); - - for t in mm_threads { - if let Err(err) = t.join() { - error!("mm join failed with: {:?}", err); - } - } - - info!("joined all mm_threads"); - - if let Err(err) = confirmation_thread.join() { - error!("confirmation join fialed with: {:?}", err); - } - - info!("joined confirmation thread"); - - exit_signal.store(true, Ordering::Relaxed); - - if let Err(err) = blockhash_thread.join() { - error!("blockhash join failed with: {:?}", err); - } + info!("Number of MM threads {}", tasks.len()); + drop(tx_record_sx); + tasks.push(blockhash_thread); if let Some(keepers_jl) = keepers_jl { - if let Err(err) = keepers_jl.join() { - error!("keeper join failed with: {:?}", err); - } + tasks.push(keepers_jl); } + + let (tx_status_sx, tx_status_rx) = async_channel::unbounded(); + let (block_status_sx, block_status_rx) = async_channel::unbounded(); + + let mut writers_jh = initialize_result_writers( + transaction_save_file, + block_data_save_file, + tx_status_rx.clone(), + block_status_rx.clone(), + ); + tasks.append(&mut writers_jh); + + let stats_handle = + mango_sim_stats.update_from_tx_status_stream(tx_status_rx.clone(), exit_signal.clone()); + tasks.push(stats_handle); + + let mut confirmation_threads = confirmations_by_blocks( + nb_rpc_client, + tx_record_rx, + tx_status_sx, + block_status_sx, + from_slot, + ); + tasks.append(&mut confirmation_threads); + + let nb_tasks = tasks.len(); + for i in 0..nb_tasks { + println!("waiting for {}", i); + let task = tasks.remove(0); + let _ = task.await; + } + mango_sim_stats.report("Mango simulation stats"); + Ok(()) } diff --git a/src/mango_v3_perp_crank_sink.rs b/src/mango_v3_perp_crank_sink.rs index 81e0183..347bff9 100644 --- a/src/mango_v3_perp_crank_sink.rs +++ b/src/mango_v3_perp_crank_sink.rs @@ -1,8 +1,8 @@ use std::{cell::RefCell, collections::BTreeMap, convert::TryFrom, mem::size_of}; use arrayref::array_ref; +use async_channel::Sender; use async_trait::async_trait; -use crossbeam_channel::Sender; use log::*; use mango::{ instruction::consume_events, @@ -135,7 +135,7 @@ impl AccountWriteSink for MangoV3PerpCrankSink { // event_queue.iter().count() // ); - if let Err(e) = self.instruction_sender.send(vec![ix?]) { + if let Err(e) = self.instruction_sender.send(vec![ix?]).await { return Err(e.to_string()); } diff --git a/src/market_markers.rs b/src/market_markers.rs index bda457d..251a45c 100644 --- a/src/market_markers.rs +++ b/src/market_markers.rs @@ -2,16 +2,14 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, RwLock, + Arc, }, - thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }; use chrono::Utc; -use crossbeam_channel::Sender; use iter_tools::Itertools; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use mango::{ instruction::{cancel_all_perp_orders, place_perp_order2}, matching::Side, @@ -24,6 +22,7 @@ use solana_sdk::{ compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair, signer::Signer, transaction::Transaction, }; +use tokio::{sync::mpsc::UnboundedSender, sync::RwLock, task::JoinHandle}; use crate::{ helpers::{to_sdk_instruction, to_sp_pk}, @@ -152,10 +151,10 @@ fn generate_random_fees( .collect() } -pub fn send_mm_transactions( +pub async fn send_mm_transactions( quotes_per_second: u64, perp_market_caches: &Vec, - tx_record_sx: &Sender, + tx_record_sx: &UnboundedSender, tpu_client: Arc>, mango_account_pk: Pubkey, mango_account_signer: &Keypair, @@ -181,9 +180,8 @@ pub fn send_mm_transactions( prioritization_fee, ); - if let Ok(recent_blockhash) = blockhash.read() { - tx.sign(&[mango_account_signer], *recent_blockhash); - } + let recent_blockhash = *blockhash.read().await; + tx.sign(&[mango_account_signer], recent_blockhash); tpu_client.send_transaction(&tx); let sent = tx_record_sx.send(TransactionSendRecord { @@ -204,103 +202,24 @@ pub fn send_mm_transactions( } } -pub fn send_mm_transactions_batched( - txs_batch_size: usize, - quotes_per_second: u64, - perp_market_caches: &Vec, - tx_record_sx: &Sender, - tpu_client: Arc>, - mango_account_pk: Pubkey, - mango_account_signer: &Keypair, - blockhash: Arc>, - slot: &AtomicU64, - prioritization_fee_proba: u8, -) { - let mut transactions = Vec::<_>::with_capacity(txs_batch_size); - - let mango_account_signer_pk = to_sp_pk(&mango_account_signer.pubkey()); - // update quotes 2x per second - for _ in 0..quotes_per_second { - for c in perp_market_caches.iter() { - let prioritization_fee_for_tx = - generate_random_fees(prioritization_fee_proba, txs_batch_size, 100, 1000); - for i in 0..txs_batch_size { - let prioritization_fee = prioritization_fee_for_tx[i]; - transactions.push(( - create_ask_bid_transaction( - c, - mango_account_pk, - &mango_account_signer, - prioritization_fee, - ), - prioritization_fee, - )); - } - - if let Ok(recent_blockhash) = blockhash.read() { - for tx in &mut transactions { - tx.0.sign(&[mango_account_signer], *recent_blockhash); - } - } - - if tpu_client - .try_send_transaction_batch( - &transactions - .iter() - .map(|x| x.0.clone()) - .collect_vec() - .as_slice(), - ) - .is_err() - { - error!("Sending batch failed"); - continue; - } - - for tx in &transactions { - let sent = tx_record_sx.send(TransactionSendRecord { - signature: tx.0.signatures[0], - sent_at: Utc::now(), - sent_slot: slot.load(Ordering::Acquire), - market_maker: mango_account_signer_pk, - market: c.perp_market_pk, - priority_fees: tx.1 as u64, - }); - if sent.is_err() { - error!( - "sending error on channel : {}", - sent.err().unwrap().to_string() - ); - } - } - transactions.clear(); - } - } -} - pub fn start_market_making_threads( account_keys_parsed: Vec, perp_market_caches: Vec, - tx_record_sx: Sender, + tx_record_sx: UnboundedSender, exit_signal: Arc, blockhash: Arc>, current_slot: Arc, tpu_client: Arc>, duration: &Duration, quotes_per_second: u64, - txs_batch_size: Option, prioritization_fee_proba: u8, number_of_markers_per_mm: u8, ) -> Vec> { - let warmup_duration = Duration::from_secs(10); - info!("waiting for keepers to warmup for {warmup_duration:?}"); - sleep(warmup_duration); - let mut rng = rand::thread_rng(); account_keys_parsed .iter() .map(|account_keys| { - let _exit_signal = exit_signal.clone(); + let exit_signal = exit_signal.clone(); let blockhash = blockhash.clone(); let current_slot = current_slot.clone(); let duration = duration.clone(); @@ -322,53 +241,40 @@ pub fn start_market_making_threads( .map(|x| x.clone()) .collect_vec(); - Builder::new() - .name("solana-client-sender".to_string()) - .spawn(move || { - for _i in 0..duration.as_secs() { - let start = Instant::now(); - - // send market maker transactions - if let Some(txs_batch_size) = txs_batch_size.clone() { - send_mm_transactions_batched( - txs_batch_size, - quotes_per_second, - &perp_market_caches, - &tx_record_sx, - tpu_client.clone(), - mango_account_pk, - &mango_account_signer, - blockhash.clone(), - current_slot.as_ref(), - prioritization_fee_proba, - ); - } else { - send_mm_transactions( - quotes_per_second, - &perp_market_caches, - &tx_record_sx, - tpu_client.clone(), - mango_account_pk, - &mango_account_signer, - blockhash.clone(), - current_slot.as_ref(), - prioritization_fee_proba, - ); - } - - let elapsed_millis: u64 = start.elapsed().as_millis() as u64; - if elapsed_millis < 950 { - // 50 ms is reserved for other stuff - sleep(Duration::from_millis(950 - elapsed_millis)); - } else { - warn!( - "time taken to send transactions is greater than 1000ms {}", - elapsed_millis - ); - } + tokio::spawn(async move { + for _i in 0..duration.as_secs() { + if exit_signal.load(Ordering::Relaxed) { + break; } - }) - .unwrap() + + let start = Instant::now(); + + // send market maker transactions + send_mm_transactions( + quotes_per_second, + &perp_market_caches, + &tx_record_sx, + tpu_client.clone(), + mango_account_pk, + &mango_account_signer, + blockhash.clone(), + current_slot.as_ref(), + prioritization_fee_proba, + ) + .await; + + let elapsed_millis: u64 = start.elapsed().as_millis() as u64; + if elapsed_millis < 1000 { + tokio::time::sleep(Duration::from_millis(1000 - elapsed_millis)).await; + } else { + warn!( + "time taken to send transactions is greater than 1000ms {}", + elapsed_millis + ); + } + } + println!("stopping mm thread"); + }) }) .collect() } diff --git a/src/result_writer.rs b/src/result_writer.rs new file mode 100644 index 0000000..6bad889 --- /dev/null +++ b/src/result_writer.rs @@ -0,0 +1,44 @@ +use async_channel::Receiver; +use tokio::task::JoinHandle; + +use crate::states::{BlockData, TransactionConfirmRecord}; + +pub fn initialize_result_writers( + transaction_save_file: String, + block_data_save_file: String, + tx_data: Receiver, + block_data: Receiver, +) -> Vec> { + let mut tasks = vec![]; + + if !transaction_save_file.is_empty() { + let tx_data_jh = tokio::spawn(async move { + let mut writer = csv::Writer::from_path(transaction_save_file).unwrap(); + loop { + if let Ok(record) = tx_data.recv().await { + writer.serialize(record).unwrap(); + } else { + break; + } + } + writer.flush().unwrap(); + }); + tasks.push(tx_data_jh); + } + + if !block_data_save_file.is_empty() { + let block_data_jh = tokio::spawn(async move { + let mut writer = csv::Writer::from_path(block_data_save_file).unwrap(); + loop { + if let Ok(record) = block_data.recv().await { + writer.serialize(record).unwrap(); + } else { + break; + } + } + writer.flush().unwrap(); + }); + tasks.push(block_data_jh); + } + tasks +} diff --git a/src/states.rs b/src/states.rs index a252a78..dee5da8 100644 --- a/src/states.rs +++ b/src/states.rs @@ -20,15 +20,15 @@ pub struct TransactionConfirmRecord { pub signature: String, pub sent_slot: Slot, pub sent_at: String, - pub confirmed_slot: Slot, - pub confirmed_at: String, + pub confirmed_slot: Option, + pub confirmed_at: Option, pub successful: bool, - pub slot_leader: String, - pub error: String, + pub slot_leader: Option, + pub error: Option, pub market_maker: String, pub market: String, - pub block_hash: String, - pub slot_processed: Slot, + pub block_hash: Option, + pub slot_processed: Option, pub timed_out: bool, pub priority_fees: u64, } diff --git a/src/stats.rs b/src/stats.rs new file mode 100644 index 0000000..309f6a8 --- /dev/null +++ b/src/stats.rs @@ -0,0 +1,103 @@ +use std::sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, +}; + +use crate::states::TransactionConfirmRecord; +use solana_metrics::datapoint_info; +use tokio::task::JoinHandle; + +#[derive(Default)] +pub struct MangoSimulationStats { + recv_limit: usize, + num_confirmed_txs: Arc, + num_error_txs: Arc, + num_timeout_txs: Arc, + num_successful: Arc, + num_sent: Arc, +} + +impl MangoSimulationStats { + pub fn new( + nb_market_makers: usize, + quotes_per_second: usize, + nb_markets_per_mm: usize, + duration_in_sec: usize, + ) -> Self { + Self { + recv_limit: nb_market_makers * quotes_per_second * nb_markets_per_mm * duration_in_sec, + num_confirmed_txs: Arc::new(AtomicU64::new(0)), + num_error_txs: Arc::new(AtomicU64::new(0)), + num_timeout_txs: Arc::new(AtomicU64::new(0)), + num_successful: Arc::new(AtomicU64::new(0)), + num_sent: Arc::new(AtomicU64::new(0)), + } + } + + pub fn update_from_tx_status_stream( + &self, + tx_confirm_record_reciever: async_channel::Receiver, + do_exit: Arc, + ) -> JoinHandle<()> { + let num_confirmed_txs = self.num_confirmed_txs.clone(); + let num_error_txs = self.num_error_txs.clone(); + let num_successful = self.num_successful.clone(); + let num_timeout_txs = self.num_timeout_txs.clone(); + tokio::spawn(async move { + loop { + if do_exit.load(Ordering::Relaxed) { + break; + } + + if let Ok(tx_data) = tx_confirm_record_reciever.recv().await { + if let Some(_) = tx_data.confirmed_at { + num_confirmed_txs.fetch_add(1, Ordering::Relaxed); + if let Some(_) = tx_data.error { + num_error_txs.fetch_add(1, Ordering::Relaxed); + } else { + num_successful.fetch_add(1, Ordering::Relaxed); + } + } else { + num_timeout_txs.fetch_add(1, Ordering::Relaxed); + } + } else { + break; + } + } + }) + } + + pub fn report(&self, name: &'static str) { + let num_sent = self.num_sent.load(Ordering::Relaxed); + let num_confirmed_txs = self.num_confirmed_txs.load(Ordering::Relaxed); + let num_successful = self.num_successful.load(Ordering::Relaxed); + let num_error_txs = self.num_error_txs.load(Ordering::Relaxed); + let num_timeout_txs = self.num_timeout_txs.load(Ordering::Relaxed); + + datapoint_info!( + name, + ("recv_limit", self.recv_limit, i64), + ("num_txs_sent", num_sent, i64), + ("num_confirmed_txs", num_confirmed_txs, i64), + ("num_successful_txs", num_successful, i64), + ("num_error_txs", num_error_txs, i64), + ("num_timeout_txs", num_timeout_txs, i64), + ( + "percent_confirmed_txs", + (num_confirmed_txs * 100) / num_sent, + f64 + ), + ( + "percent_successful_txs", + (num_confirmed_txs * 100) / num_sent, + f64 + ), + ("percent_error_txs", (num_error_txs * 100) / num_sent, f64), + ( + "percent_timeout_txs", + (num_timeout_txs * 100) / num_sent, + f64 + ), + ); + } +} diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs new file mode 100644 index 0000000..b586000 --- /dev/null +++ b/src/tpu_manager.rs @@ -0,0 +1,135 @@ +use log::info; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient}; +use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; +use solana_sdk::signature::Keypair; +use std::{ + net::{IpAddr, Ipv4Addr}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; +use tokio::sync::RwLock; + +pub type QuicTpuClient = TpuClient; +pub type QuicConnectionCache = ConnectionCache; + +const TPU_CONNECTION_CACHE_SIZE: usize = 8; + +#[derive(Clone)] +pub struct TpuManager { + error_count: Arc, + rpc_client: Arc, + // why arc twice / one is so that we clone rwlock and other so that we can clone tpu client + tpu_client: Arc>>, + pub ws_addr: String, + fanout_slots: u64, + identity: Arc, +} + +impl TpuManager { + pub async fn new( + rpc_client: Arc, + ws_addr: String, + fanout_slots: u64, + identity: Keypair, + ) -> anyhow::Result { + let connection_cache = ConnectionCache::new_with_client_options( + 4, + None, + Some((&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), + None, + ); + let quic_connection_cache = + if let ConnectionCache::Quic(connection_cache) = connection_cache { + Some(connection_cache) + } else { + None + }; + + let tpu_client = Arc::new( + TpuClient::new_with_connection_cache( + rpc_client.clone(), + &ws_addr, + solana_client::tpu_client::TpuClientConfig { fanout_slots }, + quic_connection_cache.unwrap(), + ) + .await + .unwrap(), + ); + + Ok(Self { + rpc_client, + tpu_client: Arc::new(RwLock::new(tpu_client)), + ws_addr, + fanout_slots, + error_count: Default::default(), + identity: Arc::new(identity), + }) + } + + pub async fn reset_tpu_client(&self) -> anyhow::Result<()> { + let identity = Keypair::from_bytes(&self.identity.to_bytes()).unwrap(); + let connection_cache = ConnectionCache::new_with_client_options( + 4, + None, + Some((&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), + None, + ); + let quic_connection_cache = + if let ConnectionCache::Quic(connection_cache) = connection_cache { + Some(connection_cache) + } else { + None + }; + + let tpu_client = Arc::new( + TpuClient::new_with_connection_cache( + self.rpc_client.clone(), + &self.ws_addr, + solana_client::tpu_client::TpuClientConfig { + fanout_slots: self.fanout_slots, + }, + quic_connection_cache.unwrap(), + ) + .await + .unwrap(), + ); + self.error_count.store(0, Ordering::Relaxed); + *self.tpu_client.write().await = tpu_client; + Ok(()) + } + + pub async fn reset(&self) -> anyhow::Result<()> { + self.error_count.fetch_add(1, Ordering::Relaxed); + + if self.error_count.load(Ordering::Relaxed) > 5 { + self.reset_tpu_client().await?; + info!("TPU Reset after 5 errors"); + } + + Ok(()) + } + + async fn get_tpu_client(&self) -> Arc { + self.tpu_client.read().await.clone() + } + + pub async fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> anyhow::Result<()> { + let tpu_client = self.get_tpu_client().await; + match tpu_client + .try_send_wire_transaction_batch(wire_transactions) + .await + { + Ok(_) => Ok(()), + Err(err) => { + self.reset().await?; + Err(err.into()) + } + } + } +}