diff --git a/service-mango-fills/src/fill_event_filter.rs b/service-mango-fills/src/fill_event_filter.rs index 4136c22..187ff91 100644 --- a/service-mango-fills/src/fill_event_filter.rs +++ b/service-mango-fills/src/fill_event_filter.rs @@ -39,7 +39,7 @@ fn publish_changes_perp( header: &EventQueueHeader, events: &EventQueueEvents, prev_seq_num: u64, - prev_head: u64, + prev_head: usize, prev_events: &EventQueueEvents, fill_update_sender: &async_channel::Sender, metric_events_new: &mut MetricU64, @@ -166,34 +166,32 @@ fn publish_changes_perp( } } - let head_idx = header.head(); - let head = head_idx as u64; + let head = header.head(); - let head_seq_num = if events[head_idx].event_type == EventType::Fill as u8 { - let event: PerpFillEvent = bytemuck::cast(events[head_idx]); - event.seq_num - } else if events[head_idx].event_type == EventType::Out as u8 { - let event: PerpOutEvent = bytemuck::cast(events[head_idx]); - event.seq_num + let head_seq_num = if events[head - 1].event_type == EventType::Fill as u8 { + let event: PerpFillEvent = bytemuck::cast(events[head - 1]); + event.seq_num + 1 + } else if events[head - 1].event_type == EventType::Out as u8 { + let event: PerpOutEvent = bytemuck::cast(events[head - 1]); + event.seq_num + 1 } else { 0 }; - let prev_head_idx = prev_head as usize; - let prev_head_seq_num = if prev_events[prev_head_idx].event_type == EventType::Fill as u8 { - let event: PerpFillEvent = bytemuck::cast(prev_events[prev_head_idx]); - event.seq_num - } else if prev_events[prev_head_idx].event_type == EventType::Out as u8 { - let event: PerpOutEvent = bytemuck::cast(prev_events[prev_head_idx]); - event.seq_num + let prev_head_seq_num = if prev_events[prev_head - 1].event_type == EventType::Fill as u8 { + let event: PerpFillEvent = bytemuck::cast(prev_events[prev_head - 1]); + event.seq_num + 1 + } else if prev_events[prev_head - 1].event_type == EventType::Out as u8 { + let event: PerpOutEvent = bytemuck::cast(prev_events[prev_head - 1]); + event.seq_num + 1 } else { 0 }; - // publish a head update event if the head increased (events were consumed) - if head > prev_head { + // publish a head update event if the head changed (events were consumed) + if head != prev_head { metric_head_update.increment(); - + fill_update_sender .try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate { head, @@ -209,25 +207,6 @@ fn publish_changes_perp( .unwrap(); // TODO: use anyhow to bubble up error } - // revoke head update event if it decreased (fork) - if head < prev_head { - metric_head_revoke.increment(); - - fill_update_sender - .try_send(FillEventFilterMessage::HeadUpdate(HeadUpdate { - head, - prev_head, - head_seq_num, - prev_head_seq_num, - status: FillUpdateStatus::Revoke, - market_key: mkt_pk_string.clone(), - market_name: mkt.1.name.clone(), - slot, - write_version, - })) - .unwrap(); // TODO: use anyhow to bubble up error - } - fill_update_sender .try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint { slot, @@ -560,7 +539,7 @@ pub async fn init( seq_num_cache.get(&evq_pk_string), head_cache.get(&evq_pk_string), ) { - (Some(prev_seq_num), Some(old_head)) => match perp_events_cache + (Some(prev_seq_num), Some(prev_head)) => match perp_events_cache .get(&evq_pk_string) { Some(prev_events) => publish_changes_perp( @@ -570,7 +549,7 @@ pub async fn init( &event_queue.header, &event_queue.buf, *prev_seq_num, - *old_head, + *prev_head, prev_events, &fill_update_sender, &mut metric_events_new, @@ -589,7 +568,7 @@ pub async fn init( seq_num_cache .insert(evq_pk_string.clone(), event_queue.header.seq_num.clone()); head_cache - .insert(evq_pk_string.clone(), event_queue.header.head() as u64); + .insert(evq_pk_string.clone(), event_queue.header.head()); perp_events_cache .insert(evq_pk_string.clone(), event_queue.buf.clone()); } else { @@ -634,7 +613,7 @@ pub async fn init( } seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone()); - head_cache.insert(evq_pk_string.clone(), header.head); + head_cache.insert(evq_pk_string.clone(), header.head as usize); serum_events_cache .insert(evq_pk_string.clone(), events.clone().to_vec()); } diff --git a/service-mango-fills/src/lib.rs b/service-mango-fills/src/lib.rs index 231334f..b05d7ad 100644 --- a/service-mango-fills/src/lib.rs +++ b/service-mango-fills/src/lib.rs @@ -246,8 +246,8 @@ impl Serialize for FillUpdate { #[derive(Clone, Debug)] pub struct HeadUpdate { - pub head: u64, - pub prev_head: u64, + pub head: usize, + pub prev_head: usize, pub head_seq_num: u64, pub prev_head_seq_num: u64, pub status: FillUpdateStatus,