Compare commits

...

3 Commits

Author SHA1 Message Date
Riordan Panayides 1233cc58f3 Update chaindata metric names, finish serum events unified schema 2023-01-20 17:29:02 +00:00
Riordan Panayides 8d6b6f5f38 Fix serum event change detection 2023-01-20 16:50:39 +00:00
Riordan Panayides f88ec6a53c Add new common event schema 2023-01-20 16:50:19 +00:00
4 changed files with 250 additions and 200 deletions

1
Cargo.lock generated
View File

@ -5771,7 +5771,6 @@ dependencies = [
"futures-core",
"futures-util",
"itertools 0.10.5",
"jemallocator",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core-client",
"log 0.4.17",

View File

@ -56,15 +56,15 @@ impl ChainData {
account_versions_stored: 0,
account_bytes_stored: 0,
metric_accounts_stored: metrics_sender.register_u64(
"fills_feed_chaindata_accounts_stored".into(),
"chaindata_accounts_stored".into(),
MetricType::Gauge,
),
metric_account_versions_stored: metrics_sender.register_u64(
"fills_feed_chaindata_account_versions_stored".into(),
"chaindata_account_versions_stored".into(),
MetricType::Gauge,
),
metric_account_bytes_stored: metrics_sender.register_u64(
"fills_feed_chaindata_account_bytes_stored".into(),
"chaindata_account_bytes_stored".into(),
MetricType::Gauge,
),
}

View File

@ -1,12 +1,12 @@
use crate::{
chain_data::{AccountData, ChainData, SlotData},
metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate,
AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide,
};
use bytemuck::{Pod, Zeroable};
use bytemuck::{Pod, Zeroable, cast_slice};
use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::state::EventView;
use serum_dex::state::EventView as SpotEvent;
use solana_sdk::{
account::{ReadableAccount, WritableAccount},
clock::Epoch,
@ -15,13 +15,14 @@ use solana_sdk::{
use std::{
borrow::BorrowMut,
cmp::max,
collections::{HashMap, HashSet},
collections::{HashMap, HashSet}, convert::identity, time::SystemTime,
};
use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize;
use mango_v4::state::{
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent, MAX_NUM_EVENTS,
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, Side,
MAX_NUM_EVENTS,
};
#[derive(Clone, Copy, Debug, Serialize)]
@ -30,19 +31,122 @@ pub enum FillUpdateStatus {
Revoke,
}
#[derive(Clone, Debug)]
pub struct FillUpdate {
pub event: FillEvent,
pub status: FillUpdateStatus,
pub market: String,
pub queue: String,
pub slot: u64,
pub write_version: u64,
#[derive(Clone, Copy, Debug, Serialize)]
pub enum FillEventType {
Spot,
Perp,
}
#[derive(Clone, Debug)]
pub struct SerumFillUpdate {
pub event: serum_dex::state::Event,
pub struct FillEvent {
pub event_type: FillEventType,
pub maker: bool,
pub side: OrderbookSide,
pub timestamp: u64,
pub seq_num: u64,
pub owner: String,
pub order_id: u128,
pub client_order_id: u64,
pub fee: f32,
pub price: i64,
pub quantity: i64,
}
impl Serialize for FillEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillEvent", 12)?;
state.serialize_field("eventType", &self.event_type)?;
state.serialize_field("maker", &self.maker)?;
state.serialize_field("side", &self.side)?;
state.serialize_field("timestamp", &self.timestamp)?;
state.serialize_field("seqNum", &self.seq_num)?;
state.serialize_field("owner", &self.owner)?;
state.serialize_field("orderId", &self.order_id)?;
state.serialize_field("clientOrderId", &self.client_order_id)?;
state.serialize_field("fee", &self.fee)?;
state.serialize_field("price", &self.price)?;
state.serialize_field("quantity", &self.quantity)?;
state.end()
}
}
impl FillEvent {
pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] {
let taker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Ask,
Side::Bid => OrderbookSide::Bid,
};
let maker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Bid,
Side::Bid => OrderbookSide::Ask,
};
[FillEvent {
event_type: FillEventType::Perp,
maker: true,
side: maker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.maker.to_string(),
order_id: event.maker_order_id,
client_order_id: 0u64,
fee: event.maker_fee.to_num(),
price: event.price,
quantity: event.quantity,
},
FillEvent {
event_type: FillEventType::Perp,
maker: false,
side: taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.taker.to_string(),
order_id: event.taker_order_id,
client_order_id: event.taker_client_order_id,
fee: event.taker_fee.to_num(),
price: event.price,
quantity: event.quantity,
}]
}
pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self {
match event {
SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => {
let side = match side as u8 {
0 => OrderbookSide::Bid,
1 => OrderbookSide::Ask,
_ => panic!("invalid side"),
};
let client_order_id: u64 = match client_order_id {
Some(id) => id.into(),
None => 0u64,
};
// TODO: native to ui
let price = (native_qty_paid / native_qty_received) as i64;
FillEvent {
event_type: FillEventType::Spot,
maker: maker,
side,
timestamp,
seq_num,
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
order_id: order_id,
client_order_id: client_order_id,
fee: native_fee_or_rebate as f32,
price,
quantity: native_qty_received as i64,
}
}
SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")}
}
}
}
#[derive(Clone, Debug)]
pub struct FillUpdate {
pub event: FillEvent,
pub status: FillUpdateStatus,
pub market: String,
pub queue: String,
@ -66,27 +170,8 @@ impl Serialize for FillUpdate {
where
S: Serializer,
{
let event = base64::encode(bytemuck::bytes_of(&self.event));
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
state.serialize_field("event", &event)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
impl Serialize for SerumFillUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let event = base64::encode(bytemuck::bytes_of(&self.event));
let mut state = serializer.serialize_struct("SerumFillUpdate", 4)?;
state.serialize_field("event", &event)?;
state.serialize_field("event", &self.event)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("status", &self.status)?;
@ -106,48 +191,13 @@ pub struct FillCheckpoint {
pub write_version: u64,
}
#[derive(Clone, Debug)]
pub struct SerumFillCheckpoint {
pub market: String,
pub queue: String,
pub events: Vec<serum_dex::state::Event>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let events: Vec<String> = self
.events
.iter()
.map(|e| base64::encode(bytemuck::bytes_of(e)))
.collect();
let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
state.serialize_field("events", &events)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
impl Serialize for SerumFillCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let events: Vec<String> = self
.events
.iter()
.map(|e| base64::encode(bytemuck::bytes_of(e)))
.collect();
let mut state = serializer.serialize_struct("SerumFillCheckpoint", 3)?;
state.serialize_field("events", &events)?;
state.serialize_field("events", &self.events)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("slot", &self.slot)?;
@ -159,9 +209,7 @@ impl Serialize for SerumFillCheckpoint {
pub enum FillEventFilterMessage {
Update(FillUpdate),
SerumUpdate(SerumFillUpdate),
Checkpoint(FillCheckpoint),
SerumCheckpoint(SerumFillCheckpoint),
}
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
@ -205,18 +253,22 @@ fn publish_changes_perp(
// new fills are published and recorded in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(events[idx]);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
// send event for both maker and taker
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
}
} else if old_events[idx].event_type != events[idx].event_type
|| old_events[idx].padding != events[idx].padding
@ -230,39 +282,48 @@ fn publish_changes_perp(
// first revoke old event if a fill
if old_events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(old_events[idx]);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
let fills = FillEvent::new_from_perp(fill);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
}
// then publish new if its a fill and record in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(events[idx]);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
}
} else {
// every already published event is recorded in checkpoint if a fill
if events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(events[idx]);
checkpoint.push(fill);
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
for fill in fills {
checkpoint.push(fill);
}
}
}
}
@ -278,17 +339,20 @@ fn publish_changes_perp(
metric_events_drop.increment();
if old_events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(old_events[idx]);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
let fills = FillEvent::new_from_perp(fill);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
}
}
@ -325,73 +389,100 @@ fn publish_changes_serum(
let evq_pk_string = mkt.1.to_string();
let header_seq_num = header.seq_num;
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
// Timestamp for spot events is time scraped
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
for seq_num in start_seq_num..header_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let event_view = events[idx].as_view().unwrap();
let old_event_view = old_events[idx].as_view().unwrap();
match event_view {
EventView::Fill { .. } => {
SpotEvent::Fill { .. } => {
// there are three possible cases:
// 1) the event is past the old seq num, hence guaranteed new event
// 2) the event is not matching the old event queue
// 3) all other events are matching the old event queue
// the order of these checks is important so they are exhaustive
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num);
if seq_num >= old_seq_num {
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
metric_events_new.increment();
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: events[idx],
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(events[idx]);
checkpoint.push(fill);
continue;
}
match old_event_view {
EventView::Fill { .. } => {
// every already published event is recorded in checkpoint
checkpoint.push(events[idx]);
SpotEvent::Fill { order_id, .. } => {
if order_id != fill.order_id {
debug!(
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: old_fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
// record new event in checkpoint
checkpoint.push(fill);
}
EventView::Out { .. } => {
SpotEvent::Out { .. } => {
debug!(
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
"found changed type event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
// first revoke old event
// publish new fill and record in checkpoint
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: old_events[idx],
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new if its a fill and record in checkpoint
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
write_version,
event: events[idx],
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(events[idx]);
checkpoint.push(fill);
}
}
}
@ -411,11 +502,12 @@ fn publish_changes_serum(
metric_events_drop.increment();
match old_event_view {
EventView::Fill { .. } => {
SpotEvent::Fill { .. } => {
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: old_events[idx],
event: old_fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
@ -423,13 +515,13 @@ fn publish_changes_serum(
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
EventView::Out { .. } => continue,
SpotEvent::Out { .. } => continue,
}
}
fill_update_sender
.try_send(FillEventFilterMessage::SerumCheckpoint(
SerumFillCheckpoint {
.try_send(FillEventFilterMessage::Checkpoint(
FillCheckpoint {
slot,
write_version,
events: checkpoint,

View File

@ -30,7 +30,6 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize;
use solana_geyser_connector_lib::{
fill_event_filter::SerumFillCheckpoint,
metrics::{MetricType, MetricU64},
FilterConfig, StatusResponse,
};
@ -40,7 +39,6 @@ use solana_geyser_connector_lib::{
};
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
type SerumCheckpointMap = Arc<Mutex<HashMap<String, SerumFillCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
// jemalloc seems to be better at keeping the memory footprint reasonable over
@ -79,7 +77,6 @@ pub struct Peer {
async fn handle_connection_error(
checkpoint_map: CheckpointMap,
serum_checkpoint_map: SerumCheckpointMap,
peer_map: PeerMap,
market_ids: HashMap<String, String>,
raw_stream: TcpStream,
@ -91,7 +88,6 @@ async fn handle_connection_error(
let result = handle_connection(
checkpoint_map,
serum_checkpoint_map,
peer_map.clone(),
market_ids,
raw_stream,
@ -109,7 +105,6 @@ async fn handle_connection_error(
async fn handle_connection(
checkpoint_map: CheckpointMap,
serum_checkpoint_map: SerumCheckpointMap,
peer_map: PeerMap,
market_ids: HashMap<String, String>,
raw_stream: TcpStream,
@ -138,7 +133,6 @@ async fn handle_connection(
msg,
peer_map.clone(),
checkpoint_map.clone(),
serum_checkpoint_map.clone(),
market_ids.clone(),
),
Message::Ping(_) => {
@ -167,7 +161,6 @@ fn handle_commands(
msg: Message,
peer_map: PeerMap,
checkpoint_map: CheckpointMap,
serum_checkpoint_map: SerumCheckpointMap,
market_ids: HashMap<String, String>,
) -> Ready<Result<(), Error>> {
let msg_str = msg.clone().into_text().unwrap();
@ -209,7 +202,6 @@ fn handle_commands(
if subscribed {
let checkpoint_map = checkpoint_map.lock().unwrap();
let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap();
let checkpoint = checkpoint_map.get(&market_id);
match checkpoint {
Some(checkpoint) => {
@ -219,17 +211,8 @@ fn handle_commands(
))
.unwrap();
}
None => match serum_checkpoint_map.get(&market_id) {
Some(checkpoint) => {
peer.sender
.unbounded_send(Message::Text(
serde_json::to_string(&checkpoint).unwrap(),
))
.unwrap();
}
None => info!("no checkpoint available on client subscription"),
},
}
None => info!("no checkpoint available on client subscription"),
};
}
}
Ok(Command::Unsubscribe(cmd)) => {
@ -393,11 +376,9 @@ async fn main() -> anyhow::Result<()> {
.await?;
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new()));
let checkpoints_ref_thread = checkpoints.clone();
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
let peers_ref_thread = peers.clone();
let peers_ref_thread1 = peers.clone();
@ -408,7 +389,7 @@ async fn main() -> anyhow::Result<()> {
let message = fill_receiver.recv().await.unwrap();
match message {
FillEventFilterMessage::Update(update) => {
debug!("ws update {} {:?} fill", update.market, update.status);
debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
@ -428,27 +409,6 @@ async fn main() -> anyhow::Result<()> {
.unwrap()
.insert(checkpoint.queue.clone(), checkpoint);
}
FillEventFilterMessage::SerumUpdate(update) => {
debug!("ws update {} {:?} serum fill", update.market, update.status);
let mut peers_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peers_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!("ws update {} fill could not reach {}", update.market, addr);
}
}
}
}
FillEventFilterMessage::SerumCheckpoint(checkpoint) => {
serum_checkpoints_ref_thread
.lock()
.unwrap()
.insert(checkpoint.queue.clone(), checkpoint);
}
}
}
});
@ -462,7 +422,6 @@ async fn main() -> anyhow::Result<()> {
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection_error(
checkpoints.clone(),
serum_checkpoints.clone(),
peers.clone(),
market_pubkey_strings.clone(),
stream,