Refactoring, Fills Feed changes (#1)

lib:
- Move fill/orderbook filters out of shared lib into the services
- Add some common structs to shared lib
- Add libraries to fills/orderbook services

feeds:
- Add graceful exit handling
- Publish single perp fill event for both maker and taker
- Disable openbook fills processing
- Fix perp fill event quantity decimals
- Handle revoked fills in postgres
- Allow subscriptions to multiple and all markets, and accounts
- Add event queue head updates
This commit is contained in:
Maximilian Schneider 2023-04-07 17:27:54 +02:00 committed by GitHub
parent d4a0ff7602
commit b6912202bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1640 additions and 1281 deletions

41
Cargo.lock generated
View File

@ -5181,32 +5181,6 @@ dependencies = [
"without-alloc", "without-alloc",
] ]
[[package]]
name = "serum_dex"
version = "0.5.10"
source = "git+https://github.com/openbook-dex/program?branch=master#c85e56deeaead43abbc33b7301058838b9c5136d"
dependencies = [
"anchor-lang",
"arrayref",
"bincode",
"bytemuck",
"byteorder",
"default-env",
"enumflags2",
"field-offset",
"itertools 0.10.5",
"num-traits",
"num_enum",
"safe-transmute",
"serde",
"solana-program",
"solana-security-txt",
"spl-token",
"static_assertions",
"thiserror",
"without-alloc",
]
[[package]] [[package]]
name = "serum_dex" name = "serum_dex"
version = "0.5.10" version = "0.5.10"
@ -5272,21 +5246,32 @@ dependencies = [
"anyhow", "anyhow",
"async-channel", "async-channel",
"async-trait", "async-trait",
"base64 0.21.0",
"bs58 0.4.0", "bs58 0.4.0",
"bytemuck", "bytemuck",
"chrono",
"client", "client",
"futures 0.3.26",
"futures-channel", "futures-channel",
"futures-core",
"futures-util", "futures-util",
"jemallocator", "jemallocator",
"log 0.4.17", "log 0.4.17",
"mango-feeds-lib", "mango-feeds-lib",
"mango-v4", "mango-v4",
"native-tls",
"postgres-native-tls",
"postgres-types",
"postgres_query",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"serum_dex 0.5.10 (git+https://github.com/jup-ag/openbook-program?branch=feat/expose-things)", "serum_dex 0.5.10 (git+https://github.com/jup-ag/openbook-program?branch=feat/expose-things)",
"solana-logger", "solana-logger",
"solana-sdk",
"tokio", "tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-tungstenite", "tokio-tungstenite",
"toml 0.7.1", "toml 0.7.1",
"ws", "ws",
@ -5306,14 +5291,16 @@ dependencies = [
"client", "client",
"futures-channel", "futures-channel",
"futures-util", "futures-util",
"itertools 0.10.5",
"log 0.4.17", "log 0.4.17",
"mango-feeds-lib", "mango-feeds-lib",
"mango-v4", "mango-v4",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"serum_dex 0.5.10 (git+https://github.com/openbook-dex/program?branch=master)", "serum_dex 0.5.10 (git+https://github.com/jup-ag/openbook-program?branch=feat/expose-things)",
"solana-logger", "solana-logger",
"solana-sdk",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
"toml 0.7.1", "toml 0.7.1",

View File

@ -20,6 +20,6 @@ RUN cargo build --release --bin service-mango-fills --bin service-mango-pnl --bi
FROM debian:bullseye-slim as run FROM debian:bullseye-slim as run
RUN apt-get update && apt-get -y install ca-certificates libc6 RUN apt-get update && apt-get -y install ca-certificates libc6
COPY --from=build /app/target/release/service-mango-* /usr/local/bin/ COPY --from=build /app/target/release/service-mango-* /usr/local/bin/
COPY --from=build /app/service-mango-pnl/template-config.toml ./pnl-config.toml COPY --from=build /app/service-mango-pnl/conf/template-config.toml ./pnl-config.toml
COPY --from=build /app/service-mango-fills/template-config.toml ./fills-config.toml COPY --from=build /app/service-mango-fills/conf/template-config.toml ./fills-config.toml
COPY --from=build /app/service-mango-orderbook/template-config.toml ./orderbook-config.toml COPY --from=build /app/service-mango-orderbook/conf/template-config.toml ./orderbook-config.toml

View File

@ -1,6 +1,6 @@
app = "mango-fills" app = "mango-fills"
kill_signal = "SIGINT" kill_signal = "SIGTERM"
kill_timeout = 5 kill_timeout = 30
[build] [build]
dockerfile = "../Dockerfile" dockerfile = "../Dockerfile"

View File

@ -16,6 +16,8 @@ use yellowstone_grpc_proto::tonic::{
}; };
use log::*; use log::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{collections::HashMap, env, str::FromStr, time::Duration}; use std::{collections::HashMap, env, str::FromStr, time::Duration};
use yellowstone_grpc_proto::prelude::{ use yellowstone_grpc_proto::prelude::{
@ -197,6 +199,7 @@ async fn feed_data_geyser(
// Highest slot that an account write came in for. // Highest slot that an account write came in for.
let mut newest_write_slot: u64 = 0; let mut newest_write_slot: u64 = 0;
#[derive(Clone, Debug)]
struct WriteVersion { struct WriteVersion {
// Write version seen on-chain // Write version seen on-chain
global: u64, global: u64,
@ -386,6 +389,7 @@ pub async fn process_events(
account_write_queue_sender: async_channel::Sender<AccountWrite>, account_write_queue_sender: async_channel::Sender<AccountWrite>,
slot_queue_sender: async_channel::Sender<SlotUpdate>, slot_queue_sender: async_channel::Sender<SlotUpdate>,
metrics_sender: Metrics, metrics_sender: Metrics,
exit: Arc<AtomicBool>,
) { ) {
// Subscribe to geyser // Subscribe to geyser
let (msg_sender, msg_receiver) = async_channel::bounded::<Message>(config.dedup_queue_size); let (msg_sender, msg_receiver) = async_channel::bounded::<Message>(config.dedup_queue_size);
@ -468,6 +472,11 @@ pub async fn process_events(
metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter); metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter);
loop { loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down grpc_plugin_source...");
break;
}
metric_dedup_queue.set(msg_receiver.len() as u64); metric_dedup_queue.set(msg_receiver.len() as u64);
let msg = msg_receiver.recv().await.expect("sender must not close"); let msg = msg_receiver.recv().await.expect("sender must not close");
match msg { match msg {

View File

@ -38,7 +38,7 @@ serde_derive = "1.0.130"
serde_json = "1.0.68" serde_json = "1.0.68"
bs58 = "*" bs58 = "*"
base64 = "*" base64 = "0.21.0"
log = "0.4" log = "0.4"
rand = "0.7" rand = "0.7"
anyhow = "1.0" anyhow = "1.0"

View File

@ -1,843 +0,0 @@
use crate::{
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
metrics::{MetricType, Metrics},
orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide},
serum::SerumEventQueueHeader,
AccountWrite, SlotUpdate,
};
use bytemuck::cast_slice;
use chrono::{TimeZone, Utc};
use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::state::EventView as SpotEvent;
use solana_sdk::{
account::{ReadableAccount, WritableAccount},
clock::Epoch,
pubkey::Pubkey,
};
use std::{
borrow::BorrowMut,
cmp::max,
collections::{HashMap, HashSet},
convert::identity,
time::SystemTime,
};
use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize;
use mango_v4::state::{
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, Side,
MAX_NUM_EVENTS,
};
#[derive(Clone, Copy, Debug)]
pub enum FillUpdateStatus {
New,
Revoke,
}
impl Serialize for FillUpdateStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
FillUpdateStatus::New => {
serializer.serialize_unit_variant("FillUpdateStatus", 0, "new")
}
FillUpdateStatus::Revoke => {
serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke")
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum FillEventType {
Spot,
Perp,
}
impl Serialize for FillEventType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"),
FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"),
}
}
}
#[derive(Clone, Debug)]
pub struct FillEvent {
pub event_type: FillEventType,
pub maker: bool,
pub side: OrderbookSide,
pub timestamp: u64,
pub seq_num: u64,
pub owner: String,
pub client_order_id: u64,
pub fee: f32,
pub price: f64,
pub quantity: f64,
}
impl Serialize for FillEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillEvent", 12)?;
state.serialize_field("eventType", &self.event_type)?;
state.serialize_field("maker", &self.maker)?;
state.serialize_field("side", &self.side)?;
state.serialize_field(
"timestamp",
&Utc.timestamp_opt(self.timestamp as i64, 0)
.unwrap()
.to_rfc3339(),
)?;
state.serialize_field("seqNum", &self.seq_num)?;
state.serialize_field("owner", &self.owner)?;
state.serialize_field("clientOrderId", &self.client_order_id)?;
state.serialize_field("fee", &self.fee)?;
state.serialize_field("price", &self.price)?;
state.serialize_field("quantity", &self.quantity)?;
state.end()
}
}
impl FillEvent {
pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> [Self; 2] {
let taker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Ask,
Side::Bid => OrderbookSide::Bid,
};
let maker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Bid,
Side::Bid => OrderbookSide::Ask,
};
let price = price_lots_to_ui_perp(
event.price,
config.base_decimals,
config.quote_decimals,
config.base_lot_size,
config.quote_lot_size,
);
let quantity =
base_lots_to_ui_perp(event.quantity, config.base_decimals, config.quote_decimals);
[
FillEvent {
event_type: FillEventType::Perp,
maker: true,
side: maker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.maker.to_string(),
client_order_id: event.maker_client_order_id,
fee: event.maker_fee,
price: price,
quantity: quantity,
},
FillEvent {
event_type: FillEventType::Perp,
maker: false,
side: taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.taker.to_string(),
client_order_id: event.taker_client_order_id,
fee: event.taker_fee,
price: price,
quantity: quantity,
},
]
}
pub fn new_from_spot(
event: SpotEvent,
timestamp: u64,
seq_num: u64,
config: &MarketConfig,
) -> Self {
match event {
SpotEvent::Fill {
side,
maker,
native_qty_paid,
native_qty_received,
native_fee_or_rebate,
owner,
client_order_id,
..
} => {
let side = match side as u8 {
0 => OrderbookSide::Bid,
1 => OrderbookSide::Ask,
_ => panic!("invalid side"),
};
let client_order_id: u64 = match client_order_id {
Some(id) => id.into(),
None => 0u64,
};
let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64;
let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64;
let (price, quantity) = match side {
OrderbookSide::Bid => {
let price_before_fees = if maker {
native_qty_paid + native_fee_or_rebate
} else {
native_qty_paid - native_fee_or_rebate
};
let top = price_before_fees * base_multiplier;
let bottom = quote_multiplier * native_qty_received;
let price = top as f64 / bottom as f64;
let quantity = native_qty_received as f64 / base_multiplier as f64;
(price, quantity)
}
OrderbookSide::Ask => {
let price_before_fees = if maker {
native_qty_received - native_fee_or_rebate
} else {
native_qty_received + native_fee_or_rebate
};
let top = price_before_fees * base_multiplier;
let bottom = quote_multiplier * native_qty_paid;
let price = top as f64 / bottom as f64;
let quantity = native_qty_paid as f64 / base_multiplier as f64;
(price, quantity)
}
};
let fee = native_fee_or_rebate as f32 / quote_multiplier as f32;
FillEvent {
event_type: FillEventType::Spot,
maker: maker,
side,
timestamp,
seq_num,
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
client_order_id: client_order_id,
fee,
price,
quantity,
}
}
SpotEvent::Out { .. } => {
panic!("Can't build FillEvent from SpotEvent::Out")
}
}
}
}
#[derive(Clone, Debug)]
pub struct FillUpdate {
pub event: FillEvent,
pub status: FillUpdateStatus,
pub market_key: String,
pub market_name: String,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillUpdate", 6)?;
state.serialize_field("event", &self.event)?;
state.serialize_field("marketKey", &self.market_key)?;
state.serialize_field("marketName", &self.market_name)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("writeVersion", &self.write_version)?;
state.end()
}
}
#[derive(Clone, Debug)]
pub struct FillCheckpoint {
pub market: String,
pub queue: String,
pub events: Vec<FillEvent>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
state.serialize_field("events", &self.events)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
pub enum FillEventFilterMessage {
Update(FillUpdate),
Checkpoint(FillCheckpoint),
}
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
fn publish_changes_perp(
slot: u64,
write_version: u64,
mkt: &(Pubkey, MarketConfig),
header: &EventQueueHeader,
events: &EventQueueEvents,
old_seq_num: u64,
old_events: &EventQueueEvents,
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
metric_events_new: &mut MetricU64,
metric_events_change: &mut MetricU64,
metric_events_drop: &mut MetricU64,
) {
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
let start_seq_num = max(old_seq_num, header.seq_num)
.checked_sub(MAX_NUM_EVENTS as u64)
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.event_queue.to_string();
for seq_num in start_seq_num..header.seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
// there are three possible cases:
// 1) the event is past the old seq num, hence guaranteed new event
// 2) the event is not matching the old event queue
// 3) all other events are matching the old event queue
// the order of these checks is important so they are exhaustive
if seq_num >= old_seq_num {
debug!(
"found new event {} idx {} type {} slot {} write_version {}",
mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version
);
metric_events_new.increment();
// new fills are published and recorded in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
// send event for both maker and taker
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
}
} else if old_events[idx].event_type != events[idx].event_type
|| old_events[idx].padding != events[idx].padding
{
debug!(
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num
);
metric_events_change.increment();
// first revoke old event if a fill
if old_events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::Revoke,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
}
// then publish new if its a fill and record in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
}
} else {
// every already published event is recorded in checkpoint if a fill
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
checkpoint.push(fill);
}
}
}
}
// in case queue size shrunk due to a fork we need revoke all previous fills
for seq_num in header.seq_num..old_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}",
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num, slot, write_version
);
metric_events_drop.increment();
if old_events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: fill,
write_version,
status: FillUpdateStatus::Revoke,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
}
}
fill_update_sender
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt_pk_string,
queue: evq_pk_string,
}))
.unwrap()
}
fn publish_changes_serum(
slot: u64,
write_version: u64,
mkt: &(Pubkey, MarketConfig),
header: &SerumEventQueueHeader,
events: &[serum_dex::state::Event],
old_seq_num: u64,
old_events: &[serum_dex::state::Event],
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
metric_events_new: &mut MetricU64,
metric_events_change: &mut MetricU64,
metric_events_drop: &mut MetricU64,
) {
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
let start_seq_num = max(old_seq_num, header.seq_num)
.checked_sub(MAX_NUM_EVENTS as u64)
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.event_queue.to_string();
let header_seq_num = header.seq_num;
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
// Timestamp for spot events is time scraped
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
for seq_num in start_seq_num..header_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let event_view = events[idx].as_view().unwrap();
let old_event_view = old_events[idx].as_view().unwrap();
match event_view {
SpotEvent::Fill { .. } => {
// there are three possible cases:
// 1) the event is past the old seq num, hence guaranteed new event
// 2) the event is not matching the old event queue
// 3) all other events are matching the old event queue
// the order of these checks is important so they are exhaustive
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1);
if seq_num >= old_seq_num {
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
metric_events_new.increment();
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
continue;
}
match old_event_view {
SpotEvent::Fill {
client_order_id, ..
} => {
let client_order_id = match client_order_id {
Some(id) => id.into(),
None => 0u64,
};
if client_order_id != fill.client_order_id {
debug!(
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
let old_fill = FillEvent::new_from_spot(
old_event_view,
timestamp,
seq_num,
&mkt.1,
);
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: old_fill,
status: FillUpdateStatus::Revoke,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
// record new event in checkpoint
checkpoint.push(fill);
}
SpotEvent::Out { .. } => {
debug!(
"found changed type event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
// publish new fill and record in checkpoint
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
}
}
_ => continue,
}
}
// in case queue size shrunk due to a fork we need revoke all previous fills
for seq_num in header_seq_num..old_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let old_event_view = old_events[idx].as_view().unwrap();
debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_drop.increment();
match old_event_view {
SpotEvent::Fill { .. } => {
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: old_fill,
write_version,
status: FillUpdateStatus::Revoke,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
SpotEvent::Out { .. } => continue,
}
}
fill_update_sender
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt_pk_string,
queue: evq_pk_string,
}))
.unwrap()
}
pub async fn init(
perp_market_configs: Vec<(Pubkey, MarketConfig)>,
spot_market_configs: Vec<(Pubkey, MarketConfig)>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<FillEventFilterMessage>,
)> {
let metrics_sender = metrics_sender.clone();
let mut metric_events_new =
metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Counter);
let mut metric_events_new_serum =
metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter);
let mut metric_events_change =
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter);
let mut metric_events_change_serum =
metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter);
let mut metrics_events_drop =
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter);
let mut metrics_events_drop_serum =
metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter);
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
// Fill updates can be consumed by client connections, they contain all fills for all markets
let (fill_update_sender, fill_update_receiver) =
async_channel::unbounded::<FillEventFilterMessage>();
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let mut chain_cache = ChainData::new();
let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender);
let mut perp_events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
let mut serum_events_cache: HashMap<String, Vec<serum_dex::state::Event>> = HashMap::new();
let mut seq_num_cache = HashMap::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat();
let perp_queue_pks: Vec<Pubkey> = perp_market_configs
.iter()
.map(|x| x.1.event_queue)
.collect();
let spot_queue_pks: Vec<Pubkey> = spot_market_configs
.iter()
.map(|x| x.1.event_queue)
.collect();
let all_queue_pks: HashSet<Pubkey> =
HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat());
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver_c.recv() => {
if !all_queue_pks.contains(&account_write.pubkey) {
continue;
}
chain_cache.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_cache.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});
}
Err(e) = slot_queue_receiver.recv() => {
warn!("slot update channel err {:?}", e);
}
Err(e) = account_write_queue_receiver_c.recv() => {
warn!("write update channel err {:?}", e);
}
}
chain_data_metrics.report(&chain_cache);
for mkt in all_market_configs.iter() {
let evq_pk = mkt.1.event_queue;
let evq_pk_string = evq_pk.to_string();
let last_evq_version = last_evq_versions
.get(&mkt.1.event_queue.to_string())
.unwrap_or(&(0, 0));
match chain_cache.account(&evq_pk) {
Ok(account_info) => {
// only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version);
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
if evq_version == *last_evq_version {
continue;
}
if evq_version.0 < last_evq_version.0 {
debug!("evq version slot was old");
continue;
}
if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1
{
info!("evq version slot was same and write version was old");
continue;
}
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account;
let is_perp = mango_v4::check_id(account.owner());
if is_perp {
let event_queue =
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
info!(
"evq {} seq_num {} version {:?}",
evq_pk_string, event_queue.header.seq_num, evq_version,
);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
Some(old_events) => publish_changes_perp(
account_info.slot,
account_info.write_version,
&mkt,
&event_queue.header,
&event_queue.buf,
*old_seq_num,
old_events,
&fill_update_sender,
&mut metric_events_new,
&mut metric_events_change,
&mut metrics_events_drop,
),
_ => {
info!("perp_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
} else {
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
let header: SerumEventQueueHeader =
*bytemuck::from_bytes(&inner_data[..header_span]);
let seq_num = header.seq_num;
let count = header.count;
let rest = &inner_data[header_span..];
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
let new_len = rest.len() - slop;
let events = &rest[..new_len];
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
Some(old_events) => publish_changes_serum(
account_info.slot,
account_info.write_version,
mkt,
&header,
&events,
*old_seq_num,
old_events,
&fill_update_sender,
&mut metric_events_new_serum,
&mut metric_events_change_serum,
&mut metrics_events_drop_serum,
),
_ => {
debug!(
"serum_events_cache could not find {}",
evq_pk_string
)
}
},
_ => debug!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
}
}
Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue),
}
}
}
});
Ok((
account_write_queue_sender,
slot_queue_sender,
fill_update_receiver,
))
}

