Merge remote-tracking branch 'origin/main' into keeper_prioritization

This commit is contained in:
Maximilian Schneider 2023-04-15 10:36:48 +00:00
commit 8ddb52d9cf
10 changed files with 189 additions and 72 deletions

2
Cargo.lock generated
View File

@ -3189,7 +3189,7 @@ dependencies = [
[[package]]
name = "mango-feeds-connector"
version = "0.1.1"
source = "git+https://github.com/blockworks-foundation/mango-feeds.git?branch=ckamm/solana-versions2#4cc67d34df40d12070ba0c311ca68f8c00c3ab1f"
source = "git+https://github.com/blockworks-foundation/mango-feeds.git?branch=ckamm/solana-versions2#613b2b2f799d2fab31134ec1366b270c0eacdbd2"
dependencies = [
"anyhow",
"async-channel",

View File

@ -137,13 +137,14 @@ async fn get_blocks_with_retry(
) -> Result<Vec<Slot>, ()> {
const N_TRY_REQUEST_BLOCKS: u64 = 4;
for _ in 0..N_TRY_REQUEST_BLOCKS {
let block_slots = client.get_blocks_with_commitment(start_block, None, commitment_confirmation)
let block_slots = client
.get_blocks_with_commitment(start_block, None, commitment_confirmation)
.await;
match block_slots {
Ok(slots) => {
return Ok(slots);
},
}
Err(error) => {
warn!("Failed to download blocks: {}, retry", error);
}
@ -268,7 +269,9 @@ pub fn confirmations_by_blocks(
}
start_instant = tokio::time::Instant::now();
let block_slots = get_blocks_with_retry(client.clone(), start_block, commitment_confirmation).await;
let block_slots =
get_blocks_with_retry(client.clone(), start_block, commitment_confirmation)
.await;
if block_slots.is_err() {
break;
}
@ -277,7 +280,7 @@ pub fn confirmations_by_blocks(
if block_slots.is_empty() {
continue;
}
start_block = *block_slots.last().unwrap();
start_block = *block_slots.last().unwrap() + 1;
let blocks = block_slots.iter().map(|slot| {
client.get_block_with_config(

View File

@ -2,6 +2,7 @@ use crate::{
helpers::to_sp_pk,
mango::GroupConfig,
mango_v3_perp_crank_sink::MangoV3PerpCrankSink,
noop,
states::{KeeperInstruction, TransactionSendRecord},
tpu_manager::TpuManager,
};
@ -15,8 +16,8 @@ use async_channel::unbounded;
use chrono::Utc;
use log::*;
use solana_sdk::{
hash::Hash, instruction::Instruction, pubkey::Pubkey, signature::Keypair, signer::Signer,
transaction::Transaction,
compute_budget::ComputeBudgetInstruction, hash::Hash, instruction::Instruction, pubkey::Pubkey,
signature::Keypair, signer::Signer, transaction::Transaction,
};
use std::{
str::FromStr,
@ -58,7 +59,7 @@ pub fn start(
let group_pk = Pubkey::from_str(&group.public_key).unwrap();
let cache_pk = Pubkey::from_str(&group.cache_key).unwrap();
let mango_program_id = Pubkey::from_str(&group.mango_program_id).unwrap();
let _filter_config = FilterConfig {
let filter_config = FilterConfig {
program_ids: vec![group.mango_program_id.clone()],
account_ids: group
.perp_markets
@ -74,10 +75,6 @@ pub fn start(
"crank-tx-sender signing with keypair pk={:?}",
identity.pubkey()
);
let prioritization_fee_ix =
solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_price(
prioritization_fee,
);
loop {
if exit_signal.load(Ordering::Acquire) {
@ -85,7 +82,12 @@ pub fn start(
}
if let Ok((market, mut ixs)) = instruction_receiver.recv().await {
ixs.insert(0, prioritization_fee_ix.clone());
// add priority fees
ixs.push(ComputeBudgetInstruction::set_compute_unit_price(
prioritization_fee,
));
// add timestamp to guarantee unique transactions
ixs.push(noop::timestamp());
let tx = Transaction::new_signed_with_payer(
&ixs,
@ -168,6 +170,7 @@ pub fn start(
},
rpc_ws_url: config.websocket_url,
},
&filter_config,
account_write_queue_sender,
slot_queue_sender,
)

View File

@ -1,6 +1,9 @@
use solana_sdk::compute_budget::ComputeBudgetInstruction;
use {
crate::{
helpers::to_sdk_instruction,
noop,
states::{KeeperInstruction, PerpMarketCache, TransactionSendRecord},
tpu_manager::TpuManager,
},
@ -102,14 +105,20 @@ fn create_cache_perp_markets_instructions(perp_markets: &[PerpMarketCache]) -> I
pub async fn send_transaction(
tpu_manager: TpuManager,
ixs: &[Instruction],
mut ixs: Vec<Instruction>,
blockhash: Arc<RwLock<Hash>>,
current_slot: Arc<AtomicU64>,
payer: &Keypair,
prioritization_fee: u64,
keeper_instruction: KeeperInstruction,
) {
let mut tx = Transaction::new_unsigned(Message::new(ixs, Some(&payer.pubkey())));
// add a noop with a current timestamp to ensure unique txs
ixs.push(noop::timestamp());
// add priority fees
ixs.push(ComputeBudgetInstruction::set_compute_unit_price(
prioritization_fee,
));
let mut tx = Transaction::new_unsigned(Message::new(&ixs, Some(&payer.pubkey())));
let recent_blockhash = blockhash.read().await;
tx.sign(&[payer], *recent_blockhash);
@ -167,33 +176,20 @@ pub fn start_keepers(
tokio::spawn(async move {
let current_slot = current_slot.clone();
let prioritization_fee_ix =
solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_price(
prioritization_fee,
);
let mut root_update_ixs = create_root_bank_update_instructions(&perp_markets);
let mut cache_prices = vec![create_update_price_cache_instructions(&perp_markets)];
let mut update_perp_cache = vec![create_cache_perp_markets_instructions(&perp_markets)];
let mut cache_root_bank_ix = vec![create_cache_root_bank_instruction(&perp_markets)];
let mut update_funding_ix = create_update_fundings_instructions(&perp_markets);
let mut quote_root_bank_ix =
let root_update_ixs = create_root_bank_update_instructions(&perp_markets);
let cache_prices = vec![create_update_price_cache_instructions(&perp_markets)];
let update_perp_cache = vec![create_cache_perp_markets_instructions(&perp_markets)];
let cache_root_bank_ix = vec![create_cache_root_bank_instruction(&perp_markets)];
let update_funding_ix = create_update_fundings_instructions(&perp_markets);
let quote_root_bank_ix =
create_update_and_cache_quote_banks(&perp_markets, quote_root_bank, quote_node_banks);
if prioritization_fee > 0 {
root_update_ixs.insert(0, prioritization_fee_ix.clone());
cache_prices.insert(0, prioritization_fee_ix.clone());
update_perp_cache.insert(0, prioritization_fee_ix.clone());
cache_root_bank_ix.insert(0, prioritization_fee_ix.clone());
update_funding_ix.insert(0, prioritization_fee_ix.clone());
quote_root_bank_ix.insert(0, prioritization_fee_ix.clone());
}
let blockhash = blockhash.clone();
while !exit_signal.load(Ordering::Relaxed) {
send_transaction(
tpu_manager.clone(),
cache_prices.as_slice(),
cache_prices.clone(),
blockhash.clone(),
current_slot.clone(),
&authority,
@ -204,7 +200,7 @@ pub fn start_keepers(
send_transaction(
tpu_manager.clone(),
quote_root_bank_ix.as_slice(),
quote_root_bank_ix.clone(),
blockhash.clone(),
current_slot.clone(),
&authority,
@ -216,7 +212,7 @@ pub fn start_keepers(
for updates in update_funding_ix.chunks(3) {
send_transaction(
tpu_manager.clone(),
updates,
updates.to_vec(),
blockhash.clone(),
current_slot.clone(),
&authority,
@ -228,7 +224,7 @@ pub fn start_keepers(
send_transaction(
tpu_manager.clone(),
root_update_ixs.as_slice(),
root_update_ixs.clone(),
blockhash.clone(),
current_slot.clone(),
&authority,
@ -239,7 +235,7 @@ pub fn start_keepers(
send_transaction(
tpu_manager.clone(),
update_perp_cache.as_slice(),
update_perp_cache.clone(),
blockhash.clone(),
current_slot.clone(),
&authority,
@ -250,7 +246,7 @@ pub fn start_keepers(
send_transaction(
tpu_manager.clone(),
cache_root_bank_ix.as_slice(),
cache_root_bank_ix.clone(),
blockhash.clone(),
current_slot.clone(),
&authority,

View File

@ -6,6 +6,7 @@ pub mod keeper;
pub mod mango;
pub mod mango_v3_perp_crank_sink;
pub mod market_markers;
pub mod noop;
pub mod result_writer;
pub mod rotating_queue;
pub mod states;

View File

@ -10,7 +10,7 @@ use {
},
keeper::start_keepers,
mango::{AccountKeys, MangoConfig},
market_markers::start_market_making_threads,
market_markers::{clean_market_makers, start_market_making_threads},
result_writer::initialize_result_writers,
states::PerpMarketCache,
stats::MangoSimulationStats,
@ -149,6 +149,15 @@ pub async fn main() -> anyhow::Result<()> {
.iter()
.map(|x| Pubkey::from_str(x.as_str()).unwrap())
.collect();
clean_market_makers(
nb_rpc_client.clone(),
&account_keys_parsed,
&perp_market_caches,
blockhash.clone(),
)
.await;
// start keeper if keeper authority is present
let keepers_jl = if let Some(keeper_authority) = keeper_authority {
let jl = start_keepers(
@ -184,7 +193,7 @@ pub async fn main() -> anyhow::Result<()> {
keeper_prioritization,
);
let warmup_duration = Duration::from_secs(10);
let warmup_duration = Duration::from_secs(20);
info!("waiting for keepers to warmup for {warmup_duration:?}");
tokio::time::sleep(warmup_duration).await;

View File

@ -1,4 +1,4 @@
use std::{cell::RefCell, collections::BTreeMap, convert::TryFrom, mem::size_of};
use std::{cell::RefCell, collections::{BTreeMap, HashSet}, convert::TryFrom, mem::size_of};
use arrayref::array_ref;
use async_channel::Sender;
@ -18,7 +18,8 @@ use mango_feeds_connector::{account_write_filter::AccountWriteSink, chain_data::
use crate::helpers::{to_sdk_instruction, to_sp_pk};
const MAX_BACKLOG: usize = 2;
const MAX_EVENTS_PER_TX: usize = 10;
const MAX_ACCS_PER_TX: usize = 24;
const MAX_EVENTS_PER_TX: usize = 50;
pub struct MangoV3PerpCrankSink {
mkt_pks_by_evq_pks: BTreeMap<Pubkey, Pubkey>,
@ -93,23 +94,28 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
trace!("evq {pk:?} seq_num={seq_num} len={len} contains_fill_events={contains_fill_events} has_backlog={has_backlog}");
let mut mango_accounts: Vec<_> = event_queue
let mut mango_accounts = HashSet::new();
event_queue
.iter()
.take(MAX_EVENTS_PER_TX)
.flat_map(
|e| match EventType::try_from(e.event_type).expect("mango v4 event") {
EventType::Fill => {
let fill: &FillEvent = cast_ref(e);
vec![fill.maker, fill.taker]
.for_each(|e|
if mango_accounts.len() < MAX_ACCS_PER_TX {
match EventType::try_from(e.event_type).expect("mango v4 event") {
EventType::Fill => {
let fill: &FillEvent = cast_ref(e);
mango_accounts.insert(fill.maker);
mango_accounts.insert(fill.taker);
}
EventType::Out => {
let out: &OutEvent = cast_ref(e);
mango_accounts.insert(out.owner);
}
EventType::Liquidate => {
}
}
EventType::Out => {
let out: &OutEvent = cast_ref(e);
vec![out.owner]
}
EventType::Liquidate => vec![],
},
)
.collect();
}
);
let pk = solana_sdk::pubkey::Pubkey::new_from_array(pk.to_bytes());
let mkt_pk = self
@ -124,7 +130,7 @@ impl AccountWriteSink for MangoV3PerpCrankSink {
&to_sp_pk(&self.cache_pk),
&to_sp_pk(mkt_pk),
&to_sp_pk(&pk),
&mut mango_accounts,
&mut mango_accounts.iter().map(|pk| pk.clone()).collect::<Vec<_>>(),
MAX_EVENTS_PER_TX,
)
.unwrap(),

View File

@ -15,6 +15,7 @@ use mango::{
matching::Side,
};
use rand::{distributions::Uniform, prelude::Distribution, seq::SliceRandom};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_program::pubkey::Pubkey;
use solana_sdk::{
compute_budget, hash::Hash, instruction::Instruction, message::Message, signature::Keypair,
@ -81,7 +82,7 @@ pub fn create_ask_bid_transaction(
c.price_quote_lots + offset - spread,
c.order_base_lots,
i64::MAX,
1,
Utc::now().timestamp_micros() as u64,
mango::matching::OrderType::Limit,
false,
None,
@ -109,7 +110,7 @@ pub fn create_ask_bid_transaction(
c.price_quote_lots + offset + spread,
c.order_base_lots,
i64::MAX,
2,
Utc::now().timestamp_micros() as u64,
mango::matching::OrderType::Limit,
false,
None,
@ -269,3 +270,87 @@ pub fn start_market_making_threads(
})
.collect()
}
fn create_cancel_all_orders(
perp_market: &PerpMarketCache,
mango_account_pk: Pubkey,
mango_account_signer: &Keypair,
) -> Transaction {
let mango_account_signer_pk = to_sp_pk(&mango_account_signer.pubkey());
let cb_instruction = compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1000000);
let pf_instruction = compute_budget::ComputeBudgetInstruction::set_compute_unit_price(1000);
let instruction: Instruction = to_sdk_instruction(
cancel_all_perp_orders(
&perp_market.mango_program_pk,
&perp_market.mango_group_pk,
&mango_account_pk,
&mango_account_signer_pk,
&perp_market.perp_market_pk,
&perp_market.bids,
&perp_market.asks,
255,
)
.unwrap(),
);
Transaction::new_unsigned(Message::new(
&[cb_instruction, pf_instruction, instruction],
Some(&mango_account_signer.pubkey()),
))
}
pub async fn clean_market_makers(
rpc_client: Arc<RpcClient>,
account_keys_parsed: &Vec<AccountKeys>,
perp_market_caches: &Vec<PerpMarketCache>,
blockhash: Arc<RwLock<Hash>>,
) {
info!("Cleaning previous transactions by market makers");
let mut tasks = vec![];
for market_maker in account_keys_parsed {
let mango_account_pk =
Pubkey::from_str(market_maker.mango_account_pks[0].as_str()).unwrap();
for perp_market in perp_market_caches {
let market_maker = market_maker.clone();
let perp_market = perp_market.clone();
let rpc_client = rpc_client.clone();
let mango_account_pk = mango_account_pk.clone();
let blockhash = blockhash.clone();
let task = tokio::spawn(async move {
let mango_account_signer =
Keypair::from_bytes(market_maker.secret_key.as_slice()).unwrap();
for _ in 0..10 {
let mut tx = create_cancel_all_orders(
&perp_market,
mango_account_pk,
&mango_account_signer,
);
let recent_blockhash = *blockhash.read().await;
tx.sign(&[&mango_account_signer], recent_blockhash);
// send and confirm the transaction with an RPC
if let Ok(res) = tokio::time::timeout(
Duration::from_secs(10),
rpc_client.send_and_confirm_transaction(&tx),
)
.await
{
match res {
Ok(_) => break,
Err(e) => info!("Error occured while doing cancel all for ma : {} and perp market : {} error : {}", mango_account_pk, perp_market.perp_market_pk, e),
}
}
}
});
tasks.push(task);
}
}
futures::future::join_all(tasks).await;
info!("finished cleaning market makers");
}

15
src/noop.rs Normal file
View File

@ -0,0 +1,15 @@
use chrono::Utc;
use std::str::FromStr;
use solana_sdk::{instruction::Instruction, pubkey::Pubkey};
pub fn instruction(data: Vec<u8>) -> Instruction {
Instruction {
program_id: Pubkey::from_str("noopb9bkMVfRPU8AsbpTUg8AQkHtKwMYZiFUjNRtMmV").unwrap(),
accounts: vec![],
data,
}
}
pub fn timestamp() -> Instruction {
instruction(Utc::now().timestamp_micros().to_le_bytes().into())
}

View File

@ -366,31 +366,30 @@ impl MangoSimulationStats {
);
println!(
"Transactions confirmed {}%",
"Transactions confirmed : {}%",
(counters.num_confirmed_txs * 100)
.checked_div(counters.num_sent)
.unwrap_or(0)
);
println!(
"Transactions successful {}%",
"Transactions successful : {}%",
(counters.num_successful * 100)
.checked_div(counters.num_sent)
.unwrap_or(0)
);
println!(
"Transactions timed out {}%",
"Transactions timed out : {}%",
(counters.num_timeout_txs * 100)
.checked_div(counters.num_sent)
.unwrap_or(0)
);
let top_5_errors = diff.errors.iter().sorted_by(|x,y| {(*y.1).cmp(x.1)}).take(5).collect_vec();
let top_5_errors = counters.errors.iter().sorted_by(|x,y| {(*y.1).cmp(x.1)}).take(5).enumerate().collect_vec();
let mut errors_to_print: String = String::new();
println!(" Top 5 errors (diff)");
for (error, count) in top_5_errors {
println!("{}({})", error, count);
errors_to_print += format!("{}({}),", error, count).as_str();
for (idx, (error, count)) in top_5_errors {
println!("Error #{idx} : {error} ({count})");
errors_to_print += format!("{error}({count}),").as_str();
}
println!("\n\n");
println!("\n");
if !is_final {
datapoint_info!(
@ -491,7 +490,7 @@ impl MangoSimulationStats {
i64
),
(
"top 5 errors",
"top_5_errors",
errors_to_print,
String
)