From b6912202bd4b14db1c1588b1446dd2506073cdb1 Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Fri, 7 Apr 2023 17:27:54 +0200 Subject: [PATCH] Refactoring, Fills Feed changes (#1) lib: - Move fill/orderbook filters out of shared lib into the services - Add some common structs to shared lib - Add libraries to fills/orderbook services feeds: - Add graceful exit handling - Publish single perp fill event for both maker and taker - Disable openbook fills processing - Fix perp fill event quantity decimals - Handle revoked fills in postgres - Allow subscriptions to multiple and all markets, and accounts - Add event queue head updates --- Cargo.lock | 41 +- Dockerfile | 6 +- cd/fills.toml | 4 +- connector/src/grpc_plugin_source.rs | 9 + lib/Cargo.toml | 2 +- lib/src/fill_event_filter.rs | 843 ------------------ lib/src/lib.rs | 80 +- lib/src/serum.rs | 2 +- service-mango-crank/src/main.rs | 12 +- service-mango-fills/Cargo.toml | 15 +- service-mango-fills/README.md | 85 +- service-mango-fills/conf/example-config.toml | 35 + service-mango-fills/conf/template-config.toml | 35 + service-mango-fills/config-devnet.toml | 53 -- service-mango-fills/src/fill_event_filter.rs | 653 ++++++++++++++ .../src/fill_event_postgres_target.rs | 94 +- service-mango-fills/src/lib.rs | 334 +++++++ service-mango-fills/src/main.rs | 300 +++++-- service-mango-fills/template-config.toml | 68 -- service-mango-orderbook/Cargo.toml | 5 +- service-mango-orderbook/README.md | 2 +- .../conf}/example-config.toml | 0 .../conf/template-config.toml | 20 + service-mango-orderbook/src/lib.rs | 59 ++ service-mango-orderbook/src/main.rs | 13 +- .../src/orderbook_filter.rs | 146 +-- .../{ => conf}/example-config.toml | 0 .../{ => conf}/template-config.toml | 0 service-mango-pnl/src/main.rs | 5 +- 29 files changed, 1640 insertions(+), 1281 deletions(-) delete mode 100644 lib/src/fill_event_filter.rs create mode 100644 service-mango-fills/conf/example-config.toml create mode 100644 service-mango-fills/conf/template-config.toml delete mode 100644 service-mango-fills/config-devnet.toml create mode 100644 service-mango-fills/src/fill_event_filter.rs rename {lib => service-mango-fills}/src/fill_event_postgres_target.rs (78%) create mode 100644 service-mango-fills/src/lib.rs delete mode 100644 service-mango-fills/template-config.toml rename {service-mango-fills => service-mango-orderbook/conf}/example-config.toml (100%) create mode 100644 service-mango-orderbook/conf/template-config.toml create mode 100644 service-mango-orderbook/src/lib.rs rename {lib => service-mango-orderbook}/src/orderbook_filter.rs (81%) rename service-mango-pnl/{ => conf}/example-config.toml (100%) rename service-mango-pnl/{ => conf}/template-config.toml (100%) diff --git a/Cargo.lock b/Cargo.lock index a002fb7..973290c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5181,32 +5181,6 @@ dependencies = [ "without-alloc", ] -[[package]] -name = "serum_dex" -version = "0.5.10" -source = "git+https://github.com/openbook-dex/program?branch=master#c85e56deeaead43abbc33b7301058838b9c5136d" -dependencies = [ - "anchor-lang", - "arrayref", - "bincode", - "bytemuck", - "byteorder", - "default-env", - "enumflags2", - "field-offset", - "itertools 0.10.5", - "num-traits", - "num_enum", - "safe-transmute", - "serde", - "solana-program", - "solana-security-txt", - "spl-token", - "static_assertions", - "thiserror", - "without-alloc", -] - [[package]] name = "serum_dex" version = "0.5.10" @@ -5272,21 +5246,32 @@ dependencies = [ "anyhow", "async-channel", "async-trait", + "base64 0.21.0", "bs58 0.4.0", "bytemuck", + "chrono", "client", + "futures 0.3.26", "futures-channel", + "futures-core", "futures-util", "jemallocator", "log 0.4.17", "mango-feeds-lib", "mango-v4", + "native-tls", + "postgres-native-tls", + "postgres-types", + "postgres_query", "serde", "serde_derive", "serde_json", "serum_dex 0.5.10 (git+https://github.com/jup-ag/openbook-program?branch=feat/expose-things)", "solana-logger", + "solana-sdk", "tokio", + "tokio-postgres", + "tokio-postgres-rustls", "tokio-tungstenite", "toml 0.7.1", "ws", @@ -5306,14 +5291,16 @@ dependencies = [ "client", "futures-channel", "futures-util", + "itertools 0.10.5", "log 0.4.17", "mango-feeds-lib", "mango-v4", "serde", "serde_derive", "serde_json", - "serum_dex 0.5.10 (git+https://github.com/openbook-dex/program?branch=master)", + "serum_dex 0.5.10 (git+https://github.com/jup-ag/openbook-program?branch=feat/expose-things)", "solana-logger", + "solana-sdk", "tokio", "tokio-tungstenite", "toml 0.7.1", diff --git a/Dockerfile b/Dockerfile index 4fb782a..418f711 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,6 +20,6 @@ RUN cargo build --release --bin service-mango-fills --bin service-mango-pnl --bi FROM debian:bullseye-slim as run RUN apt-get update && apt-get -y install ca-certificates libc6 COPY --from=build /app/target/release/service-mango-* /usr/local/bin/ -COPY --from=build /app/service-mango-pnl/template-config.toml ./pnl-config.toml -COPY --from=build /app/service-mango-fills/template-config.toml ./fills-config.toml -COPY --from=build /app/service-mango-orderbook/template-config.toml ./orderbook-config.toml \ No newline at end of file +COPY --from=build /app/service-mango-pnl/conf/template-config.toml ./pnl-config.toml +COPY --from=build /app/service-mango-fills/conf/template-config.toml ./fills-config.toml +COPY --from=build /app/service-mango-orderbook/conf/template-config.toml ./orderbook-config.toml \ No newline at end of file diff --git a/cd/fills.toml b/cd/fills.toml index aa63a60..27c8e00 100644 --- a/cd/fills.toml +++ b/cd/fills.toml @@ -1,6 +1,6 @@ app = "mango-fills" -kill_signal = "SIGINT" -kill_timeout = 5 +kill_signal = "SIGTERM" +kill_timeout = 30 [build] dockerfile = "../Dockerfile" diff --git a/connector/src/grpc_plugin_source.rs b/connector/src/grpc_plugin_source.rs index 5ca2505..508b2b4 100644 --- a/connector/src/grpc_plugin_source.rs +++ b/connector/src/grpc_plugin_source.rs @@ -16,6 +16,8 @@ use yellowstone_grpc_proto::tonic::{ }; use log::*; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::{collections::HashMap, env, str::FromStr, time::Duration}; use yellowstone_grpc_proto::prelude::{ @@ -197,6 +199,7 @@ async fn feed_data_geyser( // Highest slot that an account write came in for. let mut newest_write_slot: u64 = 0; + #[derive(Clone, Debug)] struct WriteVersion { // Write version seen on-chain global: u64, @@ -386,6 +389,7 @@ pub async fn process_events( account_write_queue_sender: async_channel::Sender, slot_queue_sender: async_channel::Sender, metrics_sender: Metrics, + exit: Arc, ) { // Subscribe to geyser let (msg_sender, msg_receiver) = async_channel::bounded::(config.dedup_queue_size); @@ -468,6 +472,11 @@ pub async fn process_events( metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter); loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down grpc_plugin_source..."); + break; + } + metric_dedup_queue.set(msg_receiver.len() as u64); let msg = msg_receiver.recv().await.expect("sender must not close"); match msg { diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 03d8a6e..86d9221 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -38,7 +38,7 @@ serde_derive = "1.0.130" serde_json = "1.0.68" bs58 = "*" -base64 = "*" +base64 = "0.21.0" log = "0.4" rand = "0.7" anyhow = "1.0" diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs deleted file mode 100644 index 2eab6d5..0000000 --- a/lib/src/fill_event_filter.rs +++ /dev/null @@ -1,843 +0,0 @@ -use crate::{ - chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, - metrics::{MetricType, Metrics}, - orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide}, - serum::SerumEventQueueHeader, - AccountWrite, SlotUpdate, -}; -use bytemuck::cast_slice; -use chrono::{TimeZone, Utc}; -use log::*; -use serde::{ser::SerializeStruct, Serialize, Serializer}; -use serum_dex::state::EventView as SpotEvent; -use solana_sdk::{ - account::{ReadableAccount, WritableAccount}, - clock::Epoch, - pubkey::Pubkey, -}; -use std::{ - borrow::BorrowMut, - cmp::max, - collections::{HashMap, HashSet}, - convert::identity, - time::SystemTime, -}; - -use crate::metrics::MetricU64; -use anchor_lang::AccountDeserialize; -use mango_v4::state::{ - AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, Side, - MAX_NUM_EVENTS, -}; - -#[derive(Clone, Copy, Debug)] -pub enum FillUpdateStatus { - New, - Revoke, -} - -impl Serialize for FillUpdateStatus { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match *self { - FillUpdateStatus::New => { - serializer.serialize_unit_variant("FillUpdateStatus", 0, "new") - } - FillUpdateStatus::Revoke => { - serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke") - } - } - } -} - -#[derive(Clone, Copy, Debug)] -pub enum FillEventType { - Spot, - Perp, -} - -impl Serialize for FillEventType { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match *self { - FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"), - FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"), - } - } -} - -#[derive(Clone, Debug)] -pub struct FillEvent { - pub event_type: FillEventType, - pub maker: bool, - pub side: OrderbookSide, - pub timestamp: u64, - pub seq_num: u64, - pub owner: String, - pub client_order_id: u64, - pub fee: f32, - pub price: f64, - pub quantity: f64, -} - -impl Serialize for FillEvent { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut state = serializer.serialize_struct("FillEvent", 12)?; - state.serialize_field("eventType", &self.event_type)?; - state.serialize_field("maker", &self.maker)?; - state.serialize_field("side", &self.side)?; - state.serialize_field( - "timestamp", - &Utc.timestamp_opt(self.timestamp as i64, 0) - .unwrap() - .to_rfc3339(), - )?; - state.serialize_field("seqNum", &self.seq_num)?; - state.serialize_field("owner", &self.owner)?; - state.serialize_field("clientOrderId", &self.client_order_id)?; - state.serialize_field("fee", &self.fee)?; - state.serialize_field("price", &self.price)?; - state.serialize_field("quantity", &self.quantity)?; - state.end() - } -} - -impl FillEvent { - pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> [Self; 2] { - let taker_side = match event.taker_side() { - Side::Ask => OrderbookSide::Ask, - Side::Bid => OrderbookSide::Bid, - }; - let maker_side = match event.taker_side() { - Side::Ask => OrderbookSide::Bid, - Side::Bid => OrderbookSide::Ask, - }; - let price = price_lots_to_ui_perp( - event.price, - config.base_decimals, - config.quote_decimals, - config.base_lot_size, - config.quote_lot_size, - ); - let quantity = - base_lots_to_ui_perp(event.quantity, config.base_decimals, config.quote_decimals); - [ - FillEvent { - event_type: FillEventType::Perp, - maker: true, - side: maker_side, - timestamp: event.timestamp, - seq_num: event.seq_num, - owner: event.maker.to_string(), - client_order_id: event.maker_client_order_id, - fee: event.maker_fee, - price: price, - quantity: quantity, - }, - FillEvent { - event_type: FillEventType::Perp, - maker: false, - side: taker_side, - timestamp: event.timestamp, - seq_num: event.seq_num, - owner: event.taker.to_string(), - client_order_id: event.taker_client_order_id, - fee: event.taker_fee, - price: price, - quantity: quantity, - }, - ] - } - pub fn new_from_spot( - event: SpotEvent, - timestamp: u64, - seq_num: u64, - config: &MarketConfig, - ) -> Self { - match event { - SpotEvent::Fill { - side, - maker, - native_qty_paid, - native_qty_received, - native_fee_or_rebate, - owner, - client_order_id, - .. - } => { - let side = match side as u8 { - 0 => OrderbookSide::Bid, - 1 => OrderbookSide::Ask, - _ => panic!("invalid side"), - }; - let client_order_id: u64 = match client_order_id { - Some(id) => id.into(), - None => 0u64, - }; - - let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64; - let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64; - - let (price, quantity) = match side { - OrderbookSide::Bid => { - let price_before_fees = if maker { - native_qty_paid + native_fee_or_rebate - } else { - native_qty_paid - native_fee_or_rebate - }; - - let top = price_before_fees * base_multiplier; - let bottom = quote_multiplier * native_qty_received; - let price = top as f64 / bottom as f64; - let quantity = native_qty_received as f64 / base_multiplier as f64; - (price, quantity) - } - OrderbookSide::Ask => { - let price_before_fees = if maker { - native_qty_received - native_fee_or_rebate - } else { - native_qty_received + native_fee_or_rebate - }; - - let top = price_before_fees * base_multiplier; - let bottom = quote_multiplier * native_qty_paid; - let price = top as f64 / bottom as f64; - let quantity = native_qty_paid as f64 / base_multiplier as f64; - (price, quantity) - } - }; - - let fee = native_fee_or_rebate as f32 / quote_multiplier as f32; - - FillEvent { - event_type: FillEventType::Spot, - maker: maker, - side, - timestamp, - seq_num, - owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(), - client_order_id: client_order_id, - fee, - price, - quantity, - } - } - SpotEvent::Out { .. } => { - panic!("Can't build FillEvent from SpotEvent::Out") - } - } - } -} - -#[derive(Clone, Debug)] -pub struct FillUpdate { - pub event: FillEvent, - pub status: FillUpdateStatus, - pub market_key: String, - pub market_name: String, - pub slot: u64, - pub write_version: u64, -} - -impl Serialize for FillUpdate { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut state = serializer.serialize_struct("FillUpdate", 6)?; - state.serialize_field("event", &self.event)?; - state.serialize_field("marketKey", &self.market_key)?; - state.serialize_field("marketName", &self.market_name)?; - state.serialize_field("status", &self.status)?; - state.serialize_field("slot", &self.slot)?; - state.serialize_field("writeVersion", &self.write_version)?; - - state.end() - } -} - -#[derive(Clone, Debug)] -pub struct FillCheckpoint { - pub market: String, - pub queue: String, - pub events: Vec, - pub slot: u64, - pub write_version: u64, -} - -impl Serialize for FillCheckpoint { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut state = serializer.serialize_struct("FillCheckpoint", 3)?; - state.serialize_field("events", &self.events)?; - state.serialize_field("market", &self.market)?; - state.serialize_field("queue", &self.queue)?; - state.serialize_field("slot", &self.slot)?; - state.serialize_field("write_version", &self.write_version)?; - - state.end() - } -} - -pub enum FillEventFilterMessage { - Update(FillUpdate), - Checkpoint(FillCheckpoint), -} - -// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue -type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize]; - -fn publish_changes_perp( - slot: u64, - write_version: u64, - mkt: &(Pubkey, MarketConfig), - header: &EventQueueHeader, - events: &EventQueueEvents, - old_seq_num: u64, - old_events: &EventQueueEvents, - fill_update_sender: &async_channel::Sender, - metric_events_new: &mut MetricU64, - metric_events_change: &mut MetricU64, - metric_events_drop: &mut MetricU64, -) { - // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available - let start_seq_num = max(old_seq_num, header.seq_num) - .checked_sub(MAX_NUM_EVENTS as u64) - .unwrap_or(0); - let mut checkpoint = Vec::new(); - let mkt_pk_string = mkt.0.to_string(); - let evq_pk_string = mkt.1.event_queue.to_string(); - for seq_num in start_seq_num..header.seq_num { - let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; - - // there are three possible cases: - // 1) the event is past the old seq num, hence guaranteed new event - // 2) the event is not matching the old event queue - // 3) all other events are matching the old event queue - // the order of these checks is important so they are exhaustive - if seq_num >= old_seq_num { - debug!( - "found new event {} idx {} type {} slot {} write_version {}", - mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version - ); - - metric_events_new.increment(); - - // new fills are published and recorded in checkpoint - if events[idx].event_type == EventType::Fill as u8 { - let fill: PerpFillEvent = bytemuck::cast(events[idx]); - let fills = FillEvent::new_from_perp(fill, &mkt.1); - // send event for both maker and taker - for fill in fills { - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill.clone(), - status: FillUpdateStatus::New, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(fill); - } - } - } else if old_events[idx].event_type != events[idx].event_type - || old_events[idx].padding != events[idx].padding - { - debug!( - "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", - mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num - ); - - metric_events_change.increment(); - - // first revoke old event if a fill - if old_events[idx].event_type == EventType::Fill as u8 { - let fill: PerpFillEvent = bytemuck::cast(old_events[idx]); - let fills = FillEvent::new_from_perp(fill, &mkt.1); - for fill in fills { - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill, - status: FillUpdateStatus::Revoke, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - } - } - - // then publish new if its a fill and record in checkpoint - if events[idx].event_type == EventType::Fill as u8 { - let fill: PerpFillEvent = bytemuck::cast(events[idx]); - let fills = FillEvent::new_from_perp(fill, &mkt.1); - for fill in fills { - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill.clone(), - status: FillUpdateStatus::New, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(fill); - } - } - } else { - // every already published event is recorded in checkpoint if a fill - if events[idx].event_type == EventType::Fill as u8 { - let fill: PerpFillEvent = bytemuck::cast(events[idx]); - let fills = FillEvent::new_from_perp(fill, &mkt.1); - for fill in fills { - checkpoint.push(fill); - } - } - } - } - - // in case queue size shrunk due to a fork we need revoke all previous fills - for seq_num in header.seq_num..old_seq_num { - let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; - debug!( - "found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}", - mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num, slot, write_version - ); - - metric_events_drop.increment(); - - if old_events[idx].event_type == EventType::Fill as u8 { - let fill: PerpFillEvent = bytemuck::cast(old_events[idx]); - let fills = FillEvent::new_from_perp(fill, &mkt.1); - for fill in fills { - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - event: fill, - write_version, - status: FillUpdateStatus::Revoke, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - } - } - } - - fill_update_sender - .try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint { - slot, - write_version, - events: checkpoint, - market: mkt_pk_string, - queue: evq_pk_string, - })) - .unwrap() -} - -fn publish_changes_serum( - slot: u64, - write_version: u64, - mkt: &(Pubkey, MarketConfig), - header: &SerumEventQueueHeader, - events: &[serum_dex::state::Event], - old_seq_num: u64, - old_events: &[serum_dex::state::Event], - fill_update_sender: &async_channel::Sender, - metric_events_new: &mut MetricU64, - metric_events_change: &mut MetricU64, - metric_events_drop: &mut MetricU64, -) { - // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available - let start_seq_num = max(old_seq_num, header.seq_num) - .checked_sub(MAX_NUM_EVENTS as u64) - .unwrap_or(0); - let mut checkpoint = Vec::new(); - let mkt_pk_string = mkt.0.to_string(); - let evq_pk_string = mkt.1.event_queue.to_string(); - let header_seq_num = header.seq_num; - debug!("start seq {} header seq {}", start_seq_num, header_seq_num); - - // Timestamp for spot events is time scraped - let timestamp = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - for seq_num in start_seq_num..header_seq_num { - let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; - let event_view = events[idx].as_view().unwrap(); - let old_event_view = old_events[idx].as_view().unwrap(); - - match event_view { - SpotEvent::Fill { .. } => { - // there are three possible cases: - // 1) the event is past the old seq num, hence guaranteed new event - // 2) the event is not matching the old event queue - // 3) all other events are matching the old event queue - // the order of these checks is important so they are exhaustive - let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1); - if seq_num >= old_seq_num { - debug!("found new serum fill {} idx {}", mkt_pk_string, idx,); - - metric_events_new.increment(); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill.clone(), - status: FillUpdateStatus::New, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(fill); - continue; - } - - match old_event_view { - SpotEvent::Fill { - client_order_id, .. - } => { - let client_order_id = match client_order_id { - Some(id) => id.into(), - None => 0u64, - }; - if client_order_id != fill.client_order_id { - debug!( - "found changed id event {} idx {} seq_num {} header seq num {} old seq num {}", - mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - ); - - metric_events_change.increment(); - - let old_fill = FillEvent::new_from_spot( - old_event_view, - timestamp, - seq_num, - &mkt.1, - ); - // first revoke old event - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: old_fill, - status: FillUpdateStatus::Revoke, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - - // then publish new - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill.clone(), - status: FillUpdateStatus::New, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - } - - // record new event in checkpoint - checkpoint.push(fill); - } - SpotEvent::Out { .. } => { - debug!( - "found changed type event {} idx {} seq_num {} header seq num {} old seq num {}", - mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - ); - - metric_events_change.increment(); - - // publish new fill and record in checkpoint - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill.clone(), - status: FillUpdateStatus::New, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(fill); - } - } - } - _ => continue, - } - } - - // in case queue size shrunk due to a fork we need revoke all previous fills - for seq_num in header_seq_num..old_seq_num { - let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; - let old_event_view = old_events[idx].as_view().unwrap(); - debug!( - "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", - mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - ); - - metric_events_drop.increment(); - - match old_event_view { - SpotEvent::Fill { .. } => { - let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - event: old_fill, - write_version, - status: FillUpdateStatus::Revoke, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - } - SpotEvent::Out { .. } => continue, - } - } - - fill_update_sender - .try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint { - slot, - write_version, - events: checkpoint, - market: mkt_pk_string, - queue: evq_pk_string, - })) - .unwrap() -} - -pub async fn init( - perp_market_configs: Vec<(Pubkey, MarketConfig)>, - spot_market_configs: Vec<(Pubkey, MarketConfig)>, - metrics_sender: Metrics, -) -> anyhow::Result<( - async_channel::Sender, - async_channel::Sender, - async_channel::Receiver, -)> { - let metrics_sender = metrics_sender.clone(); - - let mut metric_events_new = - metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Counter); - let mut metric_events_new_serum = - metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter); - let mut metric_events_change = - metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter); - let mut metric_events_change_serum = - metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter); - let mut metrics_events_drop = - metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter); - let mut metrics_events_drop_serum = - metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter); - - // The actual message may want to also contain a retry count, if it self-reinserts on failure? - let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::unbounded::(); - - // Slot updates flowing from the outside into the single processing thread. From - // there they'll flow into the postgres sending thread. - let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); - - // Fill updates can be consumed by client connections, they contain all fills for all markets - let (fill_update_sender, fill_update_receiver) = - async_channel::unbounded::(); - - let account_write_queue_receiver_c = account_write_queue_receiver.clone(); - - let mut chain_cache = ChainData::new(); - let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender); - let mut perp_events_cache: HashMap = HashMap::new(); - let mut serum_events_cache: HashMap> = HashMap::new(); - let mut seq_num_cache = HashMap::new(); - let mut last_evq_versions = HashMap::::new(); - - let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat(); - let perp_queue_pks: Vec = perp_market_configs - .iter() - .map(|x| x.1.event_queue) - .collect(); - let spot_queue_pks: Vec = spot_market_configs - .iter() - .map(|x| x.1.event_queue) - .collect(); - let all_queue_pks: HashSet = - HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat()); - - // update handling thread, reads both sloths and account updates - tokio::spawn(async move { - loop { - tokio::select! { - Ok(account_write) = account_write_queue_receiver_c.recv() => { - if !all_queue_pks.contains(&account_write.pubkey) { - continue; - } - - chain_cache.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - write_version: account_write.write_version, - account: WritableAccount::create( - account_write.lamports, - account_write.data.clone(), - account_write.owner, - account_write.executable, - account_write.rent_epoch as Epoch, - ), - }, - ); - } - Ok(slot_update) = slot_queue_receiver.recv() => { - chain_cache.update_slot(SlotData { - slot: slot_update.slot, - parent: slot_update.parent, - status: slot_update.status, - chain: 0, - }); - - } - Err(e) = slot_queue_receiver.recv() => { - warn!("slot update channel err {:?}", e); - } - Err(e) = account_write_queue_receiver_c.recv() => { - warn!("write update channel err {:?}", e); - } - } - - chain_data_metrics.report(&chain_cache); - - for mkt in all_market_configs.iter() { - let evq_pk = mkt.1.event_queue; - let evq_pk_string = evq_pk.to_string(); - let last_evq_version = last_evq_versions - .get(&mkt.1.event_queue.to_string()) - .unwrap_or(&(0, 0)); - - match chain_cache.account(&evq_pk) { - Ok(account_info) => { - // only process if the account state changed - let evq_version = (account_info.slot, account_info.write_version); - trace!("evq {} write_version {:?}", evq_pk_string, evq_version); - if evq_version == *last_evq_version { - continue; - } - if evq_version.0 < last_evq_version.0 { - debug!("evq version slot was old"); - continue; - } - if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1 - { - info!("evq version slot was same and write version was old"); - continue; - } - last_evq_versions.insert(evq_pk_string.clone(), evq_version); - - let account = &account_info.account; - let is_perp = mango_v4::check_id(account.owner()); - if is_perp { - let event_queue = - EventQueue::try_deserialize(account.data().borrow_mut()).unwrap(); - info!( - "evq {} seq_num {} version {:?}", - evq_pk_string, event_queue.header.seq_num, evq_version, - ); - match seq_num_cache.get(&evq_pk_string) { - Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) { - Some(old_events) => publish_changes_perp( - account_info.slot, - account_info.write_version, - &mkt, - &event_queue.header, - &event_queue.buf, - *old_seq_num, - old_events, - &fill_update_sender, - &mut metric_events_new, - &mut metric_events_change, - &mut metrics_events_drop, - ), - _ => { - info!("perp_events_cache could not find {}", evq_pk_string) - } - }, - _ => info!("seq_num_cache could not find {}", evq_pk_string), - } - - seq_num_cache - .insert(evq_pk_string.clone(), event_queue.header.seq_num.clone()); - perp_events_cache - .insert(evq_pk_string.clone(), event_queue.buf.clone()); - } else { - let inner_data = &account.data()[5..&account.data().len() - 7]; - let header_span = std::mem::size_of::(); - let header: SerumEventQueueHeader = - *bytemuck::from_bytes(&inner_data[..header_span]); - let seq_num = header.seq_num; - let count = header.count; - let rest = &inner_data[header_span..]; - let slop = rest.len() % std::mem::size_of::(); - let new_len = rest.len() - slop; - let events = &rest[..new_len]; - debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::()); - let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events); - - match seq_num_cache.get(&evq_pk_string) { - Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) { - Some(old_events) => publish_changes_serum( - account_info.slot, - account_info.write_version, - mkt, - &header, - &events, - *old_seq_num, - old_events, - &fill_update_sender, - &mut metric_events_new_serum, - &mut metric_events_change_serum, - &mut metrics_events_drop_serum, - ), - _ => { - debug!( - "serum_events_cache could not find {}", - evq_pk_string - ) - } - }, - _ => debug!("seq_num_cache could not find {}", evq_pk_string), - } - - seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone()); - serum_events_cache - .insert(evq_pk_string.clone(), events.clone().to_vec()); - } - } - Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue), - } - } - } - }); - - Ok(( - account_write_queue_sender, - slot_queue_sender, - fill_update_receiver, - )) -} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index fc5d9b7..18382eb 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,10 +1,8 @@ -pub mod fill_event_filter; -pub mod fill_event_postgres_target; pub mod memory_target; -pub mod orderbook_filter; pub mod postgres_types_numeric; pub mod serum; +use anchor_lang::prelude::Pubkey; use serde::{ser::SerializeStruct, Serialize, Serializer}; use serde_derive::Deserialize; @@ -40,6 +38,13 @@ pub struct PostgresTlsConfig { pub client_key_path: String, } +#[derive(Clone, Debug, Deserialize)] +pub struct Config { + pub postgres_target: PostgresConfig, + pub source: SourceConfig, + pub metrics: MetricsConfig, +} + #[derive(Clone, Debug)] pub struct StatusResponse<'a> { pub success: bool, @@ -59,9 +64,68 @@ impl<'a> Serialize for StatusResponse<'a> { } } -#[derive(Clone, Debug, Deserialize)] -pub struct Config { - pub postgres_target: PostgresConfig, - pub source: SourceConfig, - pub metrics: MetricsConfig, +#[derive(Clone, Debug)] +pub enum OrderbookSide { + Bid = 0, + Ask = 1, +} + +impl Serialize for OrderbookSide { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match *self { + OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"), + OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"), + } + } +} + +#[derive(Clone, Debug)] +pub struct MarketConfig { + pub name: String, + pub bids: Pubkey, + pub asks: Pubkey, + pub event_queue: Pubkey, + pub base_decimals: u8, + pub quote_decimals: u8, + pub base_lot_size: i64, + pub quote_lot_size: i64, +} + +pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 { + (native * base_lot_size) as f64 / 10i64.pow(base_decimals.into()) as f64 +} + +pub fn base_lots_to_ui_perp(native: i64, decimals: u8, base_lot_size: i64) -> f64 { + native as f64 * (base_lot_size as f64 / (10i64.pow(decimals.into()) as f64)) +} + +pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 { + let decimals = base_decimals - quote_decimals; + native as f64 / (10u64.pow(decimals.into())) as f64 +} + +pub fn spot_price_to_ui( + native: i64, + native_size: i64, + base_decimals: u8, + quote_decimals: u8, +) -> f64 { + // TODO: account for fees + ((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size)) + as f64 +} + +pub fn price_lots_to_ui_perp( + native: i64, + base_decimals: u8, + quote_decimals: u8, + base_lot_size: i64, + quote_lot_size: i64, +) -> f64 { + let decimals = base_decimals - quote_decimals; + let multiplier = 10u64.pow(decimals.into()) as f64; + native as f64 * ((multiplier * quote_lot_size as f64) / base_lot_size as f64) } diff --git a/lib/src/serum.rs b/lib/src/serum.rs index e26bd97..d290738 100644 --- a/lib/src/serum.rs +++ b/lib/src/serum.rs @@ -4,7 +4,7 @@ use bytemuck::{Pod, Zeroable}; #[repr(packed)] pub struct SerumEventQueueHeader { pub _account_flags: u64, // Initialized, EventQueue - pub _head: u64, + pub head: u64, pub count: u64, pub seq_num: u64, } diff --git a/service-mango-crank/src/main.rs b/service-mango-crank/src/main.rs index 8a9369f..7afa1b0 100644 --- a/service-mango-crank/src/main.rs +++ b/service-mango-crank/src/main.rs @@ -13,7 +13,14 @@ use bytemuck::bytes_of; use client::{Client, MangoGroupContext}; use log::*; use solana_client::nonblocking::rpc_client::RpcClient; -use std::{collections::HashSet, fs::File, io::Read, str::FromStr, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + fs::File, + io::Read, + str::FromStr, + sync::{atomic::AtomicBool, Arc}, + time::Duration, +}; use mango_feeds_lib::FilterConfig; use mango_feeds_lib::{grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig}; @@ -32,6 +39,8 @@ pub struct Config { async fn main() -> anyhow::Result<()> { solana_logger::setup_with_default("info"); + let exit: Arc = Arc::new(AtomicBool::new(false)); + let args: Vec = std::env::args().collect(); if args.len() < 2 { error!("Please enter a config file path argument."); @@ -147,6 +156,7 @@ async fn main() -> anyhow::Result<()> { account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), + exit.clone(), ) .await; } else { diff --git a/service-mango-fills/Cargo.toml b/service-mango-fills/Cargo.toml index 74078a7..0977b78 100644 --- a/service-mango-fills/Cargo.toml +++ b/service-mango-fills/Cargo.toml @@ -16,6 +16,8 @@ toml = "*" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +futures = "0.3.17" +futures-core = "0.3" futures-channel = "0.3" futures-util = "0.3" ws = "^0.9.2" @@ -25,9 +27,20 @@ tokio = { version = "1", features = ["full"] } tokio-tungstenite = "0.17" bytemuck = "1.7.2" jemallocator = "0.3.2" +chrono = "0.4.23" +solana-sdk = "~1.14.9" + +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +tokio-postgres-rustls = "0.9.0" +postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] } +postgres-native-tls = "0.5" +native-tls = "0.2" +# postgres_query hasn't updated its crate in a while +postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" } +base64 = "0.21.0" mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } -serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things" } +serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things", features = ["no-entrypoint"] } anchor-lang = "0.25.0" anchor-client = "0.25.0" diff --git a/service-mango-fills/README.md b/service-mango-fills/README.md index 7d8968e..b1622c6 100644 --- a/service-mango-fills/README.md +++ b/service-mango-fills/README.md @@ -2,11 +2,84 @@ This module parses event queues and exposes individual fills on a websocket. +Public API: `https://api.mngo.cloud/fills/v1/` + +## API Reference +Get a list of markets +``` +{ + "command": "getMarkets" +} +``` +``` +{ + "ESdnpnNLgTkBCZRuTJkZLi5wKEZ2z47SG3PJrhundSQ2": "SOL-PERP", + "HwhVGkfsSQ9JSQeQYu2CbkRCLvsh3qRZxG6m4oMVwZpN": "BTC-PERP", + "Fgh9JSZ2qfSjCw9RPJ85W2xbihsp2muLvfRztzoVR7f1": "ETH-PERP", +} +``` + +Subscribe to markets +``` +{ + "command": "subscribe" + "marketIds": ["MARKET_PUBKEY"] +} +``` +``` +{ + "success": true, + "message": "subscribed to market MARKET_PUBKEY" +} +``` + +Subscribe to account +``` +{ + "command": "subscribe" + "account": ["MANGO_ACCOUNT_PUBKEY"] +} +``` +``` +{ + "success": true, + "message": "subscribed to account MANGO_ACCOUNT_PUBKEY" +} +``` + +Fill Event +``` +{ + "event": { + "eventType": "perp", + "maker": "MAKER_MANGO_ACCOUNT_PUBKEY", + "taker": "TAKER_MANGO_ACCOUNT_PUBKEY", + "takerSide": "bid", + "timestamp": "2023-04-06T13:00:00+00:00", + "seqNum": 132420, + "makerClientOrderId": 1680786677648, + "takerClientOrderId": 1680786688080, + "makerFee": -0.0003, + "takerFee": 0.0006, + "price": 20.72, + "quantity": 0.45 + }, + "marketKey": "ESdnpnNLgTkBCZRuTJkZLi5wKEZ2z47SG3PJrhundSQ2", + "marketName": "SOL-PERP", + "status": "new", + "slot": 186869253, + "writeVersion": 662992260539 +} +``` + +If the fill ocurred on a fork, an event will be sent with the 'status' field set to 'revoke'. + ## Setup +## Local 1. Prepare the connector configuration file. - [Here is an example](service-mango-fills/example-config.toml). + [Here is an example](service-mango-fills/conf/example-config.toml). - `bind_ws_addr` is the listen port for the websocket clients - `rpc_ws_url` is unused and can stay empty. @@ -14,7 +87,6 @@ This module parses event queues and exposes individual fills on a websocket. address configured for the plugin. - `rpc_http_url` must point to the JSON-RPC URL. - `program_id` must match what is configured for the gRPC plugin - - `markets` need to contain all observed perp markets 2. Start the service binary. @@ -27,9 +99,6 @@ This module parses event queues and exposes individual fills on a websocket. logs are very spammy changing the default log level is recommended when you dont want to analyze performance of the service. -## TODO -- [] startup logic, dont accept market subscriptions before first snapshot -- [] failover logic, kill all websockets when we receive a later snapshot, more - frequent when running on home connections -- [] track latency accountwrite -> websocket -- [] create new model for fills so snapshot maps can be combined per market +## fly.io + + diff --git a/service-mango-fills/conf/example-config.toml b/service-mango-fills/conf/example-config.toml new file mode 100644 index 0000000..dd053f4 --- /dev/null +++ b/service-mango-fills/conf/example-config.toml @@ -0,0 +1,35 @@ +bind_ws_addr = "0.0.0.0:8080" +rpc_http_url = "http://mango.rpcpool.com/" +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +[metrics] +output_stdout = true +output_http = true + +# [postgres] +# connection_string = "$PG_CONNECTION_STRING" +# connection_count = 1 +# max_batch_size = 1 +# max_queue_size = 50000 +# retry_query_max_count = 10 +# retry_query_sleep_secs = 2 +# retry_connection_sleep_secs = 10 +# fatal_connection_timeout_secs = 30 +# allow_invalid_certs = true + +# # [postgres.tls] +# # ca_cert_path = "$PG_CA_CERT" +# # client_key_path = "$PG_CLIENT_KEY" + +[source] +dedup_queue_size = 50000 +rpc_ws_url = "wss://mango.rpcpool.com/" + +[[source.grpc_sources]] +name = "accountsdb-client" +connection_string = "http://tyo64.rpcpool.com/" +retry_connection_sleep_secs = 30 + +[source.snapshot] +rpc_http_url = "http://mango.rpcpool.com/" +program_id = "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg" diff --git a/service-mango-fills/conf/template-config.toml b/service-mango-fills/conf/template-config.toml new file mode 100644 index 0000000..0ec5537 --- /dev/null +++ b/service-mango-fills/conf/template-config.toml @@ -0,0 +1,35 @@ +bind_ws_addr = "[::]:8080" +rpc_http_url = "$RPC_HTTP_URL" +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +[metrics] +output_stdout = true +output_http = true + +[postgres] +connection_string = "$PG_CONNECTION_STRING" +connection_count = 1 +max_batch_size = 1 +max_queue_size = 50000 +retry_query_max_count = 10 +retry_query_sleep_secs = 2 +retry_connection_sleep_secs = 10 +fatal_connection_timeout_secs = 30 +allow_invalid_certs = true + +[postgres.tls] +ca_cert_path = "$PG_CA_CERT" +client_key_path = "$PG_CLIENT_KEY" + +[source] +dedup_queue_size = 50000 +rpc_ws_url = "$RPC_WS_URL" + +[[source.grpc_sources]] +name = "accountsdb-client" +connection_string = "$GEYSER_CONNECTION_STRING" +retry_connection_sleep_secs = 30 + +[source.snapshot] +rpc_http_url = "$RPC_HTTP_URL" +program_id = "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX" diff --git a/service-mango-fills/config-devnet.toml b/service-mango-fills/config-devnet.toml deleted file mode 100644 index 5e5140b..0000000 --- a/service-mango-fills/config-devnet.toml +++ /dev/null @@ -1,53 +0,0 @@ -bind_ws_addr = "0.0.0.0:2082" - -[source] -dedup_queue_size = 50000 -rpc_ws_url = "" - -[[source.grpc_sources]] -name = "accountsdb-client" -connection_string = "http://mango.devnet.rpcpool.com:10001" -retry_connection_sleep_secs = 30 - -[source.grpc_sources.tls] -ca_cert_path = "ca-devnet.pem" -client_cert_path = "client-devnet.pem" -client_key_path = "client-devnet.pem" -domain_name = "mango-accountsdb.rpcpool.com" - -[source.snapshot] -rpc_http_url = "http://mango.devnet.rpcpool.com/" -program_id = "4skJ85cdxQAFVKbcGgfun8iZPL7BadVYXG3kGEGkufqA" - -[[markets]] -name = "MNGO-PERP" -event_queue = "uaUCSQejWYrDeYSuvn4As4kaCwJ2rLnRQSsSjY3ogZk" - -[[markets]] -name = "ETH-PERP" -event_queue = "8WLv5fKLYkyZpFG74kRmp2RALHQFcNKmH7eJn8ebHC13" - -[[markets]] -name = "SOL-PERP" -event_queue = "CZ5MCRvkN38d5pnZDDEEyMiED3drgDUVpEUjkuJq31Kf" - -[[markets]] -name = "ADA-PERP" -event_queue = "5v5fz2cCSy2VvrgVf5Vu7PF23RiZjv6BL36bgg48bA1c" - -[[markets]] -name = "FTT-PERP" -event_queue = "7rswj7FVZcMYUKxcTLndZhWBmuVNc2GuxqjuXU8KcPWv" - -[[markets]] -name = "AVAX-PERP" -event_queue = "4b7NqjqWoQoQh9V3dubfjkLPQVNJijwAwr7D9q6vTqqd" - -[[markets]] -name = "BNB-PERP" -event_queue = "96Y87LTz5Mops7wdT9EJo1eM79XToKYJJmRZxNatV85d" - -[[markets]] -name = "MATIC-PERP" -event_queue = "77maU5zdfYayqhqjBi2ocosM4PXvPXxbps2Up7dxDsMR" - diff --git a/service-mango-fills/src/fill_event_filter.rs b/service-mango-fills/src/fill_event_filter.rs new file mode 100644 index 0000000..4136c22 --- /dev/null +++ b/service-mango-fills/src/fill_event_filter.rs @@ -0,0 +1,653 @@ +use log::*; +use mango_feeds_lib::{ + chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, + metrics::{MetricType, Metrics}, + serum::SerumEventQueueHeader, + AccountWrite, MarketConfig, SlotUpdate, +}; +use solana_sdk::{ + account::{ReadableAccount, WritableAccount}, + clock::Epoch, + pubkey::Pubkey, +}; +use std::{ + borrow::BorrowMut, + cmp::max, + collections::{HashMap, HashSet}, + iter::FromIterator, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use crate::metrics::MetricU64; +use anchor_lang::AccountDeserialize; +use mango_v4::state::{ + AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, + OutEvent as PerpOutEvent, QueueHeader, MAX_NUM_EVENTS, +}; +use service_mango_fills::*; + +// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue +type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize]; + +fn publish_changes_perp( + slot: u64, + write_version: u64, + mkt: &(Pubkey, MarketConfig), + header: &EventQueueHeader, + events: &EventQueueEvents, + prev_seq_num: u64, + prev_head: u64, + prev_events: &EventQueueEvents, + fill_update_sender: &async_channel::Sender, + metric_events_new: &mut MetricU64, + metric_events_change: &mut MetricU64, + metric_events_drop: &mut MetricU64, + metric_head_update: &mut MetricU64, + metric_head_revoke: &mut MetricU64, +) { + // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available + let start_seq_num = max(prev_seq_num, header.seq_num) + .checked_sub(MAX_NUM_EVENTS as u64) + .unwrap_or(0); + let mut checkpoint = Vec::new(); + let mkt_pk_string = mkt.0.to_string(); + let evq_pk_string = mkt.1.event_queue.to_string(); + for seq_num in start_seq_num..header.seq_num { + let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + + // there are three possible cases: + // 1) the event is past the old seq num, hence guaranteed new event + // 2) the event is not matching the old event queue + // 3) all other events are matching the old event queue + // the order of these checks is important so they are exhaustive + if seq_num >= prev_seq_num { + debug!( + "found new event {} idx {} type {} slot {} write_version {}", + mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version + ); + + metric_events_new.increment(); + + // new fills are published and recorded in checkpoint + if events[idx].event_type == EventType::Fill as u8 { + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fill = FillEvent::new_from_perp(fill, &mkt.1); + + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill.clone(), + status: FillUpdateStatus::New, + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(fill); + } + } else if prev_events[idx].event_type != events[idx].event_type + || prev_events[idx].padding != events[idx].padding + { + debug!( + "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header.seq_num, prev_seq_num + ); + + metric_events_change.increment(); + + // first revoke old event if a fill + if prev_events[idx].event_type == EventType::Fill as u8 { + let fill: PerpFillEvent = bytemuck::cast(prev_events[idx]); + let fill = FillEvent::new_from_perp(fill, &mkt.1); + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill, + status: FillUpdateStatus::Revoke, + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + + // then publish new if its a fill and record in checkpoint + if events[idx].event_type == EventType::Fill as u8 { + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fill = FillEvent::new_from_perp(fill, &mkt.1); + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill.clone(), + status: FillUpdateStatus::New, + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(fill); + } + } else { + // every already published event is recorded in checkpoint if a fill + if events[idx].event_type == EventType::Fill as u8 { + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fill = FillEvent::new_from_perp(fill, &mkt.1); + checkpoint.push(fill); + } + } + } + + // in case queue size shrunk due to a fork we need revoke all previous fills + for seq_num in header.seq_num..prev_seq_num { + let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + debug!( + "found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}", + mkt_pk_string, idx, seq_num, header.seq_num, prev_seq_num, slot, write_version + ); + + metric_events_drop.increment(); + + if prev_events[idx].event_type == EventType::Fill as u8 { + let fill: PerpFillEvent = bytemuck::cast(prev_events[idx]); + let fill = FillEvent::new_from_perp(fill, &mkt.1); + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + event: fill, + write_version, + status: FillUpdateStatus::Revoke, + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + } + + let head_idx = header.head(); + let head = head_idx as u64; + + let head_seq_num = if events[head_idx].event_type == EventType::Fill as u8 { + let event: PerpFillEvent = bytemuck::cast(events[head_idx]); + event.seq_num + } else if events[head_idx].event_type == EventType::Out as u8 { + let event: PerpOutEvent = bytemuck::cast(events[head_idx]); + event.seq_num + } else { + 0 + }; + + let prev_head_idx = prev_head as usize; + let prev_head_seq_num = if prev_events[prev_head_idx].event_type == EventType::Fill as u8 { + let event: PerpFillEvent = bytemuck::cast(prev_events[prev_head_idx]); + event.seq_num + } else if prev_events[prev_head_idx].event_type == EventType::Out as u8 { + let event: PerpOutEvent = bytemuck::cast(prev_events[prev_head_idx]); + event.seq_num + } else { + 0 + }; + + // publish a head update event if the head increased (events were consumed) + if head > prev_head { + metric_head_update.increment(); + + fill_update_sender + .try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate { + head, + prev_head, + head_seq_num, + prev_head_seq_num, + status: FillUpdateStatus::New, + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), + slot, + write_version, + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + + // revoke head update event if it decreased (fork) + if head < prev_head { + metric_head_revoke.increment(); + + fill_update_sender + .try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate { + head, + prev_head, + head_seq_num, + prev_head_seq_num, + status: FillUpdateStatus::Revoke, + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), + slot, + write_version, + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + + fill_update_sender + .try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint { + slot, + write_version, + events: checkpoint, + market: mkt_pk_string, + queue: evq_pk_string, + })) + .unwrap() +} + +fn publish_changes_serum( + _slot: u64, + _write_version: u64, + _mkt: &(Pubkey, MarketConfig), + _header: &SerumEventQueueHeader, + _events: &[serum_dex::state::Event], + _prev_seq_num: u64, + _prev_events: &[serum_dex::state::Event], + _fill_update_sender: &async_channel::Sender, + _metric_events_new: &mut MetricU64, + _metric_events_change: &mut MetricU64, + _metric_events_drop: &mut MetricU64, +) { + // // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available + // let start_seq_num = max(prev_seq_num, header.seq_num) + // .checked_sub(MAX_NUM_EVENTS as u64) + // .unwrap_or(0); + // let mut checkpoint = Vec::new(); + // let mkt_pk_string = mkt.0.to_string(); + // let evq_pk_string = mkt.1.event_queue.to_string(); + // let header_seq_num = header.seq_num; + // debug!("start seq {} header seq {}", start_seq_num, header_seq_num); + + // // Timestamp for spot events is time scraped + // let timestamp = SystemTime::now() + // .duration_since(SystemTime::UNIX_EPOCH) + // .unwrap() + // .as_secs(); + // for seq_num in start_seq_num..header_seq_num { + // let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + // let event_view = events[idx].as_view().unwrap(); + // let old_event_view = prev_events[idx].as_view().unwrap(); + + // match event_view { + // SpotEvent::Fill { .. } => { + // // there are three possible cases: + // // 1) the event is past the old seq num, hence guaranteed new event + // // 2) the event is not matching the old event queue + // // 3) all other events are matching the old event queue + // // the order of these checks is important so they are exhaustive + // let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1); + // if seq_num >= prev_seq_num { + // debug!("found new serum fill {} idx {}", mkt_pk_string, idx,); + + // metric_events_new.increment(); + // fill_update_sender + // .try_send(FillEventFilterMessage::Update(FillUpdate { + // slot, + // write_version, + // event: fill.clone(), + // status: FillUpdateStatus::New, + // market_key: mkt_pk_string.clone(), + // market_name: mkt.1.name.clone(), + // })) + // .unwrap(); // TODO: use anyhow to bubble up error + // checkpoint.push(fill); + // continue; + // } + + // match old_event_view { + // SpotEvent::Fill { + // client_order_id, .. + // } => { + // let client_order_id = match client_order_id { + // Some(id) => id.into(), + // None => 0u64, + // }; + // if client_order_id != fill.client_order_id { + // debug!( + // "found changed id event {} idx {} seq_num {} header seq num {} old seq num {}", + // mkt_pk_string, idx, seq_num, header_seq_num, prev_seq_num + // ); + + // metric_events_change.increment(); + + // let old_fill = FillEvent::new_from_spot( + // old_event_view, + // timestamp, + // seq_num, + // &mkt.1, + // ); + // // first revoke old event + // fill_update_sender + // .try_send(FillEventFilterMessage::Update(FillUpdate { + // slot, + // write_version, + // event: old_fill, + // status: FillUpdateStatus::Revoke, + // market_key: mkt_pk_string.clone(), + // market_name: mkt.1.name.clone(), + // })) + // .unwrap(); // TODO: use anyhow to bubble up error + + // // then publish new + // fill_update_sender + // .try_send(FillEventFilterMessage::Update(FillUpdate { + // slot, + // write_version, + // event: fill.clone(), + // status: FillUpdateStatus::New, + // market_key: mkt_pk_string.clone(), + // market_name: mkt.1.name.clone(), + // })) + // .unwrap(); // TODO: use anyhow to bubble up error + // } + + // // record new event in checkpoint + // checkpoint.push(fill); + // } + // SpotEvent::Out { .. } => { + // debug!( + // "found changed type event {} idx {} seq_num {} header seq num {} old seq num {}", + // mkt_pk_string, idx, seq_num, header_seq_num, prev_seq_num + // ); + + // metric_events_change.increment(); + + // // publish new fill and record in checkpoint + // fill_update_sender + // .try_send(FillEventFilterMessage::Update(FillUpdate { + // slot, + // write_version, + // event: fill.clone(), + // status: FillUpdateStatus::New, + // market_key: mkt_pk_string.clone(), + // market_name: mkt.1.name.clone(), + // })) + // .unwrap(); // TODO: use anyhow to bubble up error + // checkpoint.push(fill); + // } + // } + // } + // _ => continue, + // } + // } + + // // in case queue size shrunk due to a fork we need revoke all previous fills + // for seq_num in header_seq_num..prev_seq_num { + // let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + // let old_event_view = prev_events[idx].as_view().unwrap(); + // debug!( + // "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", + // mkt_pk_string, idx, seq_num, header_seq_num, prev_seq_num + // ); + + // metric_events_drop.increment(); + + // match old_event_view { + // SpotEvent::Fill { .. } => { + // let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1); + // fill_update_sender + // .try_send(FillEventFilterMessage::Update(FillUpdate { + // slot, + // event: old_fill, + // write_version, + // status: FillUpdateStatus::Revoke, + // market_key: mkt_pk_string.clone(), + // market_name: mkt.1.name.clone(), + // })) + // .unwrap(); // TODO: use anyhow to bubble up error + // } + // SpotEvent::Out { .. } => continue, + // } + // } + + // fill_update_sender + // .try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint { + // slot, + // write_version, + // events: checkpoint, + // market: mkt_pk_string, + // queue: evq_pk_string, + // })) + // .unwrap() +} + +pub async fn init( + perp_market_configs: Vec<(Pubkey, MarketConfig)>, + spot_market_configs: Vec<(Pubkey, MarketConfig)>, + metrics_sender: Metrics, + exit: Arc, +) -> anyhow::Result<( + async_channel::Sender, + async_channel::Sender, + async_channel::Receiver, +)> { + let metrics_sender = metrics_sender.clone(); + + let mut metric_events_new = + metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Counter); + let mut metric_events_new_serum = + metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter); + let mut metric_events_change = + metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter); + let mut metric_events_change_serum = + metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter); + let mut metrics_events_drop = + metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter); + let mut metrics_events_drop_serum = + metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter); + let mut metrics_head_update = + metrics_sender.register_u64("fills_feed_head_update".into(), MetricType::Counter); + let mut metrics_head_revoke = + metrics_sender.register_u64("fills_feed_head_revoke".into(), MetricType::Counter); + + // The actual message may want to also contain a retry count, if it self-reinserts on failure? + let (account_write_queue_sender, account_write_queue_receiver) = + async_channel::unbounded::(); + + // Slot updates flowing from the outside into the single processing thread. From + // there they'll flow into the postgres sending thread. + let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); + + // Fill updates can be consumed by client connections, they contain all fills for all markets + let (fill_update_sender, fill_update_receiver) = + async_channel::unbounded::(); + + let account_write_queue_receiver_c = account_write_queue_receiver.clone(); + + let mut chain_cache = ChainData::new(); + let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender); + let mut perp_events_cache: HashMap = HashMap::new(); + let mut serum_events_cache: HashMap> = HashMap::new(); + let mut seq_num_cache = HashMap::new(); + let mut head_cache = HashMap::new(); + let mut last_evq_versions = HashMap::::new(); + + let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat(); + let perp_queue_pks: Vec = perp_market_configs + .iter() + .map(|x| x.1.event_queue) + .collect(); + let spot_queue_pks: Vec = spot_market_configs + .iter() + .map(|x| x.1.event_queue) + .collect(); + let all_queue_pks: HashSet = + HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat()); + + // update handling thread, reads both sloths and account updates + tokio::spawn(async move { + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down fill_event_filter..."); + break; + } + tokio::select! { + Ok(account_write) = account_write_queue_receiver_c.recv() => { + if !all_queue_pks.contains(&account_write.pubkey) { + continue; + } + + chain_cache.update_account( + account_write.pubkey, + AccountData { + slot: account_write.slot, + write_version: account_write.write_version, + account: WritableAccount::create( + account_write.lamports, + account_write.data.clone(), + account_write.owner, + account_write.executable, + account_write.rent_epoch as Epoch, + ), + }, + ); + } + Ok(slot_update) = slot_queue_receiver.recv() => { + chain_cache.update_slot(SlotData { + slot: slot_update.slot, + parent: slot_update.parent, + status: slot_update.status, + chain: 0, + }); + + } + Err(e) = slot_queue_receiver.recv() => { + warn!("slot update channel err {:?}", e); + } + Err(e) = account_write_queue_receiver_c.recv() => { + warn!("write update channel err {:?}", e); + } + } + + chain_data_metrics.report(&chain_cache); + + for mkt in all_market_configs.iter() { + let evq_pk = mkt.1.event_queue; + let evq_pk_string = evq_pk.to_string(); + let last_evq_version = last_evq_versions + .get(&mkt.1.event_queue.to_string()) + .unwrap_or(&(0, 0)); + + match chain_cache.account(&evq_pk) { + Ok(account_info) => { + // only process if the account state changed + let evq_version = (account_info.slot, account_info.write_version); + if evq_version == *last_evq_version { + continue; + } + if evq_version.0 < last_evq_version.0 { + debug!("evq version slot was old"); + continue; + } + if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1 + { + info!("evq version slot was same and write version was old"); + continue; + } + last_evq_versions.insert(evq_pk_string.clone(), evq_version); + + let account = &account_info.account; + let is_perp = mango_v4::check_id(account.owner()); + if is_perp { + let event_queue = + EventQueue::try_deserialize(account.data().borrow_mut()).unwrap(); + + match ( + seq_num_cache.get(&evq_pk_string), + head_cache.get(&evq_pk_string), + ) { + (Some(prev_seq_num), Some(old_head)) => match perp_events_cache + .get(&evq_pk_string) + { + Some(prev_events) => publish_changes_perp( + account_info.slot, + account_info.write_version, + &mkt, + &event_queue.header, + &event_queue.buf, + *prev_seq_num, + *old_head, + prev_events, + &fill_update_sender, + &mut metric_events_new, + &mut metric_events_change, + &mut metrics_events_drop, + &mut metrics_head_update, + &mut metrics_head_revoke, + ), + _ => { + info!("perp_events_cache could not find {}", evq_pk_string) + } + }, + _ => info!("seq_num/head cache could not find {}", evq_pk_string), + } + + seq_num_cache + .insert(evq_pk_string.clone(), event_queue.header.seq_num.clone()); + head_cache + .insert(evq_pk_string.clone(), event_queue.header.head() as u64); + perp_events_cache + .insert(evq_pk_string.clone(), event_queue.buf.clone()); + } else { + let inner_data = &account.data()[5..&account.data().len() - 7]; + let header_span = std::mem::size_of::(); + let header: SerumEventQueueHeader = + *bytemuck::from_bytes(&inner_data[..header_span]); + let seq_num = header.seq_num; + let count = header.count; + let rest = &inner_data[header_span..]; + let slop = rest.len() % std::mem::size_of::(); + let new_len = rest.len() - slop; + let events = &rest[..new_len]; + debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::()); + let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events); + + match seq_num_cache.get(&evq_pk_string) { + Some(prev_seq_num) => { + match serum_events_cache.get(&evq_pk_string) { + Some(prev_events) => publish_changes_serum( + account_info.slot, + account_info.write_version, + mkt, + &header, + &events, + *prev_seq_num, + prev_events, + &fill_update_sender, + &mut metric_events_new_serum, + &mut metric_events_change_serum, + &mut metrics_events_drop_serum, + ), + _ => { + debug!( + "serum_events_cache could not find {}", + evq_pk_string + ) + } + } + } + _ => debug!("seq_num_cache could not find {}", evq_pk_string), + } + + seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone()); + head_cache.insert(evq_pk_string.clone(), header.head); + serum_events_cache + .insert(evq_pk_string.clone(), events.clone().to_vec()); + } + } + Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue), + } + } + } + }); + + Ok(( + account_write_queue_sender, + slot_queue_sender, + fill_update_receiver, + )) +} diff --git a/lib/src/fill_event_postgres_target.rs b/service-mango-fills/src/fill_event_postgres_target.rs similarity index 78% rename from lib/src/fill_event_postgres_target.rs rename to service-mango-fills/src/fill_event_postgres_target.rs index ca351df..2cca666 100644 --- a/lib/src/fill_event_postgres_target.rs +++ b/service-mango-fills/src/fill_event_postgres_target.rs @@ -1,17 +1,28 @@ use chrono::{TimeZone, Utc}; use log::*; +use mango_feeds_lib::{ + metrics::{MetricType, MetricU64, Metrics}, + *, +}; use native_tls::{Certificate, Identity, TlsConnector}; use postgres_native_tls::MakeTlsConnector; use postgres_query::Caching; -use std::{env, fs, time::Duration}; +use service_mango_fills::*; +use std::{ + env, fs, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use tokio_postgres::Client; -use crate::{fill_event_filter::FillUpdate, metrics::*, PostgresConfig}; - async fn postgres_connection( config: &PostgresConfig, metric_retries: MetricU64, metric_live: MetricU64, + exit: Arc, ) -> anyhow::Result>> { let (tx, rx) = async_channel::unbounded(); @@ -19,7 +30,6 @@ async fn postgres_connection( // base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64 // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills - info!("making tls config"); let tls = match &config.tls { Some(tls) => { use base64::{engine::general_purpose, Engine as _}; @@ -31,7 +41,7 @@ async fn postgres_connection( .into_bytes(), ) .expect("decoding client cert"), - _ => fs::read(&tls.client_key_path).expect("reading client key from file"), + _ => fs::read(&tls.ca_cert_path).expect("reading client cert from file"), }; let client_key = match &tls.client_key_path.chars().next().unwrap() { '$' => general_purpose::STANDARD @@ -59,16 +69,26 @@ async fn postgres_connection( }; let config = config.clone(); - let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?); + let connection_string = match &config.connection_string.chars().next().unwrap() { + '$' => { + env::var(&config.connection_string[1..]).expect("reading connection string from env") + } + _ => config.connection_string.clone(), + }; + let mut initial = Some(tokio_postgres::connect(&connection_string, tls.clone()).await?); let mut metric_retries = metric_retries; let mut metric_live = metric_live; tokio::spawn(async move { loop { + // don't acquire a new connection if we're shutting down + if exit.load(Ordering::Relaxed) { + warn!("shutting down fill_event_postgres_target..."); + break; + } let (client, connection) = match initial.take() { Some(v) => v, None => { - let result = - tokio_postgres::connect(&config.connection_string, tls.clone()).await; + let result = tokio_postgres::connect(&connection_string, tls.clone()).await; match result { Ok(v) => v, Err(err) => { @@ -129,23 +149,36 @@ async fn process_update(client: &Caching, update: &FillUpdate) -> anyhow let slot = update.slot as i64; let write_version = update.write_version as i64; - let query = postgres_query::query!( - "INSERT INTO transactions_v4.perp_fills_feed_events - (market, seq_num, fill_timestamp, price, - quantity, slot, write_version) - VALUES - ($market, $seq_num, $fill_timestamp, $price, - $quantity, $slot, $write_version) - ON CONFLICT (market, seq_num) DO NOTHING", - market, - seq_num, - fill_timestamp, - price, - quantity, - slot, - write_version, - ); - let _ = query.execute(&client).await?; + if update.status == FillUpdateStatus::New { + // insert new events + let query = postgres_query::query!( + "INSERT INTO transactions_v4.perp_fills_feed_events + (market, seq_num, fill_timestamp, price, + quantity, slot, write_version) + VALUES + ($market, $seq_num, $fill_timestamp, $price, + $quantity, $slot, $write_version) + ON CONFLICT (market, seq_num) DO NOTHING", + market, + seq_num, + fill_timestamp, + price, + quantity, + slot, + write_version, + ); + let _ = query.execute(&client).await?; + } else { + // delete revoked events + let query = postgres_query::query!( + "DELETE FROM transactions_v4.perp_fills_feed_events + WHERE market=$market + AND seq_num=$seq_num", + market, + seq_num, + ); + let _ = query.execute(&client).await?; + } Ok(()) } @@ -153,6 +186,7 @@ async fn process_update(client: &Caching, update: &FillUpdate) -> anyhow pub async fn init( config: &PostgresConfig, metrics_sender: Metrics, + exit: Arc, ) -> anyhow::Result> { // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (fill_update_queue_sender, fill_update_queue_receiver) = @@ -167,9 +201,13 @@ pub async fn init( // postgres fill update sending worker threads for _ in 0..config.connection_count { - let postgres_account_writes = - postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) - .await?; + let postgres_account_writes = postgres_connection( + config, + metric_con_retries.clone(), + metric_con_live.clone(), + exit.clone(), + ) + .await?; let fill_update_queue_receiver_c = fill_update_queue_receiver.clone(); let config = config.clone(); let mut metric_retries = diff --git a/service-mango-fills/src/lib.rs b/service-mango-fills/src/lib.rs new file mode 100644 index 0000000..231334f --- /dev/null +++ b/service-mango-fills/src/lib.rs @@ -0,0 +1,334 @@ +use std::convert::identity; + +use anchor_lang::prelude::Pubkey; +use bytemuck::cast_slice; +use chrono::{TimeZone, Utc}; +use mango_feeds_lib::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide}; +use mango_v4::state::{FillEvent as PerpFillEvent, Side}; +use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; +use serum_dex::state::EventView as SpotEvent; + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum FillUpdateStatus { + New, + Revoke, +} + +impl Serialize for FillUpdateStatus { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match *self { + FillUpdateStatus::New => { + serializer.serialize_unit_variant("FillUpdateStatus", 0, "new") + } + FillUpdateStatus::Revoke => { + serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke") + } + } + } +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum FillEventType { + Spot, + Perp, +} + +impl Serialize for FillEventType { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match *self { + FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"), + FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"), + } + } +} + +#[derive(Clone, Debug)] +pub struct FillEvent { + pub event_type: FillEventType, + pub maker: String, + pub taker: String, + pub taker_side: OrderbookSide, + pub timestamp: u64, // make all strings + pub seq_num: u64, + pub maker_client_order_id: u64, + pub taker_client_order_id: u64, + pub maker_fee: f32, + pub taker_fee: f32, + pub price: f64, + pub quantity: f64, +} + +impl Serialize for FillEvent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("FillEvent", 12)?; + state.serialize_field("eventType", &self.event_type)?; + state.serialize_field("maker", &self.maker)?; + state.serialize_field("taker", &self.taker)?; + state.serialize_field("takerSide", &self.taker_side)?; + state.serialize_field( + "timestamp", + &Utc.timestamp_opt(self.timestamp as i64, 0) + .unwrap() + .to_rfc3339(), + )?; + state.serialize_field("seqNum", &self.seq_num)?; + state.serialize_field("makerClientOrderId", &self.maker_client_order_id)?; + state.serialize_field("takerClientOrderId", &self.taker_client_order_id)?; // make string + state.serialize_field("makerFee", &self.maker_fee)?; + state.serialize_field("takerFee", &self.taker_fee)?; + state.serialize_field("price", &self.price)?; + state.serialize_field("quantity", &self.quantity)?; + state.end() + } +} + +impl FillEvent { + pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> Self { + let taker_side = match event.taker_side() { + Side::Ask => OrderbookSide::Ask, + Side::Bid => OrderbookSide::Bid, + }; + let price = price_lots_to_ui_perp( + event.price, + config.base_decimals, + config.quote_decimals, + config.base_lot_size, + config.quote_lot_size, + ); + let quantity = + base_lots_to_ui_perp(event.quantity, config.base_decimals, config.base_lot_size); + FillEvent { + event_type: FillEventType::Perp, + maker: event.maker.to_string(), + taker: event.taker.to_string(), + taker_side: taker_side, + timestamp: event.timestamp, + seq_num: event.seq_num, + maker_client_order_id: event.maker_client_order_id, + taker_client_order_id: event.taker_client_order_id, + maker_fee: event.maker_fee, + taker_fee: event.taker_fee, + price: price, + quantity: quantity, + } + } + + pub fn new_from_spot( + maker_event: SpotEvent, + taker_event: SpotEvent, + timestamp: u64, + seq_num: u64, + config: &MarketConfig, + ) -> Self { + match (maker_event, taker_event) { + ( + SpotEvent::Fill { + side: maker_side, + client_order_id: maker_client_order_id, + native_qty_paid: maker_native_qty_paid, + native_fee_or_rebate: maker_native_fee_or_rebate, + native_qty_received: maker_native_qty_received, + owner: maker_owner, + .. + }, + SpotEvent::Fill { + side: taker_side, + client_order_id: taker_client_order_id, + native_fee_or_rebate: taker_native_fee_or_rebate, + owner: taker_owner, + .. + }, + ) => { + let maker_side = match maker_side as u8 { + 0 => OrderbookSide::Bid, + 1 => OrderbookSide::Ask, + _ => panic!("invalid side"), + }; + let taker_side = match taker_side as u8 { + 0 => OrderbookSide::Bid, + 1 => OrderbookSide::Ask, + _ => panic!("invalid side"), + }; + let maker_client_order_id: u64 = match maker_client_order_id { + Some(id) => id.into(), + None => 0u64, + }; + let taker_client_order_id: u64 = match taker_client_order_id { + Some(id) => id.into(), + None => 0u64, + }; + + let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64; + let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64; + + let (price, quantity) = match maker_side { + OrderbookSide::Bid => { + let price_before_fees = maker_native_qty_paid + maker_native_fee_or_rebate; + + let top = price_before_fees * base_multiplier; + let bottom = quote_multiplier * maker_native_qty_received; + let price = top as f64 / bottom as f64; + let quantity = maker_native_qty_received as f64 / base_multiplier as f64; + (price, quantity) + } + OrderbookSide::Ask => { + let price_before_fees = + maker_native_qty_received - maker_native_fee_or_rebate; + + let top = price_before_fees * base_multiplier; + let bottom = quote_multiplier * maker_native_qty_paid; + let price = top as f64 / bottom as f64; + let quantity = maker_native_qty_paid as f64 / base_multiplier as f64; + (price, quantity) + } + }; + + let maker_fee = maker_native_fee_or_rebate as f32 / quote_multiplier as f32; + let taker_fee = taker_native_fee_or_rebate as f32 / quote_multiplier as f32; + + FillEvent { + event_type: FillEventType::Spot, + maker: Pubkey::new(cast_slice(&identity(maker_owner) as &[_])).to_string(), + taker: Pubkey::new(cast_slice(&identity(taker_owner) as &[_])).to_string(), + taker_side: taker_side, + timestamp, + seq_num, + maker_client_order_id, + taker_client_order_id, + taker_fee, + maker_fee, + price, + quantity, + } + } + (_, _) => { + panic!("Can't build FillEvent from SpotEvent::Out") + } + } + } +} + +#[derive(Clone, Debug)] +pub struct FillUpdate { + pub event: FillEvent, + pub status: FillUpdateStatus, + pub market_key: String, + pub market_name: String, + pub slot: u64, + pub write_version: u64, +} + +impl Serialize for FillUpdate { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("FillUpdate", 6)?; + state.serialize_field("event", &self.event)?; + state.serialize_field("marketKey", &self.market_key)?; + state.serialize_field("marketName", &self.market_name)?; + state.serialize_field("status", &self.status)?; + state.serialize_field("slot", &self.slot)?; + state.serialize_field("writeVersion", &self.write_version)?; + + state.end() + } +} + +#[derive(Clone, Debug)] +pub struct HeadUpdate { + pub head: u64, + pub prev_head: u64, + pub head_seq_num: u64, + pub prev_head_seq_num: u64, + pub status: FillUpdateStatus, + pub market_key: String, + pub market_name: String, + pub slot: u64, + pub write_version: u64, +} +impl Serialize for HeadUpdate { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("HeadUpdate", 6)?; + state.serialize_field("head", &self.head)?; + state.serialize_field("previousHead", &self.prev_head)?; + state.serialize_field("headSeqNum", &self.head_seq_num)?; + state.serialize_field("previousHeadSeqNum", &self.prev_head_seq_num)?; + state.serialize_field("marketKey", &self.market_key)?; + state.serialize_field("marketName", &self.market_name)?; + state.serialize_field("status", &self.status)?; + state.serialize_field("slot", &self.slot)?; + state.serialize_field("writeVersion", &self.write_version)?; + + state.end() + } +} + +#[derive(Clone, Debug)] +pub struct FillCheckpoint { + pub market: String, + pub queue: String, + pub events: Vec, + pub slot: u64, + pub write_version: u64, +} + +impl Serialize for FillCheckpoint { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("FillCheckpoint", 3)?; + state.serialize_field("events", &self.events)?; + state.serialize_field("market", &self.market)?; + state.serialize_field("queue", &self.queue)?; + state.serialize_field("slot", &self.slot)?; + state.serialize_field("write_version", &self.write_version)?; + + state.end() + } +} + +pub enum FillEventFilterMessage { + Update(FillUpdate), + HeadUpdate(HeadUpdate), + Checkpoint(FillCheckpoint), +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(tag = "command")] +pub enum Command { + #[serde(rename = "subscribe")] + Subscribe(SubscribeCommand), + #[serde(rename = "unsubscribe")] + Unsubscribe(UnsubscribeCommand), + #[serde(rename = "getMarkets")] + GetMarkets, +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SubscribeCommand { + pub market_id: Option, + pub market_ids: Option>, + pub account_ids: Option>, + pub head_updates: Option, +} + +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnsubscribeCommand { + pub market_id: String, +} diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index f16cc2d..f9963d7 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -1,3 +1,6 @@ +mod fill_event_filter; +mod fill_event_postgres_target; + use anchor_client::{ solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}, Cluster, @@ -10,6 +13,13 @@ use futures_util::{ pin_mut, SinkExt, StreamExt, TryStreamExt, }; use log::*; +use mango_feeds_lib::{ + grpc_plugin_source, metrics, + metrics::{MetricType, MetricU64}, + websocket_source, FilterConfig, MarketConfig, MetricsConfig, PostgresConfig, SourceConfig, + StatusResponse, +}; +use service_mango_fills::{Command, FillCheckpoint, FillEventFilterMessage, FillEventType}; use std::{ collections::{HashMap, HashSet}, fs::File, @@ -17,7 +27,10 @@ use std::{ net::SocketAddr, str::FromStr, sync::Arc, - sync::Mutex, + sync::{ + atomic::{AtomicBool, Ordering}, + Mutex, + }, time::Duration, }; use tokio::{ @@ -26,17 +39,6 @@ use tokio::{ }; use tokio_tungstenite::tungstenite::{protocol::Message, Error}; -use mango_feeds_lib::{ - fill_event_filter::FillEventType, - fill_event_postgres_target, - metrics::{MetricType, MetricU64}, - orderbook_filter::MarketConfig, - FilterConfig, PostgresConfig, PostgresTlsConfig, StatusResponse, -}; -use mango_feeds_lib::{ - fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage}, - grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig, -}; use serde::Deserialize; type CheckpointMap = Arc>>; @@ -47,33 +49,12 @@ type PeerMap = Arc>>; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -#[derive(Clone, Debug, Deserialize)] -#[serde(tag = "command")] -pub enum Command { - #[serde(rename = "subscribe")] - Subscribe(SubscribeCommand), - #[serde(rename = "unsubscribe")] - Unsubscribe(UnsubscribeCommand), - #[serde(rename = "getMarkets")] - GetMarkets, -} - -#[derive(Clone, Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SubscribeCommand { - pub market_id: String, -} - -#[derive(Clone, Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct UnsubscribeCommand { - pub market_id: String, -} - #[derive(Clone, Debug)] pub struct Peer { pub sender: UnboundedSender, - pub subscriptions: HashSet, + pub market_subscriptions: HashSet, + pub account_subscriptions: HashSet, + pub head_updates: bool, } async fn handle_connection_error( @@ -123,7 +104,9 @@ async fn handle_connection( addr, Peer { sender: chan_tx, - subscriptions: HashSet::::new(), + market_subscriptions: HashSet::::new(), + account_subscriptions: HashSet::::new(), + head_updates: false, }, ); } @@ -168,57 +151,151 @@ fn handle_commands( let command: Result = serde_json::from_str(&msg_str); let mut peers = peer_map.lock().unwrap(); let peer = peers.get_mut(&addr).expect("peer should be in map"); + match command { Ok(Command::Subscribe(cmd)) => { - let market_id = cmd.clone().market_id; - match market_ids.get(&market_id) { - None => { - let res = StatusResponse { - success: false, - message: "market not found", + let mut wildcard = true; + // DEPRECATED + match cmd.market_id { + Some(market_id) => { + wildcard = false; + match market_ids.get(&market_id) { + None => { + let res = StatusResponse { + success: false, + message: "market not found", + }; + peer.sender + .unbounded_send(Message::Text(serde_json::to_string(&res).unwrap())) + .unwrap(); + return future::ok(()); + } + _ => {} + } + let subscribed = peer.market_subscriptions.insert(market_id.clone()); + + let res = if subscribed { + StatusResponse { + success: true, + message: "subscribed", + } + } else { + StatusResponse { + success: false, + message: "already subscribed", + } }; peer.sender .unbounded_send(Message::Text(serde_json::to_string(&res).unwrap())) .unwrap(); - return future::ok(()); + + if subscribed { + let checkpoint_map = checkpoint_map.lock().unwrap(); + let checkpoint = checkpoint_map.get(&market_id); + match checkpoint { + Some(checkpoint) => { + peer.sender + .unbounded_send(Message::Text( + serde_json::to_string(&checkpoint).unwrap(), + )) + .unwrap(); + } + None => info!( + "no checkpoint available on client subscription for market {}", + &market_id + ), + }; + } } - _ => {} + None => {} } - let subscribed = peer.subscriptions.insert(market_id.clone()); + match cmd.market_ids { + Some(cmd_market_ids) => { + wildcard = false; + for market_id in cmd_market_ids { + match market_ids.get(&market_id) { + None => { + let res = StatusResponse { + success: false, + message: &format!("market {} not found", &market_id), + }; + peer.sender + .unbounded_send(Message::Text( + serde_json::to_string(&res).unwrap(), + )) + .unwrap(); + return future::ok(()); + } + _ => {} + } + if peer.market_subscriptions.insert(market_id.clone()) { + let checkpoint_map = checkpoint_map.lock().unwrap(); + let checkpoint = checkpoint_map.get(&market_id); + let res = StatusResponse { + success: true, + message: &format!("subscribed to market {}", &market_id), + }; - let res = if subscribed { - StatusResponse { - success: true, - message: "subscribed", + peer.sender + .unbounded_send(Message::Text(serde_json::to_string(&res).unwrap())) + .unwrap(); + match checkpoint { + Some(checkpoint) => { + peer.sender + .unbounded_send(Message::Text( + serde_json::to_string(&checkpoint).unwrap(), + )) + .unwrap(); + } + None => info!( + "no checkpoint available on client subscription for market {}", + &market_id + ), + }; + } + } } - } else { - StatusResponse { - success: false, - message: "already subscribed", - } - }; - peer.sender - .unbounded_send(Message::Text(serde_json::to_string(&res).unwrap())) - .unwrap(); + None => {} + } + match cmd.account_ids { + Some(account_ids) => { + wildcard = false; + for account_id in account_ids { + if peer.account_subscriptions.insert(account_id.clone()) { + let res = StatusResponse { + success: true, + message: &format!("subscribed to account {}", &account_id), + }; + + peer.sender + .unbounded_send(Message::Text(serde_json::to_string(&res).unwrap())) + .unwrap(); + } + } + } + None => {} + } + if wildcard { + for (market_id, market_name) in market_ids { + if peer.market_subscriptions.insert(market_id.clone()) { + let res = StatusResponse { + success: true, + message: &format!("subscribed to market {}", &market_name), + }; - if subscribed { - let checkpoint_map = checkpoint_map.lock().unwrap(); - let checkpoint = checkpoint_map.get(&market_id); - match checkpoint { - Some(checkpoint) => { peer.sender - .unbounded_send(Message::Text( - serde_json::to_string(&checkpoint).unwrap(), - )) + .unbounded_send(Message::Text(serde_json::to_string(&res).unwrap())) .unwrap(); } - None => info!("no checkpoint available on client subscription"), - }; + } + } + if let Some(head_updates) = cmd.head_updates { + peer.head_updates = head_updates; } } Ok(Command::Unsubscribe(cmd)) => { info!("unsubscribe {}", cmd.market_id); - let unsubscribed = peer.subscriptions.remove(&cmd.market_id); + let unsubscribed = peer.market_subscriptions.remove(&cmd.market_id); let res = if unsubscribed { StatusResponse { success: true, @@ -259,6 +336,7 @@ fn handle_commands( pub struct Config { pub source: SourceConfig, pub metrics: MetricsConfig, + pub postgres: Option, pub bind_ws_addr: String, pub rpc_http_url: String, pub mango_group: String, @@ -267,6 +345,7 @@ pub struct Config { #[tokio::main] async fn main() -> anyhow::Result<()> { let args: Vec = std::env::args().collect(); + let exit: Arc = Arc::new(AtomicBool::new(false)); if args.len() < 2 { eprintln!("Please enter a config file path argument."); @@ -370,7 +449,7 @@ async fn main() -> anyhow::Result<()> { .map(|(_, context)| (context.address, context.market.event_queue)) .collect(); - let a: Vec<(String, String)> = group_context + let _a: Vec<(String, String)> = group_context .serum3_markets .iter() .map(|(_, context)| { @@ -390,31 +469,21 @@ async fn main() -> anyhow::Result<()> { ) }) .collect(); - let market_pubkey_strings: HashMap = [a, b].concat().into_iter().collect(); + let market_pubkey_strings: HashMap = [b].concat().into_iter().collect(); - // TODO: read all this from config - let pgconf = PostgresConfig { - connection_string: "$PG_CONNECTION_STRING".to_owned(), - connection_count: 1, - max_batch_size: 1, - max_queue_size: 50_000, - retry_query_max_count: 10, - retry_query_sleep_secs: 2, - retry_connection_sleep_secs: 10, - fatal_connection_timeout_secs: 120, - allow_invalid_certs: true, - tls: Some(PostgresTlsConfig { - ca_cert_path: "$PG_CA_CERT".to_owned(), - client_key_path: "$PG_CLIENT_KEY".to_owned(), - }), + let postgres_update_sender = match config.postgres { + Some(postgres_config) => Some( + fill_event_postgres_target::init(&postgres_config, metrics_tx.clone(), exit.clone()) + .await?, + ), + None => None, }; - let postgres_update_sender = - fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?; let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init( perp_market_configs.clone(), spot_market_configs.clone(), metrics_tx.clone(), + exit.clone(), ) .await?; @@ -439,9 +508,11 @@ async fn main() -> anyhow::Result<()> { let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); for (addr, peer) in peer_copy.iter_mut() { let json = serde_json::to_string(&update.clone()).unwrap(); - + let is_subscribed = peer.market_subscriptions.contains(&update.market_key) + || peer.account_subscriptions.contains(&update.event.taker) + || peer.account_subscriptions.contains(&update.event.maker); // only send updates if the peer is subscribed - if peer.subscriptions.contains(&update.market_key) { + if is_subscribed { let result = peer.sender.send(Message::Text(json)).await; if result.is_err() { error!( @@ -451,16 +522,13 @@ async fn main() -> anyhow::Result<()> { } } } - // send taker fills to db + // send fills to db let update_c = update.clone(); - match update_c.event.event_type { - FillEventType::Perp => { - if !update_c.event.maker { - debug!("{:?}", update_c); - postgres_update_sender.send(update_c).await.unwrap(); - } + match (postgres_update_sender.clone(), update_c.event.event_type) { + (Some(sender), FillEventType::Perp) => { + sender.send(update_c).await.unwrap(); } - _ => warn!("failed to write spot event to db"), + _ => {} } } FillEventFilterMessage::Checkpoint(checkpoint) => { @@ -469,10 +537,32 @@ async fn main() -> anyhow::Result<()> { .unwrap() .insert(checkpoint.queue.clone(), checkpoint); } + FillEventFilterMessage::HeadUpdate(update) => { + debug!( + "ws update {} {:?} {} {} head", + update.market_name, update.status, update.head, update.prev_head + ); + let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); + for (addr, peer) in peer_copy.iter_mut() { + let json = serde_json::to_string(&update.clone()).unwrap(); + let is_subscribed = peer.market_subscriptions.contains(&update.market_key); + // only send updates if the peer is subscribed + if peer.head_updates && is_subscribed { + let result = peer.sender.send(Message::Text(json)).await; + if result.is_err() { + error!( + "ws update {} head could not reach {}", + update.market_name, addr + ); + } + } + } + } } } }); + // websocket listener info!("ws listen: {}", config.bind_ws_addr); let try_socket = TcpListener::bind(&config.bind_ws_addr).await; let listener = try_socket.expect("Failed to bind"); @@ -511,6 +601,17 @@ async fn main() -> anyhow::Result<()> { } }); } + + // handle sigint + { + let exit = exit.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("Received SIGINT, shutting down..."); + exit.store(true, Ordering::Relaxed); + }); + } + info!( "rpc connect: {}", config @@ -534,6 +635,7 @@ async fn main() -> anyhow::Result<()> { account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), + exit.clone(), ) .await; } else { diff --git a/service-mango-fills/template-config.toml b/service-mango-fills/template-config.toml deleted file mode 100644 index 6bda961..0000000 --- a/service-mango-fills/template-config.toml +++ /dev/null @@ -1,68 +0,0 @@ -bind_ws_addr = "0.0.0.0:8080" - -[metrics] -output_stdout = true -output_http = true - -[source] -dedup_queue_size = 50000 -rpc_ws_url = "" - -[[source.grpc_sources]] -name = "accountsdb-client" -connection_string = "$GEYSER_CONNECTION_STRING" -retry_connection_sleep_secs = 30 - -[source.grpc_sources.tls] -ca_cert_path = "$GEYSER_CA_CERT" -client_cert_path = "$GEYSER_CLIENT_CERT" -client_key_path = "$GEYSER_CLIENT_CERT" -domain_name = "$GEYSER_CERT_DOMAIN" - -[source.snapshot] -rpc_http_url = "$RPC_HTTP_URL" -program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68" - -[[markets]] -name = "BTC-PERP" -event_queue = "7t5Me8RieYKsFpfLEV8jnpqcqswNpyWD95ZqgUXuLV8Z" - -[[markets]] -name = "ETH-PERP" -event_queue = "9vDfKNPJkCvQv9bzR4JNTGciQC2RVHPVNMMHiVDgT1mw" - -[[markets]] -name = "SOL-PERP" -event_queue = "31cKs646dt1YkA3zPyxZ7rUAkxTBz279w4XEobFXcAKP" - -[[markets]] -name = "MNGO-PERP" -event_queue = "7orixrhZpjvofZGWZyyLFxSEt2tfFiost5kHEzd7jdet" - -[[markets]] -name = "SRM-PERP" -event_queue = "BXSPmdHWP6fMqsCsT6kG8UN9uugAJxdDkQWy87njUQnL" - -[[markets]] -name = "RAY-PERP" -event_queue = "Css2MQhEvXMTKjp9REVZR9ZyUAYAZAPrnDvRoPxrQkeN" - -[[markets]] -name = "FTT-PERP" -event_queue = "5pHAhyEphQRVvLqvYF7dziofR52yZWuq8DThQFJvJ7r5" - -[[markets]] -name = "ADA-PERP" -event_queue = "G6Dsw9KnP4G38hePtedTH6gDfDQmPJGJw8zipBJvKc12" - -[[markets]] -name = "BNB-PERP" -event_queue = "GmX4qXMpXvs1DuUXNB4eqL1rfF8LeYEjkKgpFeYsm55n" - -[[markets]] -name = "AVAX-PERP" -event_queue = "5Grgo9kLu692SUcJ6S7jtbi1WkdwiyRWgThAfN1PcvbL" - -[[markets]] -name = "GMT-PERP" -event_queue = "J2WYiw67VeGkPvmM3fi65H9KxDgCf79fNwspcD3ycubK" \ No newline at end of file diff --git a/service-mango-orderbook/Cargo.toml b/service-mango-orderbook/Cargo.toml index 79d73c3..1eb10f9 100644 --- a/service-mango-orderbook/Cargo.toml +++ b/service-mango-orderbook/Cargo.toml @@ -24,9 +24,12 @@ async-trait = "0.1" tokio = { version = "1", features = ["full"] } tokio-tungstenite = "0.17" bytemuck = "1.7.2" +itertools = "0.10.5" + +solana-sdk = "~1.14.9" mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } -serum_dex = { git = "https://github.com/openbook-dex/program", branch = "master" } +serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things", features = ["no-entrypoint"] } anchor-lang = "0.25.0" anchor-client = "0.25.0" diff --git a/service-mango-orderbook/README.md b/service-mango-orderbook/README.md index cb668a9..fe0f4d3 100644 --- a/service-mango-orderbook/README.md +++ b/service-mango-orderbook/README.md @@ -6,7 +6,7 @@ This module parses bookside accounts and exposes L2 data and updates on a websoc 1. Prepare the connector configuration file. - [Here is an example](service-mango-fills/example-config.toml). + [Here is an example](service-mango-orderbook/conf/example-config.toml). - `bind_ws_addr` is the listen port for the websocket clients - `rpc_ws_url` is unused and can stay empty. diff --git a/service-mango-fills/example-config.toml b/service-mango-orderbook/conf/example-config.toml similarity index 100% rename from service-mango-fills/example-config.toml rename to service-mango-orderbook/conf/example-config.toml diff --git a/service-mango-orderbook/conf/template-config.toml b/service-mango-orderbook/conf/template-config.toml new file mode 100644 index 0000000..4dc02f8 --- /dev/null +++ b/service-mango-orderbook/conf/template-config.toml @@ -0,0 +1,20 @@ +bind_ws_addr = "[::]:8080" +rpc_http_url = "$RPC_HTTP_URL" +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +[metrics] +output_stdout = true +output_http = true + +[source] +dedup_queue_size = 50000 +rpc_ws_url = "$RPC_WS_URL" + +[[source.grpc_sources]] +name = "accountsdb-client" +connection_string = "$GEYSER_CONNECTION_STRING" +retry_connection_sleep_secs = 30 + +[source.snapshot] +rpc_http_url = "$RPC_HTTP_URL" +program_id = "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX" diff --git a/service-mango-orderbook/src/lib.rs b/service-mango-orderbook/src/lib.rs new file mode 100644 index 0000000..f9070f2 --- /dev/null +++ b/service-mango-orderbook/src/lib.rs @@ -0,0 +1,59 @@ +use mango_feeds_lib::OrderbookSide; +use serde::{ser::SerializeStruct, Serialize, Serializer}; + +pub type OrderbookLevel = [f64; 2]; + +#[derive(Clone, Debug)] +pub struct OrderbookUpdate { + pub market: String, + pub side: OrderbookSide, + pub update: Vec, + pub slot: u64, + pub write_version: u64, +} + +impl Serialize for OrderbookUpdate { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("OrderbookUpdate", 5)?; + state.serialize_field("market", &self.market)?; + state.serialize_field("side", &self.side)?; + state.serialize_field("update", &self.update)?; + state.serialize_field("slot", &self.slot)?; + state.serialize_field("write_version", &self.write_version)?; + + state.end() + } +} + +#[derive(Clone, Debug)] +pub struct OrderbookCheckpoint { + pub market: String, + pub bids: Vec, + pub asks: Vec, + pub slot: u64, + pub write_version: u64, +} + +impl Serialize for OrderbookCheckpoint { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("OrderbookCheckpoint", 3)?; + state.serialize_field("market", &self.market)?; + state.serialize_field("bids", &self.bids)?; + state.serialize_field("asks", &self.asks)?; + state.serialize_field("slot", &self.slot)?; + state.serialize_field("write_version", &self.write_version)?; + + state.end() + } +} + +pub enum OrderbookFilterMessage { + Update(OrderbookUpdate), + Checkpoint(OrderbookCheckpoint), +} diff --git a/service-mango-orderbook/src/main.rs b/service-mango-orderbook/src/main.rs index b24b427..78c2d08 100644 --- a/service-mango-orderbook/src/main.rs +++ b/service-mango-orderbook/src/main.rs @@ -1,3 +1,5 @@ +mod orderbook_filter; + use anchor_client::{ solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}, Cluster, @@ -17,7 +19,7 @@ use std::{ net::SocketAddr, str::FromStr, sync::Arc, - sync::Mutex, + sync::{atomic::AtomicBool, Mutex}, time::Duration, }; use tokio::{ @@ -26,14 +28,17 @@ use tokio::{ }; use tokio_tungstenite::tungstenite::{protocol::Message, Error}; -use mango_feeds_lib::{grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig}; +use mango_feeds_lib::{ + grpc_plugin_source, metrics, websocket_source, MarketConfig, MetricsConfig, SourceConfig, +}; use mango_feeds_lib::{ metrics::{MetricType, MetricU64}, - orderbook_filter::{self, MarketConfig, OrderbookCheckpoint, OrderbookFilterMessage}, FilterConfig, StatusResponse, }; use serde::Deserialize; +use service_mango_orderbook::{OrderbookCheckpoint, OrderbookFilterMessage}; + type CheckpointMap = Arc>>; type PeerMap = Arc>>; @@ -246,6 +251,7 @@ fn handle_commands( #[tokio::main] async fn main() -> anyhow::Result<()> { let args: Vec = std::env::args().collect(); + let exit: Arc = Arc::new(AtomicBool::new(false)); if args.len() < 2 { eprintln!("Please enter a config file path argument"); @@ -443,6 +449,7 @@ async fn main() -> anyhow::Result<()> { account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), + exit.clone(), ) .await; } else { diff --git a/lib/src/orderbook_filter.rs b/service-mango-orderbook/src/orderbook_filter.rs similarity index 81% rename from lib/src/orderbook_filter.rs rename to service-mango-orderbook/src/orderbook_filter.rs index 00af473..9ad8e15 100644 --- a/lib/src/orderbook_filter.rs +++ b/service-mango-orderbook/src/orderbook_filter.rs @@ -1,154 +1,36 @@ -use crate::metrics::MetricU64; -use crate::{ +use anchor_lang::AccountDeserialize; +use itertools::Itertools; +use log::*; +use mango_feeds_lib::metrics::MetricU64; +use mango_feeds_lib::{ + base_lots_to_ui, base_lots_to_ui_perp, price_lots_to_ui, price_lots_to_ui_perp, MarketConfig, + OrderbookSide, +}; +use mango_feeds_lib::{ chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, metrics::{MetricType, Metrics}, AccountWrite, SlotUpdate, }; -use anchor_lang::AccountDeserialize; -use itertools::Itertools; -use log::*; use mango_v4::{ serum3_cpi::OrderBookStateHeader, state::{BookSide, OrderTreeType}, }; -use serde::{ser::SerializeStruct, Serialize, Serializer}; use serum_dex::critbit::Slab; +use service_mango_orderbook::{ + OrderbookCheckpoint, OrderbookFilterMessage, OrderbookLevel, OrderbookUpdate, +}; use solana_sdk::{ account::{ReadableAccount, WritableAccount}, clock::Epoch, pubkey::Pubkey, }; +use std::borrow::BorrowMut; use std::{ - borrow::BorrowMut, collections::{HashMap, HashSet}, mem::size_of, time::{SystemTime, UNIX_EPOCH}, }; -#[derive(Clone, Debug)] -pub enum OrderbookSide { - Bid = 0, - Ask = 1, -} - -impl Serialize for OrderbookSide { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match *self { - OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"), - OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"), - } - } -} - -pub type OrderbookLevel = [f64; 2]; - -#[derive(Clone, Debug)] -pub struct OrderbookUpdate { - pub market: String, - pub side: OrderbookSide, - pub update: Vec, - pub slot: u64, - pub write_version: u64, -} - -impl Serialize for OrderbookUpdate { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut state = serializer.serialize_struct("OrderbookUpdate", 5)?; - state.serialize_field("market", &self.market)?; - state.serialize_field("side", &self.side)?; - state.serialize_field("update", &self.update)?; - state.serialize_field("slot", &self.slot)?; - state.serialize_field("write_version", &self.write_version)?; - - state.end() - } -} - -#[derive(Clone, Debug)] -pub struct OrderbookCheckpoint { - pub market: String, - pub bids: Vec, - pub asks: Vec, - pub slot: u64, - pub write_version: u64, -} - -impl Serialize for OrderbookCheckpoint { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut state = serializer.serialize_struct("OrderbookCheckpoint", 3)?; - state.serialize_field("market", &self.market)?; - state.serialize_field("bids", &self.bids)?; - state.serialize_field("asks", &self.asks)?; - state.serialize_field("slot", &self.slot)?; - state.serialize_field("write_version", &self.write_version)?; - - state.end() - } -} - -pub enum OrderbookFilterMessage { - Update(OrderbookUpdate), - Checkpoint(OrderbookCheckpoint), -} - -#[derive(Clone, Debug)] -pub struct MarketConfig { - pub name: String, - pub bids: Pubkey, - pub asks: Pubkey, - pub event_queue: Pubkey, - pub base_decimals: u8, - pub quote_decimals: u8, - pub base_lot_size: i64, - pub quote_lot_size: i64, -} - -pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 { - (native * base_lot_size) as f64 / 10i64.pow(base_decimals.into()) as f64 -} - -pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 { - let decimals = base_decimals - quote_decimals; - native as f64 / (10i64.pow(decimals.into()) as f64) -} - -pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 { - let decimals = base_decimals - quote_decimals; - native as f64 / (10u64.pow(decimals.into())) as f64 -} - -pub fn spot_price_to_ui( - native: i64, - native_size: i64, - base_decimals: u8, - quote_decimals: u8, -) -> f64 { - // TODO: account for fees - ((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size)) - as f64 -} - -pub fn price_lots_to_ui_perp( - native: i64, - base_decimals: u8, - quote_decimals: u8, - base_lot_size: i64, - quote_lot_size: i64, -) -> f64 { - let decimals = base_decimals - quote_decimals; - let multiplier = 10u64.pow(decimals.into()) as f64; - native as f64 * ((multiplier * quote_lot_size as f64) / base_lot_size as f64) -} - fn publish_changes( slot: u64, write_version: u64, @@ -378,7 +260,7 @@ pub async fn init( .map(|(_, quantity)| quantity) .fold(0, |acc, x| acc + x), mkt.1.base_decimals, - mkt.1.quote_decimals, + mkt.1.base_lot_size, ), ] }) diff --git a/service-mango-pnl/example-config.toml b/service-mango-pnl/conf/example-config.toml similarity index 100% rename from service-mango-pnl/example-config.toml rename to service-mango-pnl/conf/example-config.toml diff --git a/service-mango-pnl/template-config.toml b/service-mango-pnl/conf/template-config.toml similarity index 100% rename from service-mango-pnl/template-config.toml rename to service-mango-pnl/conf/template-config.toml diff --git a/service-mango-pnl/src/main.rs b/service-mango-pnl/src/main.rs index aa1c3bc..e841e52 100644 --- a/service-mango-pnl/src/main.rs +++ b/service-mango-pnl/src/main.rs @@ -9,7 +9,7 @@ use { fs::File, io::Read, mem::size_of, - sync::{Arc, RwLock}, + sync::{atomic::AtomicBool, Arc, RwLock}, time::Duration, }, }; @@ -222,6 +222,8 @@ fn start_jsonrpc_server( #[tokio::main] async fn main() -> anyhow::Result<()> { + let exit: Arc = Arc::new(AtomicBool::new(false)); + let args: Vec = std::env::args().collect(); if args.len() < 2 { println!("requires a config file argument"); @@ -304,6 +306,7 @@ async fn main() -> anyhow::Result<()> { account_write_queue_sender, slot_queue_sender, metrics_tx.clone(), + exit.clone(), ) .await;