refactoring the code, splitting into multiple files

This commit is contained in:
Godmode Galactus 2023-02-15 17:08:55 +01:00
parent 0d7913977b
commit 12f46d93ac
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
7 changed files with 1105 additions and 1040 deletions

View File

@ -18,7 +18,6 @@ pub struct Config {
pub mango_keys: String,
pub transaction_save_file: String,
pub block_data_save_file: String,
pub airdrop_accounts: bool,
pub mango_cluster: String,
pub txs_batch_size: Option<usize>,
}
@ -36,7 +35,6 @@ impl Default for Config {
mango_keys: String::new(),
transaction_save_file: String::new(),
block_data_save_file: String::new(),
airdrop_accounts: false,
mango_cluster: "testnet.0".to_string(),
txs_batch_size: None,
}
@ -154,14 +152,6 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
.required(false)
.help("To save details of all block containing mm transactions"),
)
.arg(
Arg::with_name("airdrop-accounts")
.long("airdrop-accounts")
.value_name("BOOL")
.takes_value(false)
.required(false)
.help("Airdrop all MM accounts before stating"),
)
.arg(
Arg::with_name("mango-cluster")
.short("c")
@ -246,7 +236,6 @@ pub fn extract_args(matches: &ArgMatches) -> Config {
None => String::new(),
};
args.airdrop_accounts = matches.is_present("airdrop-accounts");
args.mango_cluster = match matches.value_of("mango-cluster") {
Some(x) => x.to_string(),
None => "testnet.0".to_string(),

View File

@ -0,0 +1,427 @@
use std::{
collections::HashMap,
ops::Div,
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
thread::{sleep, Builder},
time::Duration,
};
use chrono::Utc;
use crossbeam_channel::{Receiver, TryRecvError};
use log::{debug, error, info, trace};
use solana_client::{rpc_client::RpcClient, rpc_config::RpcBlockConfig};
use solana_program::pubkey::Pubkey;
use solana_sdk::{
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
};
use solana_transaction_status::RewardType;
use crate::{
helpers::seconds_since,
rotating_queue::RotatingQueue,
states::{BlockData, TransactionConfirmRecord, TransactionSendRecord},
};
pub fn process_signature_confirmation_batch(
rpc_client: &RpcClient,
batch: &Vec<TransactionSendRecord>,
not_confirmed: &Arc<RwLock<Vec<TransactionSendRecord>>>,
confirmed: &Arc<RwLock<Vec<TransactionConfirmRecord>>>,
timeouts: Arc<RwLock<Vec<TransactionSendRecord>>>,
timeout: u64,
) {
match rpc_client.get_signature_statuses(&batch.iter().map(|t| t.signature).collect::<Vec<_>>())
{
Ok(statuses) => {
trace!("batch result {:?}", statuses);
for (i, s) in statuses.value.iter().enumerate() {
let tx_record = &batch[i];
match s {
Some(s) => {
if s.confirmation_status.is_none() {
not_confirmed.write().unwrap().push(tx_record.clone());
} else {
let mut lock = confirmed.write().unwrap();
(*lock).push(TransactionConfirmRecord {
signature: tx_record.signature.to_string(),
sent_slot: tx_record.sent_slot,
sent_at: tx_record.sent_at.to_string(),
confirmed_at: Utc::now().to_string(),
confirmed_slot: s.slot,
successful: s.err.is_none(),
error: match &s.err {
Some(e) => e.to_string(),
None => "".to_string(),
},
block_hash: Pubkey::default().to_string(),
slot_leader: Pubkey::default().to_string(),
market: tx_record.market.to_string(),
market_maker: tx_record.market_maker.to_string(),
slot_processed: tx_record.sent_slot,
timed_out: false,
});
debug!(
"confirmed sig={} duration={:?}",
tx_record.signature,
seconds_since(tx_record.sent_at)
);
}
}
None => {
if seconds_since(tx_record.sent_at) > timeout as i64 {
debug!(
"could not confirm tx {} within {} seconds, dropping it",
tx_record.signature, timeout
);
let mut lock = timeouts.write().unwrap();
(*lock).push(tx_record.clone())
} else {
not_confirmed.write().unwrap().push(tx_record.clone());
}
}
}
}
}
Err(err) => {
error!("could not confirm signatures err={}", err);
not_confirmed.write().unwrap().extend_from_slice(batch);
sleep(Duration::from_millis(500));
}
}
}
pub fn confirmation_by_querying_rpc(
recv_limit: usize,
rpc_client: Arc<RpcClient>,
tx_record_rx: &Receiver<TransactionSendRecord>,
tx_confirm_records: Arc<RwLock<Vec<TransactionConfirmRecord>>>,
tx_timeout_records: Arc<RwLock<Vec<TransactionSendRecord>>>,
) {
const TIMEOUT: u64 = 30;
let mut recv_until_confirm = recv_limit;
let not_confirmed: Arc<RwLock<Vec<TransactionSendRecord>>> = Arc::new(RwLock::new(Vec::new()));
loop {
let has_signatures_to_confirm = { not_confirmed.read().unwrap().len() > 0 };
if has_signatures_to_confirm {
// collect all not confirmed records in a new buffer
const BATCH_SIZE: usize = 256;
let to_confirm = {
let mut lock = not_confirmed.write().unwrap();
let to_confirm = (*lock).clone();
(*lock).clear();
to_confirm
};
info!(
"break from reading channel, try to confirm {} in {} batches",
to_confirm.len(),
(to_confirm.len() / BATCH_SIZE)
+ if to_confirm.len() % BATCH_SIZE > 0 {
1
} else {
0
}
);
let confirmed = tx_confirm_records.clone();
let timeouts = tx_timeout_records.clone();
for batch in to_confirm.rchunks(BATCH_SIZE).map(|x| x.to_vec()) {
process_signature_confirmation_batch(
&rpc_client,
&batch,
&not_confirmed,
&confirmed,
timeouts.clone(),
TIMEOUT,
);
}
// multi threaded implementation of confirming batches
// let mut confirmation_handles = Vec::new();
// for batch in to_confirm.rchunks(BATCH_SIZE).map(|x| x.to_vec()) {
// let rpc_client = rpc_client.clone();
// let not_confirmed = not_confirmed.clone();
// let confirmed = tx_confirm_records.clone();
// let join_handle = Builder::new().name("solana-transaction-confirmation".to_string()).spawn(move || {
// process_signature_confirmation_batch(&rpc_client, &batch, &not_confirmed, &confirmed, TIMEOUT)
// }).unwrap();
// confirmation_handles.push(join_handle);
// };
// for confirmation_handle in confirmation_handles {
// let (errors, timeouts) = confirmation_handle.join().unwrap();
// error_count += errors;
// timeout_count += timeouts;
// }
// sleep(Duration::from_millis(100)); // so the confirmation thread does not spam a lot the rpc node
}
{
if recv_until_confirm == 0 && not_confirmed.read().unwrap().len() == 0 {
break;
}
}
// context for writing all the not_confirmed_transactions
if recv_until_confirm > 0 {
let mut lock = not_confirmed.write().unwrap();
loop {
match tx_record_rx.try_recv() {
Ok(tx_record) => {
debug!(
"add to queue len={} sig={}",
(*lock).len() + 1,
tx_record.signature
);
(*lock).push(tx_record);
recv_until_confirm -= 1;
}
Err(TryRecvError::Empty) => {
debug!("channel emptied");
sleep(Duration::from_millis(100));
break; // still confirm remaining transctions
}
Err(TryRecvError::Disconnected) => {
{
info!("channel disconnected {}", recv_until_confirm);
}
debug!("channel disconnected");
break; // still confirm remaining transctions
}
}
}
}
}
}
pub fn confirmations_by_blocks(
clients: RotatingQueue<Arc<RpcClient>>,
current_slot: &AtomicU64,
recv_limit: usize,
tx_record_rx: Receiver<TransactionSendRecord>,
tx_confirm_records: Arc<RwLock<Vec<TransactionConfirmRecord>>>,
tx_timeout_records: Arc<RwLock<Vec<TransactionSendRecord>>>,
tx_block_data: Arc<RwLock<Vec<BlockData>>>,
) {
let mut recv_until_confirm = recv_limit;
let transaction_map = Arc::new(RwLock::new(
HashMap::<Signature, TransactionSendRecord>::new(),
));
let last_slot = current_slot.load(Ordering::Acquire);
while recv_until_confirm != 0 {
match tx_record_rx.try_recv() {
Ok(tx_record) => {
let mut transaction_map = transaction_map.write().unwrap();
debug!(
"add to queue len={} sig={}",
transaction_map.len() + 1,
tx_record.signature
);
transaction_map.insert(tx_record.signature, tx_record);
recv_until_confirm -= 1;
}
Err(TryRecvError::Empty) => {
debug!("channel emptied");
sleep(Duration::from_millis(100));
}
Err(TryRecvError::Disconnected) => {
{
info!("channel disconnected {}", recv_until_confirm);
}
debug!("channel disconnected");
break; // still confirm remaining transctions
}
}
}
println!("finished mapping all the trasactions");
sleep(Duration::from_secs(30));
let commitment_confirmation = CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
};
let block_res = clients
.get()
.get_blocks_with_commitment(last_slot, None, commitment_confirmation)
.unwrap();
let nb_blocks = block_res.len();
let nb_thread: usize = 16;
println!("processing {} blocks", nb_blocks);
let mut join_handles = Vec::new();
for slot_batch in block_res
.chunks(if nb_blocks > nb_thread {
nb_blocks.div(nb_thread)
} else {
nb_blocks
})
.map(|x| x.to_vec())
{
let map = transaction_map.clone();
let client = clients.get().clone();
let tx_confirm_records = tx_confirm_records.clone();
let tx_block_data = tx_block_data.clone();
let joinble = Builder::new()
.name("getting blocks and searching transactions".to_string())
.spawn(move || {
for slot in slot_batch {
// retry search for block 10 times
let mut block = None;
for _i in 0..=10 {
let block_res = client
.get_block_with_config(
slot,
RpcBlockConfig {
encoding: None,
transaction_details: None,
rewards: None,
commitment: Some(commitment_confirmation),
max_supported_transaction_version: None,
},
);
match block_res {
Ok(x) => {
block = Some(x);
break;
},
_=>{
// do nothing
}
}
}
let block = match block {
Some(x) => x,
None => continue,
};
let mut mm_transaction_count: u64 = 0;
let rewards = &block.rewards.unwrap();
let slot_leader = match rewards
.iter()
.find(|r| r.reward_type == Some(RewardType::Fee))
{
Some(x) => x.pubkey.clone(),
None=> "".to_string(),
};
if let Some(transactions) = block.transactions {
let nb_transactions = transactions.len();
let mut cu_consumed : u64 = 0;
for solana_transaction_status::EncodedTransactionWithStatusMeta {
transaction,
meta,
..
} in transactions
{
if let solana_transaction_status::EncodedTransaction::Json(
transaction,
) = transaction
{
for signature in transaction.signatures {
let signature = Signature::from_str(&signature).unwrap();
let transaction_record_op = {
let map = map.read().unwrap();
let rec = map.get(&signature);
match rec {
Some(x) => Some(x.clone()),
None => None,
}
};
// add CU in counter
if let Some(meta) = &meta {
match meta.compute_units_consumed {
solana_transaction_status::option_serializer::OptionSerializer::Some(x) => {
cu_consumed = cu_consumed.saturating_add(x);
},
_ => {},
}
}
if let Some(transaction_record) = transaction_record_op {
let mut lock = tx_confirm_records.write().unwrap();
mm_transaction_count += 1;
(*lock).push(TransactionConfirmRecord {
signature: transaction_record.signature.to_string(),
confirmed_slot: slot, // TODO: should be changed to correct slot
confirmed_at: Utc::now().to_string(),
sent_at: transaction_record.sent_at.to_string(),
sent_slot: transaction_record.sent_slot,
successful: if let Some(meta) = &meta {
meta.status.is_ok()
} else {
false
},
error: if let Some(meta) = &meta {
match &meta.err {
Some(x) => x.to_string(),
None => "".to_string(),
}
} else {
"".to_string()
},
block_hash: block.blockhash.clone(),
market: transaction_record.market.to_string(),
market_maker: transaction_record.market_maker.to_string(),
slot_processed: slot,
slot_leader: slot_leader.clone(),
timed_out: false,
})
}
map.write().unwrap().remove(&signature);
}
}
}
// push block data
{
let mut blockstats_writer = tx_block_data.write().unwrap();
blockstats_writer.push(BlockData {
block_hash: block.blockhash,
block_leader: slot_leader,
block_slot: slot,
block_time: if let Some(time) = block.block_time {
time as u64
} else {
0
},
number_of_mm_transactions: mm_transaction_count,
total_transactions: nb_transactions as u64,
cu_consumed: cu_consumed,
})
}
}
}
})
.unwrap();
join_handles.push(joinble);
}
for handle in join_handles {
handle.join().unwrap();
}
let mut timeout_writer = tx_timeout_records.write().unwrap();
for x in transaction_map.read().unwrap().iter() {
timeout_writer.push(x.1.clone())
}
// sort all blocks by slot and print info
{
let mut blockstats_writer = tx_block_data.write().unwrap();
blockstats_writer.sort_by(|a, b| a.block_slot.partial_cmp(&b.block_slot).unwrap());
for block_stat in blockstats_writer.iter() {
info!(
"block {} at slot {} contains {} transactions and consumerd {} CUs",
block_stat.block_hash,
block_stat.block_slot,
block_stat.total_transactions,
block_stat.cu_consumed,
);
}
}
}

259
src/helpers.rs Normal file
View File

@ -0,0 +1,259 @@
use std::{
ops::{Div, Mul},
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
use chrono::{DateTime, Utc};
use fixed::types::I80F48;
use log::{debug, info};
use mango::state::{MangoCache, MangoGroup, PerpMarket};
use mango_common::Loadable;
use solana_client::rpc_client::RpcClient;
use solana_program::{clock::DEFAULT_MS_PER_SLOT, pubkey::Pubkey};
use solana_sdk::hash::Hash;
use crate::{
mango::GroupConfig,
states::{BlockData, PerpMarketCache, TransactionConfirmRecord, TransactionSendRecord},
};
// as there are similar modules solana_sdk and solana_program
// solana internals use solana_sdk but external dependancies like mango use solana program
// that is why we have some helper methods
pub fn to_sdk_pk(pubkey: &Pubkey) -> solana_sdk::pubkey::Pubkey {
solana_sdk::pubkey::Pubkey::from(pubkey.to_bytes())
}
pub fn to_sp_pk(pubkey: &solana_sdk::pubkey::Pubkey) -> Pubkey {
Pubkey::new_from_array(pubkey.to_bytes())
}
pub fn to_sdk_accountmetas(
vec: Vec<solana_program::instruction::AccountMeta>,
) -> Vec<solana_sdk::instruction::AccountMeta> {
vec.iter()
.map(|x| solana_sdk::instruction::AccountMeta {
pubkey: to_sdk_pk(&x.pubkey),
is_signer: x.is_signer,
is_writable: x.is_writable,
})
.collect::<Vec<solana_sdk::instruction::AccountMeta>>()
}
pub fn to_sdk_instruction(
instruction: solana_program::instruction::Instruction,
) -> solana_sdk::instruction::Instruction {
solana_sdk::instruction::Instruction {
program_id: to_sdk_pk(&instruction.program_id),
accounts: to_sdk_accountmetas(instruction.accounts),
data: instruction.data,
}
}
pub fn load_from_rpc<T: Loadable>(rpc_client: &RpcClient, pk: &Pubkey) -> T {
let acc = rpc_client.get_account(&to_sdk_pk(pk)).unwrap();
return T::load_from_bytes(acc.data.as_slice()).unwrap().clone();
}
pub fn get_latest_blockhash(rpc_client: &RpcClient) -> Hash {
loop {
match rpc_client.get_latest_blockhash() {
Ok(blockhash) => return blockhash,
Err(err) => {
info!("Couldn't get last blockhash: {:?}", err);
sleep(Duration::from_secs(1));
}
};
}
}
pub fn get_new_latest_blockhash(client: Arc<RpcClient>, blockhash: &Hash) -> Option<Hash> {
let start = Instant::now();
while start.elapsed().as_secs() < 5 {
if let Ok(new_blockhash) = client.get_latest_blockhash() {
if new_blockhash != *blockhash {
debug!("Got new blockhash ({:?})", blockhash);
return Some(new_blockhash);
}
}
debug!("Got same blockhash ({:?}), will retry...", blockhash);
// Retry ~twice during a slot
sleep(Duration::from_millis(DEFAULT_MS_PER_SLOT / 2));
}
None
}
pub fn poll_blockhash_and_slot(
exit_signal: Arc<AtomicBool>,
blockhash: Arc<RwLock<Hash>>,
slot: &AtomicU64,
client: Arc<RpcClient>,
) {
let mut blockhash_last_updated = Instant::now();
//let mut last_error_log = Instant::now();
loop {
let client = client.clone();
let old_blockhash = *blockhash.read().unwrap();
if exit_signal.load(Ordering::Relaxed) {
break;
}
let new_slot = client.get_slot().unwrap();
{
slot.store(new_slot, Ordering::Release);
}
if let Some(new_blockhash) = get_new_latest_blockhash(client, &old_blockhash) {
{
*blockhash.write().unwrap() = new_blockhash;
}
blockhash_last_updated = Instant::now();
} else {
if blockhash_last_updated.elapsed().as_secs() > 120 {
break;
}
}
sleep(Duration::from_millis(100));
}
}
pub fn seconds_since(dt: DateTime<Utc>) -> i64 {
Utc::now().signed_duration_since(dt).num_seconds()
}
pub fn write_transaction_data_into_csv(
transaction_save_file: String,
tx_confirm_records: Arc<RwLock<Vec<TransactionConfirmRecord>>>,
tx_timeout_records: Arc<RwLock<Vec<TransactionSendRecord>>>,
) {
if transaction_save_file.is_empty() {
return;
}
let mut writer = csv::Writer::from_path(transaction_save_file).unwrap();
{
let rd_lock = tx_confirm_records.read().unwrap();
for confirm_record in rd_lock.iter() {
writer.serialize(confirm_record).unwrap();
}
let timeout_lk = tx_timeout_records.read().unwrap();
for timeout_record in timeout_lk.iter() {
writer
.serialize(TransactionConfirmRecord {
block_hash: "".to_string(),
confirmed_at: "".to_string(),
confirmed_slot: 0,
error: "Timeout".to_string(),
market: timeout_record.market.to_string(),
market_maker: timeout_record.market_maker.to_string(),
sent_at: timeout_record.sent_at.to_string(),
sent_slot: timeout_record.sent_slot,
signature: timeout_record.signature.to_string(),
slot_leader: "".to_string(),
slot_processed: 0,
successful: false,
timed_out: true,
})
.unwrap();
}
}
writer.flush().unwrap();
}
pub fn write_block_data_into_csv(
block_data_csv: String,
tx_block_data: Arc<RwLock<Vec<BlockData>>>,
) {
if block_data_csv.is_empty() {
return;
}
let mut writer = csv::Writer::from_path(block_data_csv).unwrap();
let data = tx_block_data.read().unwrap();
for d in data.iter().filter(|x| x.number_of_mm_transactions > 0) {
writer.serialize(d).unwrap();
}
writer.flush().unwrap();
}
pub fn start_blockhash_polling_service(
exit_signal: Arc<AtomicBool>,
blockhash: Arc<RwLock<Hash>>,
current_slot: Arc<AtomicU64>,
client: Arc<RpcClient>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-blockhash-poller".to_string())
.spawn(move || {
poll_blockhash_and_slot(
exit_signal,
blockhash.clone(),
current_slot.as_ref(),
client,
);
})
.unwrap()
}
pub fn get_mango_market_perps_cache(
rpc_client: Arc<RpcClient>,
mango_group_config: &GroupConfig,
) -> Vec<PerpMarketCache> {
// fetch group
let mango_group_pk = Pubkey::from_str(mango_group_config.public_key.as_str()).unwrap();
let mango_group = load_from_rpc::<MangoGroup>(&rpc_client, &mango_group_pk);
let mango_program_pk = Pubkey::from_str(mango_group_config.mango_program_id.as_str()).unwrap();
let mango_cache_pk = Pubkey::from_str(mango_group.mango_cache.to_string().as_str()).unwrap();
let mango_cache = load_from_rpc::<MangoCache>(&rpc_client, &mango_cache_pk);
mango_group_config
.perp_markets
.iter()
.enumerate()
.map(|(market_index, perp_maket_config)| {
let perp_market_pk = Pubkey::from_str(perp_maket_config.public_key.as_str()).unwrap();
let perp_market = load_from_rpc::<PerpMarket>(&rpc_client, &perp_market_pk);
// fetch price
let base_decimals = mango_group_config.tokens[market_index].decimals;
let quote_decimals = mango_group_config.tokens[0].decimals;
let base_unit = I80F48::from_num(10u64.pow(base_decimals as u32));
let quote_unit = I80F48::from_num(10u64.pow(quote_decimals as u32));
let price = mango_cache.price_cache[market_index].price;
println!(
"market index {} price of : {}",
market_index, mango_cache.price_cache[market_index].price
);
let price_quote_lots: i64 = price
.mul(quote_unit)
.mul(I80F48::from_num(perp_market.base_lot_size))
.div(I80F48::from_num(perp_market.quote_lot_size))
.div(base_unit)
.to_num();
let order_base_lots: i64 = base_unit
.div(I80F48::from_num(perp_market.base_lot_size))
.to_num();
PerpMarketCache {
order_base_lots,
price,
price_quote_lots,
mango_program_pk,
mango_group_pk,
mango_cache_pk,
perp_market_pk,
perp_market,
}
})
.collect()
}

View File

@ -1,2 +1,7 @@
pub mod cli;
pub mod confirmation_strategies;
pub mod helpers;
pub mod mango;
pub mod market_markers;
pub mod rotating_queue;
pub mod states;

File diff suppressed because it is too large Load Diff

298
src/market_markers.rs Normal file
View File

@ -0,0 +1,298 @@
use std::{
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
use chrono::Utc;
use crossbeam_channel::Sender;
use log::{debug, error, info, warn};
use mango::{
instruction::{cancel_all_perp_orders, place_perp_order2},
matching::Side,
};
use solana_client::tpu_client::TpuClient;
use solana_program::pubkey::Pubkey;
use solana_sdk::{
hash::Hash, instruction::Instruction, message::Message, signature::Keypair, signer::Signer,
transaction::Transaction,
};
use crate::{
helpers::{to_sdk_instruction, to_sp_pk},
mango::AccountKeys,
rotating_queue::RotatingQueue,
states::{PerpMarketCache, TransactionSendRecord},
};
pub fn create_ask_bid_transaction(
c: &PerpMarketCache,
mango_account_pk: Pubkey,
mango_account_signer: &Keypair,
) -> Transaction {
let mango_account_signer_pk = to_sp_pk(&mango_account_signer.pubkey());
let offset = rand::random::<i8>() as i64;
let spread = rand::random::<u8>() as i64;
debug!(
"price:{:?} price_quote_lots:{:?} order_base_lots:{:?} offset:{:?} spread:{:?}",
c.price, c.price_quote_lots, c.order_base_lots, offset, spread
);
let cancel_ix: Instruction = to_sdk_instruction(
cancel_all_perp_orders(
&c.mango_program_pk,
&c.mango_group_pk,
&mango_account_pk,
&mango_account_signer_pk,
&c.perp_market_pk,
&c.perp_market.bids,
&c.perp_market.asks,
10,
)
.unwrap(),
);
let place_bid_ix: Instruction = to_sdk_instruction(
place_perp_order2(
&c.mango_program_pk,
&c.mango_group_pk,
&mango_account_pk,
&mango_account_signer_pk,
&c.mango_cache_pk,
&c.perp_market_pk,
&c.perp_market.bids,
&c.perp_market.asks,
&c.perp_market.event_queue,
None,
&[],
Side::Bid,
c.price_quote_lots + offset - spread,
c.order_base_lots,
i64::MAX,
1,
mango::matching::OrderType::Limit,
false,
None,
64,
mango::matching::ExpiryType::Absolute,
)
.unwrap(),
);
let place_ask_ix: Instruction = to_sdk_instruction(
place_perp_order2(
&c.mango_program_pk,
&c.mango_group_pk,
&mango_account_pk,
&mango_account_signer_pk,
&c.mango_cache_pk,
&c.perp_market_pk,
&c.perp_market.bids,
&c.perp_market.asks,
&c.perp_market.event_queue,
None,
&[],
Side::Ask,
c.price_quote_lots + offset + spread,
c.order_base_lots,
i64::MAX,
2,
mango::matching::OrderType::Limit,
false,
None,
64,
mango::matching::ExpiryType::Absolute,
)
.unwrap(),
);
Transaction::new_unsigned(Message::new(
&[cancel_ix, place_bid_ix, place_ask_ix],
Some(&mango_account_signer.pubkey()),
))
}
pub fn send_mm_transactions(
quotes_per_second: u64,
perp_market_caches: &Vec<PerpMarketCache>,
tx_record_sx: &Sender<TransactionSendRecord>,
tpu_client_pool: Arc<RotatingQueue<Arc<TpuClient>>>,
mango_account_pk: Pubkey,
mango_account_signer: &Keypair,
blockhash: Arc<RwLock<Hash>>,
slot: &AtomicU64,
) {
let mango_account_signer_pk = to_sp_pk(&mango_account_signer.pubkey());
// update quotes 2x per second
for _ in 0..quotes_per_second {
for c in perp_market_caches.iter() {
let mut tx = create_ask_bid_transaction(c, mango_account_pk, &mango_account_signer);
if let Ok(recent_blockhash) = blockhash.read() {
tx.sign(&[mango_account_signer], *recent_blockhash);
}
let tpu_client = tpu_client_pool.get();
tpu_client.send_transaction(&tx);
let sent = tx_record_sx.send(TransactionSendRecord {
signature: tx.signatures[0],
sent_at: Utc::now(),
sent_slot: slot.load(Ordering::Acquire),
market_maker: mango_account_signer_pk,
market: c.perp_market_pk,
});
if sent.is_err() {
println!(
"sending error on channel : {}",
sent.err().unwrap().to_string()
);
}
}
}
}
pub fn send_mm_transactions_batched(
txs_batch_size: usize,
quotes_per_second: u64,
perp_market_caches: &Vec<PerpMarketCache>,
tx_record_sx: &Sender<TransactionSendRecord>,
tpu_client_pool: Arc<RotatingQueue<Arc<TpuClient>>>,
mango_account_pk: Pubkey,
mango_account_signer: &Keypair,
blockhash: Arc<RwLock<Hash>>,
slot: &AtomicU64,
) {
let mut transactions = Vec::<_>::with_capacity(txs_batch_size);
let mango_account_signer_pk = to_sp_pk(&mango_account_signer.pubkey());
// update quotes 2x per second
for _ in 0..quotes_per_second {
for c in perp_market_caches.iter() {
for _ in 0..txs_batch_size {
transactions.push(create_ask_bid_transaction(
c,
mango_account_pk,
&mango_account_signer,
));
}
if let Ok(recent_blockhash) = blockhash.read() {
for tx in &mut transactions {
tx.sign(&[mango_account_signer], *recent_blockhash);
}
}
let tpu_client = tpu_client_pool.get();
if tpu_client
.try_send_transaction_batch(&transactions)
.is_err()
{
error!("Sending batch failed");
continue;
}
for tx in &transactions {
let sent = tx_record_sx.send(TransactionSendRecord {
signature: tx.signatures[0],
sent_at: Utc::now(),
sent_slot: slot.load(Ordering::Acquire),
market_maker: mango_account_signer_pk,
market: c.perp_market_pk,
});
if sent.is_err() {
error!(
"sending error on channel : {}",
sent.err().unwrap().to_string()
);
}
}
transactions.clear();
}
}
}
pub fn start_market_making_threads(
account_keys_parsed: Vec<AccountKeys>,
perp_market_caches: Vec<PerpMarketCache>,
tx_record_sx: Sender<TransactionSendRecord>,
exit_signal: Arc<AtomicBool>,
blockhash: Arc<RwLock<Hash>>,
current_slot: Arc<AtomicU64>,
tpu_client_pool: Arc<RotatingQueue<Arc<TpuClient>>>,
duration: &Duration,
quotes_per_second: u64,
txs_batch_size: Option<usize>,
) -> Vec<JoinHandle<()>> {
account_keys_parsed
.iter()
.map(|account_keys| {
let _exit_signal = exit_signal.clone();
// having a tpu client for each MM
let tpu_client_pool = tpu_client_pool.clone();
let blockhash = blockhash.clone();
let current_slot = current_slot.clone();
let duration = duration.clone();
let perp_market_caches = perp_market_caches.clone();
let mango_account_pk =
Pubkey::from_str(account_keys.mango_account_pks[0].as_str()).unwrap();
let mango_account_signer =
Keypair::from_bytes(account_keys.secret_key.as_slice()).unwrap();
info!(
"wallet:{:?} https://testnet.mango.markets/account?pubkey={:?}",
mango_account_signer.pubkey(),
mango_account_pk
);
//sleep(Duration::from_secs(10));
let tx_record_sx = tx_record_sx.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
for _i in 0..duration.as_secs() {
let start = Instant::now();
// send market maker transactions
if let Some(txs_batch_size) = txs_batch_size.clone() {
send_mm_transactions_batched(
txs_batch_size,
quotes_per_second,
&perp_market_caches,
&tx_record_sx,
tpu_client_pool.clone(),
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.as_ref(),
);
} else {
send_mm_transactions(
quotes_per_second,
&perp_market_caches,
&tx_record_sx,
tpu_client_pool.clone(),
mango_account_pk,
&mango_account_signer,
blockhash.clone(),
current_slot.as_ref(),
);
}
let elapsed_millis: u64 = start.elapsed().as_millis() as u64;
if elapsed_millis < 950 {
// 50 ms is reserved for other stuff
sleep(Duration::from_millis(950 - elapsed_millis));
} else {
warn!(
"time taken to send transactions is greater than 1000ms {}",
elapsed_millis
);
}
}
})
.unwrap()
})
.collect()
}

68
src/states.rs Normal file
View File

@ -0,0 +1,68 @@
use chrono::{DateTime, Utc};
use fixed::types::I80F48;
use mango::state::PerpMarket;
use serde::Serialize;
use solana_program::{pubkey::Pubkey, slot_history::Slot};
use solana_sdk::signature::Signature;
#[derive(Clone, Serialize)]
pub struct TransactionSendRecord {
pub signature: Signature,
pub sent_at: DateTime<Utc>,
pub sent_slot: Slot,
pub market_maker: Pubkey,
pub market: Pubkey,
}
#[derive(Clone, Serialize)]
pub struct TransactionConfirmRecord {
pub signature: String,
pub sent_slot: Slot,
pub sent_at: String,
pub confirmed_slot: Slot,
pub confirmed_at: String,
pub successful: bool,
pub slot_leader: String,
pub error: String,
pub market_maker: String,
pub market: String,
pub block_hash: String,
pub slot_processed: Slot,
pub timed_out: bool,
}
#[derive(Clone)]
pub struct PerpMarketCache {
pub order_base_lots: i64,
pub price: I80F48,
pub price_quote_lots: i64,
pub mango_program_pk: Pubkey,
pub mango_group_pk: Pubkey,
pub mango_cache_pk: Pubkey,
pub perp_market_pk: Pubkey,
pub perp_market: PerpMarket,
}
pub struct _TransactionInfo {
pub signature: Signature,
pub transaction_send_time: DateTime<Utc>,
pub send_slot: Slot,
pub confirmation_retries: u32,
pub error: String,
pub confirmation_blockhash: Pubkey,
pub leader_confirming_transaction: Pubkey,
pub timeout: bool,
pub market_maker: Pubkey,
pub market: Pubkey,
}
#[derive(Clone, Serialize)]
pub struct BlockData {
pub block_hash: String,
pub block_slot: Slot,
pub block_leader: String,
pub total_transactions: u64,
pub number_of_mm_transactions: u64,
pub block_time: u64,
pub cu_consumed: u64,
}