From c872d3294392d2418ec7e85ebd1a6e37a374cb7d Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Sun, 5 Feb 2023 17:04:47 +0900 Subject: [PATCH] add some basic throttling --- service-mango-crank/src/main.rs | 1 - .../src/transaction_builder.rs | 272 +++++++++++------- 2 files changed, 163 insertions(+), 110 deletions(-) diff --git a/service-mango-crank/src/main.rs b/service-mango-crank/src/main.rs index 016a8e4..eb7eab4 100644 --- a/service-mango-crank/src/main.rs +++ b/service-mango-crank/src/main.rs @@ -113,7 +113,6 @@ async fn main() -> anyhow::Result<()> { ) .expect("init transaction builder"); - // TODO: throttle cranking, currently runs very fast transaction_sender::init( instruction_receiver, blockhash, diff --git a/service-mango-crank/src/transaction_builder.rs b/service-mango-crank/src/transaction_builder.rs index dd88e8c..ab8ec75 100644 --- a/service-mango-crank/src/transaction_builder.rs +++ b/service-mango-crank/src/transaction_builder.rs @@ -1,4 +1,5 @@ use bytemuck::cast_ref; +use mango_v4::state::FillEvent; use serum_dex::{instruction::MarketInstruction, state::EventView}; use solana_geyser_connector_lib::{ chain_data::{AccountData, ChainData, SlotData}, @@ -20,8 +21,12 @@ use std::{ collections::{BTreeSet, HashMap, HashSet}, convert::TryFrom, str::FromStr, + time::{Duration, Instant}, }; +const MAX_BACKLOG: usize = 2; +const TIMEOUT_INTERVAL: Duration = Duration::from_millis(400); + pub fn init( perp_queue_pks: Vec<(Pubkey, Pubkey)>, serum_queue_pks: Vec<(Pubkey, Pubkey)>, @@ -46,7 +51,7 @@ pub fn init( let (instruction_sender, instruction_receiver) = async_channel::unbounded::>(); let mut chain_cache = ChainData::new(metrics_sender); - let mut last_evq_versions = HashMap::::new(); + let mut last_cranked_evq_versions = HashMap::::new(); let all_queue_pks: HashSet = perp_queue_pks .iter() @@ -91,76 +96,99 @@ pub fn init( for (mkt_pk, evq_pk) in perp_queue_pks.iter() { let evq_b58 = evq_pk.to_string(); - let last_evq_version = last_evq_versions.get(&evq_b58).unwrap_or(&(0, 0)); + let last_evq_version = last_cranked_evq_versions.get(&evq_b58); match chain_cache.account(&evq_pk) { Ok(account_info) => { // only process if the account state changed - let evq_version = (account_info.slot, account_info.write_version); - trace!("mango perp evq={evq_b58} write_version={:?}", evq_version); - if evq_version == *last_evq_version { + let (is_unchanged, is_on_timeout) = match last_evq_version { + Some((slot, write_version, timestamp)) => ( + account_info.slot == *slot + && account_info.write_version == *write_version, + timestamp.elapsed() < TIMEOUT_INTERVAL, + ), + None => Default::default(), + }; + trace!("mango perp evq={evq_b58} slot={} v={} is_unchanged={is_unchanged} is_on_timeout={is_on_timeout}", account_info.slot, account_info.write_version); + if is_unchanged || is_on_timeout { continue; } - last_evq_versions.insert(evq_b58.clone(), evq_version); let account = &account_info.account; - let event_queue: mango_v4::state::EventQueue = mango_v4::state::EventQueue::try_deserialize( account.data().borrow_mut(), ) .unwrap(); - if !event_queue.is_empty() { - let mango_accounts: BTreeSet<_> = event_queue - .iter() - .take(10) - .flat_map(|e| { - match mango_v4::state::EventType::try_from(e.event_type) - .expect("mango v4 event") - { - mango_v4::state::EventType::Fill => { - let fill: &mango_v4::state::FillEvent = cast_ref(e); - vec![fill.maker, fill.taker] - } - mango_v4::state::EventType::Out => { - let out: &mango_v4::state::OutEvent = cast_ref(e); - vec![out.owner] - } - mango_v4::state::EventType::Liquidate => vec![], - } - }) - .collect(); - - let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas( - &mango_v4::accounts::PerpConsumeEvents { - group: group_pk, - perp_market: *mkt_pk, - event_queue: *evq_pk, - }, - None, - ); - ams.append( - &mut mango_accounts - .iter() - .map(|pk| AccountMeta::new(*pk, false)) - .collect(), - ); - - let ix = Instruction { - program_id: mango_v4::id(), - accounts: ams, - data: anchor_lang::InstructionData::data( - &mango_v4::instruction::PerpConsumeEvents { limit: 10 }, - ), - }; - - info!( - "mango perp evq={evq_b58} count={} limit=10", - event_queue.iter().count() - ); - instruction_sender.send(vec![ix]).await; + // only crank if at least 1 fill or a sufficient events of other categories are buffered + let contains_fill_events = event_queue + .iter() + .find(|e| e.event_type == mango_v4::state::EventType::Fill as u8) + .is_some(); + let has_backlog = event_queue.iter().count() > MAX_BACKLOG; + trace!("mango perp evq={evq_b58} slot={} v={} contains_fill_events={contains_fill_events} has_backlog={has_backlog}", account_info.slot, account_info.write_version); + if !contains_fill_events && !has_backlog { + continue; } + + let mango_accounts: BTreeSet<_> = event_queue + .iter() + .take(10) + .flat_map(|e| { + match mango_v4::state::EventType::try_from(e.event_type) + .expect("mango v4 event") + { + mango_v4::state::EventType::Fill => { + let fill: &mango_v4::state::FillEvent = cast_ref(e); + vec![fill.maker, fill.taker] + } + mango_v4::state::EventType::Out => { + let out: &mango_v4::state::OutEvent = cast_ref(e); + vec![out.owner] + } + mango_v4::state::EventType::Liquidate => vec![], + } + }) + .collect(); + + let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas( + &mango_v4::accounts::PerpConsumeEvents { + group: group_pk, + perp_market: *mkt_pk, + event_queue: *evq_pk, + }, + None, + ); + ams.append( + &mut mango_accounts + .iter() + .map(|pk| AccountMeta::new(*pk, false)) + .collect(), + ); + + let ix = Instruction { + program_id: mango_v4::id(), + accounts: ams, + data: anchor_lang::InstructionData::data( + &mango_v4::instruction::PerpConsumeEvents { limit: 10 }, + ), + }; + + info!( + "mango perp evq={evq_b58} count={} limit=10", + event_queue.iter().count() + ); + instruction_sender.send(vec![ix]).await; + + last_cranked_evq_versions.insert( + evq_b58.clone(), + ( + account_info.slot, + account_info.write_version, + Instant::now(), + ), + ); } Err(_) => info!("chain_cache could not find {evq_b58}"), } @@ -168,17 +196,23 @@ pub fn init( for (mkt_pk, evq_pk) in serum_queue_pks.iter() { let evq_b58 = evq_pk.to_string(); - let last_evq_version = last_evq_versions.get(&evq_b58).unwrap_or(&(0, 0)); + let last_evq_version = last_cranked_evq_versions.get(&evq_b58); match chain_cache.account(&evq_pk) { Ok(account_info) => { // only process if the account state changed - let evq_version = (account_info.slot, account_info.write_version); - trace!("serum evq={evq_b58} write_version={:?}", evq_version); - if evq_version == *last_evq_version { + let (is_unchanged, is_on_timeout) = match last_evq_version { + Some((slot, write_version, timestamp)) => ( + account_info.slot == *slot + && account_info.write_version == *write_version, + timestamp.elapsed() < TIMEOUT_INTERVAL, + ), + None => Default::default(), + }; + trace!("openbook evq={evq_b58} slot={} v={} is_unchanged={is_unchanged} is_on_timeout={is_on_timeout}", account_info.slot, account_info.write_version); + if is_unchanged || is_on_timeout { continue; } - last_evq_versions.insert(evq_b58.clone(), evq_version); let account = &account_info.account; @@ -188,57 +222,77 @@ pub fn init( *bytemuck::from_bytes(&inner_data[..header_span]); let count = header.count; - if count > 0 { - let rest = &inner_data[header_span..]; - let event_size = std::mem::size_of::(); - let slop = rest.len() % event_size; - let end = rest.len() - slop; - let events = - bytemuck::cast_slice::(&rest[..end]); - let seq_num = header.seq_num; + let rest = &inner_data[header_span..]; + let event_size = std::mem::size_of::(); + let slop = rest.len() % event_size; + let end = rest.len() - slop; + let events = + bytemuck::cast_slice::(&rest[..end]); + let seq_num = header.seq_num; - let oo_pks: BTreeSet<_> = (0..count) - .map(|i| { - let offset = (seq_num - count + i) % events.len() as u64; - let event: serum_dex::state::Event = events[offset as usize]; - let oo_pk = match event.as_view().unwrap() { - EventView::Fill { owner, .. } - | EventView::Out { owner, .. } => { - bytemuck::cast_slice::(&owner)[0] - } - }; - oo_pk - }) - .collect(); + let events: Vec<_> = (0..count) + .map(|i| { + let offset = (seq_num - count + i) % events.len() as u64; + let event: serum_dex::state::Event = events[offset as usize]; + event.as_view().unwrap() + }) + .collect(); - let mut ams: Vec<_> = oo_pks - .iter() - .map(|pk| AccountMeta::new(*pk, false)) - .collect(); - - // pass two times evq_pk instead of deprecated fee receivers to reduce encoded tx size - ams.append( - &mut [ - mkt_pk, evq_pk, evq_pk, /*coin_pk*/ - evq_pk, /*pc_pk*/ - ] - .iter() - .map(|pk| AccountMeta::new(**pk, false)) - .collect(), - ); - - let ix = Instruction { - program_id: Pubkey::from_str( - "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX", - ) - .unwrap(), - accounts: ams, - data: MarketInstruction::ConsumeEvents(count as u16).pack(), - }; - - info!("serum evq={evq_b58} count={count}"); - instruction_sender.send(vec![ix]).await; + // only crank if at least 1 fill or a sufficient events of other categories are buffered + let contains_fill_events = events + .iter() + .find(|e| matches!(e, serum_dex::state::EventView::Fill { .. })) + .is_some(); + let has_backlog = events.len() > MAX_BACKLOG; + if !contains_fill_events && !has_backlog { + continue; } + + let oo_pks: BTreeSet<_> = events + .iter() + .map(|e| match e { + EventView::Fill { owner, .. } | EventView::Out { owner, .. } => { + bytemuck::cast_slice::(owner)[0] + } + }) + .collect(); + + let mut ams: Vec<_> = oo_pks + .iter() + .map(|pk| AccountMeta::new(*pk, false)) + .collect(); + + // pass two times evq_pk instead of deprecated fee receivers to reduce encoded tx size + ams.append( + &mut [ + mkt_pk, evq_pk, evq_pk, /*coin_pk*/ + evq_pk, /*pc_pk*/ + ] + .iter() + .map(|pk| AccountMeta::new(**pk, false)) + .collect(), + ); + + let ix = Instruction { + program_id: Pubkey::from_str( + "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX", + ) + .unwrap(), + accounts: ams, + data: MarketInstruction::ConsumeEvents(count as u16).pack(), + }; + + info!("openbook evq={evq_b58} count={count}"); + instruction_sender.send(vec![ix]).await; + + last_cranked_evq_versions.insert( + evq_b58.clone(), + ( + account_info.slot, + account_info.write_version, + Instant::now(), + ), + ); } Err(_) => info!("chain_cache could not find {evq_b58}"), }