View File

@ -1,10 +1,8 @@
pub mod fill_event_filter;
pub mod fill_event_postgres_target;
pub mod memory_target; pub mod memory_target;
pub mod orderbook_filter;
pub mod postgres_types_numeric; pub mod postgres_types_numeric;
pub mod serum; pub mod serum;
use anchor_lang::prelude::Pubkey;
use serde::{ser::SerializeStruct, Serialize, Serializer}; use serde::{ser::SerializeStruct, Serialize, Serializer};
use serde_derive::Deserialize; use serde_derive::Deserialize;
@ -40,6 +38,13 @@ pub struct PostgresTlsConfig {
pub client_key_path: String, pub client_key_path: String,
} }
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub postgres_target: PostgresConfig,
pub source: SourceConfig,
pub metrics: MetricsConfig,
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct StatusResponse<'a> { pub struct StatusResponse<'a> {
pub success: bool, pub success: bool,
@ -59,9 +64,68 @@ impl<'a> Serialize for StatusResponse<'a> {
} }
} }
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug)]
pub struct Config { pub enum OrderbookSide {
pub postgres_target: PostgresConfig, Bid = 0,
pub source: SourceConfig, Ask = 1,
pub metrics: MetricsConfig, }
impl Serialize for OrderbookSide {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"),
OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"),
}
}
}
#[derive(Clone, Debug)]
pub struct MarketConfig {
pub name: String,
pub bids: Pubkey,
pub asks: Pubkey,
pub event_queue: Pubkey,
pub base_decimals: u8,
pub quote_decimals: u8,
pub base_lot_size: i64,
pub quote_lot_size: i64,
}
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
(native * base_lot_size) as f64 / 10i64.pow(base_decimals.into()) as f64
}
pub fn base_lots_to_ui_perp(native: i64, decimals: u8, base_lot_size: i64) -> f64 {
native as f64 * (base_lot_size as f64 / (10i64.pow(decimals.into()) as f64))
}
pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
let decimals = base_decimals - quote_decimals;
native as f64 / (10u64.pow(decimals.into())) as f64
}
pub fn spot_price_to_ui(
native: i64,
native_size: i64,
base_decimals: u8,
quote_decimals: u8,
) -> f64 {
// TODO: account for fees
((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size))
as f64
}
pub fn price_lots_to_ui_perp(
native: i64,
base_decimals: u8,
quote_decimals: u8,
base_lot_size: i64,
quote_lot_size: i64,
) -> f64 {
let decimals = base_decimals - quote_decimals;
let multiplier = 10u64.pow(decimals.into()) as f64;
native as f64 * ((multiplier * quote_lot_size as f64) / base_lot_size as f64)
} }

