Compare commits
3 Commits
3dc7ae1246
...
1233cc58f3
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | 1233cc58f3 | |
Riordan Panayides | 8d6b6f5f38 | |
Riordan Panayides | f88ec6a53c |
|
@ -5771,7 +5771,6 @@ dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"itertools 0.10.5",
|
"itertools 0.10.5",
|
||||||
"jemallocator",
|
|
||||||
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"jsonrpc-core-client",
|
"jsonrpc-core-client",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
|
|
|
@ -56,15 +56,15 @@ impl ChainData {
|
||||||
account_versions_stored: 0,
|
account_versions_stored: 0,
|
||||||
account_bytes_stored: 0,
|
account_bytes_stored: 0,
|
||||||
metric_accounts_stored: metrics_sender.register_u64(
|
metric_accounts_stored: metrics_sender.register_u64(
|
||||||
"fills_feed_chaindata_accounts_stored".into(),
|
"chaindata_accounts_stored".into(),
|
||||||
MetricType::Gauge,
|
MetricType::Gauge,
|
||||||
),
|
),
|
||||||
metric_account_versions_stored: metrics_sender.register_u64(
|
metric_account_versions_stored: metrics_sender.register_u64(
|
||||||
"fills_feed_chaindata_account_versions_stored".into(),
|
"chaindata_account_versions_stored".into(),
|
||||||
MetricType::Gauge,
|
MetricType::Gauge,
|
||||||
),
|
),
|
||||||
metric_account_bytes_stored: metrics_sender.register_u64(
|
metric_account_bytes_stored: metrics_sender.register_u64(
|
||||||
"fills_feed_chaindata_account_bytes_stored".into(),
|
"chaindata_account_bytes_stored".into(),
|
||||||
MetricType::Gauge,
|
MetricType::Gauge,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
chain_data::{AccountData, ChainData, SlotData},
|
chain_data::{AccountData, ChainData, SlotData},
|
||||||
metrics::{MetricType, Metrics},
|
metrics::{MetricType, Metrics},
|
||||||
AccountWrite, SlotUpdate,
|
AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide,
|
||||||
};
|
};
|
||||||
use bytemuck::{Pod, Zeroable};
|
use bytemuck::{Pod, Zeroable, cast_slice};
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||||
use serum_dex::state::EventView;
|
use serum_dex::state::EventView as SpotEvent;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
account::{ReadableAccount, WritableAccount},
|
account::{ReadableAccount, WritableAccount},
|
||||||
clock::Epoch,
|
clock::Epoch,
|
||||||
|
@ -15,13 +15,14 @@ use solana_sdk::{
|
||||||
use std::{
|
use std::{
|
||||||
borrow::BorrowMut,
|
borrow::BorrowMut,
|
||||||
cmp::max,
|
cmp::max,
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet}, convert::identity, time::SystemTime,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::metrics::MetricU64;
|
use crate::metrics::MetricU64;
|
||||||
use anchor_lang::AccountDeserialize;
|
use anchor_lang::AccountDeserialize;
|
||||||
use mango_v4::state::{
|
use mango_v4::state::{
|
||||||
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent, MAX_NUM_EVENTS,
|
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, Side,
|
||||||
|
MAX_NUM_EVENTS,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Serialize)]
|
#[derive(Clone, Copy, Debug, Serialize)]
|
||||||
|
@ -30,19 +31,122 @@ pub enum FillUpdateStatus {
|
||||||
Revoke,
|
Revoke,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Copy, Debug, Serialize)]
|
||||||
pub struct FillUpdate {
|
pub enum FillEventType {
|
||||||
pub event: FillEvent,
|
Spot,
|
||||||
pub status: FillUpdateStatus,
|
Perp,
|
||||||
pub market: String,
|
|
||||||
pub queue: String,
|
|
||||||
pub slot: u64,
|
|
||||||
pub write_version: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct SerumFillUpdate {
|
pub struct FillEvent {
|
||||||
pub event: serum_dex::state::Event,
|
pub event_type: FillEventType,
|
||||||
|
pub maker: bool,
|
||||||
|
pub side: OrderbookSide,
|
||||||
|
pub timestamp: u64,
|
||||||
|
pub seq_num: u64,
|
||||||
|
pub owner: String,
|
||||||
|
pub order_id: u128,
|
||||||
|
pub client_order_id: u64,
|
||||||
|
pub fee: f32,
|
||||||
|
pub price: i64,
|
||||||
|
pub quantity: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Serialize for FillEvent {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
let mut state = serializer.serialize_struct("FillEvent", 12)?;
|
||||||
|
state.serialize_field("eventType", &self.event_type)?;
|
||||||
|
state.serialize_field("maker", &self.maker)?;
|
||||||
|
state.serialize_field("side", &self.side)?;
|
||||||
|
state.serialize_field("timestamp", &self.timestamp)?;
|
||||||
|
state.serialize_field("seqNum", &self.seq_num)?;
|
||||||
|
state.serialize_field("owner", &self.owner)?;
|
||||||
|
state.serialize_field("orderId", &self.order_id)?;
|
||||||
|
state.serialize_field("clientOrderId", &self.client_order_id)?;
|
||||||
|
state.serialize_field("fee", &self.fee)?;
|
||||||
|
state.serialize_field("price", &self.price)?;
|
||||||
|
state.serialize_field("quantity", &self.quantity)?;
|
||||||
|
state.end()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FillEvent {
|
||||||
|
pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] {
|
||||||
|
let taker_side = match event.taker_side() {
|
||||||
|
Side::Ask => OrderbookSide::Ask,
|
||||||
|
Side::Bid => OrderbookSide::Bid,
|
||||||
|
};
|
||||||
|
let maker_side = match event.taker_side() {
|
||||||
|
Side::Ask => OrderbookSide::Bid,
|
||||||
|
Side::Bid => OrderbookSide::Ask,
|
||||||
|
};
|
||||||
|
[FillEvent {
|
||||||
|
event_type: FillEventType::Perp,
|
||||||
|
maker: true,
|
||||||
|
side: maker_side,
|
||||||
|
timestamp: event.timestamp,
|
||||||
|
seq_num: event.seq_num,
|
||||||
|
owner: event.maker.to_string(),
|
||||||
|
order_id: event.maker_order_id,
|
||||||
|
client_order_id: 0u64,
|
||||||
|
fee: event.maker_fee.to_num(),
|
||||||
|
price: event.price,
|
||||||
|
quantity: event.quantity,
|
||||||
|
},
|
||||||
|
FillEvent {
|
||||||
|
event_type: FillEventType::Perp,
|
||||||
|
maker: false,
|
||||||
|
side: taker_side,
|
||||||
|
timestamp: event.timestamp,
|
||||||
|
seq_num: event.seq_num,
|
||||||
|
owner: event.taker.to_string(),
|
||||||
|
order_id: event.taker_order_id,
|
||||||
|
client_order_id: event.taker_client_order_id,
|
||||||
|
fee: event.taker_fee.to_num(),
|
||||||
|
price: event.price,
|
||||||
|
quantity: event.quantity,
|
||||||
|
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self {
|
||||||
|
match event {
|
||||||
|
SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => {
|
||||||
|
let side = match side as u8 {
|
||||||
|
0 => OrderbookSide::Bid,
|
||||||
|
1 => OrderbookSide::Ask,
|
||||||
|
_ => panic!("invalid side"),
|
||||||
|
};
|
||||||
|
let client_order_id: u64 = match client_order_id {
|
||||||
|
Some(id) => id.into(),
|
||||||
|
None => 0u64,
|
||||||
|
};
|
||||||
|
// TODO: native to ui
|
||||||
|
let price = (native_qty_paid / native_qty_received) as i64;
|
||||||
|
FillEvent {
|
||||||
|
event_type: FillEventType::Spot,
|
||||||
|
maker: maker,
|
||||||
|
side,
|
||||||
|
timestamp,
|
||||||
|
seq_num,
|
||||||
|
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
|
||||||
|
order_id: order_id,
|
||||||
|
client_order_id: client_order_id,
|
||||||
|
fee: native_fee_or_rebate as f32,
|
||||||
|
price,
|
||||||
|
quantity: native_qty_received as i64,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct FillUpdate {
|
||||||
|
pub event: FillEvent,
|
||||||
pub status: FillUpdateStatus,
|
pub status: FillUpdateStatus,
|
||||||
pub market: String,
|
pub market: String,
|
||||||
pub queue: String,
|
pub queue: String,
|
||||||
|
@ -66,27 +170,8 @@ impl Serialize for FillUpdate {
|
||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
let event = base64::encode(bytemuck::bytes_of(&self.event));
|
|
||||||
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
|
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
|
||||||
state.serialize_field("event", &event)?;
|
state.serialize_field("event", &self.event)?;
|
||||||
state.serialize_field("market", &self.market)?;
|
|
||||||
state.serialize_field("queue", &self.queue)?;
|
|
||||||
state.serialize_field("status", &self.status)?;
|
|
||||||
state.serialize_field("slot", &self.slot)?;
|
|
||||||
state.serialize_field("write_version", &self.write_version)?;
|
|
||||||
|
|
||||||
state.end()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Serialize for SerumFillUpdate {
|
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
let event = base64::encode(bytemuck::bytes_of(&self.event));
|
|
||||||
let mut state = serializer.serialize_struct("SerumFillUpdate", 4)?;
|
|
||||||
state.serialize_field("event", &event)?;
|
|
||||||
state.serialize_field("market", &self.market)?;
|
state.serialize_field("market", &self.market)?;
|
||||||
state.serialize_field("queue", &self.queue)?;
|
state.serialize_field("queue", &self.queue)?;
|
||||||
state.serialize_field("status", &self.status)?;
|
state.serialize_field("status", &self.status)?;
|
||||||
|
@ -106,48 +191,13 @@ pub struct FillCheckpoint {
|
||||||
pub write_version: u64,
|
pub write_version: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct SerumFillCheckpoint {
|
|
||||||
pub market: String,
|
|
||||||
pub queue: String,
|
|
||||||
pub events: Vec<serum_dex::state::Event>,
|
|
||||||
pub slot: u64,
|
|
||||||
pub write_version: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Serialize for FillCheckpoint {
|
impl Serialize for FillCheckpoint {
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
let events: Vec<String> = self
|
|
||||||
.events
|
|
||||||
.iter()
|
|
||||||
.map(|e| base64::encode(bytemuck::bytes_of(e)))
|
|
||||||
.collect();
|
|
||||||
let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
|
let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
|
||||||
state.serialize_field("events", &events)?;
|
state.serialize_field("events", &self.events)?;
|
||||||
state.serialize_field("market", &self.market)?;
|
|
||||||
state.serialize_field("queue", &self.queue)?;
|
|
||||||
state.serialize_field("slot", &self.slot)?;
|
|
||||||
state.serialize_field("write_version", &self.write_version)?;
|
|
||||||
|
|
||||||
state.end()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Serialize for SerumFillCheckpoint {
|
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
let events: Vec<String> = self
|
|
||||||
.events
|
|
||||||
.iter()
|
|
||||||
.map(|e| base64::encode(bytemuck::bytes_of(e)))
|
|
||||||
.collect();
|
|
||||||
let mut state = serializer.serialize_struct("SerumFillCheckpoint", 3)?;
|
|
||||||
state.serialize_field("events", &events)?;
|
|
||||||
state.serialize_field("market", &self.market)?;
|
state.serialize_field("market", &self.market)?;
|
||||||
state.serialize_field("queue", &self.queue)?;
|
state.serialize_field("queue", &self.queue)?;
|
||||||
state.serialize_field("slot", &self.slot)?;
|
state.serialize_field("slot", &self.slot)?;
|
||||||
|
@ -159,9 +209,7 @@ impl Serialize for SerumFillCheckpoint {
|
||||||
|
|
||||||
pub enum FillEventFilterMessage {
|
pub enum FillEventFilterMessage {
|
||||||
Update(FillUpdate),
|
Update(FillUpdate),
|
||||||
SerumUpdate(SerumFillUpdate),
|
|
||||||
Checkpoint(FillCheckpoint),
|
Checkpoint(FillCheckpoint),
|
||||||
SerumCheckpoint(SerumFillCheckpoint),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
|
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
|
||||||
|
@ -205,12 +253,15 @@ fn publish_changes_perp(
|
||||||
|
|
||||||
// new fills are published and recorded in checkpoint
|
// new fills are published and recorded in checkpoint
|
||||||
if events[idx].event_type == EventType::Fill as u8 {
|
if events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: FillEvent = bytemuck::cast(events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||||
|
let fills = FillEvent::new_from_perp(fill);
|
||||||
|
// send event for both maker and taker
|
||||||
|
for fill in fills {
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
write_version,
|
write_version,
|
||||||
event: fill,
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
queue: evq_pk_string.clone(),
|
||||||
|
@ -218,6 +269,7 @@ fn publish_changes_perp(
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
checkpoint.push(fill);
|
checkpoint.push(fill);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else if old_events[idx].event_type != events[idx].event_type
|
} else if old_events[idx].event_type != events[idx].event_type
|
||||||
|| old_events[idx].padding != events[idx].padding
|
|| old_events[idx].padding != events[idx].padding
|
||||||
{
|
{
|
||||||
|
@ -230,7 +282,9 @@ fn publish_changes_perp(
|
||||||
|
|
||||||
// first revoke old event if a fill
|
// first revoke old event if a fill
|
||||||
if old_events[idx].event_type == EventType::Fill as u8 {
|
if old_events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: FillEvent = bytemuck::cast(old_events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
||||||
|
let fills = FillEvent::new_from_perp(fill);
|
||||||
|
for fill in fills {
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
|
@ -242,15 +296,18 @@ fn publish_changes_perp(
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// then publish new if its a fill and record in checkpoint
|
// then publish new if its a fill and record in checkpoint
|
||||||
if events[idx].event_type == EventType::Fill as u8 {
|
if events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: FillEvent = bytemuck::cast(events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||||
|
let fills = FillEvent::new_from_perp(fill);
|
||||||
|
for fill in fills {
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
write_version,
|
write_version,
|
||||||
event: fill,
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
queue: evq_pk_string.clone(),
|
||||||
|
@ -258,14 +315,18 @@ fn publish_changes_perp(
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
checkpoint.push(fill);
|
checkpoint.push(fill);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// every already published event is recorded in checkpoint if a fill
|
// every already published event is recorded in checkpoint if a fill
|
||||||
if events[idx].event_type == EventType::Fill as u8 {
|
if events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: FillEvent = bytemuck::cast(events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||||
|
let fills = FillEvent::new_from_perp(fill);
|
||||||
|
for fill in fills {
|
||||||
checkpoint.push(fill);
|
checkpoint.push(fill);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// in case queue size shrunk due to a fork we need revoke all previous fills
|
// in case queue size shrunk due to a fork we need revoke all previous fills
|
||||||
for seq_num in header.seq_num..old_seq_num {
|
for seq_num in header.seq_num..old_seq_num {
|
||||||
|
@ -278,7 +339,9 @@ fn publish_changes_perp(
|
||||||
metric_events_drop.increment();
|
metric_events_drop.increment();
|
||||||
|
|
||||||
if old_events[idx].event_type == EventType::Fill as u8 {
|
if old_events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: FillEvent = bytemuck::cast(old_events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
||||||
|
let fills = FillEvent::new_from_perp(fill);
|
||||||
|
for fill in fills {
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
|
@ -291,6 +354,7 @@ fn publish_changes_perp(
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
|
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
|
||||||
|
@ -325,73 +389,100 @@ fn publish_changes_serum(
|
||||||
let evq_pk_string = mkt.1.to_string();
|
let evq_pk_string = mkt.1.to_string();
|
||||||
let header_seq_num = header.seq_num;
|
let header_seq_num = header.seq_num;
|
||||||
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
|
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
|
||||||
|
|
||||||
|
// Timestamp for spot events is time scraped
|
||||||
|
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||||
for seq_num in start_seq_num..header_seq_num {
|
for seq_num in start_seq_num..header_seq_num {
|
||||||
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||||
let event_view = events[idx].as_view().unwrap();
|
let event_view = events[idx].as_view().unwrap();
|
||||||
let old_event_view = old_events[idx].as_view().unwrap();
|
let old_event_view = old_events[idx].as_view().unwrap();
|
||||||
|
|
||||||
match event_view {
|
match event_view {
|
||||||
EventView::Fill { .. } => {
|
SpotEvent::Fill { .. } => {
|
||||||
// there are three possible cases:
|
// there are three possible cases:
|
||||||
// 1) the event is past the old seq num, hence guaranteed new event
|
// 1) the event is past the old seq num, hence guaranteed new event
|
||||||
// 2) the event is not matching the old event queue
|
// 2) the event is not matching the old event queue
|
||||||
// 3) all other events are matching the old event queue
|
// 3) all other events are matching the old event queue
|
||||||
// the order of these checks is important so they are exhaustive
|
// the order of these checks is important so they are exhaustive
|
||||||
|
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num);
|
||||||
if seq_num >= old_seq_num {
|
if seq_num >= old_seq_num {
|
||||||
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
|
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
|
||||||
|
|
||||||
metric_events_new.increment();
|
metric_events_new.increment();
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
write_version,
|
write_version,
|
||||||
event: events[idx],
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
queue: evq_pk_string.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
checkpoint.push(events[idx]);
|
checkpoint.push(fill);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
match old_event_view {
|
match old_event_view {
|
||||||
EventView::Fill { .. } => {
|
SpotEvent::Fill { order_id, .. } => {
|
||||||
// every already published event is recorded in checkpoint
|
if order_id != fill.order_id {
|
||||||
checkpoint.push(events[idx]);
|
|
||||||
}
|
|
||||||
EventView::Out { .. } => {
|
|
||||||
debug!(
|
debug!(
|
||||||
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
|
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
|
||||||
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
||||||
);
|
);
|
||||||
|
|
||||||
metric_events_change.increment();
|
metric_events_change.increment();
|
||||||
|
|
||||||
|
|
||||||
|
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
|
||||||
// first revoke old event
|
// first revoke old event
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
write_version,
|
write_version,
|
||||||
event: old_events[idx],
|
event: old_fill,
|
||||||
status: FillUpdateStatus::Revoke,
|
status: FillUpdateStatus::Revoke,
|
||||||
market: mkt_pk_string.clone(),
|
market: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
queue: evq_pk_string.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
|
|
||||||
// then publish new if its a fill and record in checkpoint
|
// then publish new
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
write_version,
|
write_version,
|
||||||
event: events[idx],
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
queue: evq_pk_string.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
checkpoint.push(events[idx]);
|
}
|
||||||
|
|
||||||
|
// record new event in checkpoint
|
||||||
|
checkpoint.push(fill);
|
||||||
|
}
|
||||||
|
SpotEvent::Out { .. } => {
|
||||||
|
debug!(
|
||||||
|
"found changed type event {} idx {} seq_num {} header seq num {} old seq num {}",
|
||||||
|
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
||||||
|
);
|
||||||
|
|
||||||
|
metric_events_change.increment();
|
||||||
|
|
||||||
|
// publish new fill and record in checkpoint
|
||||||
|
fill_update_sender
|
||||||
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
|
slot,
|
||||||
|
write_version,
|
||||||
|
event: fill.clone(),
|
||||||
|
status: FillUpdateStatus::New,
|
||||||
|
market: mkt_pk_string.clone(),
|
||||||
|
queue: evq_pk_string.clone(),
|
||||||
|
}))
|
||||||
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
|
checkpoint.push(fill);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -411,11 +502,12 @@ fn publish_changes_serum(
|
||||||
metric_events_drop.increment();
|
metric_events_drop.increment();
|
||||||
|
|
||||||
match old_event_view {
|
match old_event_view {
|
||||||
EventView::Fill { .. } => {
|
SpotEvent::Fill { .. } => {
|
||||||
|
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
event: old_events[idx],
|
event: old_fill,
|
||||||
write_version,
|
write_version,
|
||||||
status: FillUpdateStatus::Revoke,
|
status: FillUpdateStatus::Revoke,
|
||||||
market: mkt_pk_string.clone(),
|
market: mkt_pk_string.clone(),
|
||||||
|
@ -423,13 +515,13 @@ fn publish_changes_serum(
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
}
|
}
|
||||||
EventView::Out { .. } => continue,
|
SpotEvent::Out { .. } => continue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::SerumCheckpoint(
|
.try_send(FillEventFilterMessage::Checkpoint(
|
||||||
SerumFillCheckpoint {
|
FillCheckpoint {
|
||||||
slot,
|
slot,
|
||||||
write_version,
|
write_version,
|
||||||
events: checkpoint,
|
events: checkpoint,
|
||||||
|
|
|
@ -30,7 +30,6 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
fill_event_filter::SerumFillCheckpoint,
|
|
||||||
metrics::{MetricType, MetricU64},
|
metrics::{MetricType, MetricU64},
|
||||||
FilterConfig, StatusResponse,
|
FilterConfig, StatusResponse,
|
||||||
};
|
};
|
||||||
|
@ -40,7 +39,6 @@ use solana_geyser_connector_lib::{
|
||||||
};
|
};
|
||||||
|
|
||||||
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
|
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
|
||||||
type SerumCheckpointMap = Arc<Mutex<HashMap<String, SerumFillCheckpoint>>>;
|
|
||||||
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
|
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
|
||||||
|
|
||||||
// jemalloc seems to be better at keeping the memory footprint reasonable over
|
// jemalloc seems to be better at keeping the memory footprint reasonable over
|
||||||
|
@ -79,7 +77,6 @@ pub struct Peer {
|
||||||
|
|
||||||
async fn handle_connection_error(
|
async fn handle_connection_error(
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
serum_checkpoint_map: SerumCheckpointMap,
|
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
market_ids: HashMap<String, String>,
|
market_ids: HashMap<String, String>,
|
||||||
raw_stream: TcpStream,
|
raw_stream: TcpStream,
|
||||||
|
@ -91,7 +88,6 @@ async fn handle_connection_error(
|
||||||
|
|
||||||
let result = handle_connection(
|
let result = handle_connection(
|
||||||
checkpoint_map,
|
checkpoint_map,
|
||||||
serum_checkpoint_map,
|
|
||||||
peer_map.clone(),
|
peer_map.clone(),
|
||||||
market_ids,
|
market_ids,
|
||||||
raw_stream,
|
raw_stream,
|
||||||
|
@ -109,7 +105,6 @@ async fn handle_connection_error(
|
||||||
|
|
||||||
async fn handle_connection(
|
async fn handle_connection(
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
serum_checkpoint_map: SerumCheckpointMap,
|
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
market_ids: HashMap<String, String>,
|
market_ids: HashMap<String, String>,
|
||||||
raw_stream: TcpStream,
|
raw_stream: TcpStream,
|
||||||
|
@ -138,7 +133,6 @@ async fn handle_connection(
|
||||||
msg,
|
msg,
|
||||||
peer_map.clone(),
|
peer_map.clone(),
|
||||||
checkpoint_map.clone(),
|
checkpoint_map.clone(),
|
||||||
serum_checkpoint_map.clone(),
|
|
||||||
market_ids.clone(),
|
market_ids.clone(),
|
||||||
),
|
),
|
||||||
Message::Ping(_) => {
|
Message::Ping(_) => {
|
||||||
|
@ -167,7 +161,6 @@ fn handle_commands(
|
||||||
msg: Message,
|
msg: Message,
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
serum_checkpoint_map: SerumCheckpointMap,
|
|
||||||
market_ids: HashMap<String, String>,
|
market_ids: HashMap<String, String>,
|
||||||
) -> Ready<Result<(), Error>> {
|
) -> Ready<Result<(), Error>> {
|
||||||
let msg_str = msg.clone().into_text().unwrap();
|
let msg_str = msg.clone().into_text().unwrap();
|
||||||
|
@ -209,17 +202,8 @@ fn handle_commands(
|
||||||
|
|
||||||
if subscribed {
|
if subscribed {
|
||||||
let checkpoint_map = checkpoint_map.lock().unwrap();
|
let checkpoint_map = checkpoint_map.lock().unwrap();
|
||||||
let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap();
|
|
||||||
let checkpoint = checkpoint_map.get(&market_id);
|
let checkpoint = checkpoint_map.get(&market_id);
|
||||||
match checkpoint {
|
match checkpoint {
|
||||||
Some(checkpoint) => {
|
|
||||||
peer.sender
|
|
||||||
.unbounded_send(Message::Text(
|
|
||||||
serde_json::to_string(&checkpoint).unwrap(),
|
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
None => match serum_checkpoint_map.get(&market_id) {
|
|
||||||
Some(checkpoint) => {
|
Some(checkpoint) => {
|
||||||
peer.sender
|
peer.sender
|
||||||
.unbounded_send(Message::Text(
|
.unbounded_send(Message::Text(
|
||||||
|
@ -228,8 +212,7 @@ fn handle_commands(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
None => info!("no checkpoint available on client subscription"),
|
None => info!("no checkpoint available on client subscription"),
|
||||||
},
|
};
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Command::Unsubscribe(cmd)) => {
|
Ok(Command::Unsubscribe(cmd)) => {
|
||||||
|
@ -393,11 +376,9 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
||||||
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
|
|
||||||
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
let checkpoints_ref_thread = checkpoints.clone();
|
let checkpoints_ref_thread = checkpoints.clone();
|
||||||
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
|
|
||||||
let peers_ref_thread = peers.clone();
|
let peers_ref_thread = peers.clone();
|
||||||
let peers_ref_thread1 = peers.clone();
|
let peers_ref_thread1 = peers.clone();
|
||||||
|
|
||||||
|
@ -408,7 +389,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let message = fill_receiver.recv().await.unwrap();
|
let message = fill_receiver.recv().await.unwrap();
|
||||||
match message {
|
match message {
|
||||||
FillEventFilterMessage::Update(update) => {
|
FillEventFilterMessage::Update(update) => {
|
||||||
debug!("ws update {} {:?} fill", update.market, update.status);
|
debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type);
|
||||||
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
||||||
for (addr, peer) in peer_copy.iter_mut() {
|
for (addr, peer) in peer_copy.iter_mut() {
|
||||||
let json = serde_json::to_string(&update).unwrap();
|
let json = serde_json::to_string(&update).unwrap();
|
||||||
|
@ -428,27 +409,6 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(checkpoint.queue.clone(), checkpoint);
|
.insert(checkpoint.queue.clone(), checkpoint);
|
||||||
}
|
}
|
||||||
FillEventFilterMessage::SerumUpdate(update) => {
|
|
||||||
debug!("ws update {} {:?} serum fill", update.market, update.status);
|
|
||||||
let mut peers_copy = peers_ref_thread.lock().unwrap().clone();
|
|
||||||
for (addr, peer) in peers_copy.iter_mut() {
|
|
||||||
let json = serde_json::to_string(&update).unwrap();
|
|
||||||
|
|
||||||
// only send updates if the peer is subscribed
|
|
||||||
if peer.subscriptions.contains(&update.market) {
|
|
||||||
let result = peer.sender.send(Message::Text(json)).await;
|
|
||||||
if result.is_err() {
|
|
||||||
error!("ws update {} fill could not reach {}", update.market, addr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
FillEventFilterMessage::SerumCheckpoint(checkpoint) => {
|
|
||||||
serum_checkpoints_ref_thread
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.insert(checkpoint.queue.clone(), checkpoint);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -462,7 +422,6 @@ async fn main() -> anyhow::Result<()> {
|
||||||
while let Ok((stream, addr)) = listener.accept().await {
|
while let Ok((stream, addr)) = listener.accept().await {
|
||||||
tokio::spawn(handle_connection_error(
|
tokio::spawn(handle_connection_error(
|
||||||
checkpoints.clone(),
|
checkpoints.clone(),
|
||||||
serum_checkpoints.clone(),
|
|
||||||
peers.clone(),
|
peers.clone(),
|
||||||
market_pubkey_strings.clone(),
|
market_pubkey_strings.clone(),
|
||||||
stream,
|
stream,
|
||||||
|
|
Loading…
Reference in New Issue