diff --git a/src/entry_stream.rs b/src/entry_stream.rs index 1004275b87..887d7f3394 100644 --- a/src/entry_stream.rs +++ b/src/entry_stream.rs @@ -57,39 +57,38 @@ impl Output for SocketOutput { } pub trait EntryStreamHandler { - fn emit_entry_events(&self, entries: &[Entry]) -> Result<()>; + fn emit_entry_event(&self, entries: &Entry) -> Result<()>; fn emit_block_event(&self, tick_height: u64, last_id: Hash) -> Result<()>; } #[derive(Debug)] pub struct EntryStream { pub output: T, - leader_scheduler: Arc>, + pub leader_scheduler: Arc>, + pub queued_block: Option, } impl EntryStreamHandler for EntryStream where T: Output, { - fn emit_entry_events(&self, entries: &[Entry]) -> Result<()> { + fn emit_entry_event(&self, entry: &Entry) -> Result<()> { let leader_scheduler = self.leader_scheduler.read().unwrap(); - for entry in entries { - let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); - let leader_id = leader_scheduler - .get_leader_for_slot(slot) - .map(|leader| leader.to_string()) - .unwrap_or_else(|| "None".to_string()); + let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); + let leader_id = leader_scheduler + .get_leader_for_slot(slot) + .map(|leader| leader.to_string()) + .unwrap_or_else(|| "None".to_string()); - let json_entry = serde_json::to_string(&entry)?; - let payload = format!( - r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#, - Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), - slot, - leader_id, - json_entry, - ); - self.output.write(payload)?; - } + let json_entry = serde_json::to_string(&entry)?; + let payload = format!( + r#"{{"dt":"{}","t":"entry","s":{},"l":{:?},"entry":{}}}"#, + Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), + slot, + leader_id, + json_entry, + ); + self.output.write(payload)?; Ok(()) } @@ -120,6 +119,7 @@ impl SocketEntryStream { EntryStream { output: SocketOutput { socket }, leader_scheduler, + queued_block: None, } } } @@ -131,6 +131,7 @@ impl MockEntryStream { EntryStream { output: VecOutput::new(), leader_scheduler, + queued_block: None, } } @@ -139,6 +140,12 @@ impl MockEntryStream { } } +#[derive(Debug)] +pub struct EntryStreamBlock { + pub tick_height: u64, + pub id: Hash, +} + #[cfg(test)] mod test { use super::*; @@ -192,12 +199,11 @@ mod test { let entry = Entry::new(&mut last_id, tick_height, 1, vec![]); //just ticks last_id = entry.id; previous_slot = curr_slot; + entry_stream.emit_entry_event(&entry).unwrap(); expected_entries.push(entry.clone()); entries.push(entry); } - entry_stream.emit_entry_events(&entries).unwrap(); - assert_eq!( entry_stream.entries().len() as u64, // one entry per tick (0..=N+2) is +3, plus one block diff --git a/src/entry_stream_stage.rs b/src/entry_stream_stage.rs index 13b0261c54..27e554a5aa 100644 --- a/src/entry_stream_stage.rs +++ b/src/entry_stream_stage.rs @@ -3,11 +3,11 @@ //! real-time access to entries. use crate::entry::{EntryReceiver, EntrySender}; -use crate::entry_stream::EntryStreamHandler; #[cfg(test)] use crate::entry_stream::MockEntryStream as EntryStream; #[cfg(not(test))] use crate::entry_stream::SocketEntryStream as EntryStream; +use crate::entry_stream::{EntryStreamBlock, EntryStreamHandler}; use crate::leader_scheduler::LeaderScheduler; use crate::result::{Error, Result}; use crate::service::Service; @@ -59,11 +59,35 @@ impl EntryStreamStage { ) -> Result<()> { let timeout = Duration::new(1, 0); let entries = ledger_entry_receiver.recv_timeout(timeout)?; - entry_stream - .emit_entry_events(&entries) - .unwrap_or_else(|e| { + + for entry in &entries { + if entry.is_tick() && entry_stream.queued_block.is_some() { + let queued_block = entry_stream.queued_block.as_ref(); + let block_tick_height = queued_block.unwrap().tick_height; + let block_id = queued_block.unwrap().id; + entry_stream + .emit_block_event(block_tick_height, block_id) + .unwrap_or_else(|e| { + error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); + }); + entry_stream.queued_block = None; + } + entry_stream.emit_entry_event(&entry).unwrap_or_else(|e| { error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output); }); + if 0 == entry_stream + .leader_scheduler + .read() + .unwrap() + .num_ticks_left_in_slot(entry.tick_height) + { + entry_stream.queued_block = Some(EntryStreamBlock { + tick_height: entry.tick_height, + id: entry.id, + }); + } + } + entry_stream_sender.send(entries)?; Ok(()) } @@ -87,11 +111,14 @@ mod test { use chrono::{DateTime, FixedOffset}; use serde_json::Value; use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction::SystemTransaction; #[test] fn test_entry_stream_stage_process_entries() { // Set up bank and leader_scheduler - let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10); + let ticks_per_slot = 5; + let leader_scheduler_config = LeaderSchedulerConfig::new(ticks_per_slot, 2, 10); let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000); let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config); // Set up entry stream @@ -105,12 +132,18 @@ mod test { let mut last_id = Hash::default(); let mut entries = Vec::new(); let mut expected_entries = Vec::new(); - for x in 0..5 { + for x in 0..6 { let entry = Entry::new(&mut last_id, x, 1, vec![]); //just ticks last_id = entry.id; expected_entries.push(entry.clone()); entries.push(entry); } + let keypair = Keypair::new(); + let tx = SystemTransaction::new_account(&keypair, keypair.pubkey(), 1, Hash::default(), 0); + let entry = Entry::new(&mut last_id, ticks_per_slot - 1, 1, vec![tx]); + expected_entries.insert(ticks_per_slot as usize, entry.clone()); + entries.insert(ticks_per_slot as usize, entry); + ledger_entry_sender.send(entries).unwrap(); EntryStreamStage::process_entries( &ledger_entry_receiver, @@ -118,19 +151,38 @@ mod test { &mut entry_stream, ) .unwrap(); - assert_eq!(entry_stream.entries().len(), 5); - - for (i, item) in entry_stream.entries().iter().enumerate() { - let json: Value = serde_json::from_str(&item).unwrap(); - let dt_str = json["dt"].as_str().unwrap(); - - // Ensure `ts` field parses as valid DateTime - let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); + assert_eq!(entry_stream.entries().len(), 8); + let (entry_events, block_events): (Vec, Vec) = entry_stream + .entries() + .iter() + .map(|item| { + let json: Value = serde_json::from_str(&item).unwrap(); + let dt_str = json["dt"].as_str().unwrap(); + // Ensure `ts` field parses as valid DateTime + let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); + json + }) + .partition(|json| { + let item_type = json["t"].as_str().unwrap(); + item_type == "entry" + }); + for (i, json) in entry_events.iter().enumerate() { let entry_obj = json["entry"].clone(); - let entry: Entry = serde_json::from_value(entry_obj).unwrap(); - assert_eq!(entry, expected_entries[i]); + let tx = entry_obj["transactions"].as_array().unwrap(); + if tx.len() == 0 { + // TODO: There is a bug in Transaction deserialize methods such that + // `serde_json::from_str` does not work for populated Entries. + // Remove this `if` when fixed. + let entry: Entry = serde_json::from_value(entry_obj).unwrap(); + assert_eq!(entry, expected_entries[i]); + } } + for json in block_events { + let height = json["h"].as_u64().unwrap(); + assert_eq!(ticks_per_slot - 1, height); + } + // Ensure entries pass through stage unadulterated let recv_entries = entry_stream_receiver.recv().unwrap(); assert_eq!(expected_entries, recv_entries);