From e58f08b60f4e4262be8897e5c9c49e324197ab8d Mon Sep 17 00:00:00 2001 From: Sunny Gleason Date: Tue, 12 Feb 2019 16:59:02 -0700 Subject: [PATCH] Refactor EntryStream Co-authored-by: Sunny Gleason Co-authored-by: Tyera Eulberg --- src/entry_stream.rs | 263 ++++++++++++++++++++++++++++++-------- src/entry_stream_stage.rs | 16 ++- 2 files changed, 218 insertions(+), 61 deletions(-) diff --git a/src/entry_stream.rs b/src/entry_stream.rs index ac77a32c7a..1004275b87 100644 --- a/src/entry_stream.rs +++ b/src/entry_stream.rs @@ -6,87 +6,242 @@ use crate::entry::Entry; use crate::leader_scheduler::LeaderScheduler; use crate::result::Result; use chrono::{SecondsFormat, Utc}; +use solana_sdk::hash::Hash; +use std::cell::RefCell; use std::io::prelude::*; use std::net::Shutdown; use std::os::unix::net::UnixStream; use std::path::Path; use std::sync::{Arc, RwLock}; -pub trait EntryStreamHandler { - fn stream_entries(&mut self, entries: &[Entry]) -> Result<()>; +pub trait Output: std::fmt::Debug { + fn write(&self, payload: String) -> Result<()>; } -pub struct EntryStream { - pub socket: String, - leader_scheduler: Arc>, +#[derive(Debug, Default)] +pub struct VecOutput { + values: RefCell>, } -impl EntryStream { - pub fn new(socket: String, leader_scheduler: Arc>) -> Self { - EntryStream { - socket, - leader_scheduler, - } +impl Output for VecOutput { + fn write(&self, payload: String) -> Result<()> { + self.values.borrow_mut().push(payload); + Ok(()) } } -impl EntryStreamHandler for EntryStream { - fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { - let mut socket = UnixStream::connect(Path::new(&self.socket))?; - for entry in entries { - let json = serde_json::to_string(&entry)?; - let (slot, slot_leader) = { - let leader_scheduler = self.leader_scheduler.read().unwrap(); - let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); - (slot, leader_scheduler.get_leader_for_slot(slot)) - }; - let payload = format!( - r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#, - Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), - slot, - slot_leader, - json - ); - socket.write_all(payload.as_bytes())?; +impl VecOutput { + pub fn new() -> Self { + VecOutput { + values: RefCell::new(Vec::new()), } + } + + pub fn entries(&self) -> Vec { + self.values.borrow().clone() + } +} + +#[derive(Debug)] +pub struct SocketOutput { + socket: String, +} + +impl Output for SocketOutput { + fn write(&self, payload: String) -> Result<()> { + let mut socket = UnixStream::connect(Path::new(&self.socket))?; + socket.write_all(payload.as_bytes())?; socket.shutdown(Shutdown::Write)?; Ok(()) } } -pub struct MockEntryStream { - pub socket: Vec, +pub trait EntryStreamHandler { + fn emit_entry_events(&self, entries: &[Entry]) -> Result<()>; + fn emit_block_event(&self, tick_height: u64, last_id: Hash) -> Result<()>; +} + +#[derive(Debug)] +pub struct EntryStream { + pub output: T, leader_scheduler: Arc>, } -impl MockEntryStream { - #[allow(clippy::needless_pass_by_value)] - pub fn new(_socket: String, leader_scheduler: Arc>) -> Self { - MockEntryStream { - socket: Vec::new(), +impl EntryStreamHandler for EntryStream +where + T: Output, +{ + fn emit_entry_events(&self, entries: &[Entry]) -> Result<()> { + let leader_scheduler = self.leader_scheduler.read().unwrap(); + for entry in entries { + let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); + let leader_id = leader_scheduler + .get_leader_for_slot(slot) + .map(|leader| leader.to_string()) + .unwrap_or_else(|| "None".to_string()); + + let json_entry = serde_json::to_string(&entry)?; + let payload = format!( + r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#, + Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), + slot, + leader_id, + json_entry, + ); + self.output.write(payload)?; + } + Ok(()) + } + + fn emit_block_event(&self, tick_height: u64, last_id: Hash) -> Result<()> { + let leader_scheduler = self.leader_scheduler.read().unwrap(); + let slot = leader_scheduler.tick_height_to_slot(tick_height); + let leader_id = leader_scheduler + .get_leader_for_slot(slot) + .map(|leader| leader.to_string()) + .unwrap_or_else(|| "None".to_string()); + let payload = format!( + r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":{:?},"id":"{:?}"}}"#, + Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), + slot, + tick_height, + leader_id, + last_id, + ); + self.output.write(payload)?; + Ok(()) + } +} + +pub type SocketEntryStream = EntryStream; + +impl SocketEntryStream { + pub fn new(socket: String, leader_scheduler: Arc>) -> Self { + EntryStream { + output: SocketOutput { socket }, leader_scheduler, } } } -impl EntryStreamHandler for MockEntryStream { - fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { - for entry in entries { - let json = serde_json::to_string(&entry)?; - let (slot, slot_leader) = { - let leader_scheduler = self.leader_scheduler.read().unwrap(); - let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); - (slot, leader_scheduler.get_leader_for_slot(slot)) - }; - let payload = format!( - r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#, - Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), - slot, - slot_leader, - json - ); - self.socket.push(payload); +pub type MockEntryStream = EntryStream; + +impl MockEntryStream { + pub fn new(_: String, leader_scheduler: Arc>) -> Self { + EntryStream { + output: VecOutput::new(), + leader_scheduler, } - Ok(()) + } + + pub fn entries(&self) -> Vec { + self.output.entries() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::bank::Bank; + use crate::entry::Entry; + use crate::genesis_block::GenesisBlock; + use crate::leader_scheduler::LeaderSchedulerConfig; + use chrono::{DateTime, FixedOffset}; + use serde_json::Value; + use solana_sdk::hash::Hash; + use std::collections::HashSet; + + #[test] + fn test_entry_stream() -> () { + // Set up bank and leader_scheduler + let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10); + let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000); + let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config); + // Set up entry stream + let entry_stream = + MockEntryStream::new("test_stream".to_string(), bank.leader_scheduler.clone()); + let ticks_per_slot = bank.leader_scheduler.read().unwrap().ticks_per_slot; + + let mut last_id = Hash::default(); + let mut entries = Vec::new(); + let mut expected_entries = Vec::new(); + + let tick_height_initial = 0; + let tick_height_final = tick_height_initial + ticks_per_slot + 2; + let mut previous_slot = bank + .leader_scheduler + .read() + .unwrap() + .tick_height_to_slot(tick_height_initial); + + for tick_height in tick_height_initial..=tick_height_final { + bank.leader_scheduler + .write() + .unwrap() + .update_tick_height(tick_height, &bank); + let curr_slot = bank + .leader_scheduler + .read() + .unwrap() + .tick_height_to_slot(tick_height); + if curr_slot != previous_slot { + entry_stream + .emit_block_event(tick_height - 1, last_id) + .unwrap(); + } + let entry = Entry::new(&mut last_id, tick_height, 1, vec![]); //just ticks + last_id = entry.id; + previous_slot = curr_slot; + expected_entries.push(entry.clone()); + entries.push(entry); + } + + entry_stream.emit_entry_events(&entries).unwrap(); + + assert_eq!( + entry_stream.entries().len() as u64, + // one entry per tick (0..=N+2) is +3, plus one block + ticks_per_slot + 3 + 1 + ); + + let mut j = 0; + let mut matched_entries = 0; + let mut matched_slots = HashSet::new(); + let mut matched_blocks = HashSet::new(); + + for item in entry_stream.entries() { + let json: Value = serde_json::from_str(&item).unwrap(); + let dt_str = json["dt"].as_str().unwrap(); + + // Ensure `ts` field parses as valid DateTime + let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); + + let item_type = json["t"].as_str().unwrap(); + match item_type { + "block" => { + let id = json["id"].to_string(); + matched_blocks.insert(id); + } + + "entry" => { + let slot = json["s"].as_u64().unwrap(); + matched_slots.insert(slot); + let entry_obj = json["entry"].clone(); + let entry: Entry = serde_json::from_value(entry_obj).unwrap(); + + assert_eq!(entry, expected_entries[j]); + matched_entries += 1; + j += 1; + } + + _ => { + assert!(false, "unknown item type {}", item); + } + } + } + + assert_eq!(matched_entries, expected_entries.len()); + assert_eq!(matched_slots.len(), 2); + assert_eq!(matched_blocks.len(), 1); } } diff --git a/src/entry_stream_stage.rs b/src/entry_stream_stage.rs index 3a31a1a0a3..13b0261c54 100644 --- a/src/entry_stream_stage.rs +++ b/src/entry_stream_stage.rs @@ -3,11 +3,11 @@ //! real-time access to entries. use crate::entry::{EntryReceiver, EntrySender}; -#[cfg(not(test))] -use crate::entry_stream::EntryStream; use crate::entry_stream::EntryStreamHandler; #[cfg(test)] use crate::entry_stream::MockEntryStream as EntryStream; +#[cfg(not(test))] +use crate::entry_stream::SocketEntryStream as EntryStream; use crate::leader_scheduler::LeaderScheduler; use crate::result::{Error, Result}; use crate::service::Service; @@ -59,9 +59,11 @@ impl EntryStreamStage { ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = ledger_entry_receiver.recv_timeout(timeout)?; - entry_stream.stream_entries(&entries).unwrap_or_else(|e| { - error!("Entry Stream error: {:?}, {:?}", e, entry_stream.socket); - }); + entry_stream + .emit_entry_events(&entries) + .unwrap_or_else(|e| { + error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); + }); entry_stream_sender.send(entries)?; Ok(()) } @@ -116,9 +118,9 @@ mod test { &mut entry_stream, ) .unwrap(); - assert_eq!(entry_stream.socket.len(), 5); + assert_eq!(entry_stream.entries().len(), 5); - for (i, item) in entry_stream.socket.iter().enumerate() { + for (i, item) in entry_stream.entries().iter().enumerate() { let json: Value = serde_json::from_str(&item).unwrap(); let dt_str = json["dt"].as_str().unwrap();