Compare commits

...

3 Commits

Author SHA1 Message Date
Riordan Panayides 1233cc58f3 Update chaindata metric names, finish serum events unified schema 2023-01-20 17:29:02 +00:00
Riordan Panayides 8d6b6f5f38 Fix serum event change detection 2023-01-20 16:50:39 +00:00
Riordan Panayides f88ec6a53c Add new common event schema 2023-01-20 16:50:19 +00:00
4 changed files with 250 additions and 200 deletions

1
Cargo.lock generated
View File

@ -5771,7 +5771,6 @@ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"itertools 0.10.5", "itertools 0.10.5",
"jemallocator",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core-client", "jsonrpc-core-client",
"log 0.4.17", "log 0.4.17",

View File

@ -56,15 +56,15 @@ impl ChainData {
account_versions_stored: 0, account_versions_stored: 0,
account_bytes_stored: 0, account_bytes_stored: 0,
metric_accounts_stored: metrics_sender.register_u64( metric_accounts_stored: metrics_sender.register_u64(
"fills_feed_chaindata_accounts_stored".into(), "chaindata_accounts_stored".into(),
MetricType::Gauge, MetricType::Gauge,
), ),
metric_account_versions_stored: metrics_sender.register_u64( metric_account_versions_stored: metrics_sender.register_u64(
"fills_feed_chaindata_account_versions_stored".into(), "chaindata_account_versions_stored".into(),
MetricType::Gauge, MetricType::Gauge,
), ),
metric_account_bytes_stored: metrics_sender.register_u64( metric_account_bytes_stored: metrics_sender.register_u64(
"fills_feed_chaindata_account_bytes_stored".into(), "chaindata_account_bytes_stored".into(),
MetricType::Gauge, MetricType::Gauge,
), ),
} }

View File

