Move EntryStream into its own Tvu stage

This commit is contained in:
Tyera Eulberg 2019-02-11 11:36:59 -07:00 committed by Grimes
parent aac1a58651
commit f977327c7b
5 changed files with 192 additions and 85 deletions

View File

@ -3,12 +3,14 @@
//! real-time access to entries.
use crate::entry::Entry;
use crate::leader_scheduler::LeaderScheduler;
use crate::result::Result;
use chrono::Utc;
use chrono::{SecondsFormat, Utc};
use std::io::prelude::*;
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::sync::{Arc, RwLock};
pub trait EntryStreamHandler {
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()>;
@ -16,11 +18,15 @@ pub trait EntryStreamHandler {
pub struct EntryStream {
pub socket: String,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
}
impl EntryStream {
pub fn new(socket: String) -> Self {
EntryStream { socket }
pub fn new(socket: String, leader_scheduler: Arc<RwLock<LeaderScheduler>>) -> Self {
EntryStream {
socket,
leader_scheduler,
}
}
}
@ -29,7 +35,18 @@ impl EntryStreamHandler for EntryStream {
let mut socket = UnixStream::connect(Path::new(&self.socket))?;
for entry in entries {
let json = serde_json::to_string(&entry)?;
let payload = format!(r#"{{"dt":"{}","entry":{}}}"#, Utc::now().to_rfc3339(), json);
let (slot, slot_leader) = {
let leader_scheduler = self.leader_scheduler.read().unwrap();
let slot = leader_scheduler.tick_height_to_slot(entry.tick_height);
(slot, leader_scheduler.get_leader_for_slot(slot))
};
let payload = format!(
r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#,
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
slot,
slot_leader,
json
);
socket.write_all(payload.as_bytes())?;
}
socket.shutdown(Shutdown::Write)?;
@ -39,12 +56,16 @@ impl EntryStreamHandler for EntryStream {
pub struct MockEntryStream {
pub socket: Vec<String>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
}
impl MockEntryStream {
#[allow(clippy::needless_pass_by_value)]
pub fn new(_socket: String) -> Self {
MockEntryStream { socket: Vec::new() }
pub fn new(_socket: String, leader_scheduler: Arc<RwLock<LeaderScheduler>>) -> Self {
MockEntryStream {
socket: Vec::new(),
leader_scheduler,
}
}
}
@ -52,7 +73,18 @@ impl EntryStreamHandler for MockEntryStream {
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> {
for entry in entries {
let json = serde_json::to_string(&entry)?;
let payload = format!(r#"{{"dt":"{}","entry":{}}}"#, Utc::now().to_rfc3339(), json);
let (slot, slot_leader) = {
let leader_scheduler = self.leader_scheduler.read().unwrap();
let slot = leader_scheduler.tick_height_to_slot(entry.tick_height);
(slot, leader_scheduler.get_leader_for_slot(slot))
};
let payload = format!(
r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#,
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
slot,
slot_leader,
json
);
self.socket.push(payload);
}
Ok(())

140
src/entry_stream_stage.rs Normal file
View File

@ -0,0 +1,140 @@
//! The `entry_stream_stage` implements optional streaming of entries using the
//! `entry_stream` module, providing client services such as a block explorer with
//! real-time access to entries.
use crate::entry::{EntryReceiver, EntrySender};
#[cfg(not(test))]
use crate::entry_stream::EntryStream;
use crate::entry_stream::EntryStreamHandler;
#[cfg(test)]
use crate::entry_stream::MockEntryStream as EntryStream;
use crate::leader_scheduler::LeaderScheduler;
use crate::result::{Error, Result};
use crate::service::Service;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
pub struct EntryStreamStage {
t_entry_stream: JoinHandle<()>,
}
impl EntryStreamStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(
ledger_entry_receiver: EntryReceiver,
entry_stream: Option<&String>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit: Arc<AtomicBool>,
) -> (Self, EntryReceiver) {
let (entry_stream_sender, entry_stream_receiver) = channel();
let mut entry_stream = entry_stream
.cloned()
.map(|socket| EntryStream::new(socket, leader_scheduler));
let t_entry_stream = Builder::new()
.name("solana-entry-stream".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::process_entries(
&ledger_entry_receiver,
&entry_stream_sender,
entry_stream.as_mut(),
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("Error from process_entries: {:?}", e),
}
}
})
.unwrap();
(Self { t_entry_stream }, entry_stream_receiver)
}
fn process_entries(
ledger_entry_receiver: &EntryReceiver,
entry_stream_sender: &EntrySender,
entry_stream: Option<&mut EntryStream>,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries = ledger_entry_receiver.recv_timeout(timeout)?;
if let Some(stream) = entry_stream {
stream.stream_entries(&entries).unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, stream.socket);
});
}
entry_stream_sender.send(entries)?;
Ok(())
}
}
impl Service for EntryStreamStage {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_entry_stream.join()
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::bank::Bank;
use crate::entry::Entry;
use crate::genesis_block::GenesisBlock;
use crate::leader_scheduler::LeaderSchedulerConfig;
use chrono::{DateTime, FixedOffset};
use serde_json::Value;
use solana_sdk::hash::Hash;
#[test]
fn test_entry_stream_stage_process_entries() {
// Set up bank and leader_scheduler
let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10);
let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
// Set up entry stream
let mut entry_stream =
EntryStream::new("test_stream".to_string(), bank.leader_scheduler.clone());
// Set up dummy channels to host an EntryStreamStage
let (ledger_entry_sender, ledger_entry_receiver) = channel();
let (entry_stream_sender, entry_stream_receiver) = channel();
let mut last_id = Hash::default();
let mut entries = Vec::new();
let mut expected_entries = Vec::new();
for x in 0..5 {
let entry = Entry::new(&mut last_id, x, 1, vec![]); //just ticks
last_id = entry.id;
expected_entries.push(entry.clone());
entries.push(entry);
}
ledger_entry_sender.send(entries).unwrap();
EntryStreamStage::process_entries(
&ledger_entry_receiver,
&entry_stream_sender,
Some(&mut entry_stream),
)
.unwrap();
assert_eq!(entry_stream.socket.len(), 5);
for (i, item) in entry_stream.socket.iter().enumerate() {
let json: Value = serde_json::from_str(&item).unwrap();
let dt_str = json["dt"].as_str().unwrap();
// Ensure `ts` field parses as valid DateTime
let _dt: DateTime<FixedOffset> = DateTime::parse_from_rfc3339(dt_str).unwrap();
let entry_obj = json["entry"].clone();
let entry: Entry = serde_json::from_value(entry_obj).unwrap();
assert_eq!(entry, expected_entries[i]);
}
// Ensure entries pass through stage unadulterated
let recv_entries = entry_stream_receiver.recv().unwrap();
assert_eq!(expected_entries, recv_entries);
}
}

View File

@ -35,6 +35,7 @@ pub mod compute_leader_confirmation_service;
pub mod db_window;
pub mod entry;
pub mod entry_stream;
pub mod entry_stream_stage;
#[cfg(feature = "erasure")]
pub mod erasure;
pub mod fetch_stage;

View File

@ -5,11 +5,6 @@ use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
#[cfg(not(test))]
use crate::entry_stream::EntryStream;
use crate::entry_stream::EntryStreamHandler;
#[cfg(test)]
use crate::entry_stream::MockEntryStream as EntryStream;
use crate::packet::BlobError;
use crate::result::{Error, Result};
use crate::service::Service;
@ -70,13 +65,7 @@ impl ReplayStage {
ledger_entry_sender: &EntrySender,
current_blob_index: &mut u64,
last_entry_id: &Arc<RwLock<Hash>>,
entry_stream: Option<&mut EntryStream>,
) -> Result<()> {
if let Some(stream) = entry_stream {
stream.stream_entries(&entries).unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, stream.socket);
});
}
// Coalesce all the available entries into a single vote
submit(
influxdb::Point::new("replicate-stage")
@ -190,12 +179,10 @@ impl ReplayStage {
mut current_blob_index: u64,
last_entry_id: Arc<RwLock<Hash>>,
to_leader_sender: &TvuRotationSender,
entry_stream: Option<&String>,
ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>,
) -> (Self, EntryReceiver) {
let (ledger_entry_sender, ledger_entry_receiver) = channel();
let mut entry_stream = entry_stream.cloned().map(EntryStream::new);
#[cfg(test)]
let (pause, pause_) = {
let pause = Arc::new(AtomicBool::new(false));
@ -279,7 +266,6 @@ impl ReplayStage {
&ledger_entry_sender,
&mut current_blob_index,
&last_entry_id,
entry_stream.as_mut(),
) {
error!("process_entries failed: {:?}", e);
}
@ -383,8 +369,6 @@ mod test {
use crate::leader_scheduler::{make_active_set_entries, LeaderSchedulerConfig};
use crate::replay_stage::ReplayStage;
use crate::voting_keypair::VotingKeypair;
use chrono::{DateTime, FixedOffset};
use serde_json::Value;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
@ -468,7 +452,6 @@ mod test {
meta.consumed,
Arc::new(RwLock::new(last_entry_id)),
&rotation_sender,
None,
l_sender,
l_receiver,
);
@ -562,7 +545,6 @@ mod test {
entry_height,
Arc::new(RwLock::new(last_entry_id)),
&to_leader_sender,
None,
l_sender,
l_receiver,
);
@ -676,7 +658,6 @@ mod test {
meta.consumed,
Arc::new(RwLock::new(last_entry_id)),
&rotation_tx,
None,
l_sender,
l_receiver,
);
@ -759,7 +740,6 @@ mod test {
&ledger_entry_sender,
&mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)),
None,
);
match res {
@ -781,7 +761,6 @@ mod test {
&ledger_entry_sender,
&mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)),
None,
);
match res {
@ -794,59 +773,4 @@ mod test {
),
}
}
#[test]
fn test_replay_stage_stream_entries() {
// Set up entry stream
let mut entry_stream = EntryStream::new("test_stream".to_string());
// Set up dummy node to host a ReplayStage
let my_keypair = Keypair::new();
let my_id = my_keypair.pubkey();
let my_node = Node::new_localhost_with_pubkey(my_id);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
let last_entry_id = Hash::default();
let mut entry_height = 0;
let mut last_id = Hash::default();
let mut entries = Vec::new();
let mut expected_entries = Vec::new();
for _ in 0..5 {
let entry = Entry::new(&mut last_id, 0, 1, vec![]); //just ticks
last_id = entry.id;
expected_entries.push(entry.clone());
entries.push(entry);
}
let my_keypair = Arc::new(my_keypair);
let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair));
let bank = Bank::new(&GenesisBlock::new(123).0);
ReplayStage::process_entries(
entries.clone(),
&Arc::new(bank),
&cluster_info_me,
Some(&voting_keypair),
&ledger_entry_sender,
&mut entry_height,
&Arc::new(RwLock::new(last_entry_id)),
Some(&mut entry_stream),
)
.unwrap();
assert_eq!(entry_stream.socket.len(), 5);
for (i, item) in entry_stream.socket.iter().enumerate() {
let json: Value = serde_json::from_str(&item).unwrap();
let dt_str = json["dt"].as_str().unwrap();
// Ensure `ts` field parses as valid DateTime
let _dt: DateTime<FixedOffset> = DateTime::parse_from_rfc3339(dt_str).unwrap();
let entry_obj = json["entry"].clone();
let entry: Entry = serde_json::from_value(entry_obj).unwrap();
assert_eq!(entry, expected_entries[i]);
}
}
}

