serum crank implemented

This commit is contained in:
Maximilian Schneider 2023-02-04 01:23:18 +09:00
parent 0a09ad81ae
commit dc74c89626
2 changed files with 154 additions and 126 deletions

View File

@ -7,7 +7,7 @@ use anchor_client::{
Cluster, Cluster,
}; };
use anchor_lang::prelude::Pubkey; use anchor_lang::prelude::Pubkey;
use bytemuck::cast_slice; use bytemuck::{bytes_of, cast_slice};
use client::{Client, MangoGroupContext}; use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{ use futures_util::{
@ -110,22 +110,17 @@ async fn main() -> anyhow::Result<()> {
Some(rpc_timeout), Some(rpc_timeout),
); );
let group_pk = Pubkey::from_str(&config.mango_group).unwrap(); let group_pk = Pubkey::from_str(&config.mango_group).unwrap();
let group_context = Arc::new( let group_context =
MangoGroupContext::new_from_rpc( Arc::new(MangoGroupContext::new_from_rpc(&client.rpc_async(), group_pk).await?);
&client.rpc_async(),
group_pk
)
.await?,
);
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context let perp_queue_pks: Vec<_> = group_context
.perp_markets .perp_markets
.iter() .iter()
.map(|(_, context)| (context.address, context.market.event_queue)) .map(|(_, context)| (context.address, context.market.event_queue))
.collect(); .collect();
// fetch all serum/openbook markets to find their event queues // fetch all serum/openbook markets to find their event queues
let serum_market_pks: Vec<Pubkey> = group_context let serum_market_pks: Vec<_> = group_context
.serum3_markets .serum3_markets
.iter() .iter()
.map(|(_, context)| context.market.serum_market_external) .map(|(_, context)| context.market.serum_market_external)
@ -136,7 +131,7 @@ async fn main() -> anyhow::Result<()> {
.get_multiple_accounts(serum_market_pks.as_slice()) .get_multiple_accounts(serum_market_pks.as_slice())
.await?; .await?;
let serum_market_ais: Vec<&Account> = serum_market_ais let serum_market_ais: Vec<_> = serum_market_ais
.iter() .iter()
.filter_map(|maybe_ai| match maybe_ai { .filter_map(|maybe_ai| match maybe_ai {
Some(ai) => Some(ai), Some(ai) => Some(ai),
@ -144,17 +139,15 @@ async fn main() -> anyhow::Result<()> {
}) })
.collect(); .collect();
let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais let serum_queue_pks: Vec<_> = serum_market_ais
.iter() .iter()
.enumerate() .enumerate()
.map(|pair| { .map(|pair| {
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes( let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()], &pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
); );
( let event_q = market_state.event_q;
serum_market_pks[pair.0], (serum_market_pks[pair.0], Pubkey::new(bytes_of(&event_q)))
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
)
}) })
.collect(); .collect();
@ -179,11 +172,15 @@ async fn main() -> anyhow::Result<()> {
.collect::<String>() .collect::<String>()
); );
let use_geyser = true; let use_geyser = true;
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); let all_queue_pks: HashSet<Pubkey> = perp_queue_pks
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect(); .iter()
.chain(serum_queue_pks.iter())
.map(|mkt| mkt.1)
.collect();
let filter_config = FilterConfig { let filter_config = FilterConfig {
program_ids: vec![], program_ids: vec![],
account_ids: relevant_pubkeys, account_ids: all_queue_pks.iter().map(|pk| pk.to_string()).collect(),
}; };
if use_geyser { if use_geyser {
grpc_plugin_source::process_events( grpc_plugin_source::process_events(

View File

@ -1,5 +1,5 @@
use crate::Pubkey;
use bytemuck::cast_ref; use bytemuck::cast_ref;
use serum_dex::{instruction::MarketInstruction, state::EventView};
use solana_geyser_connector_lib::{ use solana_geyser_connector_lib::{
chain_data::{AccountData, ChainData, SlotData}, chain_data::{AccountData, ChainData, SlotData},
metrics::Metrics, metrics::Metrics,
@ -7,17 +7,19 @@ use solana_geyser_connector_lib::{
AccountWrite, SlotUpdate, AccountWrite, SlotUpdate,
}; };
use anchor_lang::AccountDeserialize; use anchor_lang::{solana_program::pubkey, AccountDeserialize};
use log::*; use log::*;
use solana_sdk::{ use solana_sdk::{
account::{ReadableAccount, WritableAccount}, account::{ReadableAccount, WritableAccount},
instruction::{Instruction, AccountMeta}, instruction::{AccountMeta, Instruction},
pubkey::Pubkey,
stake_history::Epoch, stake_history::Epoch,
}; };
use std::{ use std::{
borrow::BorrowMut, borrow::BorrowMut,
collections::{HashMap, HashSet}, iter::{once, empty}, collections::{HashMap, HashSet},
convert::TryFrom convert::TryFrom,
str::FromStr,
}; };
pub fn init( pub fn init(
@ -44,26 +46,20 @@ pub fn init(
let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>(); let (instruction_sender, instruction_receiver) = async_channel::unbounded::<Vec<Instruction>>();
let mut chain_cache = ChainData::new(metrics_sender); let mut chain_cache = ChainData::new(metrics_sender);
let mut perp_events_cache = HashMap::<
String,
[mango_v4::state::AnyEvent; mango_v4::state::MAX_NUM_EVENTS as usize],
>::new();
let mut serum_events_cache = HashMap::<String, Vec<serum_dex::state::Event>>::new();
let mut seq_num_cache = HashMap::<String, u64>::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new(); let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); let all_queue_pks: HashSet<Pubkey> = perp_queue_pks
let relevant_pubkeys = all_queue_pks
.iter() .iter()
.map(|m| m.1) .chain(serum_queue_pks.iter())
.collect::<HashSet<Pubkey>>(); .map(|mkt| mkt.1)
.collect();
// update handling thread, reads both sloths and account updates // update handling thread, reads both sloths and account updates
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
tokio::select! { tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => { Ok(account_write) = account_write_queue_receiver.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) { if !all_queue_pks.contains(&account_write.pubkey) {
continue; continue;
} }
@ -93,120 +89,155 @@ pub fn init(
} }
} }
for mkt in all_queue_pks.iter() { for (mkt_pk, evq_pk) in perp_queue_pks.iter() {
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0)); let evq_b58 = evq_pk.to_string();
let mkt_pk = mkt.0; let last_evq_version = last_evq_versions.get(&evq_b58).unwrap_or(&(0, 0));
let evq_pk = mkt.1;
match chain_cache.account(&evq_pk) { match chain_cache.account(&evq_pk) {
Ok(account_info) => { Ok(account_info) => {
// only process if the account state changed // only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version); let evq_version = (account_info.slot, account_info.write_version);
let evq_pk_string = evq_pk.to_string(); trace!("evq={evq_b58} write_version={:?}", evq_version);
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
if evq_version == *last_evq_version { if evq_version == *last_evq_version {
continue; continue;
} }
last_evq_versions.insert(evq_pk_string.clone(), evq_version); last_evq_versions.insert(evq_b58.clone(), evq_version);
let account = &account_info.account; let account = &account_info.account;
let is_perp = mango_v4::check_id(account.owner());
if is_perp { let event_queue: mango_v4::state::EventQueue =
let event_queue: mango_v4::state::EventQueue = mango_v4::state::EventQueue::try_deserialize( mango_v4::state::EventQueue::try_deserialize(
account.data().borrow_mut(), account.data().borrow_mut(),
) )
.unwrap(); .unwrap();
trace!( trace!("evq={evq_b58} seq_num={}", event_queue.header.seq_num);
"evq {} seq_num {}",
evq_pk_string,
event_queue.header.seq_num
);
if !event_queue.empty() { if !event_queue.empty() {
let mango_accounts: HashSet<_> = event_queue.iter().take(10).flat_map(|e| match mango_v4::state::EventType::try_from(e.event_type).expect("mango v4 event") { let mango_accounts: HashSet<_> = event_queue
mango_v4::state::EventType::Fill => { .iter()
let fill: &mango_v4::state::FillEvent = cast_ref(e); .take(10)
vec![fill.maker, fill.taker] .flat_map(|e| {
match mango_v4::state::EventType::try_from(e.event_type)
.expect("mango v4 event")
{
mango_v4::state::EventType::Fill => {
let fill: &mango_v4::state::FillEvent = cast_ref(e);
vec![fill.maker, fill.taker]
}
mango_v4::state::EventType::Out => {
let out: &mango_v4::state::OutEvent = cast_ref(e);
vec![out.owner]
}
mango_v4::state::EventType::Liquidate => vec![],
} }
mango_v4::state::EventType::Out => {
let out: &mango_v4::state::OutEvent = cast_ref(e);
vec![out.owner]
}
mango_v4::state::EventType::Liquidate => vec![]
}) })
.collect(); .collect();
let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas( let mut ams: Vec<_> = anchor_lang::ToAccountMetas::to_account_metas(
&mango_v4::accounts::PerpConsumeEvents { &mango_v4::accounts::PerpConsumeEvents {
group: group_pk, group: group_pk,
perp_market: mkt_pk, perp_market: *mkt_pk,
event_queue: evq_pk, event_queue: *evq_pk,
}, },
None, None,
); );
ams.append(&mut mango_accounts ams.append(
&mut mango_accounts
.iter() .iter()
.map(|pk| AccountMeta { pubkey: *pk, is_signer: false, is_writable: true }) .map(|pk| AccountMeta::new(*pk, false))
.collect()); .collect(),
);
let ix = Instruction { let ix = Instruction {
program_id: mango_v4::id(), program_id: mango_v4::id(),
accounts: ams, accounts: ams,
data: anchor_lang::InstructionData::data(&mango_v4::instruction::PerpConsumeEvents { data: anchor_lang::InstructionData::data(
limit: 10, &mango_v4::instruction::PerpConsumeEvents { limit: 10 },
}), ),
}; };
instruction_sender.send(vec![ix]).await; instruction_sender.send(vec![ix]).await;
}
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
Some(old_events) => {}
_ => {
info!("perp_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
} else {
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
let header: SerumEventQueueHeader =
*bytemuck::from_bytes(&inner_data[..header_span]);
let seq_num = header.seq_num;
let count = header.count;
let rest = &inner_data[header_span..];
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
let new_len = rest.len() - slop;
let events = &rest[..new_len];
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
Some(old_events) => {}
_ => {
info!("serum_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
} }
} }
Err(_) => info!("chain_cache could not find {}", mkt.1), Err(_) => info!("chain_cache could not find {evq_b58}"),
}
}
for (mkt_pk, evq_pk) in serum_queue_pks.iter() {
let evq_b58 = evq_pk.to_string();
let last_evq_version = last_evq_versions.get(&evq_b58).unwrap_or(&(0, 0));
match chain_cache.account(&evq_pk) {
Ok(account_info) => {
// only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version);
trace!("evq={evq_b58} write_version={:?}", evq_version);
if evq_version == *last_evq_version {
continue;
}
last_evq_versions.insert(evq_b58, evq_version);
let account = &account_info.account;
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
let header: SerumEventQueueHeader =
*bytemuck::from_bytes(&inner_data[..header_span]);
let count = header.count;
if count > 0 {
let rest = &inner_data[header_span..];
let event_size = std::mem::size_of::<serum_dex::state::Event>();
let slop = rest.len() % event_size;
let end = rest.len() - slop;
let events =
bytemuck::cast_slice::<u8, serum_dex::state::Event>(&rest[..end]);
let seq_num = header.seq_num;
let oo_pks: HashSet<_> = (0..count)
.map(|i| {
let offset = (seq_num - count + i) % events.len() as u64;
let event: serum_dex::state::Event = events[offset as usize];
let oo_pk = match event.as_view().unwrap() {
EventView::Fill { owner, .. }
| EventView::Out { owner, .. } => {
bytemuck::cast_slice::<u64, Pubkey>(&owner)[0]
}
};
oo_pk
})
.collect();
let mut ams: Vec<_> = oo_pks
.iter()
.map(|pk| AccountMeta::new(*pk, false))
.collect();
// pass two times evq_pk instead of deprecated fee receivers to reduce encoded tx size
ams.append(
&mut [
mkt_pk, evq_pk, evq_pk, /*coin_pk*/
evq_pk, /*pc_pk*/
]
.iter()
.map(|pk| AccountMeta::new(**pk, false))
.collect(),
);
let ix = Instruction {
program_id: Pubkey::from_str(
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX",
)
.unwrap(),
accounts: ams,
data: MarketInstruction::ConsumeEvents(count as u16).pack(),
};
instruction_sender.send(vec![ix]).await;
}
}
Err(_) => info!("chain_cache could not find {evq_b58}"),
} }
} }
} }