add some basic throttling

This commit is contained in:
Maximilian Schneider 2023-02-05 17:04:47 +09:00
parent a0af15cb4a
commit c872d32943
2 changed files with 163 additions and 110 deletions

View File

@ -113,7 +113,6 @@ async fn main() -> anyhow::Result<()> {
) )
.expect("init transaction builder"); .expect("init transaction builder");
// TODO: throttle cranking, currently runs very fast
transaction_sender::init( transaction_sender::init(
instruction_receiver, instruction_receiver,
blockhash, blockhash,

View File

@ -1,4 +1,5 @@
use bytemuck::cast_ref; use bytemuck::cast_ref;
use mango_v4::state::FillEvent;
use serum_dex::{instruction::MarketInstruction, state::EventView}; use serum_dex::{instruction::MarketInstruction, state::EventView};
use solana_geyser_connector_lib::{ use solana_geyser_connector_lib::{
chain_data::{AccountData, ChainData, SlotData}, chain_data::{AccountData, ChainData, SlotData},
@ -20,8 +21,12 @@ use std::{
collections::{BTreeSet, HashMap, HashSet}, collections::{BTreeSet, HashMap, HashSet},
convert::TryFrom, convert::TryFrom,
str::FromStr, str::FromStr,
time::{Duration, Instant},
}; };
const MAX_BACKLOG: usize = 2;
const TIMEOUT_INTERVAL: Duration = Duration::from_millis(400);
pub fn init( pub fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>, perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>, serum_queue_pks: Vec<(Pubkey, Pubkey)>,
@ -46,7 +51,7 @@ pub fn init(
let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>(); let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>();
let mut chain_cache = ChainData::new(metrics_sender); let mut chain_cache = ChainData::new(metrics_sender);
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new(); let mut last_cranked_evq_versions = HashMap::<String, (u64, u64, Instant)>::new();
let all_queue_pks: HashSet<Pubkey> = perp_queue_pks let all_queue_pks: HashSet<Pubkey> = perp_queue_pks
.iter() .iter()
@ -91,27 +96,42 @@ pub fn init(
for (mkt_pk, evq_pk) in perp_queue_pks.iter() { for (mkt_pk, evq_pk) in perp_queue_pks.iter() {
let evq_b58 = evq_pk.to_string(); let evq_b58 = evq_pk.to_string();
let last_evq_version = last_evq_versions.get(&evq_b58).unwrap_or(&(0, 0)); let last_evq_version = last_cranked_evq_versions.get(&evq_b58);
match chain_cache.account(&evq_pk) { match chain_cache.account(&evq_pk) {
Ok(account_info) => { Ok(account_info) => {
// only process if the account state changed // only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version); let (is_unchanged, is_on_timeout) = match last_evq_version {
trace!("mango perp evq={evq_b58} write_version={:?}", evq_version); Some((slot, write_version, timestamp)) => (
if evq_version == *last_evq_version { account_info.slot == *slot
&& account_info.write_version == *write_version,
timestamp.elapsed() < TIMEOUT_INTERVAL,
),
None => Default::default(),
};
trace!("mango perp evq={evq_b58} slot={} v={} is_unchanged={is_unchanged} is_on_timeout={is_on_timeout}", account_info.slot, account_info.write_version);
if is_unchanged || is_on_timeout {
continue; continue;
} }
last_evq_versions.insert(evq_b58.clone(), evq_version);
let account = &account_info.account; let account = &account_info.account;
let event_queue: mango_v4::state::EventQueue = let event_queue: mango_v4::state::EventQueue =
mango_v4::state::EventQueue::try_deserialize( mango_v4::state::EventQueue::try_deserialize(
account.data().borrow_mut(), account.data().borrow_mut(),
) )
.unwrap(); .unwrap();
if !event_queue.is_empty() { // only crank if at least 1 fill or a sufficient events of other categories are buffered
let contains_fill_events = event_queue
.iter()
.find(|e| e.event_type == mango_v4::state::EventType::Fill as u8)
.is_some();
let has_backlog = event_queue.iter().count() > MAX_BACKLOG;
trace!("mango perp evq={evq_b58} slot={} v={} contains_fill_events={contains_fill_events} has_backlog={has_backlog}", account_info.slot, account_info.write_version);
if !contains_fill_events && !has_backlog {
continue;
}
let mango_accounts: BTreeSet<_> = event_queue let mango_accounts: BTreeSet<_> = event_queue
.iter() .iter()
.take(10) .take(10)
@ -160,7 +180,15 @@ pub fn init(
event_queue.iter().count() event_queue.iter().count()
); );
instruction_sender.send(vec![ix]).await; instruction_sender.send(vec![ix]).await;
}
last_cranked_evq_versions.insert(
evq_b58.clone(),
(
account_info.slot,
account_info.write_version,
Instant::now(),
),
);
} }
Err(_) => info!("chain_cache could not find {evq_b58}"), Err(_) => info!("chain_cache could not find {evq_b58}"),
} }
@ -168,17 +196,23 @@ pub fn init(
for (mkt_pk, evq_pk) in serum_queue_pks.iter() { for (mkt_pk, evq_pk) in serum_queue_pks.iter() {
let evq_b58 = evq_pk.to_string(); let evq_b58 = evq_pk.to_string();
let last_evq_version = last_evq_versions.get(&evq_b58).unwrap_or(&(0, 0)); let last_evq_version = last_cranked_evq_versions.get(&evq_b58);
match chain_cache.account(&evq_pk) { match chain_cache.account(&evq_pk) {
Ok(account_info) => { Ok(account_info) => {
// only process if the account state changed // only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version); let (is_unchanged, is_on_timeout) = match last_evq_version {
trace!("serum evq={evq_b58} write_version={:?}", evq_version); Some((slot, write_version, timestamp)) => (
if evq_version == *last_evq_version { account_info.slot == *slot
&& account_info.write_version == *write_version,
timestamp.elapsed() < TIMEOUT_INTERVAL,
),
None => Default::default(),
};
trace!("openbook evq={evq_b58} slot={} v={} is_unchanged={is_unchanged} is_on_timeout={is_on_timeout}", account_info.slot, account_info.write_version);
if is_unchanged || is_on_timeout {
continue; continue;
} }
last_evq_versions.insert(evq_b58.clone(), evq_version);
let account = &account_info.account; let account = &account_info.account;
@ -188,7 +222,6 @@ pub fn init(
*bytemuck::from_bytes(&inner_data[..header_span]); *bytemuck::from_bytes(&inner_data[..header_span]);
let count = header.count; let count = header.count;
if count > 0 {
let rest = &inner_data[header_span..]; let rest = &inner_data[header_span..];
let event_size = std::mem::size_of::<serum_dex::state::Event>(); let event_size = std::mem::size_of::<serum_dex::state::Event>();
let slop = rest.len() % event_size; let slop = rest.len() % event_size;
@ -197,17 +230,30 @@ pub fn init(
bytemuck::cast_slice::<u8, serum_dex::state::Event>(&rest[..end]); bytemuck::cast_slice::<u8, serum_dex::state::Event>(&rest[..end]);
let seq_num = header.seq_num; let seq_num = header.seq_num;
let oo_pks: BTreeSet<_> = (0..count) let events: Vec<_> = (0..count)
.map(|i| { .map(|i| {
let offset = (seq_num - count + i) % events.len() as u64; let offset = (seq_num - count + i) % events.len() as u64;
let event: serum_dex::state::Event = events[offset as usize]; let event: serum_dex::state::Event = events[offset as usize];
let oo_pk = match event.as_view().unwrap() { event.as_view().unwrap()
EventView::Fill { owner, .. } })
| EventView::Out { owner, .. } => { .collect();
bytemuck::cast_slice::<u64, Pubkey>(&owner)[0]
// only crank if at least 1 fill or a sufficient events of other categories are buffered
let contains_fill_events = events
.iter()
.find(|e| matches!(e, serum_dex::state::EventView::Fill { .. }))
.is_some();
let has_backlog = events.len() > MAX_BACKLOG;
if !contains_fill_events && !has_backlog {
continue;
}
let oo_pks: BTreeSet<_> = events
.iter()
.map(|e| match e {
EventView::Fill { owner, .. } | EventView::Out { owner, .. } => {
bytemuck::cast_slice::<u64, Pubkey>(owner)[0]
} }
};
oo_pk
}) })
.collect(); .collect();
@ -236,9 +282,17 @@ pub fn init(
data: MarketInstruction::ConsumeEvents(count as u16).pack(), data: MarketInstruction::ConsumeEvents(count as u16).pack(),
}; };
info!("serum evq={evq_b58} count={count}"); info!("openbook evq={evq_b58} count={count}");
instruction_sender.send(vec![ix]).await; instruction_sender.send(vec![ix]).await;
}
last_cranked_evq_versions.insert(
evq_b58.clone(),
(
account_info.slot,
account_info.write_version,
Instant::now(),
),
);
} }
Err(_) => info!("chain_cache could not find {evq_b58}"), Err(_) => info!("chain_cache could not find {evq_b58}"),
} }