Remove exit variable from RequestStage

This commit is contained in:
Greg Fitzgerald 2018-07-05 16:43:48 -06:00 committed by Greg Fitzgerald
parent c4fa841aa9
commit 3ed9567f96
2 changed files with 9 additions and 11 deletions

View File

@ -5,11 +5,10 @@ use packet::{to_blobs, BlobRecycler, PacketRecycler, Packets, SharedPackets};
use rayon::prelude::*; use rayon::prelude::*;
use request::Request; use request::Request;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use result::Result; use result::{Error, Result};
use service::Service; use service::Service;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc; use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Instant; use std::time::Instant;
@ -81,7 +80,6 @@ impl RequestStage {
} }
pub fn new( pub fn new(
request_processor: RequestProcessor, request_processor: RequestProcessor,
exit: Arc<AtomicBool>,
packet_receiver: Receiver<SharedPackets>, packet_receiver: Receiver<SharedPackets>,
packet_recycler: PacketRecycler, packet_recycler: PacketRecycler,
blob_recycler: BlobRecycler, blob_recycler: BlobRecycler,
@ -92,16 +90,17 @@ impl RequestStage {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-request-stage".to_string()) .name("solana-request-stage".to_string())
.spawn(move || loop { .spawn(move || loop {
let e = Self::process_request_packets( if let Err(e) = Self::process_request_packets(
&request_processor_, &request_processor_,
&packet_receiver, &packet_receiver,
&blob_sender, &blob_sender,
&packet_recycler, &packet_recycler,
&blob_recycler, &blob_recycler,
); ) {
if e.is_err() { match e {
if exit.load(Ordering::Relaxed) { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
break; Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
} }
} }
}) })

View File

@ -50,7 +50,7 @@ impl Rpu {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver( let t_receiver = streamer::receiver(
requests_socket, requests_socket,
exit.clone(), exit,
packet_recycler.clone(), packet_recycler.clone(),
packet_sender, packet_sender,
); );
@ -59,7 +59,6 @@ impl Rpu {
let request_processor = RequestProcessor::new(bank.clone()); let request_processor = RequestProcessor::new(bank.clone());
let (request_stage, blob_receiver) = RequestStage::new( let (request_stage, blob_receiver) = RequestStage::new(
request_processor, request_processor,
exit.clone(),
packet_receiver, packet_receiver,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),