diff --git a/src/ncp.rs b/src/ncp.rs index bbf7bd6ad9..216d280fe1 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -37,12 +37,8 @@ impl Ncp { request_sender, )?; let (response_sender, response_receiver) = channel(); - let t_responder = streamer::responder( - gossip_send_socket, - exit.clone(), - blob_recycler.clone(), - response_receiver, - ); + let t_responder = + streamer::responder(gossip_send_socket, blob_recycler.clone(), response_receiver); let t_listen = Crdt::listen( crdt.clone(), window, diff --git a/src/rpu.rs b/src/rpu.rs index 4447bbe8fb..e218934299 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -65,12 +65,7 @@ impl Rpu { blob_recycler.clone(), ); - let t_responder = streamer::responder( - respond_socket, - exit.clone(), - blob_recycler.clone(), - blob_receiver, - ); + let t_responder = streamer::responder(respond_socket, blob_recycler.clone(), blob_receiver); let mut thread_hdls = vec![t_receiver, t_responder]; thread_hdls.extend(request_stage.thread_hdls().into_iter()); diff --git a/src/streamer.rs b/src/streamer.rs index 7bb5f76447..ea8eb8aef4 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -101,17 +101,16 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> Ok((batch, len)) } -pub fn responder( - sock: UdpSocket, - exit: Arc, - recycler: BlobRecycler, - r: BlobReceiver, -) -> JoinHandle<()> { +pub fn responder(sock: UdpSocket, recycler: BlobRecycler, r: BlobReceiver) -> JoinHandle<()> { Builder::new() .name("solana-responder".to_string()) .spawn(move || loop { - if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { - break; + if let Err(e) = recv_send(&sock, &recycler, &r) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } } }) .unwrap() @@ -844,20 +843,24 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); - let (s_responder, r_responder) = channel(); - let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); - let mut msgs = VecDeque::new(); - for i in 0..10 { - let b = resp_recycler.allocate(); - { - let mut w = b.write().unwrap(); - w.data[0] = i as u8; - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&addr); + let t_responder = { + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, resp_recycler.clone(), r_responder); + let mut msgs = VecDeque::new(); + for i in 0..10 { + let b = resp_recycler.allocate(); + { + let mut w = b.write().unwrap(); + w.data[0] = i as u8; + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&addr); + } + msgs.push_back(b); } - msgs.push_back(b); - } - s_responder.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); + t_responder + }; + let mut num = 0; get_msgs(r_reader, &mut num); assert_eq!(num, 10); @@ -914,28 +917,28 @@ mod test { s_window, s_retransmit, ); - let (s_responder, r_responder) = channel(); - let t_responder = responder( - tn.sockets.replicate, - exit.clone(), - resp_recycler.clone(), - r_responder, - ); - let mut msgs = VecDeque::new(); - for v in 0..10 { - let i = 9 - v; - let b = resp_recycler.allocate(); - { - let mut w = b.write().unwrap(); - w.set_index(i).unwrap(); - w.set_id(me_id).unwrap(); - assert_eq!(i, w.get_index().unwrap()); - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.data.gossip_addr); + + let t_responder = { + let (s_responder, r_responder) = channel(); + let t_responder = responder(tn.sockets.replicate, resp_recycler.clone(), r_responder); + let mut msgs = VecDeque::new(); + for v in 0..10 { + let i = 9 - v; + let b = resp_recycler.allocate(); + { + let mut w = b.write().unwrap(); + w.set_index(i).unwrap(); + w.set_id(me_id).unwrap(); + assert_eq!(i, w.get_index().unwrap()); + w.meta.size = PACKET_DATA_SIZE; + w.meta.set_addr(&tn.data.gossip_addr); + } + msgs.push_back(b); } - msgs.push_back(b); - } - s_responder.send(msgs).expect("send"); + s_responder.send(msgs).expect("send"); + t_responder + }; + let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); diff --git a/src/tvu.rs b/src/tvu.rs index feeddd685d..17612fb0e2 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -193,12 +193,8 @@ pub mod tests { // simulate leader sending messages let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder( - leader.sockets.requests, - exit.clone(), - resp_recycler.clone(), - r_responder, - ); + let t_responder = + streamer::responder(leader.sockets.requests, resp_recycler.clone(), r_responder); let starting_balance = 10_000; let mint = Mint::new(starting_balance); @@ -269,6 +265,7 @@ pub mod tests { // send the blobs into the socket s_responder.send(msgs).expect("send"); + drop(s_responder); // receive retransmitted messages let timer = Duration::new(1, 0);