Rpu/Tpu serve() functions now only spin up threads

This commit is contained in:
Greg Fitzgerald 2018-05-15 10:09:54 -06:00
parent 5f5be83a17
commit 0aaa500f7c
4 changed files with 17 additions and 24 deletions

View File

@ -147,7 +147,7 @@ fn main() {
gossip_sock, gossip_sock,
exit.clone(), exit.clone(),
stdout(), stdout(),
).unwrap(); );
eprintln!("Ready. Listening on {}", serve_addr); eprintln!("Ready. Listening on {}", serve_addr);
for t in threads { for t in threads {
t.join().expect("join"); t.join().expect("join");

View File

@ -8,7 +8,6 @@ use packet;
use record_stage::RecordStage; use record_stage::RecordStage;
use request_processor::RequestProcessor; use request_processor::RequestProcessor;
use request_stage::RequestStage; use request_stage::RequestStage;
use result::Result;
use sig_verify_stage::SigVerifyStage; use sig_verify_stage::SigVerifyStage;
use std::io::Write; use std::io::Write;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -48,7 +47,7 @@ impl Rpu {
gossip: UdpSocket, gossip: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
writer: W, writer: W,
) -> Result<Vec<JoinHandle<()>>> { ) -> Vec<JoinHandle<()>> {
let packet_recycler = packet::PacketRecycler::default(); let packet_recycler = packet::PacketRecycler::default();
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver( let t_receiver = streamer::receiver(
@ -115,6 +114,6 @@ impl Rpu {
t_broadcast, t_broadcast,
]; ];
threads.extend(sig_verify_stage.thread_hdls.into_iter()); threads.extend(sig_verify_stage.thread_hdls.into_iter());
Ok(threads) threads
} }
} }

View File

@ -208,7 +208,7 @@ mod tests {
gossip, gossip,
exit.clone(), exit.clone(),
sink(), sink(),
).unwrap(); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -261,7 +261,7 @@ mod tests {
leader_gossip, leader_gossip,
exit.clone(), exit.clone(),
sink(), sink(),
).unwrap(); );
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
@ -404,8 +404,7 @@ mod tests {
let broadcast_socket = UdpSocket::bind(local).unwrap(); let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let mut threads = leader_bank let mut threads = leader_bank.serve(
.serve(
leader.0.clone(), leader.0.clone(),
leader.2, leader.2,
broadcast_socket, broadcast_socket,
@ -413,8 +412,7 @@ mod tests {
leader.1, leader.1,
exit.clone(), exit.clone(),
sink(), sink(),
) );
.unwrap();
for _ in 0..N { for _ in 0..N {
replicant(&leader.0, exit.clone(), &alice, &mut threads); replicant(&leader.0, exit.clone(), &alice, &mut threads);

View File

@ -42,15 +42,11 @@ impl Tpu {
&self, &self,
me: ReplicatedData, me: ReplicatedData,
requests_socket: UdpSocket, requests_socket: UdpSocket,
broadcast_socket: UdpSocket,
gossip: UdpSocket, gossip: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
writer: W, writer: W,
) -> Result<Vec<JoinHandle<()>>> { ) -> Vec<JoinHandle<()>> {
// make sure we are on the same interface
let mut local = requests_socket.local_addr()?;
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local)?;
let packet_recycler = packet::PacketRecycler::default(); let packet_recycler = packet::PacketRecycler::default();
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver( let t_receiver = streamer::receiver(
@ -107,6 +103,6 @@ impl Tpu {
t_broadcast, t_broadcast,
]; ];
threads.extend(sig_verify_stage.thread_hdls.into_iter()); threads.extend(sig_verify_stage.thread_hdls.into_iter());
Ok(threads) threads
} }
} }