diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 2af48b7..6d5c9bb 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -1,9 +1,11 @@ use crate::{ chain_data::{AccountData, ChainData, SlotData}, metrics::{MetricType, Metrics}, - AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide, + orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide}, + AccountWrite, SlotUpdate, }; -use bytemuck::{Pod, Zeroable, cast_slice}; +use bytemuck::{cast_slice, Pod, Zeroable}; +use chrono::{Utc, TimeZone}; use log::*; use serde::{ser::SerializeStruct, Serialize, Serializer}; use serum_dex::state::EventView as SpotEvent; @@ -15,7 +17,9 @@ use solana_sdk::{ use std::{ borrow::BorrowMut, cmp::max, - collections::{HashMap, HashSet}, convert::identity, time::SystemTime, + collections::{HashMap, HashSet}, + convert::identity, + time::SystemTime, }; use crate::metrics::MetricU64; @@ -25,18 +29,46 @@ use mango_v4::state::{ MAX_NUM_EVENTS, }; -#[derive(Clone, Copy, Debug, Serialize)] +#[derive(Clone, Copy, Debug)] pub enum FillUpdateStatus { New, Revoke, } -#[derive(Clone, Copy, Debug, Serialize)] +impl Serialize for FillUpdateStatus { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match *self { + FillUpdateStatus::New => { + serializer.serialize_unit_variant("FillUpdateStatus", 0, "new") + } + FillUpdateStatus::Revoke => { + serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke") + } + } + } +} + +#[derive(Clone, Copy, Debug)] pub enum FillEventType { Spot, Perp, } +impl Serialize for FillEventType { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match *self { + FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"), + FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"), + } + } +} + #[derive(Clone, Debug)] pub struct FillEvent { pub event_type: FillEventType, @@ -48,8 +80,8 @@ pub struct FillEvent { pub order_id: u128, pub client_order_id: u64, pub fee: f32, - pub price: i64, - pub quantity: i64, + pub price: f64, + pub quantity: f64, } impl Serialize for FillEvent { @@ -61,7 +93,7 @@ impl Serialize for FillEvent { state.serialize_field("eventType", &self.event_type)?; state.serialize_field("maker", &self.maker)?; state.serialize_field("side", &self.side)?; - state.serialize_field("timestamp", &self.timestamp)?; + state.serialize_field("timestamp", &Utc.timestamp_opt(self.timestamp as i64, 0).unwrap().to_rfc3339())?; state.serialize_field("seqNum", &self.seq_num)?; state.serialize_field("owner", &self.owner)?; state.serialize_field("orderId", &self.order_id)?; @@ -74,7 +106,7 @@ impl Serialize for FillEvent { } impl FillEvent { - pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] { + pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> [Self; 2] { let taker_side = match event.taker_side() { Side::Ask => OrderbookSide::Ask, Side::Bid => OrderbookSide::Bid, @@ -83,37 +115,62 @@ impl FillEvent { Side::Ask => OrderbookSide::Bid, Side::Bid => OrderbookSide::Ask, }; - [FillEvent { - event_type: FillEventType::Perp, - maker: true, - side: maker_side, - timestamp: event.timestamp, - seq_num: event.seq_num, - owner: event.maker.to_string(), - order_id: event.maker_order_id, - client_order_id: 0u64, - fee: event.maker_fee.to_num(), - price: event.price, - quantity: event.quantity, - }, - FillEvent { - event_type: FillEventType::Perp, - maker: false, - side: taker_side, - timestamp: event.timestamp, - seq_num: event.seq_num, - owner: event.taker.to_string(), - order_id: event.taker_order_id, - client_order_id: event.taker_client_order_id, - fee: event.taker_fee.to_num(), - price: event.price, - quantity: event.quantity, - - }] + let price = price_lots_to_ui_perp( + event.price, + config.base_decimals, + config.quote_decimals, + config.base_lot_size, + config.quote_lot_size, + ); + let quantity = + base_lots_to_ui_perp(event.quantity, config.base_decimals, config.quote_decimals); + [ + FillEvent { + event_type: FillEventType::Perp, + maker: true, + side: maker_side, + timestamp: event.timestamp, + seq_num: event.seq_num, + owner: event.maker.to_string(), + order_id: event.maker_order_id, + client_order_id: 0u64, + fee: event.maker_fee.to_num(), + price: price, + quantity: quantity, + }, + FillEvent { + event_type: FillEventType::Perp, + maker: false, + side: taker_side, + timestamp: event.timestamp, + seq_num: event.seq_num, + owner: event.taker.to_string(), + order_id: event.taker_order_id, + client_order_id: event.taker_client_order_id, + fee: event.taker_fee.to_num(), + price: price, + quantity: quantity, + }, + ] } - pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self { + pub fn new_from_spot( + event: SpotEvent, + timestamp: u64, + seq_num: u64, + config: &MarketConfig, + ) -> Self { match event { - SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => { + SpotEvent::Fill { + side, + maker, + native_qty_paid, + native_qty_received, + native_fee_or_rebate, + order_id, + owner, + client_order_id, + .. + } => { let side = match side as u8 { 0 => OrderbookSide::Bid, 1 => OrderbookSide::Ask, @@ -123,8 +180,42 @@ impl FillEvent { Some(id) => id.into(), None => 0u64, }; - // TODO: native to ui - let price = (native_qty_paid / native_qty_received) as i64; + + let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64; + let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64; + + let (price, quantity) = match side { + OrderbookSide::Bid => { + let price_before_fees = if maker { + native_qty_paid + native_fee_or_rebate + } else { + native_qty_paid - native_fee_or_rebate + }; + + let top = price_before_fees * base_multiplier; + let bottom = quote_multiplier * native_qty_received; + let price = top as f64 / bottom as f64; + let quantity = native_qty_received as f64 / base_multiplier as f64; + (price, quantity) + } + OrderbookSide::Ask => { + let price_before_fees = if maker { + native_qty_received - native_fee_or_rebate + } else { + native_qty_received + native_fee_or_rebate + }; + + let top = price_before_fees * base_multiplier; + let bottom = quote_multiplier * native_qty_paid; + let price = top as f64 / bottom as f64; + let quantity = native_qty_paid as f64 / base_multiplier as f64; + (price, quantity) + } + }; + + let fee = + native_fee_or_rebate as f32 / quote_multiplier as f32; + FillEvent { event_type: FillEventType::Spot, maker: maker, @@ -134,12 +225,14 @@ impl FillEvent { owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(), order_id: order_id, client_order_id: client_order_id, - fee: native_fee_or_rebate as f32, + fee, price, - quantity: native_qty_received as i64, + quantity, } } - SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")} + SpotEvent::Out { .. } => { + panic!("Can't build FillEvent from SpotEvent::Out") + } } } } @@ -148,8 +241,8 @@ impl FillEvent { pub struct FillUpdate { pub event: FillEvent, pub status: FillUpdateStatus, - pub market: String, - pub queue: String, + pub market_key: String, + pub market_name: String, pub slot: u64, pub write_version: u64, } @@ -170,13 +263,13 @@ impl Serialize for FillUpdate { where S: Serializer, { - let mut state = serializer.serialize_struct("FillUpdate", 4)?; + let mut state = serializer.serialize_struct("FillUpdate", 6)?; state.serialize_field("event", &self.event)?; - state.serialize_field("market", &self.market)?; - state.serialize_field("queue", &self.queue)?; + state.serialize_field("marketKey", &self.market_key)?; + state.serialize_field("marketName", &self.market_name)?; state.serialize_field("status", &self.status)?; state.serialize_field("slot", &self.slot)?; - state.serialize_field("write_version", &self.write_version)?; + state.serialize_field("writeVersion", &self.write_version)?; state.end() } @@ -218,7 +311,7 @@ type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize]; fn publish_changes_perp( slot: u64, write_version: u64, - mkt: &(Pubkey, Pubkey), + mkt: &(Pubkey, MarketConfig), header: &EventQueueHeader, events: &EventQueueEvents, old_seq_num: u64, @@ -234,7 +327,7 @@ fn publish_changes_perp( .unwrap_or(0); let mut checkpoint = Vec::new(); let mkt_pk_string = mkt.0.to_string(); - let evq_pk_string = mkt.1.to_string(); + let evq_pk_string = mkt.1.event_queue.to_string(); for seq_num in start_seq_num..header.seq_num { let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; @@ -245,8 +338,8 @@ fn publish_changes_perp( // the order of these checks is important so they are exhaustive if seq_num >= old_seq_num { debug!( - "found new event {} idx {} type {}", - mkt_pk_string, idx, events[idx].event_type as u32 + "found new event {} idx {} type {} slot {} write_version {}", + mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version ); metric_events_new.increment(); @@ -254,7 +347,7 @@ fn publish_changes_perp( // new fills are published and recorded in checkpoint if events[idx].event_type == EventType::Fill as u8 { let fill: PerpFillEvent = bytemuck::cast(events[idx]); - let fills = FillEvent::new_from_perp(fill); + let fills = FillEvent::new_from_perp(fill, &mkt.1); // send event for both maker and taker for fill in fills { fill_update_sender @@ -263,8 +356,8 @@ fn publish_changes_perp( write_version, event: fill.clone(), status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error checkpoint.push(fill); @@ -283,7 +376,7 @@ fn publish_changes_perp( // first revoke old event if a fill if old_events[idx].event_type == EventType::Fill as u8 { let fill: PerpFillEvent = bytemuck::cast(old_events[idx]); - let fills = FillEvent::new_from_perp(fill); + let fills = FillEvent::new_from_perp(fill, &mkt.1); for fill in fills { fill_update_sender .try_send(FillEventFilterMessage::Update(FillUpdate { @@ -291,8 +384,8 @@ fn publish_changes_perp( write_version, event: fill, status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error } @@ -301,7 +394,7 @@ fn publish_changes_perp( // then publish new if its a fill and record in checkpoint if events[idx].event_type == EventType::Fill as u8 { let fill: PerpFillEvent = bytemuck::cast(events[idx]); - let fills = FillEvent::new_from_perp(fill); + let fills = FillEvent::new_from_perp(fill, &mkt.1); for fill in fills { fill_update_sender .try_send(FillEventFilterMessage::Update(FillUpdate { @@ -309,8 +402,8 @@ fn publish_changes_perp( write_version, event: fill.clone(), status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error checkpoint.push(fill); @@ -320,7 +413,7 @@ fn publish_changes_perp( // every already published event is recorded in checkpoint if a fill if events[idx].event_type == EventType::Fill as u8 { let fill: PerpFillEvent = bytemuck::cast(events[idx]); - let fills = FillEvent::new_from_perp(fill); + let fills = FillEvent::new_from_perp(fill, &mkt.1); for fill in fills { checkpoint.push(fill); } @@ -332,15 +425,15 @@ fn publish_changes_perp( for seq_num in header.seq_num..old_seq_num { let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; debug!( - "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", - mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num + "found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}", + mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num, slot, write_version ); metric_events_drop.increment(); if old_events[idx].event_type == EventType::Fill as u8 { let fill: PerpFillEvent = bytemuck::cast(old_events[idx]); - let fills = FillEvent::new_from_perp(fill); + let fills = FillEvent::new_from_perp(fill, &mkt.1); for fill in fills { fill_update_sender .try_send(FillEventFilterMessage::Update(FillUpdate { @@ -348,8 +441,8 @@ fn publish_changes_perp( event: fill, write_version, status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error } @@ -370,7 +463,7 @@ fn publish_changes_perp( fn publish_changes_serum( slot: u64, write_version: u64, - mkt: &(Pubkey, Pubkey), + mkt: &(Pubkey, MarketConfig), header: &SerumEventQueueHeader, events: &[serum_dex::state::Event], old_seq_num: u64, @@ -386,12 +479,15 @@ fn publish_changes_serum( .unwrap_or(0); let mut checkpoint = Vec::new(); let mkt_pk_string = mkt.0.to_string(); - let evq_pk_string = mkt.1.to_string(); + let evq_pk_string = mkt.1.event_queue.to_string(); let header_seq_num = header.seq_num; debug!("start seq {} header seq {}", start_seq_num, header_seq_num); // Timestamp for spot events is time scraped - let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); for seq_num in start_seq_num..header_seq_num { let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; let event_view = events[idx].as_view().unwrap(); @@ -404,7 +500,7 @@ fn publish_changes_serum( // 2) the event is not matching the old event queue // 3) all other events are matching the old event queue // the order of these checks is important so they are exhaustive - let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num); + let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1); if seq_num >= old_seq_num { debug!("found new serum fill {} idx {}", mkt_pk_string, idx,); @@ -415,8 +511,8 @@ fn publish_changes_serum( write_version, event: fill.clone(), status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error checkpoint.push(fill); @@ -430,11 +526,15 @@ fn publish_changes_serum( "found changed id event {} idx {} seq_num {} header seq num {} old seq num {}", mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num ); - + metric_events_change.increment(); - - - let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); + + let old_fill = FillEvent::new_from_spot( + old_event_view, + timestamp, + seq_num, + &mkt.1, + ); // first revoke old event fill_update_sender .try_send(FillEventFilterMessage::Update(FillUpdate { @@ -442,11 +542,11 @@ fn publish_changes_serum( write_version, event: old_fill, status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error - + // then publish new fill_update_sender .try_send(FillEventFilterMessage::Update(FillUpdate { @@ -454,8 +554,8 @@ fn publish_changes_serum( write_version, event: fill.clone(), status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error } @@ -478,8 +578,8 @@ fn publish_changes_serum( write_version, event: fill.clone(), status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error checkpoint.push(fill); @@ -503,15 +603,15 @@ fn publish_changes_serum( match old_event_view { SpotEvent::Fill { .. } => { - let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); + let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1); fill_update_sender .try_send(FillEventFilterMessage::Update(FillUpdate { slot, event: old_fill, write_version, status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), + market_key: mkt_pk_string.clone(), + market_name: mkt.1.name.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error } @@ -520,21 +620,19 @@ fn publish_changes_serum( } fill_update_sender - .try_send(FillEventFilterMessage::Checkpoint( - FillCheckpoint { - slot, - write_version, - events: checkpoint, - market: mkt_pk_string, - queue: evq_pk_string, - }, - )) + .try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint { + slot, + write_version, + events: checkpoint, + market: mkt_pk_string, + queue: evq_pk_string, + })) .unwrap() } pub async fn init( - perp_queue_pks: Vec<(Pubkey, Pubkey)>, - serum_queue_pks: Vec<(Pubkey, Pubkey)>, + perp_market_configs: Vec<(Pubkey, MarketConfig)>, + spot_market_configs: Vec<(Pubkey, MarketConfig)>, metrics_sender: Metrics, ) -> anyhow::Result<( async_channel::Sender, @@ -549,8 +647,12 @@ pub async fn init( metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter); let mut metric_events_change = metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter); + let mut metric_events_change_serum = + metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter); let mut metrics_events_drop = metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter); + let mut metrics_events_drop_serum = + metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter); // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (account_write_queue_sender, account_write_queue_receiver) = @@ -572,18 +674,24 @@ pub async fn init( let mut seq_num_cache = HashMap::new(); let mut last_evq_versions = HashMap::::new(); - let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); - let relevant_pubkeys = all_queue_pks + let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat(); + let perp_queue_pks: Vec = perp_market_configs .iter() - .map(|m| m.1) - .collect::>(); + .map(|x| x.1.event_queue) + .collect(); + let spot_queue_pks: Vec = spot_market_configs + .iter() + .map(|x| x.1.event_queue) + .collect(); + let all_queue_pks: HashSet = + HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat()); // update handling thread, reads both sloths and account updates tokio::spawn(async move { loop { tokio::select! { Ok(account_write) = account_write_queue_receiver_c.recv() => { - if !relevant_pubkeys.contains(&account_write.pubkey) { + if !all_queue_pks.contains(&account_write.pubkey) { continue; } @@ -611,21 +719,38 @@ pub async fn init( }); } + Err(e) = slot_queue_receiver.recv() => { + warn!("slot update channel err {:?}", e); + } + Err(e) = account_write_queue_receiver_c.recv() => { + warn!("write update channel err {:?}", e); + } } - for mkt in all_queue_pks.iter() { - let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0)); - let mkt_pk = mkt.1; + for mkt in all_market_configs.iter() { + let evq_pk = mkt.1.event_queue; + let evq_pk_string = evq_pk.to_string(); + let last_evq_version = last_evq_versions + .get(&mkt.1.event_queue.to_string()) + .unwrap_or(&(0, 0)); - match chain_cache.account(&mkt_pk) { + match chain_cache.account(&evq_pk) { Ok(account_info) => { // only process if the account state changed let evq_version = (account_info.slot, account_info.write_version); - let evq_pk_string = mkt.1.to_string(); trace!("evq {} write_version {:?}", evq_pk_string, evq_version); if evq_version == *last_evq_version { continue; } + if evq_version.0 < last_evq_version.0 { + debug!("evq version slot was old"); + continue; + } + if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1 + { + info!("evq version slot was same and write version was old"); + continue; + } last_evq_versions.insert(evq_pk_string.clone(), evq_version); let account = &account_info.account; @@ -633,17 +758,16 @@ pub async fn init( if is_perp { let event_queue = EventQueue::try_deserialize(account.data().borrow_mut()).unwrap(); - trace!( - "evq {} seq_num {}", - evq_pk_string, - event_queue.header.seq_num + info!( + "evq {} seq_num {} version {:?}", + evq_pk_string, event_queue.header.seq_num, evq_version, ); match seq_num_cache.get(&evq_pk_string) { Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) { Some(old_events) => publish_changes_perp( account_info.slot, account_info.write_version, - mkt, + &mkt, &event_queue.header, &event_queue.buf, *old_seq_num, @@ -690,14 +814,17 @@ pub async fn init( old_events, &fill_update_sender, &mut metric_events_new_serum, - &mut metric_events_change, - &mut metrics_events_drop, + &mut metric_events_change_serum, + &mut metrics_events_drop_serum, ), _ => { - info!("serum_events_cache could not find {}", evq_pk_string) + debug!( + "serum_events_cache could not find {}", + evq_pk_string + ) } }, - _ => info!("seq_num_cache could not find {}", evq_pk_string), + _ => debug!("seq_num_cache could not find {}", evq_pk_string), } seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone()); @@ -705,7 +832,7 @@ pub async fn init( .insert(evq_pk_string.clone(), events.clone().to_vec()); } } - Err(_) => info!("chain_cache could not find {}", mkt.1), + Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue), } } } diff --git a/lib/src/fill_event_postgres_target.rs b/lib/src/fill_event_postgres_target.rs new file mode 100644 index 0000000..3af9163 --- /dev/null +++ b/lib/src/fill_event_postgres_target.rs @@ -0,0 +1,237 @@ +use chrono::{TimeZone, Utc}; +use log::*; +use native_tls::{Certificate, Identity, TlsConnector}; +use postgres_native_tls::MakeTlsConnector; +use postgres_query::Caching; +use std::{env, fs, time::Duration}; +use tokio_postgres::Client; + +use crate::{fill_event_filter::FillUpdate, metrics::*, PostgresConfig}; + +async fn postgres_connection( + config: &PostgresConfig, + metric_retries: MetricU64, + metric_live: MetricU64, +) -> anyhow::Result>> { + let (tx, rx) = async_channel::unbounded(); + + // openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks + // base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64 + // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills + // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills + info!("making tls config"); + let tls = match &config.tls { + Some(tls) => { + use base64::{engine::general_purpose, Engine as _}; + let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() { + '$' => general_purpose::STANDARD + .decode( + env::var(&tls.ca_cert_path[1..]) + .expect("reading client cert from env") + .into_bytes(), + ) + .expect("decoding client cert"), + _ => fs::read(&tls.client_key_path).expect("reading client key from file"), + }; + let client_key = match &tls.client_key_path.chars().next().unwrap() { + '$' => general_purpose::STANDARD + .decode( + env::var(&tls.client_key_path[1..]) + .expect("reading client key from env") + .into_bytes(), + ) + .expect("decoding client key"), + _ => fs::read(&tls.client_key_path).expect("reading client key from file"), + }; + MakeTlsConnector::new( + TlsConnector::builder() + .add_root_certificate(Certificate::from_pem(&ca_cert)?) + .identity(Identity::from_pkcs12(&client_key, "pass")?) + .danger_accept_invalid_certs(config.allow_invalid_certs) + .build()?, + ) + } + None => MakeTlsConnector::new( + TlsConnector::builder() + .danger_accept_invalid_certs(config.allow_invalid_certs) + .build()?, + ), + }; + + let config = config.clone(); + let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?); + let mut metric_retries = metric_retries; + let mut metric_live = metric_live; + tokio::spawn(async move { + loop { + let (client, connection) = match initial.take() { + Some(v) => v, + None => { + let result = + tokio_postgres::connect(&config.connection_string, tls.clone()).await; + match result { + Ok(v) => v, + Err(err) => { + warn!("could not connect to postgres: {:?}", err); + tokio::time::sleep(Duration::from_secs( + config.retry_connection_sleep_secs, + )) + .await; + continue; + } + } + } + }; + + tx.send(Some(client)).await.expect("send success"); + metric_live.increment(); + + let result = connection.await; + + metric_retries.increment(); + metric_live.decrement(); + + tx.send(None).await.expect("send success"); + warn!("postgres connection error: {:?}", result); + tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await; + } + }); + + Ok(rx) +} + +async fn update_postgres_client<'a>( + client: &'a mut Option>, + rx: &async_channel::Receiver>, + config: &PostgresConfig, +) -> &'a postgres_query::Caching { + // get the most recent client, waiting if there's a disconnect + while !rx.is_empty() || client.is_none() { + tokio::select! { + client_raw_opt = rx.recv() => { + *client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new); + }, + _ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => { + error!("waited too long for new postgres client"); + std::process::exit(1); + }, + } + } + client.as_ref().expect("must contain value") +} + +async fn process_update(client: &Caching, update: &FillUpdate) -> anyhow::Result<()> { + let market = &update.market_key; + let seq_num = update.event.seq_num as i64; + let fill_timestamp = Utc.timestamp_opt(update.event.timestamp as i64, 0).unwrap(); + let price = update.event.price as f64; + let quantity = update.event.quantity as f64; + let slot = update.slot as i64; + let write_version = update.write_version as i64; + + let query = postgres_query::query!( + "INSERT INTO transactions_v4.perp_fills_feed_events + (market, seq_num, fill_timestamp, price, + quantity, slot, write_version) + VALUES + ($market, $seq_num, $fill_timestamp, $price, + $quantity, $slot, $write_version) + ON CONFLICT (market, seq_num) DO NOTHING", + market, + seq_num, + fill_timestamp, + price, + quantity, + slot, + write_version, + ); + let _ = query.execute(&client).await?; + + Ok(()) +} + +pub async fn init( + config: &PostgresConfig, + metrics_sender: Metrics, +) -> anyhow::Result> { + // The actual message may want to also contain a retry count, if it self-reinserts on failure? + let (fill_update_queue_sender, fill_update_queue_receiver) = + async_channel::bounded::(config.max_queue_size); + + let metric_con_retries = metrics_sender.register_u64( + "fills_postgres_connection_retries".into(), + MetricType::Counter, + ); + let metric_con_live = + metrics_sender.register_u64("fills_postgres_connections_alive".into(), MetricType::Gauge); + + // postgres fill update sending worker threads + for _ in 0..config.connection_count { + let postgres_account_writes = + postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) + .await?; + let fill_update_queue_receiver_c = fill_update_queue_receiver.clone(); + let config = config.clone(); + let mut metric_retries = + metrics_sender.register_u64("fills_postgres_retries".into(), MetricType::Counter); + + tokio::spawn(async move { + let mut client_opt = None; + loop { + // Retrieve up to batch_size updates + let mut batch = Vec::new(); + batch.push( + fill_update_queue_receiver_c + .recv() + .await + .expect("sender must stay alive"), + ); + while batch.len() < config.max_batch_size { + match fill_update_queue_receiver_c.try_recv() { + Ok(update) => batch.push(update), + Err(async_channel::TryRecvError::Empty) => break, + Err(async_channel::TryRecvError::Closed) => { + panic!("sender must stay alive") + } + }; + } + + info!( + "updates, batch {}, channel size {}", + batch.len(), + fill_update_queue_receiver_c.len(), + ); + + let mut error_count = 0; + loop { + let client = + update_postgres_client(&mut client_opt, &postgres_account_writes, &config) + .await; + let mut results = futures::future::join_all( + batch.iter().map(|update| process_update(client, update)), + ) + .await; + let mut iter = results.iter(); + batch.retain(|_| iter.next().unwrap().is_err()); + if batch.len() > 0 { + metric_retries.add(batch.len() as u64); + error_count += 1; + if error_count - 1 < config.retry_query_max_count { + results.retain(|r| r.is_err()); + warn!("failed to process fill update, retrying: {:?}", results); + tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs)) + .await; + continue; + } else { + error!("failed to process account write, exiting"); + std::process::exit(1); + } + }; + break; + } + } + }); + } + + Ok(fill_update_queue_sender) +} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index df30ca3..f978ac0 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,21 +1,20 @@ pub mod chain_data; pub mod fill_event_filter; -pub mod orderbook_filter; +pub mod fill_event_postgres_target; pub mod grpc_plugin_source; pub mod memory_target; pub mod metrics; +pub mod orderbook_filter; pub mod postgres_target; pub mod postgres_types_numeric; pub mod websocket_source; pub use chain_data::SlotStatus; -use serde::{Serialize, Serializer, ser::SerializeStruct}; +use serde::{ser::SerializeStruct, Serialize, Serializer}; use { - async_trait::async_trait, serde_derive::Deserialize, solana_sdk::{account::Account, pubkey::Pubkey}, - std::sync::Arc, }; trait AnyhowWrap { @@ -69,14 +68,12 @@ pub struct SlotUpdate { #[derive(Clone, Debug, Deserialize)] pub struct PostgresConfig { pub connection_string: String, - /// Number of parallel postgres connections used for account write insertions - pub account_write_connection_count: u64, - /// Maximum batch size for account write inserts over one connection - pub account_write_max_batch_size: usize, - /// Max size of account write queues - pub account_write_max_queue_size: usize, - /// Number of parallel postgres connections used for slot insertions - pub slot_update_connection_count: u64, + /// Number of parallel postgres connections used for insertions + pub connection_count: u64, + /// Maximum batch size for inserts over one connection + pub max_batch_size: usize, + /// Max size of queues + pub max_queue_size: usize, /// Number of queries retries before fatal error pub retry_query_max_count: u64, /// Seconds to sleep between query retries @@ -87,12 +84,15 @@ pub struct PostgresConfig { pub fatal_connection_timeout_secs: u64, /// Allow invalid TLS certificates, passed to native_tls danger_accept_invalid_certs pub allow_invalid_certs: bool, - /// Name key to use in the monitoring table - pub monitoring_name: String, - /// Time between updates to the monitoring table - pub monitoring_update_interval_secs: u64, - /// Time between cleanup jobs (0 to disable) - pub cleanup_interval_secs: u64, + pub tls: Option, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct PostgresTlsConfig { + /// CA Cert file or env var + pub ca_cert_path: String, + /// PKCS12 client cert path + pub client_key_path: String, } #[derive(Clone, Debug, Deserialize)] @@ -164,63 +164,3 @@ pub struct Config { pub source: SourceConfig, pub metrics: MetricsConfig, } - -#[async_trait] -pub trait AccountTable: Sync + Send { - fn table_name(&self) -> &str; - async fn insert_account_write( - &self, - client: &postgres_query::Caching, - account_write: &AccountWrite, - ) -> anyhow::Result<()>; -} - -pub type AccountTables = Vec>; - -pub struct RawAccountTable {} - -pub fn encode_address(addr: &Pubkey) -> String { - bs58::encode(&addr.to_bytes()).into_string() -} - -#[async_trait] -impl AccountTable for RawAccountTable { - fn table_name(&self) -> &str { - "account_write" - } - - async fn insert_account_write( - &self, - client: &postgres_query::Caching, - account_write: &AccountWrite, - ) -> anyhow::Result<()> { - let pubkey = encode_address(&account_write.pubkey); - let owner = encode_address(&account_write.owner); - let slot = account_write.slot as i64; - let write_version = account_write.write_version as i64; - let lamports = account_write.lamports as i64; - let rent_epoch = account_write.rent_epoch as i64; - - // TODO: should update for same write_version to work with websocket input - let query = postgres_query::query!( - "INSERT INTO account_write - (pubkey_id, slot, write_version, is_selected, - owner_id, lamports, executable, rent_epoch, data) - VALUES - (map_pubkey($pubkey), $slot, $write_version, $is_selected, - map_pubkey($owner), $lamports, $executable, $rent_epoch, $data) - ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING", - pubkey, - slot, - write_version, - is_selected = account_write.is_selected, - owner, - lamports, - executable = account_write.executable, - rent_epoch, - data = account_write.data, - ); - let _ = query.execute(client).await?; - Ok(()) - } -} diff --git a/lib/src/postgres_target.rs b/lib/src/postgres_target.rs deleted file mode 100644 index 3c98f7c..0000000 --- a/lib/src/postgres_target.rs +++ /dev/null @@ -1,637 +0,0 @@ -use anyhow::Context; -use log::*; -use native_tls::TlsConnector; -use postgres_native_tls::MakeTlsConnector; -use postgres_query::{query, query_dyn}; -use std::{collections::HashMap, convert::TryFrom, time::Duration}; - -use crate::{metrics::*, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate}; - -mod pg { - #[derive(Clone, Copy, Debug, PartialEq, postgres_types::ToSql)] - pub enum SlotStatus { - Rooted, - Confirmed, - Processed, - } - - impl From for SlotStatus { - fn from(status: super::SlotStatus) -> SlotStatus { - match status { - super::SlotStatus::Rooted => SlotStatus::Rooted, - super::SlotStatus::Confirmed => SlotStatus::Confirmed, - super::SlotStatus::Processed => SlotStatus::Processed, - } - } - } -} - -async fn postgres_connection( - config: &PostgresConfig, - metric_retries: MetricU64, - metric_live: MetricU64, -) -> anyhow::Result>> { - let (tx, rx) = async_channel::unbounded(); - - let tls = MakeTlsConnector::new( - TlsConnector::builder() - .danger_accept_invalid_certs(config.allow_invalid_certs) - .build()?, - ); - - let config = config.clone(); - let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?); - let mut metric_retries = metric_retries; - let mut metric_live = metric_live; - tokio::spawn(async move { - loop { - let (client, connection) = match initial.take() { - Some(v) => v, - None => { - let result = - tokio_postgres::connect(&config.connection_string, tls.clone()).await; - match result { - Ok(v) => v, - Err(err) => { - warn!("could not connect to postgres: {:?}", err); - tokio::time::sleep(Duration::from_secs( - config.retry_connection_sleep_secs, - )) - .await; - continue; - } - } - } - }; - - tx.send(Some(client)).await.expect("send success"); - metric_live.increment(); - - let result = connection.await; - - metric_retries.increment(); - metric_live.decrement(); - - tx.send(None).await.expect("send success"); - warn!("postgres connection error: {:?}", result); - tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await; - } - }); - - Ok(rx) -} - -async fn update_postgres_client<'a>( - client: &'a mut Option>, - rx: &async_channel::Receiver>, - config: &PostgresConfig, -) -> &'a postgres_query::Caching { - // get the most recent client, waiting if there's a disconnect - while !rx.is_empty() || client.is_none() { - tokio::select! { - client_raw_opt = rx.recv() => { - *client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new); - }, - _ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => { - error!("waited too long for new postgres client"); - std::process::exit(1); - }, - } - } - client.as_ref().expect("must contain value") -} - -async fn process_account_write( - client: &postgres_query::Caching, - write: &AccountWrite, - account_tables: &AccountTables, -) -> anyhow::Result<()> { - futures::future::try_join_all( - account_tables - .iter() - .map(|table| table.insert_account_write(client, write)), - ) - .await?; - Ok(()) -} - -struct Slots { - // non-rooted only - slots: HashMap, - newest_processed_slot: Option, - newest_rooted_slot: Option, -} - -#[derive(Default)] -struct SlotPreprocessing { - discard_duplicate: bool, - discard_old: bool, - new_processed_head: bool, - new_rooted_head: bool, - parent_update: bool, -} - -impl Slots { - fn new() -> Self { - Self { - slots: HashMap::new(), - newest_processed_slot: None, - newest_rooted_slot: None, - } - } - - fn add(&mut self, update: &SlotUpdate) -> SlotPreprocessing { - let mut result = SlotPreprocessing::default(); - - if let Some(previous) = self.slots.get_mut(&update.slot) { - if previous.status == update.status && previous.parent == update.parent { - result.discard_duplicate = true; - } - - previous.status = update.status; - if update.parent.is_some() && previous.parent != update.parent { - previous.parent = update.parent; - result.parent_update = true; - } - } else if self.newest_rooted_slot.is_none() - || update.slot > self.newest_rooted_slot.unwrap() - { - self.slots.insert(update.slot, update.clone()); - } else { - result.discard_old = true; - } - - if update.status == SlotStatus::Rooted { - let old_slots: Vec = self - .slots - .keys() - .filter(|s| **s <= update.slot) - .copied() - .collect(); - for old_slot in old_slots { - self.slots.remove(&old_slot); - } - if self.newest_rooted_slot.is_none() || self.newest_rooted_slot.unwrap() < update.slot { - self.newest_rooted_slot = Some(update.slot); - result.new_rooted_head = true; - } - } - - if self.newest_processed_slot.is_none() || self.newest_processed_slot.unwrap() < update.slot - { - self.newest_processed_slot = Some(update.slot); - result.new_processed_head = true; - } - - result - } -} - -fn make_cleanup_steps(tables: &Vec) -> HashMap { - let mut steps = HashMap::::new(); - - // Delete all account writes that came before the newest rooted slot except - // for the newest rooted write for each pubkey. - // This could be older rooted writes or writes in uncled slots that came - // before the newest rooted slot. - // - // Also delete _all_ writes from before the newest snapshot, because these may - // be for deleted accounts where the deletion event was missed. Snapshots - // provide a new state for all live accounts, but don't tell us about deleted - // accounts. - // - // The way this is done, by taking the newest snapshot that's at least - // min_snapshot_age behind the newest rooted slot is a workaround: we don't know - // how long it'll take to insert snapshot data, but assume it'll be done by that - // time. - let min_snapshot_age = 300; - steps.extend( - tables - .iter() - .map(|table_name| { - let sql = format!( - "WITH - newest_rooted AS ( - SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted'), - newest_snapshot AS ( - SELECT max(slot) AS newest_snapshot_slot FROM account_write, newest_rooted - WHERE write_version = 0 AND slot + {min_snapshot_age} < newest_rooted_slot) - DELETE FROM {table} AS data - USING - newest_rooted, - newest_snapshot, - (SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version - FROM {table} - LEFT JOIN slot USING(slot) - CROSS JOIN newest_rooted - WHERE slot <= newest_rooted_slot AND (status = 'Rooted' OR status is NULL) - ORDER BY pubkey_id, slot DESC, write_version DESC - ) newest_rooted_write - WHERE - data.pubkey_id = newest_rooted_write.pubkey_id AND ( - data.slot < newest_snapshot_slot OR ( - data.slot <= newest_rooted_slot - AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version) - ) - )", - table = table_name, - min_snapshot_age = min_snapshot_age, - ); - (format!("delete old writes in {}", table_name), sql) - }) - .collect::>(), - ); - - // Delete information about older slots - steps.insert( - "delete old slots".into(), - "DELETE FROM slot - USING (SELECT max(slot) as newest_rooted_slot FROM slot WHERE status = 'Rooted') s - WHERE slot + 1000 < newest_rooted_slot" - .into(), - ); - - steps -} - -#[derive(Clone)] -struct SlotsProcessing {} - -impl SlotsProcessing { - fn new() -> Self { - Self {} - } - - async fn process( - &self, - client: &postgres_query::Caching, - update: &SlotUpdate, - meta: &SlotPreprocessing, - ) -> anyhow::Result<()> { - let slot = update.slot as i64; - let status: pg::SlotStatus = update.status.into(); - if let Some(parent) = update.parent { - let parent = parent as i64; - let query = query!( - "INSERT INTO slot - (slot, parent, status, uncle) - VALUES - ($slot, $parent, $status, FALSE) - ON CONFLICT (slot) DO UPDATE SET - parent=$parent, status=$status", - slot, - parent, - status, - ); - let _ = query.execute(client).await.context("updating slot row")?; - } else { - let query = query!( - "INSERT INTO slot - (slot, parent, status, uncle) - VALUES - ($slot, NULL, $status, FALSE) - ON CONFLICT (slot) DO UPDATE SET - status=$status", - slot, - status, - ); - let _ = query.execute(client).await.context("updating slot row")?; - } - - if meta.new_rooted_head { - let slot = update.slot as i64; - // Mark preceeding non-uncle slots as rooted - let query = query!( - "UPDATE slot SET status = 'Rooted' - WHERE slot < $newest_final_slot - AND (NOT uncle) - AND status != 'Rooted'", - newest_final_slot = slot - ); - let _ = query - .execute(client) - .await - .context("updating preceding non-rooted slots")?; - } - - if meta.new_processed_head || meta.parent_update { - // update the uncle column for the chain of slots from the - // newest down the the first rooted slot - let query = query!( - "WITH RECURSIVE - liveslots AS ( - SELECT slot.*, 0 AS depth FROM slot - WHERE slot = (SELECT max(slot) FROM slot) - UNION ALL - SELECT s.*, depth + 1 FROM slot s - INNER JOIN liveslots l ON s.slot = l.parent - WHERE l.status != 'Rooted' AND depth < 1000 - ), - min_slot AS (SELECT min(slot) AS min_slot FROM liveslots) - UPDATE slot SET - uncle = NOT EXISTS (SELECT 1 FROM liveslots WHERE liveslots.slot = slot.slot) - FROM min_slot - WHERE slot >= min_slot;" - ); - let _ = query - .execute(client) - .await - .context("recomputing slot uncle status")?; - } - - trace!("slot update done {}", update.slot); - Ok(()) - } -} - -fn secs_since_epoch() -> u64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() -} -fn epoch_secs_to_time(secs: u64) -> std::time::SystemTime { - std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs) -} - -pub async fn init( - config: &PostgresConfig, - account_tables: AccountTables, - metrics_sender: Metrics, -) -> anyhow::Result<( - async_channel::Sender, - async_channel::Sender, -)> { - // The actual message may want to also contain a retry count, if it self-reinserts on failure? - let (account_write_queue_sender, account_write_queue_receiver) = - async_channel::bounded::(config.account_write_max_queue_size); - - // Slot updates flowing from the outside into the single processing thread. From - // there they'll flow into the postgres sending thread. - let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); - let (slot_inserter_sender, slot_inserter_receiver) = - async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>(); - - let metric_con_retries = - metrics_sender.register_u64("postgres_connection_retries".into(), MetricType::Counter); - let metric_con_live = - metrics_sender.register_u64("postgres_connections_alive".into(), MetricType::Gauge); - - // postgres account write sending worker threads - for _ in 0..config.account_write_connection_count { - let postgres_account_writes = - postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) - .await?; - let account_write_queue_receiver_c = account_write_queue_receiver.clone(); - let account_tables_c = account_tables.clone(); - let config = config.clone(); - let mut metric_retries = metrics_sender - .register_u64("postgres_account_write_retries".into(), MetricType::Counter); - let mut metric_last_write = metrics_sender.register_u64( - "postgres_account_write_last_write_timestamp".into(), - MetricType::Gauge, - ); - tokio::spawn(async move { - let mut client_opt = None; - loop { - // Retrieve up to batch_size account writes - let mut write_batch = Vec::new(); - write_batch.push( - account_write_queue_receiver_c - .recv() - .await - .expect("sender must stay alive"), - ); - while write_batch.len() < config.account_write_max_batch_size { - match account_write_queue_receiver_c.try_recv() { - Ok(write) => write_batch.push(write), - Err(async_channel::TryRecvError::Empty) => break, - Err(async_channel::TryRecvError::Closed) => { - panic!("sender must stay alive") - } - }; - } - - trace!( - "account write, batch {}, channel size {}", - write_batch.len(), - account_write_queue_receiver_c.len(), - ); - - let mut error_count = 0; - loop { - let client = - update_postgres_client(&mut client_opt, &postgres_account_writes, &config) - .await; - let mut results = futures::future::join_all( - write_batch - .iter() - .map(|write| process_account_write(client, &write, &account_tables_c)), - ) - .await; - let mut iter = results.iter(); - write_batch.retain(|_| iter.next().unwrap().is_err()); - if write_batch.len() > 0 { - metric_retries.add(write_batch.len() as u64); - error_count += 1; - if error_count - 1 < config.retry_query_max_count { - results.retain(|r| r.is_err()); - warn!("failed to process account write, retrying: {:?}", results); - tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs)) - .await; - continue; - } else { - error!("failed to process account write, exiting"); - std::process::exit(1); - } - }; - break; - } - metric_last_write.set_max(secs_since_epoch()); - } - }); - } - - // slot update handling thread - let mut metric_slot_queue = - metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge); - tokio::spawn(async move { - let mut slots = Slots::new(); - - loop { - let update = slot_queue_receiver - .recv() - .await - .expect("sender must stay alive"); - trace!( - "slot update {}, channel size {}", - update.slot, - slot_queue_receiver.len() - ); - - // Check if we already know about the slot, or it is outdated - let slot_preprocessing = slots.add(&update); - if slot_preprocessing.discard_duplicate || slot_preprocessing.discard_old { - continue; - } - - slot_inserter_sender - .send((update, slot_preprocessing)) - .await - .expect("sending must succeed"); - metric_slot_queue.set(slot_inserter_sender.len() as u64); - } - }); - - // postgres slot update worker threads - let slots_processing = SlotsProcessing::new(); - for _ in 0..config.slot_update_connection_count { - let postgres_slot = - postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) - .await?; - let receiver_c = slot_inserter_receiver.clone(); - let config = config.clone(); - let mut metric_retries = - metrics_sender.register_u64("postgres_slot_update_retries".into(), MetricType::Counter); - let mut metric_last_write = metrics_sender.register_u64( - "postgres_slot_last_write_timestamp".into(), - MetricType::Gauge, - ); - let slots_processing = slots_processing.clone(); - tokio::spawn(async move { - let mut client_opt = None; - loop { - let (update, preprocessing) = - receiver_c.recv().await.expect("sender must stay alive"); - trace!("slot insertion, slot {}", update.slot); - - let mut error_count = 0; - loop { - let client = - update_postgres_client(&mut client_opt, &postgres_slot, &config).await; - if let Err(err) = slots_processing - .process(client, &update, &preprocessing) - .await - { - metric_retries.increment(); - error_count += 1; - if error_count - 1 < config.retry_query_max_count { - warn!("failed to process slot update, retrying: {:?}", err); - tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs)) - .await; - continue; - } else { - error!("failed to process slot update, exiting"); - std::process::exit(1); - } - }; - break; - } - metric_last_write.set_max(secs_since_epoch()); - } - }); - } - - // postgres cleanup thread - if config.cleanup_interval_secs > 0 { - let table_names: Vec = account_tables - .iter() - .map(|table| table.table_name().to_string()) - .collect(); - let cleanup_steps = make_cleanup_steps(&table_names); - - let postgres_con = - postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) - .await?; - let mut metric_last_cleanup = metrics_sender.register_u64( - "postgres_cleanup_last_success_timestamp".into(), - MetricType::Gauge, - ); - let mut metric_cleanup_errors = - metrics_sender.register_u64("postgres_cleanup_errors".into(), MetricType::Counter); - let config = config.clone(); - tokio::spawn(async move { - let mut client_opt = None; - loop { - tokio::time::sleep(Duration::from_secs(config.cleanup_interval_secs)).await; - let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await; - - let mut all_successful = true; - for (name, cleanup_sql) in &cleanup_steps { - let query = query_dyn!(&cleanup_sql).unwrap(); - if let Err(err) = query.execute(client).await { - warn!("failed to process cleanup step {}: {:?}", name, err); - metric_cleanup_errors.increment(); - all_successful = false; - } - } - if all_successful { - metric_last_cleanup.set_max(secs_since_epoch()); - } - } - }); - } - - // postgres metrics/monitoring thread - { - let postgres_con = - postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) - .await?; - let metric_slot_last_write = metrics_sender.register_u64( - "postgres_slot_last_write_timestamp".into(), - MetricType::Gauge, - ); - let metric_account_write_last_write = metrics_sender.register_u64( - "postgres_account_write_last_write_timestamp".into(), - MetricType::Gauge, - ); - let metric_account_queue = - metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge); - let metric_slot_queue = - metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge); - let config = config.clone(); - tokio::spawn(async move { - let mut client_opt = None; - loop { - tokio::time::sleep(Duration::from_secs(config.monitoring_update_interval_secs)) - .await; - - let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await; - let last_update = std::time::SystemTime::now(); - let last_slot_write = epoch_secs_to_time(metric_slot_last_write.value()); - let last_account_write_write = - epoch_secs_to_time(metric_account_write_last_write.value()); - let slot_queue = i64::try_from(metric_slot_queue.value()).unwrap(); - let account_write_queue = i64::try_from(metric_account_queue.value()).unwrap(); - let query = query!( - "INSERT INTO monitoring - (name, last_update, last_slot_write, last_account_write_write, slot_queue, account_write_queue) - VALUES - ($name, $last_update, $last_slot_write, $last_account_write_write, $slot_queue, $account_write_queue) - ON CONFLICT (name) DO UPDATE SET - last_update=$last_update, - last_slot_write=$last_slot_write, - last_account_write_write=$last_account_write_write, - slot_queue=$slot_queue, - account_write_queue=$account_write_queue - ", - name = config.monitoring_name, - last_update, - last_slot_write, - last_account_write_write, - slot_queue, - account_write_queue, - ); - if let Err(err) = query - .execute(client) - .await - .context("updating monitoring table") - { - warn!("failed to process monitoring update: {:?}", err); - }; - } - }); - } - - Ok((account_write_queue_sender, slot_queue_sender)) -} diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index 158c618..049b566 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -1,9 +1,8 @@ use anchor_client::{ - solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair}, + solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}, Cluster, }; use anchor_lang::prelude::Pubkey; -use bytemuck::cast_slice; use client::{Client, MangoGroupContext}; use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_util::{ @@ -13,7 +12,6 @@ use futures_util::{ use log::*; use std::{ collections::{HashMap, HashSet}, - convert::identity, fs::File, io::Read, net::SocketAddr, @@ -31,7 +29,7 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use serde::Deserialize; use solana_geyser_connector_lib::{ metrics::{MetricType, MetricU64}, - FilterConfig, StatusResponse, + FilterConfig, StatusResponse, fill_event_postgres_target, PostgresConfig, fill_event_filter::FillEventType, orderbook_filter::MarketConfig, PostgresTlsConfig, }; use solana_geyser_connector_lib::{ fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage}, @@ -308,44 +306,67 @@ async fn main() -> anyhow::Result<()> { .await?, ); + // todo: reload markets at intervals + let perp_market_configs: Vec<(Pubkey, MarketConfig)> = group_context + .perp_markets + .iter() + .map(|(_, context)| { + let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index) + { + Some(token) => token.decimals, + None => panic!("token not found for market"), // todo: default to 6 for usdc? + }; + ( + context.address, + MarketConfig { + name: context.market.name().to_owned(), + bids: context.market.bids, + asks: context.market.asks, + event_queue: context.market.event_queue, + base_decimals: context.market.base_decimals, + quote_decimals, + base_lot_size: context.market.base_lot_size, + quote_lot_size: context.market.quote_lot_size, + }, + ) + }) + .collect(); + + let spot_market_configs: Vec<(Pubkey, MarketConfig)> = group_context + .serum3_markets + .iter() + .map(|(_, context)| { + let base_decimals = match group_context.tokens.get(&context.market.base_token_index) { + Some(token) => token.decimals, + None => panic!("token not found for market"), // todo: default? + }; + let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) { + Some(token) => token.decimals, + None => panic!("token not found for market"), // todo: default to 6 for usdc? + }; + ( + context.market.serum_market_external, + MarketConfig { + name: context.market.name().to_owned(), + bids: context.bids, + asks: context.asks, + event_queue: context.event_q, + base_decimals, + quote_decimals, + base_lot_size: context.pc_lot_size as i64, + quote_lot_size: context.coin_lot_size as i64, + }, + ) + }) + .collect(); + let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context .perp_markets .iter() .map(|(_, context)| (context.address, context.market.event_queue)) .collect(); - let serum_market_pks: Vec = group_context - .serum3_markets - .iter() - .map(|(_, context)| context.market.serum_market_external) - .collect(); - - let serum_market_ais = client - .rpc_async() - .get_multiple_accounts(serum_market_pks.as_slice()) - .await?; - let serum_market_ais: Vec<&Account> = serum_market_ais - .iter() - .filter_map(|maybe_ai| match maybe_ai { - Some(ai) => Some(ai), - None => None, - }) - .collect(); - - let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais - .iter() - .enumerate() - .map(|pair| { - let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes( - &pair.1.data[5..5 + std::mem::size_of::()], - ); - ( - serum_market_pks[pair.0], - Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])), - ) - }) - .collect(); - + let spot_queue_pks: Vec<(Pubkey, Pubkey)> = spot_market_configs.iter().map(|x| (x.0, x.1.event_queue)).collect(); let a: Vec<(String, String)> = group_context .serum3_markets .iter() @@ -368,9 +389,27 @@ async fn main() -> anyhow::Result<()> { .collect(); let market_pubkey_strings: HashMap = [a, b].concat().into_iter().collect(); + // TODO: read all this from config + let pgconf = PostgresConfig { + connection_string: "$PG_CONNECTION_STRING".to_owned(), + connection_count: 1, + max_batch_size: 1, + max_queue_size: 50_000, + retry_query_max_count: 10, + retry_query_sleep_secs: 2, + retry_connection_sleep_secs: 10, + fatal_connection_timeout_secs: 120, + allow_invalid_certs: true, + tls: Some(PostgresTlsConfig { + ca_cert_path: "$PG_CA_CERT".to_owned(), + client_key_path: "$PG_CLIENT_KEY".to_owned(), + }) + }; + let postgres_update_sender = fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?; + let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init( - perp_queue_pks.clone(), - serum_queue_pks.clone(), + perp_market_configs.clone(), + spot_market_configs.clone(), metrics_tx.clone(), ) .await?; @@ -389,19 +428,30 @@ async fn main() -> anyhow::Result<()> { let message = fill_receiver.recv().await.unwrap(); match message { FillEventFilterMessage::Update(update) => { - debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type); + debug!("ws update {} {:?} {:?} fill", update.market_name, update.status, update.event.event_type); let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); for (addr, peer) in peer_copy.iter_mut() { - let json = serde_json::to_string(&update).unwrap(); + let json = serde_json::to_string(&update.clone()).unwrap(); // only send updates if the peer is subscribed - if peer.subscriptions.contains(&update.market) { + if peer.subscriptions.contains(&update.market_key) { let result = peer.sender.send(Message::Text(json)).await; if result.is_err() { - error!("ws update {} fill could not reach {}", update.market, addr); + error!("ws update {} fill could not reach {}", update.market_name, addr); } } } + // send taker fills to db + let update_c = update.clone(); + match update_c.event.event_type { + FillEventType::Perp => { + if !update_c.event.maker { + debug!("{:?}", update_c); + postgres_update_sender.send(update_c).await.unwrap(); + } + } + _ => warn!("failed to write spot event to db") + } } FillEventFilterMessage::Checkpoint(checkpoint) => { checkpoints_ref_thread @@ -461,7 +511,7 @@ async fn main() -> anyhow::Result<()> { .collect::() ); let use_geyser = true; - let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); + let all_queue_pks = [perp_queue_pks.clone()].concat(); let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect(); let filter_config = FilterConfig { program_ids: vec![],