Make EntryStreamStage optional

This commit is contained in:
Tyera Eulberg 2019-02-11 14:51:14 -07:00 committed by Grimes
parent f977327c7b
commit d41dec9395
2 changed files with 26 additions and 22 deletions

View File

@ -25,14 +25,12 @@ impl EntryStreamStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(
ledger_entry_receiver: EntryReceiver,
entry_stream: Option<&String>,
entry_stream_socket: String,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit: Arc<AtomicBool>,
) -> (Self, EntryReceiver) {
let (entry_stream_sender, entry_stream_receiver) = channel();
let mut entry_stream = entry_stream
.cloned()
.map(|socket| EntryStream::new(socket, leader_scheduler));
let mut entry_stream = EntryStream::new(entry_stream_socket, leader_scheduler);
let t_entry_stream = Builder::new()
.name("solana-entry-stream".to_string())
.spawn(move || loop {
@ -42,7 +40,7 @@ impl EntryStreamStage {
if let Err(e) = Self::process_entries(
&ledger_entry_receiver,
&entry_stream_sender,
entry_stream.as_mut(),
&mut entry_stream,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -57,15 +55,13 @@ impl EntryStreamStage {
fn process_entries(
ledger_entry_receiver: &EntryReceiver,
entry_stream_sender: &EntrySender,
entry_stream: Option<&mut EntryStream>,
entry_stream: &mut EntryStream,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries = ledger_entry_receiver.recv_timeout(timeout)?;
if let Some(stream) = entry_stream {
stream.stream_entries(&entries).unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, stream.socket);
});
}
entry_stream.stream_entries(&entries).unwrap_or_else(|e| {
error!("Entry Stream error: {:?}, {:?}", e, entry_stream.socket);
});
entry_stream_sender.send(entries)?;
Ok(())
}
@ -117,7 +113,7 @@ mod test {
EntryStreamStage::process_entries(
&ledger_entry_receiver,
&entry_stream_sender,
Some(&mut entry_stream),
&mut entry_stream,
)
.unwrap();
assert_eq!(entry_stream.socket.len(), 5);

View File

@ -40,7 +40,7 @@ pub struct Tvu {
fetch_stage: BlobFetchStage,
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
entry_stream_stage: EntryStreamStage,
entry_stream_stage: Option<EntryStreamStage>,
storage_stage: StorageStage,
exit: Arc<AtomicBool>,
last_entry_id: Arc<RwLock<Hash>>,
@ -118,7 +118,7 @@ impl Tvu {
let l_last_entry_id = Arc::new(RwLock::new(last_entry_id));
let (replay_stage, ledger_entry_receiver) = ReplayStage::new(
let (replay_stage, mut previous_receiver) = ReplayStage::new(
keypair.pubkey(),
voting_keypair,
blocktree.clone(),
@ -132,16 +132,22 @@ impl Tvu {
ledger_signal_receiver,
);
let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
ledger_entry_receiver,
entry_stream,
bank.leader_scheduler.clone(),
exit.clone(),
);
let entry_stream_stage = if entry_stream.is_some() {
let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
previous_receiver,
entry_stream.unwrap().to_string(),
bank.leader_scheduler.clone(),
exit.clone(),
);
previous_receiver = entry_stream_receiver;
Some(entry_stream_stage)
} else {
None
};
let storage_stage = StorageStage::new(
storage_state,
entry_stream_receiver,
previous_receiver,
Some(blocktree),
&keypair,
&exit.clone(),
@ -197,7 +203,9 @@ impl Service for Tvu {
self.retransmit_stage.join()?;
self.fetch_stage.join()?;
self.storage_stage.join()?;
self.entry_stream_stage.join()?;
if self.entry_stream_stage.is_some() {
self.entry_stream_stage.unwrap().join()?;
}
self.replay_stage.join()?;
Ok(())
}