@ -1,12 +1,12 @@
use crate::{ use crate::{
chain_data::{AccountData, ChainData, SlotData}, chain_data::{AccountData, ChainData, SlotData},
metrics::{MetricType, Metrics}, metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate, AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide,
}; };
use bytemuck::{Pod, Zeroable}; use bytemuck::{Pod, Zeroable, cast_slice};
use log::*; use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer}; use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::state::EventView; use serum_dex::state::EventView as SpotEvent;
use solana_sdk::{ use solana_sdk::{
account::{ReadableAccount, WritableAccount}, account::{ReadableAccount, WritableAccount},
clock::Epoch, clock::Epoch,
@ -15,13 +15,14 @@ use solana_sdk::{
use std::{ use std::{
borrow::BorrowMut, borrow::BorrowMut,
cmp::max, cmp::max,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet}, convert::identity, time::SystemTime,
}; };
use crate::metrics::MetricU64; use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize; use anchor_lang::AccountDeserialize;
use mango_v4::state::{ use mango_v4::state::{
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent, MAX_NUM_EVENTS, AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, Side,
MAX_NUM_EVENTS,
}; };
#[derive(Clone, Copy, Debug, Serialize)] #[derive(Clone, Copy, Debug, Serialize)]
@ -30,19 +31,122 @@ pub enum FillUpdateStatus {
Revoke, Revoke,
} }
#[derive(Clone, Debug)] #[derive(Clone, Copy, Debug, Serialize)]
pub struct FillUpdate { pub enum FillEventType {
pub event: FillEvent, Spot,
pub status: FillUpdateStatus, Perp,
pub market: String,
pub queue: String,
pub slot: u64,
pub write_version: u64,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SerumFillUpdate { pub struct FillEvent {
pub event: serum_dex::state::Event, pub event_type: FillEventType,
pub maker: bool,
pub side: OrderbookSide,
pub timestamp: u64,
pub seq_num: u64,
pub owner: String,
pub order_id: u128,
pub client_order_id: u64,
pub fee: f32,
pub price: i64,
pub quantity: i64,
}
impl Serialize for FillEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillEvent", 12)?;
state.serialize_field("eventType", &self.event_type)?;
state.serialize_field("maker", &self.maker)?;
state.serialize_field("side", &self.side)?;
state.serialize_field("timestamp", &self.timestamp)?;
state.serialize_field("seqNum", &self.seq_num)?;
state.serialize_field("owner", &self.owner)?;
state.serialize_field("orderId", &self.order_id)?;
state.serialize_field("clientOrderId", &self.client_order_id)?;
state.serialize_field("fee", &self.fee)?;
state.serialize_field("price", &self.price)?;
state.serialize_field("quantity", &self.quantity)?;
state.end()
}
}
impl FillEvent {
pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] {
let taker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Ask,
Side::Bid => OrderbookSide::Bid,
};
let maker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Bid,
Side::Bid => OrderbookSide::Ask,
};
[FillEvent {
event_type: FillEventType::Perp,
maker: true,
side: maker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.maker.to_string(),
order_id: event.maker_order_id,
client_order_id: 0u64,
fee: event.maker_fee.to_num(),
price: event.price,
quantity: event.quantity,
},
FillEvent {
event_type: FillEventType::Perp,
maker: false,
side: taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.taker.to_string(),
order_id: event.taker_order_id,
client_order_id: event.taker_client_order_id,
fee: event.taker_fee.to_num(),
price: event.price,
quantity: event.quantity,
}]
}
pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self {
match event {
SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => {
let side = match side as u8 {
0 => OrderbookSide::Bid,
1 => OrderbookSide::Ask,
_ => panic!("invalid side"),
};
let client_order_id: u64 = match client_order_id {
Some(id) => id.into(),
None => 0u64,
};
// TODO: native to ui
let price = (native_qty_paid / native_qty_received) as i64;
FillEvent {
event_type: FillEventType::Spot,
maker: maker,
side,
timestamp,
seq_num,
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
order_id: order_id,
client_order_id: client_order_id,
fee: native_fee_or_rebate as f32,
price,
quantity: native_qty_received as i64,
}
}
SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")}
}
}
}
#[derive(Clone, Debug)]
pub struct FillUpdate {
pub event: FillEvent,
pub status: FillUpdateStatus, pub status: FillUpdateStatus,
pub market: String, pub market: String,
pub queue: String, pub queue: String,
@ -66,27 +170,8 @@ impl Serialize for FillUpdate {
where where
S: Serializer, S: Serializer,
{ {
let event = base64::encode(bytemuck::bytes_of(&self.event));
let mut state = serializer.serialize_struct("FillUpdate", 4)?; let mut state = serializer.serialize_struct("FillUpdate", 4)?;
state.serialize_field("event", &event)?; state.serialize_field("event", &self.event)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
impl Serialize for SerumFillUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let event = base64::encode(bytemuck::bytes_of(&self.event));
let mut state = serializer.serialize_struct("SerumFillUpdate", 4)?;
state.serialize_field("event", &event)?;
state.serialize_field("market", &self.market)?; state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?; state.serialize_field("queue", &self.queue)?;
state.serialize_field("status", &self.status)?; state.serialize_field("status", &self.status)?;
@ -106,48 +191,13 @@ pub struct FillCheckpoint {
pub write_version: u64, pub write_version: u64,
} }
#[derive(Clone, Debug)]
pub struct SerumFillCheckpoint {
pub market: String,
pub queue: String,
pub events: Vec<serum_dex::state::Event>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillCheckpoint { impl Serialize for FillCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
S: Serializer, S: Serializer,
{ {
let events: Vec<String> = self
.events
.iter()
.map(|e| base64::encode(bytemuck::bytes_of(e)))
.collect();
let mut state = serializer.serialize_struct("FillCheckpoint", 3)?; let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
state.serialize_field("events", &events)?; state.serialize_field("events", &self.events)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
impl Serialize for SerumFillCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let events: Vec<String> = self
.events
.iter()
.map(|e| base64::encode(bytemuck::bytes_of(e)))
.collect();
let mut state = serializer.serialize_struct("SerumFillCheckpoint", 3)?;
state.serialize_field("events", &events)?;
state.serialize_field("market", &self.market)?; state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?; state.serialize_field("queue", &self.queue)?;
state.serialize_field("slot", &self.slot)?; state.serialize_field("slot", &self.slot)?;
@ -159,9 +209,7 @@ impl Serialize for SerumFillCheckpoint {
pub enum FillEventFilterMessage { pub enum FillEventFilterMessage {
Update(FillUpdate), Update(FillUpdate),
SerumUpdate(SerumFillUpdate),
Checkpoint(FillCheckpoint), Checkpoint(FillCheckpoint),
SerumCheckpoint(SerumFillCheckpoint),
} }
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue // couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
@ -205,18 +253,22 @@ fn publish_changes_perp(
// new fills are published and recorded in checkpoint // new fills are published and recorded in checkpoint
if events[idx].event_type == EventType::Fill as u8 { if events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(events[idx]); let fill: PerpFillEvent = bytemuck::cast(events[idx]);
fill_update_sender let fills = FillEvent::new_from_perp(fill);
.try_send(FillEventFilterMessage::Update(FillUpdate { // send event for both maker and taker
slot, for fill in fills {
write_version, fill_update_sender
event: fill, .try_send(FillEventFilterMessage::Update(FillUpdate {
status: FillUpdateStatus::New, slot,
market: mkt_pk_string.clone(), write_version,
queue: evq_pk_string.clone(), event: fill.clone(),
})) status: FillUpdateStatus::New,
.unwrap(); // TODO: use anyhow to bubble up error market: mkt_pk_string.clone(),
checkpoint.push(fill); queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
} }
} else if old_events[idx].event_type != events[idx].event_type } else if old_events[idx].event_type != events[idx].event_type
|| old_events[idx].padding != events[idx].padding || old_events[idx].padding != events[idx].padding
@ -230,39 +282,48 @@ fn publish_changes_perp(
// first revoke old event if a fill // first revoke old event if a fill
if old_events[idx].event_type == EventType::Fill as u8 { if old_events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(old_events[idx]); let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
fill_update_sender let fills = FillEvent::new_from_perp(fill);
.try_send(FillEventFilterMessage::Update(FillUpdate { for fill in fills {
slot, fill_update_sender
write_version, .try_send(FillEventFilterMessage::Update(FillUpdate {
event: fill, slot,
status: FillUpdateStatus::Revoke, write_version,
market: mkt_pk_string.clone(), event: fill,
queue: evq_pk_string.clone(), status: FillUpdateStatus::Revoke,
})) market: mkt_pk_string.clone(),
.unwrap(); // TODO: use anyhow to bubble up error queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
} }
// then publish new if its a fill and record in checkpoint // then publish new if its a fill and record in checkpoint
if events[idx].event_type == EventType::Fill as u8 { if events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(events[idx]); let fill: PerpFillEvent = bytemuck::cast(events[idx]);
fill_update_sender let fills = FillEvent::new_from_perp(fill);
.try_send(FillEventFilterMessage::Update(FillUpdate { for fill in fills {
slot, fill_update_sender
write_version, .try_send(FillEventFilterMessage::Update(FillUpdate {
event: fill, slot,
status: FillUpdateStatus::New, write_version,
market: mkt_pk_string.clone(), event: fill.clone(),
queue: evq_pk_string.clone(), status: FillUpdateStatus::New,
})) market: mkt_pk_string.clone(),
.unwrap(); // TODO: use anyhow to bubble up error queue: evq_pk_string.clone(),
checkpoint.push(fill); }))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
} }
} else { } else {
// every already published event is recorded in checkpoint if a fill // every already published event is recorded in checkpoint if a fill
if events[idx].event_type == EventType::Fill as u8 { if events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(events[idx]); let fill: PerpFillEvent = bytemuck::cast(events[idx]);
checkpoint.push(fill); let fills = FillEvent::new_from_perp(fill);
for fill in fills {
checkpoint.push(fill);
}
} }
} }
} }
@ -278,17 +339,20 @@ fn publish_changes_perp(
metric_events_drop.increment(); metric_events_drop.increment();
if old_events[idx].event_type == EventType::Fill as u8 { if old_events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(old_events[idx]); let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
fill_update_sender let fills = FillEvent::new_from_perp(fill);
.try_send(FillEventFilterMessage::Update(FillUpdate { for fill in fills {
slot, fill_update_sender
event: fill, .try_send(FillEventFilterMessage::Update(FillUpdate {
write_version, slot,
status: FillUpdateStatus::Revoke, event: fill,
market: mkt_pk_string.clone(), write_version,
queue: evq_pk_string.clone(), status: FillUpdateStatus::Revoke,
})) market: mkt_pk_string.clone(),
.unwrap(); // TODO: use anyhow to bubble up error queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
} }
} }
@ -325,73 +389,100 @@ fn publish_changes_serum(
let evq_pk_string = mkt.1.to_string(); let evq_pk_string = mkt.1.to_string();
let header_seq_num = header.seq_num; let header_seq_num = header.seq_num;
debug!("start seq {} header seq {}", start_seq_num, header_seq_num); debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
// Timestamp for spot events is time scraped
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
for seq_num in start_seq_num..header_seq_num { for seq_num in start_seq_num..header_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let event_view = events[idx].as_view().unwrap(); let event_view = events[idx].as_view().unwrap();
let old_event_view = old_events[idx].as_view().unwrap(); let old_event_view = old_events[idx].as_view().unwrap();
match event_view { match event_view {
EventView::Fill { .. } => { SpotEvent::Fill { .. } => {
// there are three possible cases: // there are three possible cases:
// 1) the event is past the old seq num, hence guaranteed new event // 1) the event is past the old seq num, hence guaranteed new event
// 2) the event is not matching the old event queue // 2) the event is not matching the old event queue
// 3) all other events are matching the old event queue // 3) all other events are matching the old event queue
// the order of these checks is important so they are exhaustive // the order of these checks is important so they are exhaustive
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num);
if seq_num >= old_seq_num { if seq_num >= old_seq_num {
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,); debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
metric_events_new.increment(); metric_events_new.increment();
fill_update_sender fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { .try_send(FillEventFilterMessage::Update(FillUpdate {
slot, slot,
write_version, write_version,
event: events[idx], event: fill.clone(),
status: FillUpdateStatus::New, status: FillUpdateStatus::New,
market: mkt_pk_string.clone(), market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(), queue: evq_pk_string.clone(),
})) }))
.unwrap(); // TODO: use anyhow to bubble up error .unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(events[idx]); checkpoint.push(fill);
continue; continue;
} }
match old_event_view { match old_event_view {
EventView::Fill { .. } => { SpotEvent::Fill { order_id, .. } => {
// every already published event is recorded in checkpoint if order_id != fill.order_id {
checkpoint.push(events[idx]); debug!(
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: old_fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
// record new event in checkpoint
checkpoint.push(fill);
} }
EventView::Out { .. } => { SpotEvent::Out { .. } => {
debug!( debug!(
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}", "found changed type event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
); );
metric_events_change.increment(); metric_events_change.increment();
// first revoke old event // publish new fill and record in checkpoint
fill_update_sender fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { .try_send(FillEventFilterMessage::Update(FillUpdate {
slot, slot,
write_version, write_version,
event: old_events[idx], event: fill.clone(),
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new if its a fill and record in checkpoint
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
write_version,
event: events[idx],
status: FillUpdateStatus::New, status: FillUpdateStatus::New,
market: mkt_pk_string.clone(), market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(), queue: evq_pk_string.clone(),
})) }))
.unwrap(); // TODO: use anyhow to bubble up error .unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(events[idx]); checkpoint.push(fill);
} }
} }
} }
@ -411,11 +502,12 @@ fn publish_changes_serum(
metric_events_drop.increment(); metric_events_drop.increment();
match old_event_view { match old_event_view {
EventView::Fill { .. } => { SpotEvent::Fill { .. } => {
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
fill_update_sender fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { .try_send(FillEventFilterMessage::Update(FillUpdate {
slot, slot,
event: old_events[idx], event: old_fill,
write_version, write_version,
status: FillUpdateStatus::Revoke, status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(), market: mkt_pk_string.clone(),
@ -423,13 +515,13 @@ fn publish_changes_serum(
})) }))
.unwrap(); // TODO: use anyhow to bubble up error .unwrap(); // TODO: use anyhow to bubble up error
} }
EventView::Out { .. } => continue, SpotEvent::Out { .. } => continue,
} }
} }
fill_update_sender fill_update_sender
.try_send(FillEventFilterMessage::SerumCheckpoint( .try_send(FillEventFilterMessage::Checkpoint(
SerumFillCheckpoint { FillCheckpoint {
slot, slot,
write_version, write_version,
events: checkpoint, events: checkpoint,

View File

@ -30,7 +30,6 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize; use serde::Deserialize;
use solana_geyser_connector_lib::{ use solana_geyser_connector_lib::{
fill_event_filter::SerumFillCheckpoint,
metrics::{MetricType, MetricU64}, metrics::{MetricType, MetricU64},
FilterConfig, StatusResponse, FilterConfig, StatusResponse,
}; };
@ -40,7 +39,6 @@ use solana_geyser_connector_lib::{
}; };
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>; type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
type SerumCheckpointMap = Arc<Mutex<HashMap<String, SerumFillCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>; type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
// jemalloc seems to be better at keeping the memory footprint reasonable over // jemalloc seems to be better at keeping the memory footprint reasonable over
@ -79,7 +77,6 @@ pub struct Peer {
async fn handle_connection_error( async fn handle_connection_error(
checkpoint_map: CheckpointMap, checkpoint_map: CheckpointMap,
serum_checkpoint_map: SerumCheckpointMap,
peer_map: PeerMap, peer_map: PeerMap,
market_ids: HashMap<String, String>, market_ids: HashMap<String, String>,
raw_stream: TcpStream, raw_stream: TcpStream,
@ -91,7 +88,6 @@ async fn handle_connection_error(
let result = handle_connection( let result = handle_connection(
checkpoint_map, checkpoint_map,
serum_checkpoint_map,
peer_map.clone(), peer_map.clone(),
market_ids, market_ids,
raw_stream, raw_stream,
@ -109,7 +105,6 @@ async fn handle_connection_error(
async fn handle_connection( async fn handle_connection(
checkpoint_map: CheckpointMap, checkpoint_map: CheckpointMap,
serum_checkpoint_map: SerumCheckpointMap,
peer_map: PeerMap, peer_map: PeerMap,
market_ids: HashMap<String, String>, market_ids: HashMap<String, String>,
raw_stream: TcpStream, raw_stream: TcpStream,
@ -138,7 +133,6 @@ async fn handle_connection(
msg, msg,
peer_map.clone(), peer_map.clone(),
checkpoint_map.clone(), checkpoint_map.clone(),
serum_checkpoint_map.clone(),
market_ids.clone(), market_ids.clone(),
), ),
Message::Ping(_) => { Message::Ping(_) => {
@ -167,7 +161,6 @@ fn handle_commands(
msg: Message, msg: Message,
peer_map: PeerMap, peer_map: PeerMap,
checkpoint_map: CheckpointMap, checkpoint_map: CheckpointMap,
serum_checkpoint_map: SerumCheckpointMap,
market_ids: HashMap<String, String>, market_ids: HashMap<String, String>,
) -> Ready<Result<(), Error>> { ) -> Ready<Result<(), Error>> {
let msg_str = msg.clone().into_text().unwrap(); let msg_str = msg.clone().into_text().unwrap();
@ -209,7 +202,6 @@ fn handle_commands(
if subscribed { if subscribed {
let checkpoint_map = checkpoint_map.lock().unwrap(); let checkpoint_map = checkpoint_map.lock().unwrap();
let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap();
let checkpoint = checkpoint_map.get(&market_id); let checkpoint = checkpoint_map.get(&market_id);
match checkpoint { match checkpoint {
Some(checkpoint) => { Some(checkpoint) => {
@ -219,17 +211,8 @@ fn handle_commands(
)) ))
.unwrap(); .unwrap();
} }
None => match serum_checkpoint_map.get(&market_id) { None => info!("no checkpoint available on client subscription"),
Some(checkpoint) => { };
peer.sender
.unbounded_send(Message::Text(
serde_json::to_string(&checkpoint).unwrap(),
))
.unwrap();
}
None => info!("no checkpoint available on client subscription"),
},
}
} }
} }
Ok(Command::Unsubscribe(cmd)) => { Ok(Command::Unsubscribe(cmd)) => {
@ -393,11 +376,9 @@ async fn main() -> anyhow::Result<()> {
.await?; .await?;
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new())); let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new())); let peers = PeerMap::new(Mutex::new(HashMap::new()));
let checkpoints_ref_thread = checkpoints.clone(); let checkpoints_ref_thread = checkpoints.clone();
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
let peers_ref_thread = peers.clone(); let peers_ref_thread = peers.clone();
let peers_ref_thread1 = peers.clone(); let peers_ref_thread1 = peers.clone();
@ -408,7 +389,7 @@ async fn main() -> anyhow::Result<()> {
let message = fill_receiver.recv().await.unwrap(); let message = fill_receiver.recv().await.unwrap();
match message { match message {
FillEventFilterMessage::Update(update) => { FillEventFilterMessage::Update(update) => {
debug!("ws update {} {:?} fill", update.market, update.status); debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() { for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap(); let json = serde_json::to_string(&update).unwrap();
@ -428,27 +409,6 @@ async fn main() -> anyhow::Result<()> {
.unwrap() .unwrap()
.insert(checkpoint.queue.clone(), checkpoint); .insert(checkpoint.queue.clone(), checkpoint);
} }
FillEventFilterMessage::SerumUpdate(update) => {
debug!("ws update {} {:?} serum fill", update.market, update.status);
let mut peers_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peers_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!("ws update {} fill could not reach {}", update.market, addr);
}
}
}
}
FillEventFilterMessage::SerumCheckpoint(checkpoint) => {
serum_checkpoints_ref_thread
.lock()
.unwrap()
.insert(checkpoint.queue.clone(), checkpoint);
}
} }
} }
}); });
@ -462,7 +422,6 @@ async fn main() -> anyhow::Result<()> {
while let Ok((stream, addr)) = listener.accept().await { while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection_error( tokio::spawn(handle_connection_error(
checkpoints.clone(), checkpoints.clone(),
serum_checkpoints.clone(),
peers.clone(), peers.clone(),
market_pubkey_strings.clone(), market_pubkey_strings.clone(),
stream, stream,