From 8d6b6f5f38ac9b9b630712a76b25951ac87c4577 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Fri, 20 Jan 2023 16:50:39 +0000 Subject: [PATCH] Fix serum event change detection --- lib/src/fill_event_filter.rs | 56 +++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 5371ad6..2af48b7 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -424,32 +424,54 @@ fn publish_changes_serum( } match old_event_view { - SpotEvent::Fill { .. } => { - // every already published event is recorded in checkpoint + SpotEvent::Fill { order_id, .. } => { + if order_id != fill.order_id { + debug!( + "found changed id event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + ); + + metric_events_change.increment(); + + + let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); + // first revoke old event + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: old_fill, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + + // then publish new + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill.clone(), + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + + // record new event in checkpoint checkpoint.push(fill); } SpotEvent::Out { .. } => { debug!( - "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", + "found changed type event {} idx {} seq_num {} header seq num {} old seq num {}", mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num ); metric_events_change.increment(); - let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); - // first revoke old event - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: old_fill, - status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - - // then publish new if its a fill and record in checkpoint + // publish new fill and record in checkpoint fill_update_sender .try_send(FillEventFilterMessage::Update(FillUpdate { slot,