View File

@ -4,7 +4,7 @@ use bytemuck::{Pod, Zeroable};
#[repr(packed)] #[repr(packed)]
pub struct SerumEventQueueHeader { pub struct SerumEventQueueHeader {
pub _account_flags: u64, // Initialized, EventQueue pub _account_flags: u64, // Initialized, EventQueue
pub _head: u64, pub head: u64,
pub count: u64, pub count: u64,
pub seq_num: u64, pub seq_num: u64,
} }

View File

@ -13,7 +13,14 @@ use bytemuck::bytes_of;
use client::{Client, MangoGroupContext}; use client::{Client, MangoGroupContext};
use log::*; use log::*;
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
use std::{collections::HashSet, fs::File, io::Read, str::FromStr, sync::Arc, time::Duration}; use std::{
collections::HashSet,
fs::File,
io::Read,
str::FromStr,
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use mango_feeds_lib::FilterConfig; use mango_feeds_lib::FilterConfig;
use mango_feeds_lib::{grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig}; use mango_feeds_lib::{grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig};
@ -32,6 +39,8 @@ pub struct Config {
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
solana_logger::setup_with_default("info"); solana_logger::setup_with_default("info");
let exit: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
if args.len() < 2 { if args.len() < 2 {
error!("Please enter a config file path argument."); error!("Please enter a config file path argument.");
@ -147,6 +156,7 @@ async fn main() -> anyhow::Result<()> {
account_write_queue_sender, account_write_queue_sender,
slot_queue_sender, slot_queue_sender,
metrics_tx.clone(), metrics_tx.clone(),
exit.clone(),
) )
.await; .await;
} else { } else {

View File

@ -16,6 +16,8 @@ toml = "*"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
futures = "0.3.17"
futures-core = "0.3"
futures-channel = "0.3" futures-channel = "0.3"
futures-util = "0.3" futures-util = "0.3"
ws = "^0.9.2" ws = "^0.9.2"
@ -25,9 +27,20 @@ tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17" tokio-tungstenite = "0.17"
bytemuck = "1.7.2" bytemuck = "1.7.2"
jemallocator = "0.3.2" jemallocator = "0.3.2"
chrono = "0.4.23"
solana-sdk = "~1.14.9"
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tokio-postgres-rustls = "0.9.0"
postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] }
postgres-native-tls = "0.5"
native-tls = "0.2"
# postgres_query hasn't updated its crate in a while
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
base64 = "0.21.0"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things" } serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things", features = ["no-entrypoint"] }
anchor-lang = "0.25.0" anchor-lang = "0.25.0"
anchor-client = "0.25.0" anchor-client = "0.25.0"

View File

@ -2,11 +2,84 @@
This module parses event queues and exposes individual fills on a websocket. This module parses event queues and exposes individual fills on a websocket.
Public API: `https://api.mngo.cloud/fills/v1/`
## API Reference
Get a list of markets
```
{
"command": "getMarkets"
}
```
```
{
"ESdnpnNLgTkBCZRuTJkZLi5wKEZ2z47SG3PJrhundSQ2": "SOL-PERP",
"HwhVGkfsSQ9JSQeQYu2CbkRCLvsh3qRZxG6m4oMVwZpN": "BTC-PERP",
"Fgh9JSZ2qfSjCw9RPJ85W2xbihsp2muLvfRztzoVR7f1": "ETH-PERP",
}
```
Subscribe to markets
```
{
"command": "subscribe"
"marketIds": ["MARKET_PUBKEY"]
}
```
```
{
"success": true,
"message": "subscribed to market MARKET_PUBKEY"
}
```
Subscribe to account
```
{
"command": "subscribe"
"account": ["MANGO_ACCOUNT_PUBKEY"]
}
```
```
{
"success": true,
"message": "subscribed to account MANGO_ACCOUNT_PUBKEY"
}
```
Fill Event
```
{
"event": {
"eventType": "perp",
"maker": "MAKER_MANGO_ACCOUNT_PUBKEY",
"taker": "TAKER_MANGO_ACCOUNT_PUBKEY",
"takerSide": "bid",
"timestamp": "2023-04-06T13:00:00+00:00",
"seqNum": 132420,
"makerClientOrderId": 1680786677648,
"takerClientOrderId": 1680786688080,
"makerFee": -0.0003,
"takerFee": 0.0006,
"price": 20.72,
"quantity": 0.45
},
"marketKey": "ESdnpnNLgTkBCZRuTJkZLi5wKEZ2z47SG3PJrhundSQ2",
"marketName": "SOL-PERP",
"status": "new",
"slot": 186869253,
"writeVersion": 662992260539
}
```
If the fill ocurred on a fork, an event will be sent with the 'status' field set to 'revoke'.
## Setup ## Setup
## Local
1. Prepare the connector configuration file. 1. Prepare the connector configuration file.
[Here is an example](service-mango-fills/example-config.toml). [Here is an example](service-mango-fills/conf/example-config.toml).
- `bind_ws_addr` is the listen port for the websocket clients - `bind_ws_addr` is the listen port for the websocket clients
- `rpc_ws_url` is unused and can stay empty. - `rpc_ws_url` is unused and can stay empty.
@ -14,7 +87,6 @@ This module parses event queues and exposes individual fills on a websocket.
address configured for the plugin. address configured for the plugin.
- `rpc_http_url` must point to the JSON-RPC URL. - `rpc_http_url` must point to the JSON-RPC URL.
- `program_id` must match what is configured for the gRPC plugin - `program_id` must match what is configured for the gRPC plugin
- `markets` need to contain all observed perp markets
2. Start the service binary. 2. Start the service binary.
@ -27,9 +99,6 @@ This module parses event queues and exposes individual fills on a websocket.
logs are very spammy changing the default log level is recommended when you logs are very spammy changing the default log level is recommended when you
dont want to analyze performance of the service. dont want to analyze performance of the service.
## TODO ## fly.io
- [] startup logic, dont accept market subscriptions before first snapshot
- [] failover logic, kill all websockets when we receive a later snapshot, more
frequent when running on home connections
- [] track latency accountwrite -> websocket
- [] create new model for fills so snapshot maps can be combined per market

View File

@ -0,0 +1,35 @@
bind_ws_addr = "0.0.0.0:8080"
rpc_http_url = "http://mango.rpcpool.com/<token>"
mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX"
[metrics]
output_stdout = true
output_http = true
# [postgres]
# connection_string = "$PG_CONNECTION_STRING"
# connection_count = 1
# max_batch_size = 1
# max_queue_size = 50000
# retry_query_max_count = 10
# retry_query_sleep_secs = 2
# retry_connection_sleep_secs = 10
# fatal_connection_timeout_secs = 30
# allow_invalid_certs = true
# # [postgres.tls]
# # ca_cert_path = "$PG_CA_CERT"
# # client_key_path = "$PG_CLIENT_KEY"
[source]
dedup_queue_size = 50000
rpc_ws_url = "wss://mango.rpcpool.com/<token>"
[[source.grpc_sources]]
name = "accountsdb-client"
connection_string = "http://tyo64.rpcpool.com/"
retry_connection_sleep_secs = 30
[source.snapshot]
rpc_http_url = "http://mango.rpcpool.com/<token>"
program_id = "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg"

View File

@ -0,0 +1,35 @@
bind_ws_addr = "[::]:8080"
rpc_http_url = "$RPC_HTTP_URL"
mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX"
[metrics]
output_stdout = true
output_http = true
[postgres]
connection_string = "$PG_CONNECTION_STRING"
connection_count = 1
max_batch_size = 1
max_queue_size = 50000
retry_query_max_count = 10
retry_query_sleep_secs = 2
retry_connection_sleep_secs = 10
fatal_connection_timeout_secs = 30
allow_invalid_certs = true
[postgres.tls]
ca_cert_path = "$PG_CA_CERT"
client_key_path = "$PG_CLIENT_KEY"
[source]
dedup_queue_size = 50000
rpc_ws_url = "$RPC_WS_URL"
[[source.grpc_sources]]
name = "accountsdb-client"
connection_string = "$GEYSER_CONNECTION_STRING"
retry_connection_sleep_secs = 30
[source.snapshot]
rpc_http_url = "$RPC_HTTP_URL"
program_id = "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX"

View File

