diff --git a/.gitignore b/.gitignore index 585079d..0ab4a88 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ /target **/*.pem **/config.toml +.DS_Store +.idea/ +*.pem diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 7c0174d..091a49f 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -1,6 +1,6 @@ use crate::{ chain_data::{AccountData, ChainData, SlotData}, - AccountWrite, SlotUpdate, + AccountWrite, SlotUpdate, metrics }; use log::*; use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; @@ -18,6 +18,7 @@ use std::{ use arrayref::array_ref; use mango::queue::{AnyEvent, EventQueueHeader, EventType, FillEvent}; +use crate::metrics::MetricU64; #[derive(Clone, Debug, Deserialize)] pub struct MarketConfig { @@ -109,6 +110,9 @@ fn publish_changes( old_seq_num: usize, old_events: &EventQueueEvents, fill_update_sender: &async_channel::Sender, + metric_events_new: &mut MetricU64, + metric_events_change: &mut MetricU64, + metric_events_drop: &mut MetricU64 ) { // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available let start_seq_num = max(old_seq_num, header.seq_num) - QUEUE_LEN; @@ -123,11 +127,14 @@ fn publish_changes( // 3) all other events are matching the old event queue // the order of these checks is important so they are exhaustive if seq_num >= old_seq_num { - info!( + debug!( "found new event {} idx {} type {}", mkt.name, idx, events[idx].event_type as u32 ); + metric_events_new.increment(); + + // new fills are published and recorded in checkpoint if events[idx].event_type == EventType::Fill as u8 { let fill: FillEvent = bytemuck::cast(events[idx]); @@ -146,11 +153,13 @@ fn publish_changes( } else if old_events[idx].event_type != events[idx].event_type || old_events[idx].padding != events[idx].padding { - info!( + debug!( "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", mkt.name, idx, seq_num, header.seq_num, old_seq_num ); + metric_events_change.increment(); + // first revoke old event if a fill if old_events[idx].event_type == EventType::Fill as u8 { let fill: FillEvent = bytemuck::cast(old_events[idx]); @@ -193,11 +202,14 @@ fn publish_changes( // in case queue size shrunk due to a fork we need revoke all previous fills for seq_num in header.seq_num..old_seq_num { let idx = seq_num % QUEUE_LEN; - info!( + + debug!( "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", mkt.name, idx, seq_num, header.seq_num, old_seq_num ); + metric_events_drop.increment(); + if old_events[idx].event_type == EventType::Fill as u8 { let fill: FillEvent = bytemuck::cast(old_events[idx]); fill_update_sender @@ -226,11 +238,18 @@ fn publish_changes( pub async fn init( markets: Vec, + metrics_sender: metrics::Metrics, ) -> anyhow::Result<( async_channel::Sender, async_channel::Sender, async_channel::Receiver, )> { + let metrics_sender = metrics_sender.clone(); + + let mut metric_events_new = metrics_sender.register_u64("fills_feed_events_new".into()); + let mut metric_events_change = metrics_sender.register_u64("fills_feed_events_change".into()); + let mut metrics_events_drop = metrics_sender.register_u64("fills_feed_events_drop".into()); + // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (account_write_queue_sender, account_write_queue_receiver) = async_channel::unbounded::(); @@ -326,6 +345,9 @@ pub async fn init( *old_seq_num, old_events, &fill_update_sender, + &mut metric_events_new, + &mut metric_events_change, + &mut metrics_events_drop ), _ => info!("events_cache could not find {}", mkt.name), }, diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index e8315f4..5aac373 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -95,7 +95,10 @@ async fn main() -> anyhow::Result<()> { let metrics_tx = metrics::start(); let (account_write_queue_sender, slot_queue_sender, fill_receiver) = - fill_event_filter::init(config.markets.clone()).await?; + fill_event_filter::init( + config.markets.clone(), + metrics_tx.clone() + ).await?; let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new())); let peers = PeerMap::new(Mutex::new(HashMap::new())); @@ -164,7 +167,7 @@ async fn main() -> anyhow::Result<()> { &config.source, account_write_queue_sender, slot_queue_sender, - metrics_tx, + metrics_tx.clone(), ) .await; } else {