Fix serum event change detection

This commit is contained in:
Riordan Panayides 2023-01-20 16:50:39 +00:00
parent f88ec6a53c
commit 8d6b6f5f38
1 changed files with 39 additions and 17 deletions

View File

@ -424,32 +424,54 @@ fn publish_changes_serum(
}
match old_event_view {
SpotEvent::Fill { .. } => {
// every already published event is recorded in checkpoint
SpotEvent::Fill { order_id, .. } => {
if order_id != fill.order_id {
debug!(
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: old_fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
// record new event in checkpoint
checkpoint.push(fill);
}
SpotEvent::Out { .. } => {
debug!(
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
"found changed type event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: old_fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new if its a fill and record in checkpoint
// publish new fill and record in checkpoint
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,