Add tick-height field to entry event payload

This commit is contained in:
Tyera Eulberg 2019-02-20 10:08:23 -07:00 committed by Tyera Eulberg
parent 2cf00021d9
commit 0a73bb7efd
2 changed files with 22 additions and 9 deletions

View File

@ -60,12 +60,18 @@ impl EntryWriter for EntrySocket {
}
pub trait EntryStreamHandler {
fn emit_entry_event(&self, slot: u64, leader_id: &str, entries: &Entry) -> Result<()>;
fn emit_entry_event(
&self,
slot: u64,
tick_height: u64,
leader_id: &str,
entries: &Entry,
) -> Result<()>;
fn emit_block_event(
&self,
slot: u64,
leader_id: &str,
tick_height: u64,
leader_id: &str,
last_id: Hash,
) -> Result<()>;
}
@ -81,12 +87,19 @@ impl<T> EntryStreamHandler for EntryStream<T>
where
T: EntryWriter,
{
fn emit_entry_event(&self, slot: u64, leader_id: &str, entry: &Entry) -> Result<()> {
fn emit_entry_event(
&self,
slot: u64,
tick_height: u64,
leader_id: &str,
entry: &Entry,
) -> Result<()> {
let json_entry = serde_json::to_string(&entry)?;
let payload = format!(
r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#,
r#"{{"dt":"{}","t":"entry","s":{},"h":{},"l":{:?},"entry":{}}}"#,
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
slot,
tick_height,
leader_id,
json_entry,
);
@ -97,8 +110,8 @@ where
fn emit_block_event(
&self,
slot: u64,
leader_id: &str,
tick_height: u64,
leader_id: &str,
last_id: Hash,
) -> Result<()> {
let payload = format!(
@ -203,14 +216,14 @@ mod test {
.tick_height_to_slot(tick_height);
if curr_slot != previous_slot {
entry_stream
.emit_block_event(previous_slot, &leader_id, tick_height - 1, last_id)
.emit_block_event(previous_slot, tick_height - 1, &leader_id, last_id)
.unwrap();
}
let entry = Entry::new(&mut last_id, 1, vec![]); // just ticks
last_id = entry.id;
previous_slot = curr_slot;
entry_stream
.emit_entry_event(curr_slot, &leader_id, &entry)
.emit_entry_event(curr_slot, tick_height, &leader_id, &entry)
.unwrap();
expected_entries.push(entry.clone());
entries.push(entry);

View File

@ -80,14 +80,14 @@ impl EntryStreamStage {
let block_tick_height = queued_block.unwrap().tick_height;
let block_id = queued_block.unwrap().id;
entry_stream
.emit_block_event(block_slot, &leader_id, block_tick_height, block_id)
.emit_block_event(block_slot, block_tick_height, &leader_id, block_id)
.unwrap_or_else(|e| {
debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
});
entry_stream.queued_block = None;
}
entry_stream
.emit_entry_event(slot, &leader_id, &entry)
.emit_entry_event(slot, *tick_height, &leader_id, &entry)
.unwrap_or_else(|e| {
debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
});