View File

@ -16,6 +16,7 @@ use crate::bank::Bank;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::entry_stream_stage::EntryStreamStage;
use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
use crate::service::Service;
@ -39,6 +40,7 @@ pub struct Tvu {
fetch_stage: BlobFetchStage,
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
entry_stream_stage: EntryStreamStage,
storage_stage: StorageStage,
exit: Arc<AtomicBool>,
last_entry_id: Arc<RwLock<Hash>>,
@ -126,14 +128,20 @@ impl Tvu {
blob_index,
l_last_entry_id.clone(),
to_leader_sender,
entry_stream,
ledger_signal_sender,
ledger_signal_receiver,
);
let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
ledger_entry_receiver,
entry_stream,
bank.leader_scheduler.clone(),
exit.clone(),
);
let storage_stage = StorageStage::new(
storage_state,
ledger_entry_receiver,
entry_stream_receiver,
Some(blocktree),
&keypair,
&exit.clone(),
@ -147,6 +155,7 @@ impl Tvu {
fetch_stage,
retransmit_stage,
replay_stage,
entry_stream_stage,
storage_stage,
exit,
last_entry_id: l_last_entry_id,
@ -188,6 +197,7 @@ impl Service for Tvu {
self.retransmit_stage.join()?;
self.fetch_stage.join()?;
self.storage_stage.join()?;
self.entry_stream_stage.join()?;
self.replay_stage.join()?;
Ok(())
}