From cbd664ba4b79de4b37444390049c80b0b1d565c2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 15:41:53 -0600 Subject: [PATCH] Remove exit variable from BankingStage --- src/banking_stage.rs | 22 +++++++++++----------- src/tpu.rs | 8 ++------ src/write_stage.rs | 3 ++- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 871df4843..94f79d78f 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -8,11 +8,11 @@ use counter::Counter; use packet::{PacketRecycler, Packets, SharedPackets}; use rayon::prelude::*; use record_stage::Signal; -use result::Result; +use result::{Error, Result}; use service::Service; use std::net::SocketAddr; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::atomic::AtomicUsize; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -27,13 +27,11 @@ pub struct BankingStage { } impl BankingStage { - /// Create the stage using `bank`. Exit when either `exit` is set or - /// when `verified_receiver` or the stage's output receiver is dropped. + /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. /// Discard input packets using `packet_recycler` to minimize memory /// allocations in a previous stage such as the `fetch_stage`. pub fn new( bank: Arc, - exit: Arc, verified_receiver: Receiver)>>, packet_recycler: PacketRecycler, ) -> (Self, Receiver) { @@ -41,15 +39,17 @@ impl BankingStage { let thread_hdl = Builder::new() .name("solana-banking-stage".to_string()) .spawn(move || loop { - let e = Self::process_packets( + if let Err(e) = Self::process_packets( bank.clone(), &verified_receiver, &signal_sender, &packet_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::SendError => (), // Ignore when downstream stage exits prematurely. + _ => error!("{:?}", e), } } }) diff --git a/src/tpu.rs b/src/tpu.rs index 4136e2d65..e3cacad2b 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -66,12 +66,8 @@ impl Tpu { let (sigverify_stage, verified_receiver) = SigVerifyStage::new(exit.clone(), packet_receiver); - let (banking_stage, signal_receiver) = BankingStage::new( - bank.clone(), - exit.clone(), - verified_receiver, - packet_recycler.clone(), - ); + let (banking_stage, signal_receiver) = + BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone()); let (record_stage, entry_receiver) = match tick_duration { Some(tick_duration) => { diff --git a/src/write_stage.rs b/src/write_stage.rs index a892e4b0a..ba48b3ccb 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -63,7 +63,8 @@ impl WriteStage { ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::SendError => (), // Ignore when downstream stage exists prematurely. + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::SendError => (), // Ignore when downstream stage exits prematurely. _ => error!("{:?}", e), } };