service-fills: add slote and write version to fill update

This commit is contained in:
Maximilian Schneider 2022-04-08 05:14:39 +02:00
parent b41c094aba
commit 8dde08d377
1 changed files with 27 additions and 4 deletions

View File

@ -38,6 +38,8 @@ pub struct FillUpdate {
pub status: FillUpdateStatus,
pub market: String,
pub queue: String,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillUpdate {
@ -47,10 +49,12 @@ impl Serialize for FillUpdate {
{
let event = base64::encode_config(bytemuck::bytes_of(&self.event), base64::STANDARD);
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
state.serialize_field("event", &event)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("event", &event)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
@ -61,6 +65,8 @@ pub struct FillCheckpoint {
pub market: String,
pub queue: String,
pub events: Vec<FillEvent>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillCheckpoint {
@ -74,9 +80,12 @@ impl Serialize for FillCheckpoint {
.map(|e| base64::encode_config(bytemuck::bytes_of(e), base64::STANDARD))
.collect();
let mut state = serializer.serialize_struct("FillUpdate", 3)?;
state.serialize_field("events", &events)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("events", &events)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
@ -92,6 +101,8 @@ const QUEUE_LEN: usize = 256;
type EventQueueEvents = [AnyEvent; QUEUE_LEN];
fn publish_changes(
slot: u64,
write_version: u64,
mkt: &MarketConfig,
header: &EventQueueHeader,
events: &EventQueueEvents,
@ -122,6 +133,8 @@ fn publish_changes(
let fill: FillEvent = bytemuck::cast(events[idx]);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::New,
market: mkt.name.clone(),
@ -143,6 +156,8 @@ fn publish_changes(
let fill: FillEvent = bytemuck::cast(old_events[idx]);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::Revoke,
market: mkt.name.clone(),
@ -156,6 +171,8 @@ fn publish_changes(
let fill: FillEvent = bytemuck::cast(events[idx]);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
write_version,
event: fill,
status: FillUpdateStatus::New,
market: mkt.name.clone(),
@ -185,7 +202,9 @@ fn publish_changes(
let fill: FillEvent = bytemuck::cast(old_events[idx]);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt.name.clone(),
queue: mkt.event_queue.clone(),
@ -196,6 +215,8 @@ fn publish_changes(
fill_update_sender
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt.name.clone(),
queue: mkt.event_queue.clone(),
@ -270,7 +291,7 @@ pub async fn init(
}
for mkt in markets.iter() {
let last_ev_q_version = last_ev_q_versions.get(&mkt.event_queue);
let last_ev_q_version = last_ev_q_versions.get(&mkt.event_queue).unwrap_or(&(0, 0));
let mkt_pk = mkt.event_queue.parse::<Pubkey>().unwrap();
match chain_cache.account(&mkt_pk) {
@ -278,7 +299,7 @@ pub async fn init(
// only process if the account state changed
let ev_q_version = (account_info.slot, account_info.write_version);
trace!("evq {} write_version {:?}", mkt.name, ev_q_version);
if ev_q_version == *last_ev_q_version.unwrap_or(&(0, 0)) {
if ev_q_version == *last_ev_q_version {
continue;
}
last_ev_q_versions.insert(mkt.event_queue.clone(), ev_q_version);
@ -297,6 +318,8 @@ pub async fn init(
match seq_num_cache.get(&mkt.event_queue) {
Some(old_seq_num) => match events_cache.get(&mkt.event_queue) {
Some(old_events) => publish_changes(
account_info.slot,
account_info.write_version,
mkt,
header,
events,