Add datetime to EntryStream message

This commit is contained in:
Tyera Eulberg 2019-01-30 11:12:31 -07:00 committed by Grimes
parent 32f19c5c19
commit 03e6a56b3c
2 changed files with 22 additions and 6 deletions

View File

@ -4,6 +4,7 @@
use crate::entry::Entry;
use crate::result::Result;
use chrono::Utc;
use std::io::prelude::*;
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
@ -27,8 +28,9 @@ impl EntryStreamHandler for EntryStream {
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> {
let mut socket = UnixStream::connect(Path::new(&self.socket))?;
for entry in entries {
let result = serde_json::to_string(&entry)?;
socket.write_all(result.as_bytes())?;
let json = serde_json::to_string(&entry)?;
let payload = format!(r#"{{"dt":"{}","entry":{}}}"#, Utc::now().to_rfc3339(), json);
socket.write_all(payload.as_bytes())?;
}
socket.shutdown(Shutdown::Write)?;
Ok(())
@ -49,8 +51,9 @@ impl MockEntryStream {
impl EntryStreamHandler for MockEntryStream {
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> {
for entry in entries {
let result = serde_json::to_string(&entry)?;
self.socket.push(result);
let json = serde_json::to_string(&entry)?;
let payload = format!(r#"{{"dt":"{}","entry":{}}}"#, Utc::now().to_rfc3339(), json);
self.socket.push(payload);
}
Ok(())
}

View File

@ -294,6 +294,8 @@ mod test {
use crate::service::Service;
use crate::tvu::TvuReturnType;
use crate::voting_keypair::VotingKeypair;
use chrono::{DateTime, FixedOffset};
use serde_json::Value;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
@ -740,7 +742,7 @@ mod test {
for _ in 0..5 {
let entry = Entry::new(&mut last_id, 0, 1, vec![]); //just ticks
last_id = entry.id;
expected_entries.push(serde_json::to_string(&entry).unwrap());
expected_entries.push(entry.clone());
entries.push(entry);
}
entry_sender
@ -763,6 +765,17 @@ mod test {
.unwrap();
assert_eq!(entry_stream.socket.len(), 5);
assert_eq!(entry_stream.socket, expected_entries);
for (i, item) in entry_stream.socket.iter().enumerate() {
let json: Value = serde_json::from_str(&item).unwrap();
let dt_str = json["dt"].as_str().unwrap();
// Ensure `ts` field parses as valid DateTime
let _dt: DateTime<FixedOffset> = DateTime::parse_from_rfc3339(dt_str).unwrap();
let entry_obj = json["entry"].clone();
let entry: Entry = serde_json::from_value(entry_obj).unwrap();
assert_eq!(entry, expected_entries[i]);
}
}
}