From 7f0ddd3ac50d5812a53aa3ea852936a35f825e9b Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Sat, 24 Dec 2022 14:43:43 +0000 Subject: [PATCH] Fix serum fills --- lib/src/fill_event_filter.rs | 321 +++++++++++++++++++++++++++++--- service-mango-fills/src/main.rs | 37 +++- 2 files changed, 324 insertions(+), 34 deletions(-) diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index b50a4dd..9ce6095 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -3,8 +3,10 @@ use crate::{ metrics::{MetricType, Metrics}, AccountWrite, SlotUpdate, }; +use bytemuck::{Pod, Zeroable}; use log::*; use serde::{ser::SerializeStruct, Serialize, Serializer}; +use serum_dex::state::EventView; use solana_sdk::{ account::{ReadableAccount, WritableAccount}, clock::Epoch, @@ -39,6 +41,27 @@ pub struct FillUpdate { pub write_version: u64, } +#[derive(Clone, Debug)] +pub struct SerumFillUpdate { + pub event: serum_dex::state::Event, + pub status: FillUpdateStatus, + pub market: String, + pub queue: String, + pub slot: u64, + pub write_version: u64, +} + +#[derive(Copy, Clone, Debug)] +#[repr(packed)] +pub struct SerumEventQueueHeader { + _account_flags: u64, // Initialized, EventQueue + _head: u64, + count: u64, + seq_num: u64, +} +unsafe impl Zeroable for SerumEventQueueHeader {} +unsafe impl Pod for SerumEventQueueHeader {} + impl Serialize for FillUpdate { fn serialize(&self, serializer: S) -> Result where @@ -57,6 +80,24 @@ impl Serialize for FillUpdate { } } +impl Serialize for SerumFillUpdate { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let event = base64::encode(bytemuck::bytes_of(&self.event)); + let mut state = serializer.serialize_struct("SerumFillUpdate", 4)?; + state.serialize_field("event", &event)?; + state.serialize_field("market", &self.market)?; + state.serialize_field("queue", &self.queue)?; + state.serialize_field("status", &self.status)?; + state.serialize_field("slot", &self.slot)?; + state.serialize_field("write_version", &self.write_version)?; + + state.end() + } +} + #[derive(Clone, Debug)] pub struct FillCheckpoint { pub market: String, @@ -66,6 +107,15 @@ pub struct FillCheckpoint { pub write_version: u64, } +#[derive(Clone, Debug)] +pub struct SerumFillCheckpoint { + pub market: String, + pub queue: String, + pub events: Vec, + pub slot: u64, + pub write_version: u64, +} + impl Serialize for FillCheckpoint { fn serialize(&self, serializer: S) -> Result where @@ -76,7 +126,28 @@ impl Serialize for FillCheckpoint { .iter() .map(|e| base64::encode(bytemuck::bytes_of(e))) .collect(); - let mut state = serializer.serialize_struct("FillUpdate", 3)?; + let mut state = serializer.serialize_struct("FillCheckpoint", 3)?; + state.serialize_field("events", &events)?; + state.serialize_field("market", &self.market)?; + state.serialize_field("queue", &self.queue)?; + state.serialize_field("slot", &self.slot)?; + state.serialize_field("write_version", &self.write_version)?; + + state.end() + } +} + +impl Serialize for SerumFillCheckpoint { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let events: Vec = self + .events + .iter() + .map(|e| base64::encode(bytemuck::bytes_of(e))) + .collect(); + let mut state = serializer.serialize_struct("SerumFillCheckpoint", 3)?; state.serialize_field("events", &events)?; state.serialize_field("market", &self.market)?; state.serialize_field("queue", &self.queue)?; @@ -89,13 +160,15 @@ impl Serialize for FillCheckpoint { pub enum FillEventFilterMessage { Update(FillUpdate), + SerumUpdate(SerumFillUpdate), Checkpoint(FillCheckpoint), + SerumCheckpoint(SerumFillCheckpoint), } // couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize]; -fn publish_changes( +fn publish_changes_perp( slot: u64, write_version: u64, mkt: &(Pubkey, Pubkey), @@ -232,6 +305,144 @@ fn publish_changes( .unwrap() } +fn publish_changes_serum( + slot: u64, + write_version: u64, + mkt: &(Pubkey, Pubkey), + header: &SerumEventQueueHeader, + events: &[serum_dex::state::Event], + old_seq_num: u64, + old_events: &[serum_dex::state::Event], + fill_update_sender: &async_channel::Sender, + metric_events_new: &mut MetricU64, + metric_events_change: &mut MetricU64, + metric_events_drop: &mut MetricU64, +) { + // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available + let start_seq_num = max(old_seq_num, header.seq_num) + .checked_sub(MAX_NUM_EVENTS as u64) + .unwrap_or(0); + let mut checkpoint = Vec::new(); + let mkt_pk_string = mkt.0.to_string(); + let evq_pk_string = mkt.1.to_string(); + let header_seq_num = header.seq_num; + info!("start seq {} header seq {}", start_seq_num, header_seq_num); + for seq_num in start_seq_num..header_seq_num { + let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + let event_view = events[idx].as_view().unwrap(); + let old_event_view = old_events[idx].as_view().unwrap(); + + match event_view { + EventView::Fill { .. } => { + // there are three possible cases: + // 1) the event is past the old seq num, hence guaranteed new event + // 2) the event is not matching the old event queue + // 3) all other events are matching the old event queue + // the order of these checks is important so they are exhaustive + if seq_num >= old_seq_num { + info!("found new serum fill {} idx {}", mkt_pk_string, idx,); + + metric_events_new.increment(); + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + write_version, + event: events[idx], + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(events[idx]); + continue; + } + + match old_event_view { + EventView::Fill { .. } => { + info!("already got all fills???"); + // every already published event is recorded in checkpoint + checkpoint.push(events[idx]); + } + EventView::Out { .. } => { + info!( + "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + ); + + metric_events_change.increment(); + + // first revoke old event + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + write_version, + event: old_events[idx], + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + + // then publish new if its a fill and record in checkpoint + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + write_version, + event: events[idx], + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(events[idx]); + } + } + } + _ => continue, + } + } + + // in case queue size shrunk due to a fork we need revoke all previous fills + for seq_num in header_seq_num..old_seq_num { + let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + let old_event_view = old_events[idx].as_view().unwrap(); + debug!( + "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + ); + + metric_events_drop.increment(); + + match old_event_view { + EventView::Fill { .. } => { + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + event: old_events[idx], + write_version, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + EventView::Out { .. } => { continue } + } + } + + fill_update_sender + .try_send(FillEventFilterMessage::SerumCheckpoint( + SerumFillCheckpoint { + slot, + write_version, + events: checkpoint, + market: mkt_pk_string, + queue: evq_pk_string, + }, + )) + .unwrap() +} + pub async fn init( perp_queue_pks: Vec<(Pubkey, Pubkey)>, serum_queue_pks: Vec<(Pubkey, Pubkey)>, @@ -265,12 +476,13 @@ pub async fn init( let account_write_queue_receiver_c = account_write_queue_receiver.clone(); let mut chain_cache = ChainData::new(); - let mut events_cache: HashMap = HashMap::new(); + let mut perp_events_cache: HashMap = HashMap::new(); + let mut serum_events_cache: HashMap> = HashMap::new(); let mut seq_num_cache = HashMap::new(); let mut last_evq_versions = HashMap::::new(); - let relevant_pubkeys = [perp_queue_pks.clone(), serum_queue_pks.clone()] - .concat() + let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); + let relevant_pubkeys = all_queue_pks .iter() .map(|m| m.1) .collect::>(); @@ -310,7 +522,7 @@ pub async fn init( } } - for mkt in perp_queue_pks.iter() { + for mkt in all_queue_pks.iter() { let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0)); let mkt_pk = mkt.1; @@ -326,34 +538,81 @@ pub async fn init( last_evq_versions.insert(evq_pk_string.clone(), evq_version); let account = &account_info.account; + let is_perp = mango_v4::check_id(account.owner()); + if is_perp { + let event_queue = + EventQueue::try_deserialize(account.data().borrow_mut()).unwrap(); + trace!( + "evq {} seq_num {}", + evq_pk_string, + event_queue.header.seq_num + ); + match seq_num_cache.get(&evq_pk_string) { + Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) { + Some(old_events) => publish_changes_perp( + account_info.slot, + account_info.write_version, + mkt, + &event_queue.header, + &event_queue.buf, + *old_seq_num, + old_events, + &fill_update_sender, + &mut metric_events_new, + &mut metric_events_change, + &mut metrics_events_drop, + ), + _ => { + info!("perp_events_cache could not find {}", evq_pk_string) + } + }, + _ => info!("seq_num_cache could not find {}", evq_pk_string), + } - let event_queue = - EventQueue::try_deserialize(account.data().borrow_mut()).unwrap(); - trace!("evq {} seq_num {}", evq_pk_string, event_queue.header.seq_num); + seq_num_cache + .insert(evq_pk_string.clone(), event_queue.header.seq_num.clone()); + perp_events_cache + .insert(evq_pk_string.clone(), event_queue.buf.clone()); + } else { + let inner_data = &account.data()[5..&account.data().len() - 7]; + let header_span = std::mem::size_of::(); + let header: SerumEventQueueHeader = + *bytemuck::from_bytes(&inner_data[..header_span]); + let seq_num = header.seq_num; + let count = header.count; + let rest = &inner_data[header_span..]; + let slop = rest.len() % std::mem::size_of::(); + let new_len = rest.len() - slop; + let events = &rest[..new_len]; + debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::()); + let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events); - match seq_num_cache.get(&evq_pk_string) { - Some(old_seq_num) => match events_cache.get(&evq_pk_string) { - Some(old_events) => publish_changes( - account_info.slot, - account_info.write_version, - mkt, - &event_queue.header, - &event_queue.buf, - *old_seq_num, - old_events, - &fill_update_sender, - &mut metric_events_new, - &mut metric_events_change, - &mut metrics_events_drop, - ), - _ => info!("events_cache could not find {}", evq_pk_string), - }, - _ => info!("seq_num_cache could not find {}", evq_pk_string), + match seq_num_cache.get(&evq_pk_string) { + Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) { + Some(old_events) => publish_changes_serum( + account_info.slot, + account_info.write_version, + mkt, + &header, + &events, + *old_seq_num, + old_events, + &fill_update_sender, + &mut metric_events_new, + &mut metric_events_change, + &mut metrics_events_drop, + ), + _ => { + info!("serum_events_cache could not find {}", evq_pk_string) + } + }, + _ => info!("seq_num_cache could not find {}", evq_pk_string), + } + + seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone()); + serum_events_cache + .insert(evq_pk_string.clone(), events.clone().to_vec()); } - - seq_num_cache - .insert(evq_pk_string.clone(), event_queue.header.seq_num.clone()); - events_cache.insert(evq_pk_string.clone(), event_queue.buf.clone()); } Err(_) => info!("chain_cache could not find {}", mkt.1), } diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index 2e956df..5c4d542 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -13,17 +13,19 @@ use tokio::{ use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use serde::Deserialize; -use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig}; +use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig, fill_event_filter::SerumFillCheckpoint}; use solana_geyser_connector_lib::{ fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage}, grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig, }; type CheckpointMap = Arc>>; +type SerumCheckpointMap = Arc>>; type PeerMap = Arc>>>; async fn handle_connection_error( checkpoint_map: CheckpointMap, + serum_checkpoint_map: SerumCheckpointMap, peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr, @@ -32,7 +34,7 @@ async fn handle_connection_error( ) { metrics_opened_connections.clone().increment(); - let result = handle_connection(checkpoint_map, peer_map.clone(), raw_stream, addr).await; + let result = handle_connection(checkpoint_map, serum_checkpoint_map, peer_map.clone(), raw_stream, addr).await; if result.is_err() { error!("connection {} error {}", addr, result.unwrap_err()); }; @@ -44,6 +46,7 @@ async fn handle_connection_error( async fn handle_connection( checkpoint_map: CheckpointMap, + serum_checkpoint_map: SerumCheckpointMap, peer_map: PeerMap, raw_stream: TcpStream, addr: SocketAddr, @@ -59,6 +62,7 @@ async fn handle_connection( info!("ws published: {}", addr); } + // todo: add subscribe logic // 2: send initial checkpoint { let checkpoint_map_copy = checkpoint_map.lock().unwrap().clone(); @@ -170,9 +174,11 @@ async fn main() -> anyhow::Result<()> { fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?; let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new())); + let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new())); let peers = PeerMap::new(Mutex::new(HashMap::new())); let checkpoints_ref_thread = checkpoints.clone(); + let serum_checkpoints_ref_thread = serum_checkpoints.clone(); let peers_ref_thread = peers.clone(); // filleventfilter websocket sink @@ -202,6 +208,27 @@ async fn main() -> anyhow::Result<()> { .unwrap() .insert(checkpoint.queue.clone(), checkpoint); } + FillEventFilterMessage::SerumUpdate(update) => { + debug!("ws update {} {:?} serum fill", update.market, update.status); + let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); + for (k, v) in peer_copy.iter_mut() { + trace!(" > {}", k); + let json = serde_json::to_string(&update); + let result = v.send(Message::Text(json.unwrap())).await; + if result.is_err() { + error!( + "ws update {} {:?} serum fill could not reach {}", + update.market, update.status, k + ); + } + } + } + FillEventFilterMessage::SerumCheckpoint(checkpoint) => { + serum_checkpoints_ref_thread + .lock() + .unwrap() + .insert(checkpoint.queue.clone(), checkpoint); + } } } }); @@ -214,6 +241,7 @@ async fn main() -> anyhow::Result<()> { while let Ok((stream, addr)) = listener.accept().await { tokio::spawn(handle_connection_error( checkpoints.clone(), + serum_checkpoints.clone(), peers.clone(), stream, addr, @@ -234,7 +262,10 @@ async fn main() -> anyhow::Result<()> { ); let use_geyser = true; let filter_config = FilterConfig { - program_ids: vec!["abc123".into()] + program_ids: vec![ + "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(), + "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(), + ], }; if use_geyser { grpc_plugin_source::process_events(