diff --git a/src/entry_stream.rs b/src/entry_stream.rs index 57500dccc..ac77a32c7 100644 --- a/src/entry_stream.rs +++ b/src/entry_stream.rs @@ -3,12 +3,14 @@ //! real-time access to entries. use crate::entry::Entry; +use crate::leader_scheduler::LeaderScheduler; use crate::result::Result; -use chrono::Utc; +use chrono::{SecondsFormat, Utc}; use std::io::prelude::*; use std::net::Shutdown; use std::os::unix::net::UnixStream; use std::path::Path; +use std::sync::{Arc, RwLock}; pub trait EntryStreamHandler { fn stream_entries(&mut self, entries: &[Entry]) -> Result<()>; @@ -16,11 +18,15 @@ pub trait EntryStreamHandler { pub struct EntryStream { pub socket: String, + leader_scheduler: Arc>, } impl EntryStream { - pub fn new(socket: String) -> Self { - EntryStream { socket } + pub fn new(socket: String, leader_scheduler: Arc>) -> Self { + EntryStream { + socket, + leader_scheduler, + } } } @@ -29,7 +35,18 @@ impl EntryStreamHandler for EntryStream { let mut socket = UnixStream::connect(Path::new(&self.socket))?; for entry in entries { let json = serde_json::to_string(&entry)?; - let payload = format!(r#"{{"dt":"{}","entry":{}}}"#, Utc::now().to_rfc3339(), json); + let (slot, slot_leader) = { + let leader_scheduler = self.leader_scheduler.read().unwrap(); + let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); + (slot, leader_scheduler.get_leader_for_slot(slot)) + }; + let payload = format!( + r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#, + Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), + slot, + slot_leader, + json + ); socket.write_all(payload.as_bytes())?; } socket.shutdown(Shutdown::Write)?; @@ -39,12 +56,16 @@ impl EntryStreamHandler for EntryStream { pub struct MockEntryStream { pub socket: Vec, + leader_scheduler: Arc>, } impl MockEntryStream { #[allow(clippy::needless_pass_by_value)] - pub fn new(_socket: String) -> Self { - MockEntryStream { socket: Vec::new() } + pub fn new(_socket: String, leader_scheduler: Arc>) -> Self { + MockEntryStream { + socket: Vec::new(), + leader_scheduler, + } } } @@ -52,7 +73,18 @@ impl EntryStreamHandler for MockEntryStream { fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> { for entry in entries { let json = serde_json::to_string(&entry)?; - let payload = format!(r#"{{"dt":"{}","entry":{}}}"#, Utc::now().to_rfc3339(), json); + let (slot, slot_leader) = { + let leader_scheduler = self.leader_scheduler.read().unwrap(); + let slot = leader_scheduler.tick_height_to_slot(entry.tick_height); + (slot, leader_scheduler.get_leader_for_slot(slot)) + }; + let payload = format!( + r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#, + Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), + slot, + slot_leader, + json + ); self.socket.push(payload); } Ok(()) diff --git a/src/entry_stream_stage.rs b/src/entry_stream_stage.rs new file mode 100644 index 000000000..943441436 --- /dev/null +++ b/src/entry_stream_stage.rs @@ -0,0 +1,140 @@ +//! The `entry_stream_stage` implements optional streaming of entries using the +//! `entry_stream` module, providing client services such as a block explorer with +//! real-time access to entries. + +use crate::entry::{EntryReceiver, EntrySender}; +#[cfg(not(test))] +use crate::entry_stream::EntryStream; +use crate::entry_stream::EntryStreamHandler; +#[cfg(test)] +use crate::entry_stream::MockEntryStream as EntryStream; +use crate::leader_scheduler::LeaderScheduler; +use crate::result::{Error, Result}; +use crate::service::Service; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, RecvTimeoutError}; +use std::sync::{Arc, RwLock}; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; + +pub struct EntryStreamStage { + t_entry_stream: JoinHandle<()>, +} + +impl EntryStreamStage { + #[allow(clippy::new_ret_no_self)] + pub fn new( + ledger_entry_receiver: EntryReceiver, + entry_stream: Option<&String>, + leader_scheduler: Arc>, + exit: Arc, + ) -> (Self, EntryReceiver) { + let (entry_stream_sender, entry_stream_receiver) = channel(); + let mut entry_stream = entry_stream + .cloned() + .map(|socket| EntryStream::new(socket, leader_scheduler)); + let t_entry_stream = Builder::new() + .name("solana-entry-stream".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + if let Err(e) = Self::process_entries( + &ledger_entry_receiver, + &entry_stream_sender, + entry_stream.as_mut(), + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => info!("Error from process_entries: {:?}", e), + } + } + }) + .unwrap(); + (Self { t_entry_stream }, entry_stream_receiver) + } + fn process_entries( + ledger_entry_receiver: &EntryReceiver, + entry_stream_sender: &EntrySender, + entry_stream: Option<&mut EntryStream>, + ) -> Result<()> { + let timeout = Duration::new(1, 0); + let entries = ledger_entry_receiver.recv_timeout(timeout)?; + if let Some(stream) = entry_stream { + stream.stream_entries(&entries).unwrap_or_else(|e| { + error!("Entry Stream error: {:?}, {:?}", e, stream.socket); + }); + } + entry_stream_sender.send(entries)?; + Ok(()) + } +} + +impl Service for EntryStreamStage { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.t_entry_stream.join() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::bank::Bank; + use crate::entry::Entry; + use crate::genesis_block::GenesisBlock; + use crate::leader_scheduler::LeaderSchedulerConfig; + use chrono::{DateTime, FixedOffset}; + use serde_json::Value; + use solana_sdk::hash::Hash; + + #[test] + fn test_entry_stream_stage_process_entries() { + // Set up bank and leader_scheduler + let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10); + let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000); + let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config); + // Set up entry stream + let mut entry_stream = + EntryStream::new("test_stream".to_string(), bank.leader_scheduler.clone()); + + // Set up dummy channels to host an EntryStreamStage + let (ledger_entry_sender, ledger_entry_receiver) = channel(); + let (entry_stream_sender, entry_stream_receiver) = channel(); + + let mut last_id = Hash::default(); + let mut entries = Vec::new(); + let mut expected_entries = Vec::new(); + for x in 0..5 { + let entry = Entry::new(&mut last_id, x, 1, vec![]); //just ticks + last_id = entry.id; + expected_entries.push(entry.clone()); + entries.push(entry); + } + ledger_entry_sender.send(entries).unwrap(); + EntryStreamStage::process_entries( + &ledger_entry_receiver, + &entry_stream_sender, + Some(&mut entry_stream), + ) + .unwrap(); + assert_eq!(entry_stream.socket.len(), 5); + + for (i, item) in entry_stream.socket.iter().enumerate() { + let json: Value = serde_json::from_str(&item).unwrap(); + let dt_str = json["dt"].as_str().unwrap(); + + // Ensure `ts` field parses as valid DateTime + let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); + + let entry_obj = json["entry"].clone(); + let entry: Entry = serde_json::from_value(entry_obj).unwrap(); + assert_eq!(entry, expected_entries[i]); + } + // Ensure entries pass through stage unadulterated + let recv_entries = entry_stream_receiver.recv().unwrap(); + assert_eq!(expected_entries, recv_entries); + } +} diff --git a/src/lib.rs b/src/lib.rs index ebf2b595a..8f58e9275 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,7 @@ pub mod compute_leader_confirmation_service; pub mod db_window; pub mod entry; pub mod entry_stream; +pub mod entry_stream_stage; #[cfg(feature = "erasure")] pub mod erasure; pub mod fetch_stage; diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 2019f4540..b5b99d3d6 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -5,11 +5,6 @@ use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::counter::Counter; use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; -#[cfg(not(test))] -use crate::entry_stream::EntryStream; -use crate::entry_stream::EntryStreamHandler; -#[cfg(test)] -use crate::entry_stream::MockEntryStream as EntryStream; use crate::packet::BlobError; use crate::result::{Error, Result}; use crate::service::Service; @@ -70,13 +65,7 @@ impl ReplayStage { ledger_entry_sender: &EntrySender, current_blob_index: &mut u64, last_entry_id: &Arc>, - entry_stream: Option<&mut EntryStream>, ) -> Result<()> { - if let Some(stream) = entry_stream { - stream.stream_entries(&entries).unwrap_or_else(|e| { - error!("Entry Stream error: {:?}, {:?}", e, stream.socket); - }); - } // Coalesce all the available entries into a single vote submit( influxdb::Point::new("replicate-stage") @@ -190,12 +179,10 @@ impl ReplayStage { mut current_blob_index: u64, last_entry_id: Arc>, to_leader_sender: &TvuRotationSender, - entry_stream: Option<&String>, ledger_signal_sender: SyncSender, ledger_signal_receiver: Receiver, ) -> (Self, EntryReceiver) { let (ledger_entry_sender, ledger_entry_receiver) = channel(); - let mut entry_stream = entry_stream.cloned().map(EntryStream::new); #[cfg(test)] let (pause, pause_) = { let pause = Arc::new(AtomicBool::new(false)); @@ -279,7 +266,6 @@ impl ReplayStage { &ledger_entry_sender, &mut current_blob_index, &last_entry_id, - entry_stream.as_mut(), ) { error!("process_entries failed: {:?}", e); } @@ -383,8 +369,6 @@ mod test { use crate::leader_scheduler::{make_active_set_entries, LeaderSchedulerConfig}; use crate::replay_stage::ReplayStage; use crate::voting_keypair::VotingKeypair; - use chrono::{DateTime, FixedOffset}; - use serde_json::Value; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; @@ -468,7 +452,6 @@ mod test { meta.consumed, Arc::new(RwLock::new(last_entry_id)), &rotation_sender, - None, l_sender, l_receiver, ); @@ -562,7 +545,6 @@ mod test { entry_height, Arc::new(RwLock::new(last_entry_id)), &to_leader_sender, - None, l_sender, l_receiver, ); @@ -676,7 +658,6 @@ mod test { meta.consumed, Arc::new(RwLock::new(last_entry_id)), &rotation_tx, - None, l_sender, l_receiver, ); @@ -759,7 +740,6 @@ mod test { &ledger_entry_sender, &mut current_blob_index, &Arc::new(RwLock::new(last_entry_id)), - None, ); match res { @@ -781,7 +761,6 @@ mod test { &ledger_entry_sender, &mut current_blob_index, &Arc::new(RwLock::new(last_entry_id)), - None, ); match res { @@ -794,59 +773,4 @@ mod test { ), } } - - #[test] - fn test_replay_stage_stream_entries() { - // Set up entry stream - let mut entry_stream = EntryStream::new("test_stream".to_string()); - - // Set up dummy node to host a ReplayStage - let my_keypair = Keypair::new(); - let my_id = my_keypair.pubkey(); - let my_node = Node::new_localhost_with_pubkey(my_id); - // Set up the cluster info - let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); - let (ledger_entry_sender, _ledger_entry_receiver) = channel(); - let last_entry_id = Hash::default(); - - let mut entry_height = 0; - let mut last_id = Hash::default(); - let mut entries = Vec::new(); - let mut expected_entries = Vec::new(); - for _ in 0..5 { - let entry = Entry::new(&mut last_id, 0, 1, vec![]); //just ticks - last_id = entry.id; - expected_entries.push(entry.clone()); - entries.push(entry); - } - - let my_keypair = Arc::new(my_keypair); - let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair)); - let bank = Bank::new(&GenesisBlock::new(123).0); - ReplayStage::process_entries( - entries.clone(), - &Arc::new(bank), - &cluster_info_me, - Some(&voting_keypair), - &ledger_entry_sender, - &mut entry_height, - &Arc::new(RwLock::new(last_entry_id)), - Some(&mut entry_stream), - ) - .unwrap(); - - assert_eq!(entry_stream.socket.len(), 5); - - for (i, item) in entry_stream.socket.iter().enumerate() { - let json: Value = serde_json::from_str(&item).unwrap(); - let dt_str = json["dt"].as_str().unwrap(); - - // Ensure `ts` field parses as valid DateTime - let _dt: DateTime = DateTime::parse_from_rfc3339(dt_str).unwrap(); - - let entry_obj = json["entry"].clone(); - let entry: Entry = serde_json::from_value(entry_obj).unwrap(); - assert_eq!(entry, expected_entries[i]); - } - } } diff --git a/src/tvu.rs b/src/tvu.rs index 6c2f06582..8ea92ee42 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -16,6 +16,7 @@ use crate::bank::Bank; use crate::blob_fetch_stage::BlobFetchStage; use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; +use crate::entry_stream_stage::EntryStreamStage; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; use crate::service::Service; @@ -39,6 +40,7 @@ pub struct Tvu { fetch_stage: BlobFetchStage, retransmit_stage: RetransmitStage, replay_stage: ReplayStage, + entry_stream_stage: EntryStreamStage, storage_stage: StorageStage, exit: Arc, last_entry_id: Arc>, @@ -126,14 +128,20 @@ impl Tvu { blob_index, l_last_entry_id.clone(), to_leader_sender, - entry_stream, ledger_signal_sender, ledger_signal_receiver, ); + let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new( + ledger_entry_receiver, + entry_stream, + bank.leader_scheduler.clone(), + exit.clone(), + ); + let storage_stage = StorageStage::new( storage_state, - ledger_entry_receiver, + entry_stream_receiver, Some(blocktree), &keypair, &exit.clone(), @@ -147,6 +155,7 @@ impl Tvu { fetch_stage, retransmit_stage, replay_stage, + entry_stream_stage, storage_stage, exit, last_entry_id: l_last_entry_id, @@ -188,6 +197,7 @@ impl Service for Tvu { self.retransmit_stage.join()?; self.fetch_stage.join()?; self.storage_stage.join()?; + self.entry_stream_stage.join()?; self.replay_stage.join()?; Ok(()) }