@ -1,53 +0,0 @@
bind_ws_addr = "0.0.0.0:2082"
[source]
dedup_queue_size = 50000
rpc_ws_url = ""
[[source.grpc_sources]]
name = "accountsdb-client"
connection_string = "http://mango.devnet.rpcpool.com:10001"
retry_connection_sleep_secs = 30
[source.grpc_sources.tls]
ca_cert_path = "ca-devnet.pem"
client_cert_path = "client-devnet.pem"
client_key_path = "client-devnet.pem"
domain_name = "mango-accountsdb.rpcpool.com"
[source.snapshot]
rpc_http_url = "http://mango.devnet.rpcpool.com/"
program_id = "4skJ85cdxQAFVKbcGgfun8iZPL7BadVYXG3kGEGkufqA"
[[markets]]
name = "MNGO-PERP"
event_queue = "uaUCSQejWYrDeYSuvn4As4kaCwJ2rLnRQSsSjY3ogZk"
[[markets]]
name = "ETH-PERP"
event_queue = "8WLv5fKLYkyZpFG74kRmp2RALHQFcNKmH7eJn8ebHC13"
[[markets]]
name = "SOL-PERP"
event_queue = "CZ5MCRvkN38d5pnZDDEEyMiED3drgDUVpEUjkuJq31Kf"
[[markets]]
name = "ADA-PERP"
event_queue = "5v5fz2cCSy2VvrgVf5Vu7PF23RiZjv6BL36bgg48bA1c"
[[markets]]
name = "FTT-PERP"
event_queue = "7rswj7FVZcMYUKxcTLndZhWBmuVNc2GuxqjuXU8KcPWv"
[[markets]]
name = "AVAX-PERP"
event_queue = "4b7NqjqWoQoQh9V3dubfjkLPQVNJijwAwr7D9q6vTqqd"
[[markets]]
name = "BNB-PERP"
event_queue = "96Y87LTz5Mops7wdT9EJo1eM79XToKYJJmRZxNatV85d"
[[markets]]
name = "MATIC-PERP"
event_queue = "77maU5zdfYayqhqjBi2ocosM4PXvPXxbps2Up7dxDsMR"

View File

@ -0,0 +1,653 @@
use log::*;
use mango_feeds_lib::{
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
metrics::{MetricType, Metrics},
serum::SerumEventQueueHeader,
AccountWrite, MarketConfig, SlotUpdate,
};
use solana_sdk::{
account::{ReadableAccount, WritableAccount},
clock::Epoch,
pubkey::Pubkey,
};
use std::{
borrow::BorrowMut,
cmp::max,
collections::{HashMap, HashSet},
iter::FromIterator,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize;
use mango_v4::state::{
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent,
OutEvent as PerpOutEvent, QueueHeader, MAX_NUM_EVENTS,
};
use service_mango_fills::*;
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
fn publish_changes_perp(
slot: u64,
write_version: u64,
mkt: &(Pubkey, MarketConfig),
header: &EventQueueHeader,
events: &EventQueueEvents,
prev_seq_num: u64,
prev_head: u64,
prev_events: &EventQueueEvents,
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
metric_events_new: &mut MetricU64,
metric_events_change: &mut MetricU64,
metric_events_drop: &mut MetricU64,
metric_head_update: &mut MetricU64,
metric_head_revoke: &mut MetricU64,
) {
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
let start_seq_num = max(prev_seq_num, header.seq_num)
.checked_sub(MAX_NUM_EVENTS as u64)
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.event_queue.to_string();
for seq_num in start_seq_num..header.seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
// there are three possible cases:
// 1) the event is past the old seq num, hence guaranteed new event
// 2) the event is not matching the old event queue
// 3) all other events are matching the old event queue
// the order of these checks is important so they are exhaustive
if seq_num >= prev_seq_num {
debug!(
"found new event {} idx {} type {} slot {} write_version {}",
mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version
);
metric_events_new.increment();
// new fills are published and recorded in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fill = FillEvent::new_from_perp(fill, &mkt.1);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
} else if prev_events[idx].event_type != events[idx].event_type
|| prev_events[idx].padding != events[idx].padding
{
debug!(
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header.seq_num, prev_seq_num
);
metric_events_change.increment();
// first revoke old event if a fill
if prev_events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(prev_events[idx]);
let fill = FillEvent::new_from_perp(fill, &mkt.1);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::Revoke,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
// then publish new if its a fill and record in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fill = FillEvent::new_from_perp(fill, &mkt.1);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
}
} else {
// every already published event is recorded in checkpoint if a fill
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fill = FillEvent::new_from_perp(fill, &mkt.1);
checkpoint.push(fill);
}
}
}
// in case queue size shrunk due to a fork we need revoke all previous fills
for seq_num in header.seq_num..prev_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}",
mkt_pk_string, idx, seq_num, header.seq_num, prev_seq_num, slot, write_version
);
metric_events_drop.increment();
if prev_events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(prev_events[idx]);
let fill = FillEvent::new_from_perp(fill, &mkt.1);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: fill,
write_version,
status: FillUpdateStatus::Revoke,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
}
let head_idx = header.head();
let head = head_idx as u64;
let head_seq_num = if events[head_idx].event_type == EventType::Fill as u8 {
let event: PerpFillEvent = bytemuck::cast(events[head_idx]);
event.seq_num
} else if events[head_idx].event_type == EventType::Out as u8 {
let event: PerpOutEvent = bytemuck::cast(events[head_idx]);
event.seq_num
} else {
0
};
let prev_head_idx = prev_head as usize;
let prev_head_seq_num = if prev_events[prev_head_idx].event_type == EventType::Fill as u8 {
let event: PerpFillEvent = bytemuck::cast(prev_events[prev_head_idx]);
event.seq_num
} else if prev_events[prev_head_idx].event_type == EventType::Out as u8 {
let event: PerpOutEvent = bytemuck::cast(prev_events[prev_head_idx]);
event.seq_num
} else {
0
};
// publish a head update event if the head increased (events were consumed)
if head > prev_head {
metric_head_update.increment();
fill_update_sender
.try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate {
head,
prev_head,
head_seq_num,
prev_head_seq_num,
status: FillUpdateStatus::New,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
slot,
write_version,
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
// revoke head update event if it decreased (fork)
if head < prev_head {
metric_head_revoke.increment();
fill_update_sender
.try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate {
head,
prev_head,
head_seq_num,
prev_head_seq_num,
status: FillUpdateStatus::Revoke,
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
slot,
write_version,
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
fill_update_sender
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt_pk_string,
queue: evq_pk_string,
}))
.unwrap()
}
fn publish_changes_serum(
_slot: u64,
_write_version: u64,
_mkt: &(Pubkey, MarketConfig),
_header: &SerumEventQueueHeader,
_events: &[serum_dex::state::Event],
_prev_seq_num: u64,
_prev_events: &[serum_dex::state::Event],
_fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
_metric_events_new: &mut MetricU64,
_metric_events_change: &mut MetricU64,
_metric_events_drop: &mut MetricU64,
) {
// // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
// let start_seq_num = max(prev_seq_num, header.seq_num)
// .checked_sub(MAX_NUM_EVENTS as u64)
// .unwrap_or(0);
// let mut checkpoint = Vec::new();
// let mkt_pk_string = mkt.0.to_string();
// let evq_pk_string = mkt.1.event_queue.to_string();
// let header_seq_num = header.seq_num;
// debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
// // Timestamp for spot events is time scraped
// let timestamp = SystemTime::now()
// .duration_since(SystemTime::UNIX_EPOCH)
// .unwrap()
// .as_secs();
// for seq_num in start_seq_num..header_seq_num {
// let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
// let event_view = events[idx].as_view().unwrap();
// let old_event_view = prev_events[idx].as_view().unwrap();
// match event_view {
// SpotEvent::Fill { .. } => {
// // there are three possible cases:
// // 1) the event is past the old seq num, hence guaranteed new event
// // 2) the event is not matching the old event queue
// // 3) all other events are matching the old event queue
// // the order of these checks is important so they are exhaustive
// let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1);
// if seq_num >= prev_seq_num {
// debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
// metric_events_new.increment();
// fill_update_sender
// .try_send(FillEventFilterMessage::Update(FillUpdate {
// slot,
// write_version,
// event: fill.clone(),
// status: FillUpdateStatus::New,
// market_key: mkt_pk_string.clone(),
// market_name: mkt.1.name.clone(),
// }))
// .unwrap(); // TODO: use anyhow to bubble up error
// checkpoint.push(fill);
// continue;
// }
// match old_event_view {
// SpotEvent::Fill {
// client_order_id, ..
// } => {
// let client_order_id = match client_order_id {
// Some(id) => id.into(),
// None => 0u64,
// };
// if client_order_id != fill.client_order_id {
// debug!(
// "found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
// mkt_pk_string, idx, seq_num, header_seq_num, prev_seq_num
// );
// metric_events_change.increment();
// let old_fill = FillEvent::new_from_spot(
// old_event_view,
// timestamp,
// seq_num,
// &mkt.1,
// );
// // first revoke old event
// fill_update_sender
// .try_send(FillEventFilterMessage::Update(FillUpdate {
// slot,
// write_version,
// event: old_fill,
// status: FillUpdateStatus::Revoke,
// market_key: mkt_pk_string.clone(),
// market_name: mkt.1.name.clone(),
// }))
// .unwrap(); // TODO: use anyhow to bubble up error
// // then publish new
// fill_update_sender
// .try_send(FillEventFilterMessage::Update(FillUpdate {
// slot,
// write_version,
// event: fill.clone(),
// status: FillUpdateStatus::New,
// market_key: mkt_pk_string.clone(),
// market_name: mkt.1.name.clone(),
// }))
// .unwrap(); // TODO: use anyhow to bubble up error
// }
// // record new event in checkpoint
// checkpoint.push(fill);
// }
// SpotEvent::Out { .. } => {
// debug!(
// "found changed type event {} idx {} seq_num {} header seq num {} old seq num {}",
// mkt_pk_string, idx, seq_num, header_seq_num, prev_seq_num
// );
// metric_events_change.increment();
// // publish new fill and record in checkpoint
// fill_update_sender
// .try_send(FillEventFilterMessage::Update(FillUpdate {
// slot,
// write_version,
// event: fill.clone(),
// status: FillUpdateStatus::New,
// market_key: mkt_pk_string.clone(),
// market_name: mkt.1.name.clone(),
// }))
// .unwrap(); // TODO: use anyhow to bubble up error
// checkpoint.push(fill);
// }
// }
// }
// _ => continue,
// }
// }
// // in case queue size shrunk due to a fork we need revoke all previous fills
// for seq_num in header_seq_num..prev_seq_num {
// let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
// let old_event_view = prev_events[idx].as_view().unwrap();
// debug!(
// "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
// mkt_pk_string, idx, seq_num, header_seq_num, prev_seq_num
// );
// metric_events_drop.increment();
// match old_event_view {
// SpotEvent::Fill { .. } => {
// let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1);
// fill_update_sender
// .try_send(FillEventFilterMessage::Update(FillUpdate {
// slot,
// event: old_fill,
// write_version,
// status: FillUpdateStatus::Revoke,
// market_key: mkt_pk_string.clone(),
// market_name: mkt.1.name.clone(),
// }))
// .unwrap(); // TODO: use anyhow to bubble up error
// }
// SpotEvent::Out { .. } => continue,
// }
// }
// fill_update_sender
// .try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
// slot,
// write_version,
// events: checkpoint,
// market: mkt_pk_string,
// queue: evq_pk_string,
// }))
// .unwrap()
}
pub async fn init(
perp_market_configs: Vec<(Pubkey, MarketConfig)>,
spot_market_configs: Vec<(Pubkey, MarketConfig)>,
metrics_sender: Metrics,
exit: Arc<AtomicBool>,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
async_channel::Receiver<FillEventFilterMessage>,
)> {
let metrics_sender = metrics_sender.clone();
let mut metric_events_new =
metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Counter);
let mut metric_events_new_serum =
metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter);
let mut metric_events_change =
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter);
let mut metric_events_change_serum =
metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter);
let mut metrics_events_drop =
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter);
let mut metrics_events_drop_serum =
metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter);
let mut metrics_head_update =
metrics_sender.register_u64("fills_feed_head_update".into(), MetricType::Counter);
let mut metrics_head_revoke =
metrics_sender.register_u64("fills_feed_head_revoke".into(), MetricType::Counter);
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
// Fill updates can be consumed by client connections, they contain all fills for all markets
let (fill_update_sender, fill_update_receiver) =
async_channel::unbounded::<FillEventFilterMessage>();
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let mut chain_cache = ChainData::new();
let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender);
let mut perp_events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
let mut serum_events_cache: HashMap<String, Vec<serum_dex::state::Event>> = HashMap::new();
let mut seq_num_cache = HashMap::new();
let mut head_cache = HashMap::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat();
let perp_queue_pks: Vec<Pubkey> = perp_market_configs
.iter()
.map(|x| x.1.event_queue)
.collect();
let spot_queue_pks: Vec<Pubkey> = spot_market_configs
.iter()
.map(|x| x.1.event_queue)
.collect();
let all_queue_pks: HashSet<Pubkey> =
HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat());
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down fill_event_filter...");
break;
}
tokio::select! {
Ok(account_write) = account_write_queue_receiver_c.recv() => {
if !all_queue_pks.contains(&account_write.pubkey) {
continue;
}
chain_cache.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
write_version: account_write.write_version,
account: WritableAccount::create(
account_write.lamports,
account_write.data.clone(),
account_write.owner,
account_write.executable,
account_write.rent_epoch as Epoch,
),
},
);
}
Ok(slot_update) = slot_queue_receiver.recv() => {
chain_cache.update_slot(SlotData {
slot: slot_update.slot,
parent: slot_update.parent,
status: slot_update.status,
chain: 0,
});
}
Err(e) = slot_queue_receiver.recv() => {
warn!("slot update channel err {:?}", e);
}
Err(e) = account_write_queue_receiver_c.recv() => {
warn!("write update channel err {:?}", e);
}
}
chain_data_metrics.report(&chain_cache);
for mkt in all_market_configs.iter() {
let evq_pk = mkt.1.event_queue;
let evq_pk_string = evq_pk.to_string();
let last_evq_version = last_evq_versions
.get(&mkt.1.event_queue.to_string())
.unwrap_or(&(0, 0));
match chain_cache.account(&evq_pk) {
Ok(account_info) => {
// only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version);
if evq_version == *last_evq_version {
continue;
}
if evq_version.0 < last_evq_version.0 {
debug!("evq version slot was old");
continue;
}
if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1
{
info!("evq version slot was same and write version was old");
continue;
}
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account;
let is_perp = mango_v4::check_id(account.owner());
if is_perp {
let event_queue =
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
match (
seq_num_cache.get(&evq_pk_string),
head_cache.get(&evq_pk_string),
) {
(Some(prev_seq_num), Some(old_head)) => match perp_events_cache
.get(&evq_pk_string)
{
Some(prev_events) => publish_changes_perp(
account_info.slot,
account_info.write_version,
&mkt,
&event_queue.header,
&event_queue.buf,
*prev_seq_num,
*old_head,
prev_events,
&fill_update_sender,
&mut metric_events_new,
&mut metric_events_change,
&mut metrics_events_drop,
&mut metrics_head_update,
&mut metrics_head_revoke,
),
_ => {
info!("perp_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num/head cache could not find {}", evq_pk_string),
}
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
head_cache
.insert(evq_pk_string.clone(), event_queue.header.head() as u64);
perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
} else {
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
let header: SerumEventQueueHeader =
*bytemuck::from_bytes(&inner_data[..header_span]);
let seq_num = header.seq_num;
let count = header.count;
let rest = &inner_data[header_span..];
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
let new_len = rest.len() - slop;
let events = &rest[..new_len];
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
match seq_num_cache.get(&evq_pk_string) {
Some(prev_seq_num) => {
match serum_events_cache.get(&evq_pk_string) {
Some(prev_events) => publish_changes_serum(
account_info.slot,
account_info.write_version,
mkt,
&header,
&events,
*prev_seq_num,
prev_events,
&fill_update_sender,
&mut metric_events_new_serum,
&mut metric_events_change_serum,
&mut metrics_events_drop_serum,
),
_ => {
debug!(
"serum_events_cache could not find {}",
evq_pk_string
)
}
}
}
_ => debug!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
head_cache.insert(evq_pk_string.clone(), header.head);
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
}
}
Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue),
}
}
}
});
Ok((
account_write_queue_sender,
slot_queue_sender,
fill_update_receiver,
))
}

