From 0a73bb7efde92e673c2ee49c79dedaf11cc6b659 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Wed, 20 Feb 2019 10:08:23 -0700 Subject: [PATCH] Add tick-height field to entry event payload --- src/entry_stream.rs | 27 ++++++++++++++++++++------- src/entry_stream_stage.rs | 4 ++-- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/entry_stream.rs b/src/entry_stream.rs index 4e415ce3e..ca7b9f65b 100644 --- a/src/entry_stream.rs +++ b/src/entry_stream.rs @@ -60,12 +60,18 @@ impl EntryWriter for EntrySocket { } pub trait EntryStreamHandler { - fn emit_entry_event(&self, slot: u64, leader_id: &str, entries: &Entry) -> Result<()>; + fn emit_entry_event( + &self, + slot: u64, + tick_height: u64, + leader_id: &str, + entries: &Entry, + ) -> Result<()>; fn emit_block_event( &self, slot: u64, - leader_id: &str, tick_height: u64, + leader_id: &str, last_id: Hash, ) -> Result<()>; } @@ -81,12 +87,19 @@ impl EntryStreamHandler for EntryStream where T: EntryWriter, { - fn emit_entry_event(&self, slot: u64, leader_id: &str, entry: &Entry) -> Result<()> { + fn emit_entry_event( + &self, + slot: u64, + tick_height: u64, + leader_id: &str, + entry: &Entry, + ) -> Result<()> { let json_entry = serde_json::to_string(&entry)?; let payload = format!( - r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#, + r#"{{"dt":"{}","t":"entry","s":{},"h":{},"l":{:?},"entry":{}}}"#, Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), slot, + tick_height, leader_id, json_entry, ); @@ -97,8 +110,8 @@ where fn emit_block_event( &self, slot: u64, - leader_id: &str, tick_height: u64, + leader_id: &str, last_id: Hash, ) -> Result<()> { let payload = format!( @@ -203,14 +216,14 @@ mod test { .tick_height_to_slot(tick_height); if curr_slot != previous_slot { entry_stream - .emit_block_event(previous_slot, &leader_id, tick_height - 1, last_id) + .emit_block_event(previous_slot, tick_height - 1, &leader_id, last_id) .unwrap(); } let entry = Entry::new(&mut last_id, 1, vec![]); // just ticks last_id = entry.id; previous_slot = curr_slot; entry_stream - .emit_entry_event(curr_slot, &leader_id, &entry) + .emit_entry_event(curr_slot, tick_height, &leader_id, &entry) .unwrap(); expected_entries.push(entry.clone()); entries.push(entry); diff --git a/src/entry_stream_stage.rs b/src/entry_stream_stage.rs index b896c0958..866c8f135 100644 --- a/src/entry_stream_stage.rs +++ b/src/entry_stream_stage.rs @@ -80,14 +80,14 @@ impl EntryStreamStage { let block_tick_height = queued_block.unwrap().tick_height; let block_id = queued_block.unwrap().id; entry_stream - .emit_block_event(block_slot, &leader_id, block_tick_height, block_id) + .emit_block_event(block_slot, block_tick_height, &leader_id, block_id) .unwrap_or_else(|e| { debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); }); entry_stream.queued_block = None; } entry_stream - .emit_entry_event(slot, &leader_id, &entry) + .emit_entry_event(slot, *tick_height, &leader_id, &entry) .unwrap_or_else(|e| { debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); });