From f88ec6a53ce084c3a7e1b687fa4f377fcd88f1ca Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Fri, 20 Jan 2023 16:50:19 +0000 Subject: [PATCH] Add new common event schema --- Cargo.lock | 1 - lib/src/fill_event_filter.rs | 348 +++++++++++++++++++++-------------- 2 files changed, 209 insertions(+), 140 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7bc001..0df8624 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5771,7 +5771,6 @@ dependencies = [ "futures-core", "futures-util", "itertools 0.10.5", - "jemallocator", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", "log 0.4.17", diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index b108166..5371ad6 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -1,12 +1,12 @@ use crate::{ chain_data::{AccountData, ChainData, SlotData}, metrics::{MetricType, Metrics}, - AccountWrite, SlotUpdate, + AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide, }; -use bytemuck::{Pod, Zeroable}; +use bytemuck::{Pod, Zeroable, cast_slice}; use log::*; use serde::{ser::SerializeStruct, Serialize, Serializer}; -use serum_dex::state::EventView; +use serum_dex::state::EventView as SpotEvent; use solana_sdk::{ account::{ReadableAccount, WritableAccount}, clock::Epoch, @@ -15,13 +15,14 @@ use solana_sdk::{ use std::{ borrow::BorrowMut, cmp::max, - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet}, convert::identity, time::SystemTime, }; use crate::metrics::MetricU64; use anchor_lang::AccountDeserialize; use mango_v4::state::{ - AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent, MAX_NUM_EVENTS, + AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, Side, + MAX_NUM_EVENTS, }; #[derive(Clone, Copy, Debug, Serialize)] @@ -30,19 +31,122 @@ pub enum FillUpdateStatus { Revoke, } -#[derive(Clone, Debug)] -pub struct FillUpdate { - pub event: FillEvent, - pub status: FillUpdateStatus, - pub market: String, - pub queue: String, - pub slot: u64, - pub write_version: u64, +#[derive(Clone, Copy, Debug, Serialize)] +pub enum FillEventType { + Spot, + Perp, } #[derive(Clone, Debug)] -pub struct SerumFillUpdate { - pub event: serum_dex::state::Event, +pub struct FillEvent { + pub event_type: FillEventType, + pub maker: bool, + pub side: OrderbookSide, + pub timestamp: u64, + pub seq_num: u64, + pub owner: String, + pub order_id: u128, + pub client_order_id: u64, + pub fee: f32, + pub price: i64, + pub quantity: i64, +} + +impl Serialize for FillEvent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("FillEvent", 12)?; + state.serialize_field("eventType", &self.event_type)?; + state.serialize_field("maker", &self.maker)?; + state.serialize_field("side", &self.side)?; + state.serialize_field("timestamp", &self.timestamp)?; + state.serialize_field("seqNum", &self.seq_num)?; + state.serialize_field("owner", &self.owner)?; + state.serialize_field("orderId", &self.order_id)?; + state.serialize_field("clientOrderId", &self.client_order_id)?; + state.serialize_field("fee", &self.fee)?; + state.serialize_field("price", &self.price)?; + state.serialize_field("quantity", &self.quantity)?; + state.end() + } +} + +impl FillEvent { + pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] { + let taker_side = match event.taker_side() { + Side::Ask => OrderbookSide::Ask, + Side::Bid => OrderbookSide::Bid, + }; + let maker_side = match event.taker_side() { + Side::Ask => OrderbookSide::Bid, + Side::Bid => OrderbookSide::Ask, + }; + [FillEvent { + event_type: FillEventType::Perp, + maker: true, + side: maker_side, + timestamp: event.timestamp, + seq_num: event.seq_num, + owner: event.maker.to_string(), + order_id: event.maker_order_id, + client_order_id: 0u64, + fee: event.maker_fee.to_num(), + price: event.price, + quantity: event.quantity, + }, + FillEvent { + event_type: FillEventType::Perp, + maker: false, + side: taker_side, + timestamp: event.timestamp, + seq_num: event.seq_num, + owner: event.taker.to_string(), + order_id: event.taker_order_id, + client_order_id: event.taker_client_order_id, + fee: event.taker_fee.to_num(), + price: event.price, + quantity: event.quantity, + + }] + } + pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self { + match event { + SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => { + let side = match side as u8 { + 0 => OrderbookSide::Bid, + 1 => OrderbookSide::Ask, + _ => panic!("invalid side"), + }; + let client_order_id: u64 = match client_order_id { + Some(id) => id.into(), + None => 0u64, + }; + // TODO: native to ui + let price = (native_qty_paid / native_qty_received) as i64; + FillEvent { + event_type: FillEventType::Spot, + maker: maker, + side, + timestamp, + seq_num, + owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(), + order_id: order_id, + client_order_id: client_order_id, + fee: native_fee_or_rebate as f32, + price, + quantity: native_qty_received as i64, + } + } + SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")} + } + } +} + +#[derive(Clone, Debug)] +pub struct FillUpdate { + pub event: FillEvent, pub status: FillUpdateStatus, pub market: String, pub queue: String, @@ -66,27 +170,8 @@ impl Serialize for FillUpdate { where S: Serializer, { - let event = base64::encode(bytemuck::bytes_of(&self.event)); let mut state = serializer.serialize_struct("FillUpdate", 4)?; - state.serialize_field("event", &event)?; - state.serialize_field("market", &self.market)?; - state.serialize_field("queue", &self.queue)?; - state.serialize_field("status", &self.status)?; - state.serialize_field("slot", &self.slot)?; - state.serialize_field("write_version", &self.write_version)?; - - state.end() - } -} - -impl Serialize for SerumFillUpdate { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let event = base64::encode(bytemuck::bytes_of(&self.event)); - let mut state = serializer.serialize_struct("SerumFillUpdate", 4)?; - state.serialize_field("event", &event)?; + state.serialize_field("event", &self.event)?; state.serialize_field("market", &self.market)?; state.serialize_field("queue", &self.queue)?; state.serialize_field("status", &self.status)?; @@ -106,48 +191,13 @@ pub struct FillCheckpoint { pub write_version: u64, } -#[derive(Clone, Debug)] -pub struct SerumFillCheckpoint { - pub market: String, - pub queue: String, - pub events: Vec, - pub slot: u64, - pub write_version: u64, -} - impl Serialize for FillCheckpoint { fn serialize(&self, serializer: S) -> Result where S: Serializer, { - let events: Vec = self - .events - .iter() - .map(|e| base64::encode(bytemuck::bytes_of(e))) - .collect(); let mut state = serializer.serialize_struct("FillCheckpoint", 3)?; - state.serialize_field("events", &events)?; - state.serialize_field("market", &self.market)?; - state.serialize_field("queue", &self.queue)?; - state.serialize_field("slot", &self.slot)?; - state.serialize_field("write_version", &self.write_version)?; - - state.end() - } -} - -impl Serialize for SerumFillCheckpoint { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let events: Vec = self - .events - .iter() - .map(|e| base64::encode(bytemuck::bytes_of(e))) - .collect(); - let mut state = serializer.serialize_struct("SerumFillCheckpoint", 3)?; - state.serialize_field("events", &events)?; + state.serialize_field("events", &self.events)?; state.serialize_field("market", &self.market)?; state.serialize_field("queue", &self.queue)?; state.serialize_field("slot", &self.slot)?; @@ -159,9 +209,7 @@ impl Serialize for SerumFillCheckpoint { pub enum FillEventFilterMessage { Update(FillUpdate), - SerumUpdate(SerumFillUpdate), Checkpoint(FillCheckpoint), - SerumCheckpoint(SerumFillCheckpoint), } // couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue @@ -205,18 +253,22 @@ fn publish_changes_perp( // new fills are published and recorded in checkpoint if events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(events[idx]); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill, - status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(fill); + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fills = FillEvent::new_from_perp(fill); + // send event for both maker and taker + for fill in fills { + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill.clone(), + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(fill); + } } } else if old_events[idx].event_type != events[idx].event_type || old_events[idx].padding != events[idx].padding @@ -230,39 +282,48 @@ fn publish_changes_perp( // first revoke old event if a fill if old_events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(old_events[idx]); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill, - status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error + let fill: PerpFillEvent = bytemuck::cast(old_events[idx]); + let fills = FillEvent::new_from_perp(fill); + for fill in fills { + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } } // then publish new if its a fill and record in checkpoint if events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(events[idx]); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill, - status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(fill); + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fills = FillEvent::new_from_perp(fill); + for fill in fills { + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill.clone(), + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(fill); + } } } else { // every already published event is recorded in checkpoint if a fill if events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(events[idx]); - checkpoint.push(fill); + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fills = FillEvent::new_from_perp(fill); + for fill in fills { + checkpoint.push(fill); + } } } } @@ -278,17 +339,20 @@ fn publish_changes_perp( metric_events_drop.increment(); if old_events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(old_events[idx]); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - event: fill, - write_version, - status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error + let fill: PerpFillEvent = bytemuck::cast(old_events[idx]); + let fills = FillEvent::new_from_perp(fill); + for fill in fills { + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + event: fill, + write_version, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } } } @@ -325,42 +389,46 @@ fn publish_changes_serum( let evq_pk_string = mkt.1.to_string(); let header_seq_num = header.seq_num; debug!("start seq {} header seq {}", start_seq_num, header_seq_num); + + // Timestamp for spot events is time scraped + let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); for seq_num in start_seq_num..header_seq_num { let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; let event_view = events[idx].as_view().unwrap(); let old_event_view = old_events[idx].as_view().unwrap(); match event_view { - EventView::Fill { .. } => { + SpotEvent::Fill { .. } => { // there are three possible cases: // 1) the event is past the old seq num, hence guaranteed new event // 2) the event is not matching the old event queue // 3) all other events are matching the old event queue // the order of these checks is important so they are exhaustive + let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num); if seq_num >= old_seq_num { debug!("found new serum fill {} idx {}", mkt_pk_string, idx,); metric_events_new.increment(); fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + .try_send(FillEventFilterMessage::Update(FillUpdate { slot, write_version, - event: events[idx], + event: fill.clone(), status: FillUpdateStatus::New, market: mkt_pk_string.clone(), queue: evq_pk_string.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(events[idx]); + checkpoint.push(fill); continue; } match old_event_view { - EventView::Fill { .. } => { + SpotEvent::Fill { .. } => { // every already published event is recorded in checkpoint - checkpoint.push(events[idx]); + checkpoint.push(fill); } - EventView::Out { .. } => { + SpotEvent::Out { .. } => { debug!( "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num @@ -368,12 +436,13 @@ fn publish_changes_serum( metric_events_change.increment(); + let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); // first revoke old event fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + .try_send(FillEventFilterMessage::Update(FillUpdate { slot, write_version, - event: old_events[idx], + event: old_fill, status: FillUpdateStatus::Revoke, market: mkt_pk_string.clone(), queue: evq_pk_string.clone(), @@ -382,16 +451,16 @@ fn publish_changes_serum( // then publish new if its a fill and record in checkpoint fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + .try_send(FillEventFilterMessage::Update(FillUpdate { slot, write_version, - event: events[idx], + event: fill.clone(), status: FillUpdateStatus::New, market: mkt_pk_string.clone(), queue: evq_pk_string.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(events[idx]); + checkpoint.push(fill); } } } @@ -411,11 +480,12 @@ fn publish_changes_serum( metric_events_drop.increment(); match old_event_view { - EventView::Fill { .. } => { + SpotEvent::Fill { .. } => { + let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + .try_send(FillEventFilterMessage::Update(FillUpdate { slot, - event: old_events[idx], + event: old_fill, write_version, status: FillUpdateStatus::Revoke, market: mkt_pk_string.clone(), @@ -423,13 +493,13 @@ fn publish_changes_serum( })) .unwrap(); // TODO: use anyhow to bubble up error } - EventView::Out { .. } => continue, + SpotEvent::Out { .. } => continue, } } fill_update_sender - .try_send(FillEventFilterMessage::SerumCheckpoint( - SerumFillCheckpoint { + .try_send(FillEventFilterMessage::Checkpoint( + FillCheckpoint { slot, write_version, events: checkpoint,