View File

@ -1,17 +1,28 @@
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use log::*; use log::*;
use mango_feeds_lib::{
metrics::{MetricType, MetricU64, Metrics},
*,
};
use native_tls::{Certificate, Identity, TlsConnector}; use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector; use postgres_native_tls::MakeTlsConnector;
use postgres_query::Caching; use postgres_query::Caching;
use std::{env, fs, time::Duration}; use service_mango_fills::*;
use std::{
env, fs,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio_postgres::Client; use tokio_postgres::Client;
use crate::{fill_event_filter::FillUpdate, metrics::*, PostgresConfig};
async fn postgres_connection( async fn postgres_connection(
config: &PostgresConfig, config: &PostgresConfig,
metric_retries: MetricU64, metric_retries: MetricU64,
metric_live: MetricU64, metric_live: MetricU64,
exit: Arc<AtomicBool>,
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> { ) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
let (tx, rx) = async_channel::unbounded(); let (tx, rx) = async_channel::unbounded();
@ -19,7 +30,6 @@ async fn postgres_connection(
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64 // base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
info!("making tls config");
let tls = match &config.tls { let tls = match &config.tls {
Some(tls) => { Some(tls) => {
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
@ -31,7 +41,7 @@ async fn postgres_connection(
.into_bytes(), .into_bytes(),
) )
.expect("decoding client cert"), .expect("decoding client cert"),
_ => fs::read(&tls.client_key_path).expect("reading client key from file"), _ => fs::read(&tls.ca_cert_path).expect("reading client cert from file"),
}; };
let client_key = match &tls.client_key_path.chars().next().unwrap() { let client_key = match &tls.client_key_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD '$' => general_purpose::STANDARD
@ -59,16 +69,26 @@ async fn postgres_connection(
}; };
let config = config.clone(); let config = config.clone();
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?); let connection_string = match &config.connection_string.chars().next().unwrap() {
'$' => {
env::var(&config.connection_string[1..]).expect("reading connection string from env")
}
_ => config.connection_string.clone(),
};
let mut initial = Some(tokio_postgres::connect(&connection_string, tls.clone()).await?);
let mut metric_retries = metric_retries; let mut metric_retries = metric_retries;
let mut metric_live = metric_live; let mut metric_live = metric_live;
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
// don't acquire a new connection if we're shutting down
if exit.load(Ordering::Relaxed) {
warn!("shutting down fill_event_postgres_target...");
break;
}
let (client, connection) = match initial.take() { let (client, connection) = match initial.take() {
Some(v) => v, Some(v) => v,
None => { None => {
let result = let result = tokio_postgres::connect(&connection_string, tls.clone()).await;
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
match result { match result {
Ok(v) => v, Ok(v) => v,
Err(err) => { Err(err) => {
@ -129,6 +149,8 @@ async fn process_update(client: &Caching<Client>, update: &FillUpdate) -> anyhow
let slot = update.slot as i64; let slot = update.slot as i64;
let write_version = update.write_version as i64; let write_version = update.write_version as i64;
if update.status == FillUpdateStatus::New {
// insert new events
let query = postgres_query::query!( let query = postgres_query::query!(
"INSERT INTO transactions_v4.perp_fills_feed_events "INSERT INTO transactions_v4.perp_fills_feed_events
(market, seq_num, fill_timestamp, price, (market, seq_num, fill_timestamp, price,
@ -146,6 +168,17 @@ async fn process_update(client: &Caching<Client>, update: &FillUpdate) -> anyhow
write_version, write_version,
); );
let _ = query.execute(&client).await?; let _ = query.execute(&client).await?;
} else {
// delete revoked events
let query = postgres_query::query!(
"DELETE FROM transactions_v4.perp_fills_feed_events
WHERE market=$market
AND seq_num=$seq_num",
market,
seq_num,
);
let _ = query.execute(&client).await?;
}
Ok(()) Ok(())
} }
@ -153,6 +186,7 @@ async fn process_update(client: &Caching<Client>, update: &FillUpdate) -> anyhow
pub async fn init( pub async fn init(
config: &PostgresConfig, config: &PostgresConfig,
metrics_sender: Metrics, metrics_sender: Metrics,
exit: Arc<AtomicBool>,
) -> anyhow::Result<async_channel::Sender<FillUpdate>> { ) -> anyhow::Result<async_channel::Sender<FillUpdate>> {
// The actual message may want to also contain a retry count, if it self-reinserts on failure? // The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (fill_update_queue_sender, fill_update_queue_receiver) = let (fill_update_queue_sender, fill_update_queue_receiver) =
@ -167,8 +201,12 @@ pub async fn init(
// postgres fill update sending worker threads // postgres fill update sending worker threads
for _ in 0..config.connection_count { for _ in 0..config.connection_count {
let postgres_account_writes = let postgres_account_writes = postgres_connection(
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) config,
metric_con_retries.clone(),
metric_con_live.clone(),
exit.clone(),
)
.await?; .await?;
let fill_update_queue_receiver_c = fill_update_queue_receiver.clone(); let fill_update_queue_receiver_c = fill_update_queue_receiver.clone();
let config = config.clone(); let config = config.clone();

View File

@ -0,0 +1,334 @@
use std::convert::identity;
use anchor_lang::prelude::Pubkey;
use bytemuck::cast_slice;
use chrono::{TimeZone, Utc};
use mango_feeds_lib::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide};
use mango_v4::state::{FillEvent as PerpFillEvent, Side};
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use serum_dex::state::EventView as SpotEvent;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum FillUpdateStatus {
New,
Revoke,
}
impl Serialize for FillUpdateStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
FillUpdateStatus::New => {
serializer.serialize_unit_variant("FillUpdateStatus", 0, "new")
}
FillUpdateStatus::Revoke => {
serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke")
}
}
}
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum FillEventType {
Spot,
Perp,
}
impl Serialize for FillEventType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"),
FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"),
}
}
}
#[derive(Clone, Debug)]
pub struct FillEvent {
pub event_type: FillEventType,
pub maker: String,
pub taker: String,
pub taker_side: OrderbookSide,
pub timestamp: u64, // make all strings
pub seq_num: u64,
pub maker_client_order_id: u64,
pub taker_client_order_id: u64,
pub maker_fee: f32,
pub taker_fee: f32,
pub price: f64,
pub quantity: f64,
}
impl Serialize for FillEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillEvent", 12)?;
state.serialize_field("eventType", &self.event_type)?;
state.serialize_field("maker", &self.maker)?;
state.serialize_field("taker", &self.taker)?;
state.serialize_field("takerSide", &self.taker_side)?;
state.serialize_field(
"timestamp",
&Utc.timestamp_opt(self.timestamp as i64, 0)
.unwrap()
.to_rfc3339(),
)?;
state.serialize_field("seqNum", &self.seq_num)?;
state.serialize_field("makerClientOrderId", &self.maker_client_order_id)?;
state.serialize_field("takerClientOrderId", &self.taker_client_order_id)?; // make string
state.serialize_field("makerFee", &self.maker_fee)?;
state.serialize_field("takerFee", &self.taker_fee)?;
state.serialize_field("price", &self.price)?;
state.serialize_field("quantity", &self.quantity)?;
state.end()
}
}
impl FillEvent {
pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> Self {
let taker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Ask,
Side::Bid => OrderbookSide::Bid,
};
let price = price_lots_to_ui_perp(
event.price,
config.base_decimals,
config.quote_decimals,
config.base_lot_size,
config.quote_lot_size,
);
let quantity =
base_lots_to_ui_perp(event.quantity, config.base_decimals, config.base_lot_size);
FillEvent {
event_type: FillEventType::Perp,
maker: event.maker.to_string(),
taker: event.taker.to_string(),
taker_side: taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
maker_client_order_id: event.maker_client_order_id,
taker_client_order_id: event.taker_client_order_id,
maker_fee: event.maker_fee,
taker_fee: event.taker_fee,
price: price,
quantity: quantity,
}
}
pub fn new_from_spot(
maker_event: SpotEvent,
taker_event: SpotEvent,
timestamp: u64,
seq_num: u64,
config: &MarketConfig,
) -> Self {
match (maker_event, taker_event) {
(
SpotEvent::Fill {
side: maker_side,
client_order_id: maker_client_order_id,
native_qty_paid: maker_native_qty_paid,
native_fee_or_rebate: maker_native_fee_or_rebate,
native_qty_received: maker_native_qty_received,
owner: maker_owner,
..
},
SpotEvent::Fill {
side: taker_side,
client_order_id: taker_client_order_id,
native_fee_or_rebate: taker_native_fee_or_rebate,
owner: taker_owner,
..
},
) => {
let maker_side = match maker_side as u8 {
0 => OrderbookSide::Bid,
1 => OrderbookSide::Ask,
_ => panic!("invalid side"),
};
let taker_side = match taker_side as u8 {
0 => OrderbookSide::Bid,
1 => OrderbookSide::Ask,
_ => panic!("invalid side"),
};
let maker_client_order_id: u64 = match maker_client_order_id {
Some(id) => id.into(),
None => 0u64,
};
let taker_client_order_id: u64 = match taker_client_order_id {
Some(id) => id.into(),
None => 0u64,
};
let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64;
let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64;
let (price, quantity) = match maker_side {
OrderbookSide::Bid => {
let price_before_fees = maker_native_qty_paid + maker_native_fee_or_rebate;
let top = price_before_fees * base_multiplier;
let bottom = quote_multiplier * maker_native_qty_received;
let price = top as f64 / bottom as f64;
let quantity = maker_native_qty_received as f64 / base_multiplier as f64;
(price, quantity)
}
OrderbookSide::Ask => {
let price_before_fees =
maker_native_qty_received - maker_native_fee_or_rebate;
let top = price_before_fees * base_multiplier;
let bottom = quote_multiplier * maker_native_qty_paid;
let price = top as f64 / bottom as f64;
let quantity = maker_native_qty_paid as f64 / base_multiplier as f64;
(price, quantity)
}
};
let maker_fee = maker_native_fee_or_rebate as f32 / quote_multiplier as f32;
let taker_fee = taker_native_fee_or_rebate as f32 / quote_multiplier as f32;
FillEvent {
event_type: FillEventType::Spot,
maker: Pubkey::new(cast_slice(&identity(maker_owner) as &[_])).to_string(),
taker: Pubkey::new(cast_slice(&identity(taker_owner) as &[_])).to_string(),
taker_side: taker_side,
timestamp,
seq_num,
maker_client_order_id,
taker_client_order_id,
taker_fee,
maker_fee,
price,
quantity,
}
}
(_, _) => {
panic!("Can't build FillEvent from SpotEvent::Out")
}
}
}
}
#[derive(Clone, Debug)]
pub struct FillUpdate {
pub event: FillEvent,
pub status: FillUpdateStatus,
pub market_key: String,
pub market_name: String,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillUpdate", 6)?;
state.serialize_field("event", &self.event)?;
state.serialize_field("marketKey", &self.market_key)?;
state.serialize_field("marketName", &self.market_name)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("writeVersion", &self.write_version)?;
state.end()
}
}
#[derive(Clone, Debug)]
pub struct HeadUpdate {
pub head: u64,
pub prev_head: u64,
pub head_seq_num: u64,
pub prev_head_seq_num: u64,
pub status: FillUpdateStatus,
pub market_key: String,
pub market_name: String,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for HeadUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("HeadUpdate", 6)?;
state.serialize_field("head", &self.head)?;
state.serialize_field("previousHead", &self.prev_head)?;
state.serialize_field("headSeqNum", &self.head_seq_num)?;
state.serialize_field("previousHeadSeqNum", &self.prev_head_seq_num)?;
state.serialize_field("marketKey", &self.market_key)?;
state.serialize_field("marketName", &self.market_name)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("writeVersion", &self.write_version)?;
state.end()
}
}
#[derive(Clone, Debug)]
pub struct FillCheckpoint {
pub market: String,
pub queue: String,
pub events: Vec<FillEvent>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
state.serialize_field("events", &self.events)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
pub enum FillEventFilterMessage {
Update(FillUpdate),
HeadUpdate(HeadUpdate),
Checkpoint(FillCheckpoint),
}
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "command")]
pub enum Command {
#[serde(rename = "subscribe")]
Subscribe(SubscribeCommand),
#[serde(rename = "unsubscribe")]
Unsubscribe(UnsubscribeCommand),
#[serde(rename = "getMarkets")]
GetMarkets,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeCommand {
pub market_id: Option<String>,
pub market_ids: Option<Vec<String>>,
pub account_ids: Option<Vec<String>>,
pub head_updates: Option<bool>,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UnsubscribeCommand {
pub market_id: String,
}

View File

@ -1,3 +1,6 @@
mod fill_event_filter;
mod fill_event_postgres_target;
use anchor_client::{ use anchor_client::{
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}, solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
Cluster, Cluster,
@ -10,6 +13,13 @@ use futures_util::{
pin_mut, SinkExt, StreamExt, TryStreamExt, pin_mut, SinkExt, StreamExt, TryStreamExt,
}; };
use log::*; use log::*;
use mango_feeds_lib::{
grpc_plugin_source, metrics,
metrics::{MetricType, MetricU64},
websocket_source, FilterConfig, MarketConfig, MetricsConfig, PostgresConfig, SourceConfig,
StatusResponse,
};
use service_mango_fills::{Command, FillCheckpoint, FillEventFilterMessage, FillEventType};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fs::File, fs::File,
@ -17,7 +27,10 @@ use std::{
net::SocketAddr, net::SocketAddr,
str::FromStr, str::FromStr,
sync::Arc, sync::Arc,
sync::Mutex, sync::{
atomic::{AtomicBool, Ordering},
Mutex,
},
time::Duration, time::Duration,
}; };
use tokio::{ use tokio::{
@ -26,17 +39,6 @@ use tokio::{
}; };
use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use mango_feeds_lib::{
fill_event_filter::FillEventType,
fill_event_postgres_target,
metrics::{MetricType, MetricU64},
orderbook_filter::MarketConfig,
FilterConfig, PostgresConfig, PostgresTlsConfig, StatusResponse,
};
use mango_feeds_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
};
use serde::Deserialize; use serde::Deserialize;
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>; type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
@ -47,33 +49,12 @@ type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
#[global_allocator] #[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "command")]
pub enum Command {
#[serde(rename = "subscribe")]
Subscribe(SubscribeCommand),
#[serde(rename = "unsubscribe")]
Unsubscribe(UnsubscribeCommand),
#[serde(rename = "getMarkets")]
GetMarkets,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeCommand {
pub market_id: String,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UnsubscribeCommand {
pub market_id: String,
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Peer { pub struct Peer {
pub sender: UnboundedSender<Message>, pub sender: UnboundedSender<Message>,
pub subscriptions: HashSet<String>, pub market_subscriptions: HashSet<String>,
pub account_subscriptions: HashSet<String>,
pub head_updates: bool,
} }
async fn handle_connection_error( async fn handle_connection_error(
@ -123,7 +104,9 @@ async fn handle_connection(
addr, addr,
Peer { Peer {
sender: chan_tx, sender: chan_tx,
subscriptions: HashSet::<String>::new(), market_subscriptions: HashSet::<String>::new(),
account_subscriptions: HashSet::<String>::new(),
head_updates: false,
}, },
); );
} }
@ -168,9 +151,14 @@ fn handle_commands(
let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str); let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str);
let mut peers = peer_map.lock().unwrap(); let mut peers = peer_map.lock().unwrap();
let peer = peers.get_mut(&addr).expect("peer should be in map"); let peer = peers.get_mut(&addr).expect("peer should be in map");
match command { match command {
Ok(Command::Subscribe(cmd)) => { Ok(Command::Subscribe(cmd)) => {
let market_id = cmd.clone().market_id; let mut wildcard = true;
// DEPRECATED
match cmd.market_id {
Some(market_id) => {
wildcard = false;
match market_ids.get(&market_id) { match market_ids.get(&market_id) {
None => { None => {
let res = StatusResponse { let res = StatusResponse {
@ -184,7 +172,7 @@ fn handle_commands(
} }
_ => {} _ => {}
} }
let subscribed = peer.subscriptions.insert(market_id.clone()); let subscribed = peer.market_subscriptions.insert(market_id.clone());
let res = if subscribed { let res = if subscribed {
StatusResponse { StatusResponse {
@ -212,13 +200,102 @@ fn handle_commands(
)) ))
.unwrap(); .unwrap();
} }
None => info!("no checkpoint available on client subscription"), None => info!(
"no checkpoint available on client subscription for market {}",
&market_id
),
}; };
} }
} }
None => {}
}
match cmd.market_ids {
Some(cmd_market_ids) => {
wildcard = false;
for market_id in cmd_market_ids {
match market_ids.get(&market_id) {
None => {
let res = StatusResponse {
success: false,
message: &format!("market {} not found", &market_id),
};
peer.sender
.unbounded_send(Message::Text(
serde_json::to_string(&res).unwrap(),
))
.unwrap();
return future::ok(());
}
_ => {}
}
if peer.market_subscriptions.insert(market_id.clone()) {
let checkpoint_map = checkpoint_map.lock().unwrap();
let checkpoint = checkpoint_map.get(&market_id);
let res = StatusResponse {
success: true,
message: &format!("subscribed to market {}", &market_id),
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
match checkpoint {
Some(checkpoint) => {
peer.sender
.unbounded_send(Message::Text(
serde_json::to_string(&checkpoint).unwrap(),
))
.unwrap();
}
None => info!(
"no checkpoint available on client subscription for market {}",
&market_id
),
};
}
}
}
None => {}
}
match cmd.account_ids {
Some(account_ids) => {
wildcard = false;
for account_id in account_ids {
if peer.account_subscriptions.insert(account_id.clone()) {
let res = StatusResponse {
success: true,
message: &format!("subscribed to account {}", &account_id),
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
}
}
}
None => {}
}
if wildcard {
for (market_id, market_name) in market_ids {
if peer.market_subscriptions.insert(market_id.clone()) {
let res = StatusResponse {
success: true,
message: &format!("subscribed to market {}", &market_name),
};
peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap();
}
}
}
if let Some(head_updates) = cmd.head_updates {
peer.head_updates = head_updates;
}
}
Ok(Command::Unsubscribe(cmd)) => { Ok(Command::Unsubscribe(cmd)) => {
info!("unsubscribe {}", cmd.market_id); info!("unsubscribe {}", cmd.market_id);
let unsubscribed = peer.subscriptions.remove(&cmd.market_id); let unsubscribed = peer.market_subscriptions.remove(&cmd.market_id);
let res = if unsubscribed { let res = if unsubscribed {
StatusResponse { StatusResponse {
success: true, success: true,
@ -259,6 +336,7 @@ fn handle_commands(
pub struct Config { pub struct Config {
pub source: SourceConfig, pub source: SourceConfig,
pub metrics: MetricsConfig, pub metrics: MetricsConfig,
pub postgres: Option<PostgresConfig>,
pub bind_ws_addr: String, pub bind_ws_addr: String,
pub rpc_http_url: String, pub rpc_http_url: String,
pub mango_group: String, pub mango_group: String,
@ -267,6 +345,7 @@ pub struct Config {
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
let exit: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
if args.len() < 2 { if args.len() < 2 {
eprintln!("Please enter a config file path argument."); eprintln!("Please enter a config file path argument.");
@ -370,7 +449,7 @@ async fn main() -> anyhow::Result<()> {
.map(|(_, context)| (context.address, context.market.event_queue)) .map(|(_, context)| (context.address, context.market.event_queue))
.collect(); .collect();
let a: Vec<(String, String)> = group_context let _a: Vec<(String, String)> = group_context
.serum3_markets .serum3_markets
.iter() .iter()
.map(|(_, context)| { .map(|(_, context)| {
@ -390,31 +469,21 @@ async fn main() -> anyhow::Result<()> {
) )
}) })
.collect(); .collect();
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect(); let market_pubkey_strings: HashMap<String, String> = [b].concat().into_iter().collect();
// TODO: read all this from config let postgres_update_sender = match config.postgres {
let pgconf = PostgresConfig { Some(postgres_config) => Some(
connection_string: "$PG_CONNECTION_STRING".to_owned(), fill_event_postgres_target::init(&postgres_config, metrics_tx.clone(), exit.clone())
connection_count: 1, .await?,
max_batch_size: 1, ),
max_queue_size: 50_000, None => None,
retry_query_max_count: 10,
retry_query_sleep_secs: 2,
retry_connection_sleep_secs: 10,
fatal_connection_timeout_secs: 120,
allow_invalid_certs: true,
tls: Some(PostgresTlsConfig {
ca_cert_path: "$PG_CA_CERT".to_owned(),
client_key_path: "$PG_CLIENT_KEY".to_owned(),
}),
}; };
let postgres_update_sender =
fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?;
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init( let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
perp_market_configs.clone(), perp_market_configs.clone(),
spot_market_configs.clone(), spot_market_configs.clone(),
metrics_tx.clone(), metrics_tx.clone(),
exit.clone(),
) )
.await?; .await?;
@ -439,9 +508,11 @@ async fn main() -> anyhow::Result<()> {
let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() { for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update.clone()).unwrap(); let json = serde_json::to_string(&update.clone()).unwrap();
let is_subscribed = peer.market_subscriptions.contains(&update.market_key)
|| peer.account_subscriptions.contains(&update.event.taker)
|| peer.account_subscriptions.contains(&update.event.maker);
// only send updates if the peer is subscribed // only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market_key) { if is_subscribed {
let result = peer.sender.send(Message::Text(json)).await; let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() { if result.is_err() {
error!( error!(
@ -451,16 +522,13 @@ async fn main() -> anyhow::Result<()> {
} }
} }
} }
// send taker fills to db // send fills to db
let update_c = update.clone(); let update_c = update.clone();
match update_c.event.event_type { match (postgres_update_sender.clone(), update_c.event.event_type) {
FillEventType::Perp => { (Some(sender), FillEventType::Perp) => {
if !update_c.event.maker { sender.send(update_c).await.unwrap();
debug!("{:?}", update_c);
postgres_update_sender.send(update_c).await.unwrap();
} }
} _ => {}
_ => warn!("failed to write spot event to db"),
} }
} }
FillEventFilterMessage::Checkpoint(checkpoint) => { FillEventFilterMessage::Checkpoint(checkpoint) => {
@ -469,10 +537,32 @@ async fn main() -> anyhow::Result<()> {
.unwrap() .unwrap()
.insert(checkpoint.queue.clone(), checkpoint); .insert(checkpoint.queue.clone(), checkpoint);
} }
FillEventFilterMessage::HeadUpdate(update) => {
debug!(
"ws update {} {:?} {} {} head",
update.market_name, update.status, update.head, update.prev_head
);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update.clone()).unwrap();
let is_subscribed = peer.market_subscriptions.contains(&update.market_key);
// only send updates if the peer is subscribed
if peer.head_updates && is_subscribed {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!(
"ws update {} head could not reach {}",
update.market_name, addr
);
}
}
}
}
} }
} }
}); });
// websocket listener
info!("ws listen: {}", config.bind_ws_addr); info!("ws listen: {}", config.bind_ws_addr);
let try_socket = TcpListener::bind(&config.bind_ws_addr).await; let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
let listener = try_socket.expect("Failed to bind"); let listener = try_socket.expect("Failed to bind");
@ -511,6 +601,17 @@ async fn main() -> anyhow::Result<()> {
} }
}); });
} }
// handle sigint
{
let exit = exit.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("Received SIGINT, shutting down...");
exit.store(true, Ordering::Relaxed);
});
}
info!( info!(
"rpc connect: {}", "rpc connect: {}",
config config
@ -534,6 +635,7 @@ async fn main() -> anyhow::Result<()> {
account_write_queue_sender, account_write_queue_sender,
slot_queue_sender, slot_queue_sender,
metrics_tx.clone(), metrics_tx.clone(),
exit.clone(),
) )
.await; .await;
} else { } else {

View File

@ -1,68 +0,0 @@
bind_ws_addr = "0.0.0.0:8080"
[metrics]
output_stdout = true
output_http = true
[source]
dedup_queue_size = 50000
rpc_ws_url = ""
[[source.grpc_sources]]
name = "accountsdb-client"
connection_string = "$GEYSER_CONNECTION_STRING"
retry_connection_sleep_secs = 30
[source.grpc_sources.tls]
ca_cert_path = "$GEYSER_CA_CERT"
client_cert_path = "$GEYSER_CLIENT_CERT"
client_key_path = "$GEYSER_CLIENT_CERT"
domain_name = "$GEYSER_CERT_DOMAIN"
[source.snapshot]
rpc_http_url = "$RPC_HTTP_URL"
program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
[[markets]]
name = "BTC-PERP"
event_queue = "7t5Me8RieYKsFpfLEV8jnpqcqswNpyWD95ZqgUXuLV8Z"
[[markets]]
name = "ETH-PERP"
event_queue = "9vDfKNPJkCvQv9bzR4JNTGciQC2RVHPVNMMHiVDgT1mw"
[[markets]]
name = "SOL-PERP"
event_queue = "31cKs646dt1YkA3zPyxZ7rUAkxTBz279w4XEobFXcAKP"
[[markets]]
name = "MNGO-PERP"
event_queue = "7orixrhZpjvofZGWZyyLFxSEt2tfFiost5kHEzd7jdet"
[[markets]]
name = "SRM-PERP"
event_queue = "BXSPmdHWP6fMqsCsT6kG8UN9uugAJxdDkQWy87njUQnL"
[[markets]]
name = "RAY-PERP"
event_queue = "Css2MQhEvXMTKjp9REVZR9ZyUAYAZAPrnDvRoPxrQkeN"
[[markets]]
name = "FTT-PERP"
event_queue = "5pHAhyEphQRVvLqvYF7dziofR52yZWuq8DThQFJvJ7r5"
[[markets]]
name = "ADA-PERP"
event_queue = "G6Dsw9KnP4G38hePtedTH6gDfDQmPJGJw8zipBJvKc12"
[[markets]]
name = "BNB-PERP"
event_queue = "GmX4qXMpXvs1DuUXNB4eqL1rfF8LeYEjkKgpFeYsm55n"
[[markets]]
name = "AVAX-PERP"
event_queue = "5Grgo9kLu692SUcJ6S7jtbi1WkdwiyRWgThAfN1PcvbL"
[[markets]]
name = "GMT-PERP"
event_queue = "J2WYiw67VeGkPvmM3fi65H9KxDgCf79fNwspcD3ycubK"

View File

@ -24,9 +24,12 @@ async-trait = "0.1"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17" tokio-tungstenite = "0.17"
bytemuck = "1.7.2" bytemuck = "1.7.2"
itertools = "0.10.5"
solana-sdk = "~1.14.9"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "https://github.com/openbook-dex/program", branch = "master" } serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things", features = ["no-entrypoint"] }
anchor-lang = "0.25.0" anchor-lang = "0.25.0"
anchor-client = "0.25.0" anchor-client = "0.25.0"

