2022-03-28 10:58:14 -07:00
|
|
|
use crate::{
|
|
|
|
chain_data::{AccountData, ChainData, SlotData},
|
2022-10-07 03:46:19 -07:00
|
|
|
metrics::{MetricType, Metrics},
|
2023-02-03 03:39:53 -08:00
|
|
|
orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide},
|
|
|
|
AccountWrite, SlotUpdate,
|
2022-03-28 10:58:14 -07:00
|
|
|
};
|
2023-02-03 03:39:53 -08:00
|
|
|
use bytemuck::{cast_slice, Pod, Zeroable};
|
|
|
|
use chrono::{Utc, TimeZone};
|
2022-03-28 10:58:14 -07:00
|
|
|
use log::*;
|
2022-12-16 03:03:21 -08:00
|
|
|
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
2023-01-20 08:50:19 -08:00
|
|
|
use serum_dex::state::EventView as SpotEvent;
|
2022-03-28 10:58:14 -07:00
|
|
|
use solana_sdk::{
|
|
|
|
account::{ReadableAccount, WritableAccount},
|
|
|
|
clock::Epoch,
|
|
|
|
pubkey::Pubkey,
|
|
|
|
};
|
|
|
|
use std::{
|
2022-11-16 06:57:50 -08:00
|
|
|
borrow::BorrowMut,
|
2022-03-28 10:58:14 -07:00
|
|
|
cmp::max,
|
2023-02-03 03:39:53 -08:00
|
|
|
collections::{HashMap, HashSet},
|
|
|
|
convert::identity,
|
|
|
|
time::SystemTime,
|
2022-03-28 10:58:14 -07:00
|
|
|
};
|
|
|
|
|
2022-10-05 15:53:20 -07:00
|
|
|
use crate::metrics::MetricU64;
|
2022-11-16 06:57:50 -08:00
|
|
|
use anchor_lang::AccountDeserialize;
|
|
|
|
use mango_v4::state::{
|
2023-01-20 08:50:19 -08:00
|
|
|
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, Side,
|
|
|
|
MAX_NUM_EVENTS,
|
2022-11-16 06:57:50 -08:00
|
|
|
};
|
2022-03-28 10:58:14 -07:00
|
|
|
|
2023-02-03 03:39:53 -08:00
|
|
|
#[derive(Clone, Copy, Debug)]
|
2022-03-28 10:58:14 -07:00
|
|
|
pub enum FillUpdateStatus {
|
|
|
|
New,
|
|
|
|
Revoke,
|
|
|
|
}
|
|
|
|
|
2023-02-03 03:39:53 -08:00
|
|
|
impl Serialize for FillUpdateStatus {
|
|
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
|
|
where
|
|
|
|
S: Serializer,
|
|
|
|
{
|
|
|
|
match *self {
|
|
|
|
FillUpdateStatus::New => {
|
|
|
|
serializer.serialize_unit_variant("FillUpdateStatus", 0, "new")
|
|
|
|
}
|
|
|
|
FillUpdateStatus::Revoke => {
|
|
|
|
serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
2023-01-20 08:50:19 -08:00
|
|
|
pub enum FillEventType {
|
|
|
|
Spot,
|
|
|
|
Perp,
|
|
|
|
}
|
|
|
|
|
2023-02-03 03:39:53 -08:00
|
|
|
impl Serialize for FillEventType {
|
|
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
|
|
where
|
|
|
|
S: Serializer,
|
|
|
|
{
|
|
|
|
match *self {
|
|
|
|
FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"),
|
|
|
|
FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
#[derive(Clone, Debug)]
|
2023-01-20 08:50:19 -08:00
|
|
|
pub struct FillEvent {
|
|
|
|
pub event_type: FillEventType,
|
|
|
|
pub maker: bool,
|
|
|
|
pub side: OrderbookSide,
|
|
|
|
pub timestamp: u64,
|
|
|
|
pub seq_num: u64,
|
|
|
|
pub owner: String,
|
|
|
|
pub order_id: u128,
|
|
|
|
pub client_order_id: u64,
|
|
|
|
pub fee: f32,
|
2023-02-03 03:39:53 -08:00
|
|
|
pub price: f64,
|
|
|
|
pub quantity: f64,
|
2023-01-20 08:50:19 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Serialize for FillEvent {
|
|
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
|
|
where
|
|
|
|
S: Serializer,
|
|
|
|
{
|
|
|
|
let mut state = serializer.serialize_struct("FillEvent", 12)?;
|
|
|
|
state.serialize_field("eventType", &self.event_type)?;
|
|
|
|
state.serialize_field("maker", &self.maker)?;
|
|
|
|
state.serialize_field("side", &self.side)?;
|
2023-02-03 03:39:53 -08:00
|
|
|
state.serialize_field("timestamp", &Utc.timestamp_opt(self.timestamp as i64, 0).unwrap().to_rfc3339())?;
|
2023-01-20 08:50:19 -08:00
|
|
|
state.serialize_field("seqNum", &self.seq_num)?;
|
|
|
|
state.serialize_field("owner", &self.owner)?;
|
|
|
|
state.serialize_field("orderId", &self.order_id)?;
|
|
|
|
state.serialize_field("clientOrderId", &self.client_order_id)?;
|
|
|
|
state.serialize_field("fee", &self.fee)?;
|
|
|
|
state.serialize_field("price", &self.price)?;
|
|
|
|
state.serialize_field("quantity", &self.quantity)?;
|
|
|
|
state.end()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl FillEvent {
|
2023-02-03 03:39:53 -08:00
|
|
|
pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> [Self; 2] {
|
2023-01-20 08:50:19 -08:00
|
|
|
let taker_side = match event.taker_side() {
|
|
|
|
Side::Ask => OrderbookSide::Ask,
|
|
|
|
Side::Bid => OrderbookSide::Bid,
|
|
|
|
};
|
|
|
|
let maker_side = match event.taker_side() {
|
|
|
|
Side::Ask => OrderbookSide::Bid,
|
|
|
|
Side::Bid => OrderbookSide::Ask,
|
|
|
|
};
|
2023-02-03 03:39:53 -08:00
|
|
|
let price = price_lots_to_ui_perp(
|
|
|
|
event.price,
|
|
|
|
config.base_decimals,
|
|
|
|
config.quote_decimals,
|
|
|
|
config.base_lot_size,
|
|
|
|
config.quote_lot_size,
|
|
|
|
);
|
|
|
|
let quantity =
|
|
|
|
base_lots_to_ui_perp(event.quantity, config.base_decimals, config.quote_decimals);
|
|
|
|
[
|
|
|
|
FillEvent {
|
|
|
|
event_type: FillEventType::Perp,
|
|
|
|
maker: true,
|
|
|
|
side: maker_side,
|
|
|
|
timestamp: event.timestamp,
|
|
|
|
seq_num: event.seq_num,
|
|
|
|
owner: event.maker.to_string(),
|
|
|
|
order_id: event.maker_order_id,
|
|
|
|
client_order_id: 0u64,
|
|
|
|
fee: event.maker_fee.to_num(),
|
|
|
|
price: price,
|
|
|
|
quantity: quantity,
|
|
|
|
},
|
|
|
|
FillEvent {
|
|
|
|
event_type: FillEventType::Perp,
|
|
|
|
maker: false,
|
|
|
|
side: taker_side,
|
|
|
|
timestamp: event.timestamp,
|
|
|
|
seq_num: event.seq_num,
|
|
|
|
owner: event.taker.to_string(),
|
|
|
|
order_id: event.taker_order_id,
|
|
|
|
client_order_id: event.taker_client_order_id,
|
|
|
|
fee: event.taker_fee.to_num(),
|
|
|
|
price: price,
|
|
|
|
quantity: quantity,
|
|
|
|
},
|
|
|
|
]
|
2023-01-20 08:50:19 -08:00
|
|
|
}
|
2023-02-03 03:39:53 -08:00
|
|
|
pub fn new_from_spot(
|
|
|
|
event: SpotEvent,
|
|
|
|
timestamp: u64,
|
|
|
|
seq_num: u64,
|
|
|
|
config: &MarketConfig,
|
|
|
|
) -> Self {
|
2023-01-20 08:50:19 -08:00
|
|
|
match event {
|
2023-02-03 03:39:53 -08:00
|
|
|
SpotEvent::Fill {
|
|
|
|
side,
|
|
|
|
maker,
|
|
|
|
native_qty_paid,
|
|
|
|
native_qty_received,
|
|
|
|
native_fee_or_rebate,
|
|
|
|
order_id,
|
|
|
|
owner,
|
|
|
|
client_order_id,
|
|
|
|
..
|
|
|
|
} => {
|
2023-01-20 08:50:19 -08:00
|
|
|
let side = match side as u8 {
|
|
|
|
0 => OrderbookSide::Bid,
|
|
|
|
1 => OrderbookSide::Ask,
|
|
|
|
_ => panic!("invalid side"),
|
|
|
|
};
|
|
|
|
let client_order_id: u64 = match client_order_id {
|
|
|
|
Some(id) => id.into(),
|
|
|
|
None => 0u64,
|
|
|
|
};
|
2023-02-03 03:39:53 -08:00
|
|
|
|
|
|
|
let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64;
|
|
|
|
let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64;
|
|
|
|
|
|
|
|
let (price, quantity) = match side {
|
|
|
|
OrderbookSide::Bid => {
|
|
|
|
let price_before_fees = if maker {
|
|
|
|
native_qty_paid + native_fee_or_rebate
|
|
|
|
} else {
|
|
|
|
native_qty_paid - native_fee_or_rebate
|
|
|
|
};
|
|
|
|
|
|
|
|
let top = price_before_fees * base_multiplier;
|
|
|
|
let bottom = quote_multiplier * native_qty_received;
|
|
|
|
let price = top as f64 / bottom as f64;
|
|
|
|
let quantity = native_qty_received as f64 / base_multiplier as f64;
|
|
|
|
(price, quantity)
|
|
|
|
}
|
|
|
|
OrderbookSide::Ask => {
|
|
|
|
let price_before_fees = if maker {
|
|
|
|
native_qty_received - native_fee_or_rebate
|
|
|
|
} else {
|
|
|
|
native_qty_received + native_fee_or_rebate
|
|
|
|
};
|
|
|
|
|
|
|
|
let top = price_before_fees * base_multiplier;
|
|
|
|
let bottom = quote_multiplier * native_qty_paid;
|
|
|
|
let price = top as f64 / bottom as f64;
|
|
|
|
let quantity = native_qty_paid as f64 / base_multiplier as f64;
|
|
|
|
(price, quantity)
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let fee =
|
|
|
|
native_fee_or_rebate as f32 / quote_multiplier as f32;
|
|
|
|
|
2023-01-20 08:50:19 -08:00
|
|
|
FillEvent {
|
|
|
|
event_type: FillEventType::Spot,
|
|
|
|
maker: maker,
|
|
|
|
side,
|
|
|
|
timestamp,
|
|
|
|
seq_num,
|
|
|
|
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
|
|
|
|
order_id: order_id,
|
|
|
|
client_order_id: client_order_id,
|
2023-02-03 03:39:53 -08:00
|
|
|
fee,
|
2023-01-20 08:50:19 -08:00
|
|
|
price,
|
2023-02-03 03:39:53 -08:00
|
|
|
quantity,
|
2023-01-20 08:50:19 -08:00
|
|
|
}
|
|
|
|
}
|
2023-02-03 03:39:53 -08:00
|
|
|
SpotEvent::Out { .. } => {
|
|
|
|
panic!("Can't build FillEvent from SpotEvent::Out")
|
|
|
|
}
|
2023-01-20 08:50:19 -08:00
|
|
|
}
|
|
|
|
}
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
|
2022-12-24 06:43:43 -08:00
|
|
|
#[derive(Clone, Debug)]
|
2023-01-20 08:50:19 -08:00
|
|
|
pub struct FillUpdate {
|
|
|
|
pub event: FillEvent,
|
2022-12-24 06:43:43 -08:00
|
|
|
pub status: FillUpdateStatus,
|
2023-02-03 03:39:53 -08:00
|
|
|
pub market_key: String,
|
|
|
|
pub market_name: String,
|
2022-12-24 06:43:43 -08:00
|
|
|
pub slot: u64,
|
|
|
|
pub write_version: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Copy, Clone, Debug)]
|
|
|
|
#[repr(packed)]
|
|
|
|
pub struct SerumEventQueueHeader {
|
|
|
|
_account_flags: u64, // Initialized, EventQueue
|
|
|
|
_head: u64,
|
|
|
|
count: u64,
|
|
|
|
seq_num: u64,
|
|
|
|
}
|
|
|
|
unsafe impl Zeroable for SerumEventQueueHeader {}
|
|
|
|
unsafe impl Pod for SerumEventQueueHeader {}
|
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
impl Serialize for FillUpdate {
|
|
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
|
|
where
|
|
|
|
S: Serializer,
|
|
|
|
{
|
2023-02-03 03:39:53 -08:00
|
|
|
let mut state = serializer.serialize_struct("FillUpdate", 6)?;
|
2023-01-20 08:50:19 -08:00
|
|
|
state.serialize_field("event", &self.event)?;
|
2023-02-03 03:39:53 -08:00
|
|
|
state.serialize_field("marketKey", &self.market_key)?;
|
|
|
|
state.serialize_field("marketName", &self.market_name)?;
|
2022-12-24 06:43:43 -08:00
|
|
|
state.serialize_field("status", &self.status)?;
|
|
|
|
state.serialize_field("slot", &self.slot)?;
|
2023-02-03 03:39:53 -08:00
|
|
|
state.serialize_field("writeVersion", &self.write_version)?;
|
2022-12-24 06:43:43 -08:00
|
|
|
|
|
|
|
state.end()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub struct FillCheckpoint {
|
|
|
|
pub market: String,
|
|
|
|
pub queue: String,
|
|
|
|
pub events: Vec<FillEvent>,
|
2022-04-07 20:14:39 -07:00
|
|
|
pub slot: u64,
|
|
|
|
pub write_version: u64,
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Serialize for FillCheckpoint {
|
|
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
|
|
where
|
|
|
|
S: Serializer,
|
|
|
|
{
|
2022-12-24 06:43:43 -08:00
|
|
|
let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
|
2023-01-20 08:50:19 -08:00
|
|
|
state.serialize_field("events", &self.events)?;
|
2022-03-28 10:58:14 -07:00
|
|
|
state.serialize_field("market", &self.market)?;
|
|
|
|
state.serialize_field("queue", &self.queue)?;
|
2022-04-07 20:14:39 -07:00
|
|
|
state.serialize_field("slot", &self.slot)?;
|
|
|
|
state.serialize_field("write_version", &self.write_version)?;
|
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
state.end()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub enum FillEventFilterMessage {
|
|
|
|
Update(FillUpdate),
|
|
|
|
Checkpoint(FillCheckpoint),
|
|
|
|
}
|
|
|
|
|
|
|
|
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
|
2022-11-16 06:57:50 -08:00
|
|
|
type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
|
2022-03-28 10:58:14 -07:00
|
|
|
|
2022-12-24 06:43:43 -08:00
|
|
|
fn publish_changes_perp(
|
2022-04-07 20:14:39 -07:00
|
|
|
slot: u64,
|
|
|
|
write_version: u64,
|
2023-02-03 03:39:53 -08:00
|
|
|
mkt: &(Pubkey, MarketConfig),
|
2022-03-28 10:58:14 -07:00
|
|
|
header: &EventQueueHeader,
|
|
|
|
events: &EventQueueEvents,
|
2022-11-16 06:57:50 -08:00
|
|
|
old_seq_num: u64,
|
2022-03-28 10:58:14 -07:00
|
|
|
old_events: &EventQueueEvents,
|
|
|
|
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
|
2022-09-21 07:45:55 -07:00
|
|
|
metric_events_new: &mut MetricU64,
|
|
|
|
metric_events_change: &mut MetricU64,
|
2022-10-05 15:53:20 -07:00
|
|
|
metric_events_drop: &mut MetricU64,
|
2022-03-28 10:58:14 -07:00
|
|
|
) {
|
|
|
|
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
|
2022-11-16 06:57:50 -08:00
|
|
|
let start_seq_num = max(old_seq_num, header.seq_num)
|
|
|
|
.checked_sub(MAX_NUM_EVENTS as u64)
|
|
|
|
.unwrap_or(0);
|
2022-03-28 10:58:14 -07:00
|
|
|
let mut checkpoint = Vec::new();
|
2022-12-16 03:03:21 -08:00
|
|
|
let mkt_pk_string = mkt.0.to_string();
|
2023-02-03 03:39:53 -08:00
|
|
|
let evq_pk_string = mkt.1.event_queue.to_string();
|
2022-03-28 10:58:14 -07:00
|
|
|
for seq_num in start_seq_num..header.seq_num {
|
2022-11-16 06:57:50 -08:00
|
|
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
2022-03-28 10:58:14 -07:00
|
|
|
|
|
|
|
// there are three possible cases:
|
|
|
|
// 1) the event is past the old seq num, hence guaranteed new event
|
|
|
|
// 2) the event is not matching the old event queue
|
|
|
|
// 3) all other events are matching the old event queue
|
|
|
|
// the order of these checks is important so they are exhaustive
|
|
|
|
if seq_num >= old_seq_num {
|
2022-09-21 07:45:55 -07:00
|
|
|
debug!(
|
2023-02-03 03:39:53 -08:00
|
|
|
"found new event {} idx {} type {} slot {} write_version {}",
|
|
|
|
mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version
|
2022-03-28 10:58:14 -07:00
|
|
|
);
|
|
|
|
|
2022-09-21 07:45:55 -07:00
|
|
|
metric_events_new.increment();
|
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
// new fills are published and recorded in checkpoint
|
|
|
|
if events[idx].event_type == EventType::Fill as u8 {
|
2023-01-20 08:50:19 -08:00
|
|
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
2023-02-03 03:39:53 -08:00
|
|
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
2023-01-20 08:50:19 -08:00
|
|
|
// send event for both maker and taker
|
|
|
|
for fill in fills {
|
|
|
|
fill_update_sender
|
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
|
|
|
slot,
|
|
|
|
write_version,
|
|
|
|
event: fill.clone(),
|
|
|
|
status: FillUpdateStatus::New,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2023-01-20 08:50:19 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
|
|
|
checkpoint.push(fill);
|
|
|
|
}
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
} else if old_events[idx].event_type != events[idx].event_type
|
|
|
|
|| old_events[idx].padding != events[idx].padding
|
|
|
|
{
|
2022-09-21 07:45:55 -07:00
|
|
|
debug!(
|
2022-03-28 10:58:14 -07:00
|
|
|
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
|
2022-12-16 03:03:21 -08:00
|
|
|
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num
|
2022-03-28 10:58:14 -07:00
|
|
|
);
|
|
|
|
|
2022-09-21 07:45:55 -07:00
|
|
|
metric_events_change.increment();
|
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
// first revoke old event if a fill
|
|
|
|
if old_events[idx].event_type == EventType::Fill as u8 {
|
2023-01-20 08:50:19 -08:00
|
|
|
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
2023-02-03 03:39:53 -08:00
|
|
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
2023-01-20 08:50:19 -08:00
|
|
|
for fill in fills {
|
|
|
|
fill_update_sender
|
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
|
|
|
slot,
|
|
|
|
write_version,
|
|
|
|
event: fill,
|
|
|
|
status: FillUpdateStatus::Revoke,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2023-01-20 08:50:19 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
|
|
|
}
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// then publish new if its a fill and record in checkpoint
|
|
|
|
if events[idx].event_type == EventType::Fill as u8 {
|
2023-01-20 08:50:19 -08:00
|
|
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
2023-02-03 03:39:53 -08:00
|
|
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
2023-01-20 08:50:19 -08:00
|
|
|
for fill in fills {
|
|
|
|
fill_update_sender
|
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
|
|
|
slot,
|
|
|
|
write_version,
|
|
|
|
event: fill.clone(),
|
|
|
|
status: FillUpdateStatus::New,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2023-01-20 08:50:19 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
|
|
|
checkpoint.push(fill);
|
|
|
|
}
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// every already published event is recorded in checkpoint if a fill
|
|
|
|
if events[idx].event_type == EventType::Fill as u8 {
|
2023-01-20 08:50:19 -08:00
|
|
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
2023-02-03 03:39:53 -08:00
|
|
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
2023-01-20 08:50:19 -08:00
|
|
|
for fill in fills {
|
|
|
|
checkpoint.push(fill);
|
|
|
|
}
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// in case queue size shrunk due to a fork we need revoke all previous fills
|
|
|
|
for seq_num in header.seq_num..old_seq_num {
|
2022-11-16 06:57:50 -08:00
|
|
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
2022-09-21 07:45:55 -07:00
|
|
|
debug!(
|
2023-02-03 03:39:53 -08:00
|
|
|
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}",
|
|
|
|
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num, slot, write_version
|
2022-03-28 10:58:14 -07:00
|
|
|
);
|
|
|
|
|
2022-09-21 07:45:55 -07:00
|
|
|
metric_events_drop.increment();
|
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
if old_events[idx].event_type == EventType::Fill as u8 {
|
2023-01-20 08:50:19 -08:00
|
|
|
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
2023-02-03 03:39:53 -08:00
|
|
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
2023-01-20 08:50:19 -08:00
|
|
|
for fill in fills {
|
|
|
|
fill_update_sender
|
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
|
|
|
slot,
|
|
|
|
event: fill,
|
|
|
|
write_version,
|
|
|
|
status: FillUpdateStatus::Revoke,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2023-01-20 08:50:19 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
|
|
|
}
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fill_update_sender
|
|
|
|
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
|
2022-04-07 20:14:39 -07:00
|
|
|
slot,
|
|
|
|
write_version,
|
2022-03-28 10:58:14 -07:00
|
|
|
events: checkpoint,
|
2022-12-16 03:03:21 -08:00
|
|
|
market: mkt_pk_string,
|
|
|
|
queue: evq_pk_string,
|
2022-03-28 10:58:14 -07:00
|
|
|
}))
|
|
|
|
.unwrap()
|
|
|
|
}
|
|
|
|
|
2022-12-24 06:43:43 -08:00
|
|
|
fn publish_changes_serum(
|
|
|
|
slot: u64,
|
|
|
|
write_version: u64,
|
2023-02-03 03:39:53 -08:00
|
|
|
mkt: &(Pubkey, MarketConfig),
|
2022-12-24 06:43:43 -08:00
|
|
|
header: &SerumEventQueueHeader,
|
|
|
|
events: &[serum_dex::state::Event],
|
|
|
|
old_seq_num: u64,
|
|
|
|
old_events: &[serum_dex::state::Event],
|
|
|
|
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
|
|
|
|
metric_events_new: &mut MetricU64,
|
|
|
|
metric_events_change: &mut MetricU64,
|
|
|
|
metric_events_drop: &mut MetricU64,
|
|
|
|
) {
|
|
|
|
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
|
|
|
|
let start_seq_num = max(old_seq_num, header.seq_num)
|
|
|
|
.checked_sub(MAX_NUM_EVENTS as u64)
|
|
|
|
.unwrap_or(0);
|
|
|
|
let mut checkpoint = Vec::new();
|
|
|
|
let mkt_pk_string = mkt.0.to_string();
|
2023-02-03 03:39:53 -08:00
|
|
|
let evq_pk_string = mkt.1.event_queue.to_string();
|
2022-12-24 06:43:43 -08:00
|
|
|
let header_seq_num = header.seq_num;
|
2023-01-03 16:04:35 -08:00
|
|
|
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
|
2023-01-20 08:50:19 -08:00
|
|
|
|
|
|
|
// Timestamp for spot events is time scraped
|
2023-02-03 03:39:53 -08:00
|
|
|
let timestamp = SystemTime::now()
|
|
|
|
.duration_since(SystemTime::UNIX_EPOCH)
|
|
|
|
.unwrap()
|
|
|
|
.as_secs();
|
2022-12-24 06:43:43 -08:00
|
|
|
for seq_num in start_seq_num..header_seq_num {
|
|
|
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
|
|
|
let event_view = events[idx].as_view().unwrap();
|
|
|
|
let old_event_view = old_events[idx].as_view().unwrap();
|
|
|
|
|
|
|
|
match event_view {
|
2023-01-20 08:50:19 -08:00
|
|
|
SpotEvent::Fill { .. } => {
|
2022-12-24 06:43:43 -08:00
|
|
|
// there are three possible cases:
|
|
|
|
// 1) the event is past the old seq num, hence guaranteed new event
|
|
|
|
// 2) the event is not matching the old event queue
|
|
|
|
// 3) all other events are matching the old event queue
|
|
|
|
// the order of these checks is important so they are exhaustive
|
2023-02-03 03:39:53 -08:00
|
|
|
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1);
|
2022-12-24 06:43:43 -08:00
|
|
|
if seq_num >= old_seq_num {
|
2023-01-03 16:04:35 -08:00
|
|
|
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
|
2022-12-24 06:43:43 -08:00
|
|
|
|
|
|
|
metric_events_new.increment();
|
|
|
|
fill_update_sender
|
2023-01-20 08:50:19 -08:00
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
2022-12-24 06:43:43 -08:00
|
|
|
slot,
|
|
|
|
write_version,
|
2023-01-20 08:50:19 -08:00
|
|
|
event: fill.clone(),
|
2022-12-24 06:43:43 -08:00
|
|
|
status: FillUpdateStatus::New,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2022-12-24 06:43:43 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
2023-01-20 08:50:19 -08:00
|
|
|
checkpoint.push(fill);
|
2022-12-24 06:43:43 -08:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2023-01-20 06:52:01 -08:00
|
|
|
match old_event_view {
|
2023-01-20 08:50:39 -08:00
|
|
|
SpotEvent::Fill { order_id, .. } => {
|
|
|
|
if order_id != fill.order_id {
|
|
|
|
debug!(
|
|
|
|
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
|
|
|
|
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
|
|
|
);
|
2023-02-03 03:39:53 -08:00
|
|
|
|
2023-01-20 08:50:39 -08:00
|
|
|
metric_events_change.increment();
|
2023-02-03 03:39:53 -08:00
|
|
|
|
|
|
|
let old_fill = FillEvent::new_from_spot(
|
|
|
|
old_event_view,
|
|
|
|
timestamp,
|
|
|
|
seq_num,
|
|
|
|
&mkt.1,
|
|
|
|
);
|
2023-01-20 08:50:39 -08:00
|
|
|
// first revoke old event
|
|
|
|
fill_update_sender
|
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
|
|
|
slot,
|
|
|
|
write_version,
|
|
|
|
event: old_fill,
|
|
|
|
status: FillUpdateStatus::Revoke,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2023-01-20 08:50:39 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
2023-02-03 03:39:53 -08:00
|
|
|
|
2023-01-20 08:50:39 -08:00
|
|
|
// then publish new
|
|
|
|
fill_update_sender
|
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
|
|
|
slot,
|
|
|
|
write_version,
|
|
|
|
event: fill.clone(),
|
|
|
|
status: FillUpdateStatus::New,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2023-01-20 08:50:39 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
|
|
|
}
|
|
|
|
|
|
|
|
// record new event in checkpoint
|
2023-01-20 08:50:19 -08:00
|
|
|
checkpoint.push(fill);
|
2023-01-20 06:52:01 -08:00
|
|
|
}
|
2023-01-20 08:50:19 -08:00
|
|
|
SpotEvent::Out { .. } => {
|
2023-01-20 06:52:01 -08:00
|
|
|
debug!(
|
2023-01-20 08:50:39 -08:00
|
|
|
"found changed type event {} idx {} seq_num {} header seq num {} old seq num {}",
|
2023-01-20 06:52:01 -08:00
|
|
|
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
|
|
|
);
|
|
|
|
|
|
|
|
metric_events_change.increment();
|
|
|
|
|
2023-01-20 08:50:39 -08:00
|
|
|
// publish new fill and record in checkpoint
|
2023-01-20 06:52:01 -08:00
|
|
|
fill_update_sender
|
2023-01-20 08:50:19 -08:00
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
2023-01-20 06:52:01 -08:00
|
|
|
slot,
|
|
|
|
write_version,
|
2023-01-20 08:50:19 -08:00
|
|
|
event: fill.clone(),
|
2023-01-20 06:52:01 -08:00
|
|
|
status: FillUpdateStatus::New,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2023-01-20 06:52:01 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
2023-01-20 08:50:19 -08:00
|
|
|
checkpoint.push(fill);
|
2023-01-20 06:52:01 -08:00
|
|
|
}
|
|
|
|
}
|
2022-12-24 06:43:43 -08:00
|
|
|
}
|
|
|
|
_ => continue,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-20 06:52:01 -08:00
|
|
|
// in case queue size shrunk due to a fork we need revoke all previous fills
|
|
|
|
for seq_num in header_seq_num..old_seq_num {
|
|
|
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
|
|
|
let old_event_view = old_events[idx].as_view().unwrap();
|
|
|
|
debug!(
|
|
|
|
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
|
|
|
|
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
|
|
|
);
|
|
|
|
|
|
|
|
metric_events_drop.increment();
|
|
|
|
|
|
|
|
match old_event_view {
|
2023-01-20 08:50:19 -08:00
|
|
|
SpotEvent::Fill { .. } => {
|
2023-02-03 03:39:53 -08:00
|
|
|
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1);
|
2023-01-20 06:52:01 -08:00
|
|
|
fill_update_sender
|
2023-01-20 08:50:19 -08:00
|
|
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
2023-01-20 06:52:01 -08:00
|
|
|
slot,
|
2023-01-20 08:50:19 -08:00
|
|
|
event: old_fill,
|
2023-01-20 06:52:01 -08:00
|
|
|
write_version,
|
|
|
|
status: FillUpdateStatus::Revoke,
|
2023-02-03 03:39:53 -08:00
|
|
|
market_key: mkt_pk_string.clone(),
|
|
|
|
market_name: mkt.1.name.clone(),
|
2023-01-20 06:52:01 -08:00
|
|
|
}))
|
|
|
|
.unwrap(); // TODO: use anyhow to bubble up error
|
|
|
|
}
|
2023-01-20 08:50:19 -08:00
|
|
|
SpotEvent::Out { .. } => continue,
|
2023-01-20 06:52:01 -08:00
|
|
|
}
|
|
|
|
}
|
2022-12-24 06:43:43 -08:00
|
|
|
|
|
|
|
fill_update_sender
|
2023-02-03 03:39:53 -08:00
|
|
|
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
|
|
|
|
slot,
|
|
|
|
write_version,
|
|
|
|
events: checkpoint,
|
|
|
|
market: mkt_pk_string,
|
|
|
|
queue: evq_pk_string,
|
|
|
|
}))
|
2022-12-24 06:43:43 -08:00
|
|
|
.unwrap()
|
|
|
|
}
|
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
pub async fn init(
|
2023-02-03 03:39:53 -08:00
|
|
|
perp_market_configs: Vec<(Pubkey, MarketConfig)>,
|
|
|
|
spot_market_configs: Vec<(Pubkey, MarketConfig)>,
|
2022-10-07 03:44:53 -07:00
|
|
|
metrics_sender: Metrics,
|
2022-03-28 10:58:14 -07:00
|
|
|
) -> anyhow::Result<(
|
|
|
|
async_channel::Sender<AccountWrite>,
|
|
|
|
async_channel::Sender<SlotUpdate>,
|
|
|
|
async_channel::Receiver<FillEventFilterMessage>,
|
|
|
|
)> {
|
2022-09-21 07:45:55 -07:00
|
|
|
let metrics_sender = metrics_sender.clone();
|
|
|
|
|
2022-10-07 03:46:19 -07:00
|
|
|
let mut metric_events_new =
|
2022-10-08 04:57:47 -07:00
|
|
|
metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Counter);
|
2023-01-18 11:39:53 -08:00
|
|
|
let mut metric_events_new_serum =
|
|
|
|
metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter);
|
2022-10-07 03:46:19 -07:00
|
|
|
let mut metric_events_change =
|
2022-10-08 04:57:47 -07:00
|
|
|
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter);
|
2023-02-03 03:39:53 -08:00
|
|
|
let mut metric_events_change_serum =
|
|
|
|
metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter);
|
2022-10-07 03:46:19 -07:00
|
|
|
let mut metrics_events_drop =
|
2022-10-08 04:57:47 -07:00
|
|
|
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter);
|
2023-02-03 03:39:53 -08:00
|
|
|
let mut metrics_events_drop_serum =
|
|
|
|
metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter);
|
2022-09-21 07:45:55 -07:00
|
|
|
|
2022-03-28 10:58:14 -07:00
|
|
|
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
|
|
|
let (account_write_queue_sender, account_write_queue_receiver) =
|
|
|
|
async_channel::unbounded::<AccountWrite>();
|
|
|
|
|
|
|
|
// Slot updates flowing from the outside into the single processing thread. From
|
|
|
|
// there they'll flow into the postgres sending thread.
|
|
|
|
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
|
|
|
|
|
|
|
|
// Fill updates can be consumed by client connections, they contain all fills for all markets
|
|
|
|
let (fill_update_sender, fill_update_receiver) =
|
|
|
|
async_channel::unbounded::<FillEventFilterMessage>();
|
|
|
|
|
|
|
|
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
|
|
|
|
2023-01-20 06:52:01 -08:00
|
|
|
let mut chain_cache = ChainData::new(metrics_sender);
|
2022-12-24 06:43:43 -08:00
|
|
|
let mut perp_events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
|
|
|
|
let mut serum_events_cache: HashMap<String, Vec<serum_dex::state::Event>> = HashMap::new();
|
2022-03-28 10:58:14 -07:00
|
|
|
let mut seq_num_cache = HashMap::new();
|
2022-12-16 03:03:21 -08:00
|
|
|
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
|
2022-03-28 10:58:14 -07:00
|
|
|
|
2023-02-03 03:39:53 -08:00
|
|
|
let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat();
|
|
|
|
let perp_queue_pks: Vec<Pubkey> = perp_market_configs
|
2022-03-28 10:58:14 -07:00
|
|
|
.iter()
|
2023-02-03 03:39:53 -08:00
|
|
|
.map(|x| x.1.event_queue)
|
|
|
|
.collect();
|
|
|
|
let spot_queue_pks: Vec<Pubkey> = spot_market_configs
|
|
|
|
.iter()
|
|
|
|
.map(|x| x.1.event_queue)
|
|
|
|
.collect();
|
|
|
|
let all_queue_pks: HashSet<Pubkey> =
|
|
|
|
HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat());
|
2022-03-28 10:58:14 -07:00
|
|
|
|
|
|
|
// update handling thread, reads both sloths and account updates
|
|
|
|
tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
Ok(account_write) = account_write_queue_receiver_c.recv() => {
|
2023-02-03 03:39:53 -08:00
|
|
|
if !all_queue_pks.contains(&account_write.pubkey) {
|
2022-03-28 10:58:14 -07:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
chain_cache.update_account(
|
|
|
|
account_write.pubkey,
|
|
|
|
AccountData {
|
|
|
|
slot: account_write.slot,
|
|
|
|
write_version: account_write.write_version,
|
|
|
|
account: WritableAccount::create(
|
|
|
|
account_write.lamports,
|
|
|
|
account_write.data.clone(),
|
|
|
|
account_write.owner,
|
|
|
|
account_write.executable,
|
|
|
|
account_write.rent_epoch as Epoch,
|
|
|
|
),
|
|
|
|
},
|
|
|
|
);
|
|
|
|
}
|
|
|
|
Ok(slot_update) = slot_queue_receiver.recv() => {
|
|
|
|
chain_cache.update_slot(SlotData {
|
|
|
|
slot: slot_update.slot,
|
|
|
|
parent: slot_update.parent,
|
|
|
|
status: slot_update.status,
|
|
|
|
chain: 0,
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
2023-02-03 03:39:53 -08:00
|
|
|
Err(e) = slot_queue_receiver.recv() => {
|
|
|
|
warn!("slot update channel err {:?}", e);
|
|
|
|
}
|
|
|
|
Err(e) = account_write_queue_receiver_c.recv() => {
|
|
|
|
warn!("write update channel err {:?}", e);
|
|
|
|
}
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
|
2023-02-03 03:39:53 -08:00
|
|
|
for mkt in all_market_configs.iter() {
|
|
|
|
let evq_pk = mkt.1.event_queue;
|
|
|
|
let evq_pk_string = evq_pk.to_string();
|
|
|
|
let last_evq_version = last_evq_versions
|
|
|
|
.get(&mkt.1.event_queue.to_string())
|
|
|
|
.unwrap_or(&(0, 0));
|
2022-03-28 10:58:14 -07:00
|
|
|
|
2023-02-03 03:39:53 -08:00
|
|
|
match chain_cache.account(&evq_pk) {
|
2022-03-28 10:58:14 -07:00
|
|
|
Ok(account_info) => {
|
|
|
|
// only process if the account state changed
|
2022-12-16 03:03:21 -08:00
|
|
|
let evq_version = (account_info.slot, account_info.write_version);
|
|
|
|
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
|
|
|
|
if evq_version == *last_evq_version {
|
2022-03-28 10:58:14 -07:00
|
|
|
continue;
|
|
|
|
}
|
2023-02-03 03:39:53 -08:00
|
|
|
if evq_version.0 < last_evq_version.0 {
|
|
|
|
debug!("evq version slot was old");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1
|
|
|
|
{
|
|
|
|
info!("evq version slot was same and write version was old");
|
|
|
|
continue;
|
|
|
|
}
|
2022-12-16 03:03:21 -08:00
|
|
|
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
|
2022-03-28 10:58:14 -07:00
|
|
|
|
|
|
|
let account = &account_info.account;
|
2022-12-24 06:43:43 -08:00
|
|
|
let is_perp = mango_v4::check_id(account.owner());
|
|
|
|
if is_perp {
|
|
|
|
let event_queue =
|
|
|
|
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
|
2023-02-03 03:39:53 -08:00
|
|
|
info!(
|
|
|
|
"evq {} seq_num {} version {:?}",
|
|
|
|
evq_pk_string, event_queue.header.seq_num, evq_version,
|
2022-12-24 06:43:43 -08:00
|
|
|
);
|
|
|
|
match seq_num_cache.get(&evq_pk_string) {
|
|
|
|
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
|
|
|
|
Some(old_events) => publish_changes_perp(
|
|
|
|
account_info.slot,
|
|
|
|
account_info.write_version,
|
2023-02-03 03:39:53 -08:00
|
|
|
&mkt,
|
2022-12-24 06:43:43 -08:00
|
|
|
&event_queue.header,
|
|
|
|
&event_queue.buf,
|
|
|
|
*old_seq_num,
|
|
|
|
old_events,
|
|
|
|
&fill_update_sender,
|
|
|
|
&mut metric_events_new,
|
|
|
|
&mut metric_events_change,
|
|
|
|
&mut metrics_events_drop,
|
|
|
|
),
|
|
|
|
_ => {
|
|
|
|
info!("perp_events_cache could not find {}", evq_pk_string)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
_ => info!("seq_num_cache could not find {}", evq_pk_string),
|
|
|
|
}
|
|
|
|
|
|
|
|
seq_num_cache
|
|
|
|
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
|
|
|
|
perp_events_cache
|
|
|
|
.insert(evq_pk_string.clone(), event_queue.buf.clone());
|
|
|
|
} else {
|
|
|
|
let inner_data = &account.data()[5..&account.data().len() - 7];
|
|
|
|
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
|
|
|
|
let header: SerumEventQueueHeader =
|
|
|
|
*bytemuck::from_bytes(&inner_data[..header_span]);
|
|
|
|
let seq_num = header.seq_num;
|
|
|
|
let count = header.count;
|
|
|
|
let rest = &inner_data[header_span..];
|
|
|
|
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
|
|
|
|
let new_len = rest.len() - slop;
|
|
|
|
let events = &rest[..new_len];
|
|
|
|
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
|
|
|
|
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
|
|
|
|
|
|
|
|
match seq_num_cache.get(&evq_pk_string) {
|
|
|
|
Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
|
|
|
|
Some(old_events) => publish_changes_serum(
|
|
|
|
account_info.slot,
|
|
|
|
account_info.write_version,
|
|
|
|
mkt,
|
|
|
|
&header,
|
|
|
|
&events,
|
|
|
|
*old_seq_num,
|
|
|
|
old_events,
|
|
|
|
&fill_update_sender,
|
2023-01-18 11:39:53 -08:00
|
|
|
&mut metric_events_new_serum,
|
2023-02-03 03:39:53 -08:00
|
|
|
&mut metric_events_change_serum,
|
|
|
|
&mut metrics_events_drop_serum,
|
2022-12-24 06:43:43 -08:00
|
|
|
),
|
|
|
|
_ => {
|
2023-02-03 03:39:53 -08:00
|
|
|
debug!(
|
|
|
|
"serum_events_cache could not find {}",
|
|
|
|
evq_pk_string
|
|
|
|
)
|
2022-12-24 06:43:43 -08:00
|
|
|
}
|
|
|
|
},
|
2023-02-03 03:39:53 -08:00
|
|
|
_ => debug!("seq_num_cache could not find {}", evq_pk_string),
|
2022-12-24 06:43:43 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
|
|
|
|
serum_events_cache
|
|
|
|
.insert(evq_pk_string.clone(), events.clone().to_vec());
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
}
|
2023-02-03 03:39:53 -08:00
|
|
|
Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue),
|
2022-03-28 10:58:14 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok((
|
|
|
|
account_write_queue_sender,
|
|
|
|
slot_queue_sender,
|
|
|
|
fill_update_receiver,
|
|
|
|
))
|
|
|
|
}
|