Remove exit variable from respond [stage]

And drop the sender that feeds input to the responder.
This commit is contained in:
Greg Fitzgerald 2018-07-05 16:41:03 -06:00 committed by Greg Fitzgerald
parent f284af1c3d
commit c4fa841aa9
4 changed files with 51 additions and 60 deletions

View File

@ -37,12 +37,8 @@ impl Ncp {
request_sender, request_sender,
)?; )?;
let (response_sender, response_receiver) = channel(); let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder( let t_responder =
gossip_send_socket, streamer::responder(gossip_send_socket, blob_recycler.clone(), response_receiver);
exit.clone(),
blob_recycler.clone(),
response_receiver,
);
let t_listen = Crdt::listen( let t_listen = Crdt::listen(
crdt.clone(), crdt.clone(),
window, window,

View File

@ -65,12 +65,7 @@ impl Rpu {
blob_recycler.clone(), blob_recycler.clone(),
); );
let t_responder = streamer::responder( let t_responder = streamer::responder(respond_socket, blob_recycler.clone(), blob_receiver);
respond_socket,
exit.clone(),
blob_recycler.clone(),
blob_receiver,
);
let mut thread_hdls = vec![t_receiver, t_responder]; let mut thread_hdls = vec![t_receiver, t_responder];
thread_hdls.extend(request_stage.thread_hdls().into_iter()); thread_hdls.extend(request_stage.thread_hdls().into_iter());

View File

@ -101,17 +101,16 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)>
Ok((batch, len)) Ok((batch, len))
} }
pub fn responder( pub fn responder(sock: UdpSocket, recycler: BlobRecycler, r: BlobReceiver) -> JoinHandle<()> {
sock: UdpSocket,
exit: Arc<AtomicBool>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new() Builder::new()
.name("solana-responder".to_string()) .name("solana-responder".to_string())
.spawn(move || loop { .spawn(move || loop {
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { if let Err(e) = recv_send(&sock, &recycler, &r) {
break; match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => error!("{:?}", e),
}
} }
}) })
.unwrap() .unwrap()
@ -844,20 +843,24 @@ mod test {
let resp_recycler = BlobRecycler::default(); let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader);
let (s_responder, r_responder) = channel(); let t_responder = {
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); let (s_responder, r_responder) = channel();
let mut msgs = VecDeque::new(); let t_responder = responder(send, resp_recycler.clone(), r_responder);
for i in 0..10 { let mut msgs = VecDeque::new();
let b = resp_recycler.allocate(); for i in 0..10 {
{ let b = resp_recycler.allocate();
let mut w = b.write().unwrap(); {
w.data[0] = i as u8; let mut w = b.write().unwrap();
w.meta.size = PACKET_DATA_SIZE; w.data[0] = i as u8;
w.meta.set_addr(&addr); w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
}
msgs.push_back(b);
} }
msgs.push_back(b); s_responder.send(msgs).expect("send");
} t_responder
s_responder.send(msgs).expect("send"); };
let mut num = 0; let mut num = 0;
get_msgs(r_reader, &mut num); get_msgs(r_reader, &mut num);
assert_eq!(num, 10); assert_eq!(num, 10);
@ -914,28 +917,28 @@ mod test {
s_window, s_window,
s_retransmit, s_retransmit,
); );
let (s_responder, r_responder) = channel();
let t_responder = responder( let t_responder = {
tn.sockets.replicate, let (s_responder, r_responder) = channel();
exit.clone(), let t_responder = responder(tn.sockets.replicate, resp_recycler.clone(), r_responder);
resp_recycler.clone(), let mut msgs = VecDeque::new();
r_responder, for v in 0..10 {
); let i = 9 - v;
let mut msgs = VecDeque::new(); let b = resp_recycler.allocate();
for v in 0..10 { {
let i = 9 - v; let mut w = b.write().unwrap();
let b = resp_recycler.allocate(); w.set_index(i).unwrap();
{ w.set_id(me_id).unwrap();
let mut w = b.write().unwrap(); assert_eq!(i, w.get_index().unwrap());
w.set_index(i).unwrap(); w.meta.size = PACKET_DATA_SIZE;
w.set_id(me_id).unwrap(); w.meta.set_addr(&tn.data.gossip_addr);
assert_eq!(i, w.get_index().unwrap()); }
w.meta.size = PACKET_DATA_SIZE; msgs.push_back(b);
w.meta.set_addr(&tn.data.gossip_addr);
} }
msgs.push_back(b); s_responder.send(msgs).expect("send");
} t_responder
s_responder.send(msgs).expect("send"); };
let mut num = 0; let mut num = 0;
get_blobs(r_window, &mut num); get_blobs(r_window, &mut num);
assert_eq!(num, 10); assert_eq!(num, 10);

View File

@ -193,12 +193,8 @@ pub mod tests {
// simulate leader sending messages // simulate leader sending messages
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = streamer::responder( let t_responder =
leader.sockets.requests, streamer::responder(leader.sockets.requests, resp_recycler.clone(), r_responder);
exit.clone(),
resp_recycler.clone(),
r_responder,
);
let starting_balance = 10_000; let starting_balance = 10_000;
let mint = Mint::new(starting_balance); let mint = Mint::new(starting_balance);
@ -269,6 +265,7 @@ pub mod tests {
// send the blobs into the socket // send the blobs into the socket
s_responder.send(msgs).expect("send"); s_responder.send(msgs).expect("send");
drop(s_responder);
// receive retransmitted messages // receive retransmitted messages
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);