View File

@ -6,7 +6,7 @@ This module parses bookside accounts and exposes L2 data and updates on a websoc
1. Prepare the connector configuration file. 1. Prepare the connector configuration file.
[Here is an example](service-mango-fills/example-config.toml). [Here is an example](service-mango-orderbook/conf/example-config.toml).
- `bind_ws_addr` is the listen port for the websocket clients - `bind_ws_addr` is the listen port for the websocket clients
- `rpc_ws_url` is unused and can stay empty. - `rpc_ws_url` is unused and can stay empty.

View File

@ -0,0 +1,20 @@
bind_ws_addr = "[::]:8080"
rpc_http_url = "$RPC_HTTP_URL"
mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX"
[metrics]
output_stdout = true
output_http = true
[source]
dedup_queue_size = 50000
rpc_ws_url = "$RPC_WS_URL"
[[source.grpc_sources]]
name = "accountsdb-client"
connection_string = "$GEYSER_CONNECTION_STRING"
retry_connection_sleep_secs = 30
[source.snapshot]
rpc_http_url = "$RPC_HTTP_URL"
program_id = "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX"

View File

@ -0,0 +1,59 @@
use mango_feeds_lib::OrderbookSide;
use serde::{ser::SerializeStruct, Serialize, Serializer};
pub type OrderbookLevel = [f64; 2];
#[derive(Clone, Debug)]
pub struct OrderbookUpdate {
pub market: String,
pub side: OrderbookSide,
pub update: Vec<OrderbookLevel>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for OrderbookUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("OrderbookUpdate", 5)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("side", &self.side)?;
state.serialize_field("update", &self.update)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
#[derive(Clone, Debug)]
pub struct OrderbookCheckpoint {
pub market: String,
pub bids: Vec<OrderbookLevel>,
pub asks: Vec<OrderbookLevel>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for OrderbookCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("OrderbookCheckpoint", 3)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("bids", &self.bids)?;
state.serialize_field("asks", &self.asks)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
pub enum OrderbookFilterMessage {
Update(OrderbookUpdate),
Checkpoint(OrderbookCheckpoint),
}

View File

@ -1,3 +1,5 @@
mod orderbook_filter;
use anchor_client::{ use anchor_client::{
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}, solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
Cluster, Cluster,
@ -17,7 +19,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
str::FromStr, str::FromStr,
sync::Arc, sync::Arc,
sync::Mutex, sync::{atomic::AtomicBool, Mutex},
time::Duration, time::Duration,
}; };
use tokio::{ use tokio::{
@ -26,14 +28,17 @@ use tokio::{
}; };
use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use mango_feeds_lib::{grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig}; use mango_feeds_lib::{
grpc_plugin_source, metrics, websocket_source, MarketConfig, MetricsConfig, SourceConfig,
};
use mango_feeds_lib::{ use mango_feeds_lib::{
metrics::{MetricType, MetricU64}, metrics::{MetricType, MetricU64},
orderbook_filter::{self, MarketConfig, OrderbookCheckpoint, OrderbookFilterMessage},
FilterConfig, StatusResponse, FilterConfig, StatusResponse,
}; };
use serde::Deserialize; use serde::Deserialize;
use service_mango_orderbook::{OrderbookCheckpoint, OrderbookFilterMessage};
type CheckpointMap = Arc<Mutex<HashMap<String, OrderbookCheckpoint>>>; type CheckpointMap = Arc<Mutex<HashMap<String, OrderbookCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>; type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
@ -246,6 +251,7 @@ fn handle_commands(
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
let exit: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
if args.len() < 2 { if args.len() < 2 {
eprintln!("Please enter a config file path argument"); eprintln!("Please enter a config file path argument");
@ -443,6 +449,7 @@ async fn main() -> anyhow::Result<()> {
account_write_queue_sender, account_write_queue_sender,
slot_queue_sender, slot_queue_sender,
metrics_tx.clone(), metrics_tx.clone(),
exit.clone(),
) )
.await; .await;
} else { } else {

View File

@ -1,154 +1,36 @@
use crate::metrics::MetricU64; use anchor_lang::AccountDeserialize;
use crate::{ use itertools::Itertools;
use log::*;
use mango_feeds_lib::metrics::MetricU64;
use mango_feeds_lib::{
base_lots_to_ui, base_lots_to_ui_perp, price_lots_to_ui, price_lots_to_ui_perp, MarketConfig,
OrderbookSide,
};
use mango_feeds_lib::{
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
metrics::{MetricType, Metrics}, metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate, AccountWrite, SlotUpdate,
}; };
use anchor_lang::AccountDeserialize;
use itertools::Itertools;
use log::*;
use mango_v4::{ use mango_v4::{
serum3_cpi::OrderBookStateHeader, serum3_cpi::OrderBookStateHeader,
state::{BookSide, OrderTreeType}, state::{BookSide, OrderTreeType},
}; };
use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::critbit::Slab; use serum_dex::critbit::Slab;
use service_mango_orderbook::{
OrderbookCheckpoint, OrderbookFilterMessage, OrderbookLevel, OrderbookUpdate,
};
use solana_sdk::{ use solana_sdk::{
account::{ReadableAccount, WritableAccount}, account::{ReadableAccount, WritableAccount},
clock::Epoch, clock::Epoch,
pubkey::Pubkey, pubkey::Pubkey,
}; };
use std::borrow::BorrowMut;
use std::{ use std::{
borrow::BorrowMut,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
mem::size_of, mem::size_of,
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
#[derive(Clone, Debug)]
pub enum OrderbookSide {
Bid = 0,
Ask = 1,
}
impl Serialize for OrderbookSide {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"),
OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"),
}
}
}
pub type OrderbookLevel = [f64; 2];
#[derive(Clone, Debug)]
pub struct OrderbookUpdate {
pub market: String,
pub side: OrderbookSide,
pub update: Vec<OrderbookLevel>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for OrderbookUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("OrderbookUpdate", 5)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("side", &self.side)?;
state.serialize_field("update", &self.update)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
#[derive(Clone, Debug)]
pub struct OrderbookCheckpoint {
pub market: String,
pub bids: Vec<OrderbookLevel>,
pub asks: Vec<OrderbookLevel>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for OrderbookCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("OrderbookCheckpoint", 3)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("bids", &self.bids)?;
state.serialize_field("asks", &self.asks)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
pub enum OrderbookFilterMessage {
Update(OrderbookUpdate),
Checkpoint(OrderbookCheckpoint),
}
#[derive(Clone, Debug)]
pub struct MarketConfig {
pub name: String,
pub bids: Pubkey,
pub asks: Pubkey,
pub event_queue: Pubkey,
pub base_decimals: u8,
pub quote_decimals: u8,
pub base_lot_size: i64,
pub quote_lot_size: i64,
}
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
(native * base_lot_size) as f64 / 10i64.pow(base_decimals.into()) as f64
}
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
let decimals = base_decimals - quote_decimals;
native as f64 / (10i64.pow(decimals.into()) as f64)
}
pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
let decimals = base_decimals - quote_decimals;
native as f64 / (10u64.pow(decimals.into())) as f64
}
pub fn spot_price_to_ui(
native: i64,
native_size: i64,
base_decimals: u8,
quote_decimals: u8,
) -> f64 {
// TODO: account for fees
((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size))
as f64
}
pub fn price_lots_to_ui_perp(
native: i64,
base_decimals: u8,
quote_decimals: u8,
base_lot_size: i64,
quote_lot_size: i64,
) -> f64 {
let decimals = base_decimals - quote_decimals;
let multiplier = 10u64.pow(decimals.into()) as f64;
native as f64 * ((multiplier * quote_lot_size as f64) / base_lot_size as f64)
}
fn publish_changes( fn publish_changes(
slot: u64, slot: u64,
write_version: u64, write_version: u64,
@ -378,7 +260,7 @@ pub async fn init(
.map(|(_, quantity)| quantity) .map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x), .fold(0, |acc, x| acc + x),
mkt.1.base_decimals, mkt.1.base_decimals,
mkt.1.quote_decimals, mkt.1.base_lot_size,
), ),
] ]
}) })

View File

@ -9,7 +9,7 @@ use {
fs::File, fs::File,
io::Read, io::Read,
mem::size_of, mem::size_of,
sync::{Arc, RwLock}, sync::{atomic::AtomicBool, Arc, RwLock},
time::Duration, time::Duration,
}, },
}; };
@ -222,6 +222,8 @@ fn start_jsonrpc_server(
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let exit: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
if args.len() < 2 { if args.len() < 2 {
println!("requires a config file argument"); println!("requires a config file argument");
@ -304,6 +306,7 @@ async fn main() -> anyhow::Result<()> {
account_write_queue_sender, account_write_queue_sender,
slot_queue_sender, slot_queue_sender,
metrics_tx.clone(), metrics_tx.clone(),
exit.clone(),
) )
.await; .await;