diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 8aaec2e..9508fdc 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -31,7 +31,6 @@ pub enum FillUpdateStatus { } #[derive(Clone, Debug)] - pub struct FillUpdate { pub event: FillEvent, pub status: FillUpdateStatus, @@ -187,7 +186,7 @@ fn publish_changes_perp( .unwrap_or(0); let mut checkpoint = Vec::new(); let mkt_pk_string = mkt.0.to_string(); - let evq_pk_string = mkt.0.to_string(); + let evq_pk_string = mkt.1.to_string(); for seq_num in start_seq_num..header.seq_num { let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; @@ -271,7 +270,6 @@ fn publish_changes_perp( // in case queue size shrunk due to a fork we need revoke all previous fills for seq_num in header.seq_num..old_seq_num { let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; - debug!( "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num @@ -357,77 +355,77 @@ fn publish_changes_serum( continue; } - match old_event_view { - EventView::Fill { .. } => { - // every already published event is recorded in checkpoint - checkpoint.push(events[idx]); - } - EventView::Out { .. } => { - debug!( - "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", - mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - ); + // match old_event_view { + // EventView::Fill { .. } => { + // // every already published event is recorded in checkpoint + // checkpoint.push(events[idx]); + // } + // EventView::Out { .. } => { + // debug!( + // "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", + // mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + // ); - metric_events_change.increment(); + // metric_events_change.increment(); - // first revoke old event - fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - slot, - write_version, - event: old_events[idx], - status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error + // // first revoke old event + // fill_update_sender + // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + // slot, + // write_version, + // event: old_events[idx], + // status: FillUpdateStatus::Revoke, + // market: mkt_pk_string.clone(), + // queue: evq_pk_string.clone(), + // })) + // .unwrap(); // TODO: use anyhow to bubble up error - // then publish new if its a fill and record in checkpoint - fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - slot, - write_version, - event: events[idx], - status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(events[idx]); - } - } + // // then publish new if its a fill and record in checkpoint + // fill_update_sender + // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + // slot, + // write_version, + // event: events[idx], + // status: FillUpdateStatus::New, + // market: mkt_pk_string.clone(), + // queue: evq_pk_string.clone(), + // })) + // .unwrap(); // TODO: use anyhow to bubble up error + // checkpoint.push(events[idx]); + // } + // } } _ => continue, } } - // in case queue size shrunk due to a fork we need revoke all previous fills - for seq_num in header_seq_num..old_seq_num { - let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; - let old_event_view = old_events[idx].as_view().unwrap(); - debug!( - "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", - mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - ); + // // in case queue size shrunk due to a fork we need revoke all previous fills + // for seq_num in header_seq_num..old_seq_num { + // let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + // let old_event_view = old_events[idx].as_view().unwrap(); + // debug!( + // "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", + // mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + // ); - metric_events_drop.increment(); + // metric_events_drop.increment(); - match old_event_view { - EventView::Fill { .. } => { - fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - slot, - event: old_events[idx], - write_version, - status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - } - EventView::Out { .. } => { continue } - } - } + // match old_event_view { + // EventView::Fill { .. } => { + // fill_update_sender + // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + // slot, + // event: old_events[idx], + // write_version, + // status: FillUpdateStatus::Revoke, + // market: mkt_pk_string.clone(), + // queue: evq_pk_string.clone(), + // })) + // .unwrap(); // TODO: use anyhow to bubble up error + // } + // EventView::Out { .. } => { continue } + // } + // } fill_update_sender .try_send(FillEventFilterMessage::SerumCheckpoint( @@ -455,6 +453,8 @@ pub async fn init( let mut metric_events_new = metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Counter); + let mut metric_events_new_serum = + metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter); let mut metric_events_change = metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter); let mut metrics_events_drop = @@ -597,7 +597,7 @@ pub async fn init( *old_seq_num, old_events, &fill_update_sender, - &mut metric_events_new, + &mut metric_events_new_serum, &mut metric_events_change, &mut metrics_events_drop, ), diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 7d32947..2642dad 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -395,11 +395,11 @@ pub async fn process_events( let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into(), MetricType::Counter); let mut metric_account_queue = - metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge); + metrics_sender.register_u64("grpc_account_write_queue".into(), MetricType::Gauge); let mut metric_dedup_queue = - metrics_sender.register_u64("dedup_queue".into(), MetricType::Gauge); + metrics_sender.register_u64("grpc_dedup_queue".into(), MetricType::Gauge); let mut metric_slot_queue = - metrics_sender.register_u64("slot_update_queue".into(), MetricType::Gauge); + metrics_sender.register_u64("grpc_slot_update_queue".into(), MetricType::Gauge); let mut metric_slot_updates = metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter); let mut metric_snapshots =