diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 3d8e4f212..175717834 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -116,7 +116,6 @@ fn main() { eprintln!("creating networking stack..."); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000))); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); serve_sock .set_read_timeout(Some(Duration::new(1, 0))) @@ -139,7 +138,10 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let threads = rpu.serve( + let rpu = Rpu::new1( + bank, + last_id, + Some(Duration::from_millis(1000)), d, serve_sock, broadcast_socket, @@ -149,7 +151,7 @@ fn main() { stdout(), ); eprintln!("Ready. Listening on {}", serve_addr); - for t in threads { + for t in rpu.thread_hdls { t.join().expect("join"); } } diff --git a/src/rpu.rs b/src/rpu.rs index c6ee2d6f5..fa85f6b97 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -23,6 +23,7 @@ pub struct Rpu { bank: Arc, start_hash: Hash, tick_duration: Option, + pub thread_hdls: Vec>, } impl Rpu { @@ -32,9 +33,36 @@ impl Rpu { bank: Arc::new(bank), start_hash, tick_duration, + thread_hdls: vec![], } } + pub fn new1( + bank: Bank, + start_hash: Hash, + tick_duration: Option, + me: ReplicatedData, + requests_socket: UdpSocket, + broadcast_socket: UdpSocket, + respond_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Self { + let mut rpu = Self::new(bank, start_hash, tick_duration); + let thread_hdls = rpu.serve( + me, + requests_socket, + broadcast_socket, + respond_socket, + gossip, + exit, + writer, + ); + rpu.thread_hdls.extend(thread_hdls); + rpu + } + /// Create a UDP microservice that forwards messages the given Rpu. /// This service is the network leader /// Set `exit` to shutdown its threads. diff --git a/src/thin_client.rs b/src/thin_client.rs index 36a446438..3e3b8dadf 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -193,14 +193,16 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let mut local = serve.local_addr().unwrap(); local.set_port(0); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let threads = rpu.serve( + let rpu = Rpu::new1( + bank, + alice.last_id(), + Some(Duration::from_millis(30)), d, serve, broadcast_socket, @@ -232,7 +234,7 @@ mod tests { } assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); - for t in threads { + for t in rpu.thread_hdls { t.join().unwrap(); } } @@ -245,7 +247,6 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let serve_addr = leader_serve.local_addr().unwrap(); let mut local = leader_serve.local_addr().unwrap(); @@ -253,7 +254,10 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let threads = rpu.serve( + let rpu = Rpu::new1( + bank, + alice.last_id(), + Some(Duration::from_millis(30)), leader_data, leader_serve, broadcast_socket, @@ -289,7 +293,7 @@ mod tests { trace!("exiting"); exit.store(true, Ordering::Relaxed); trace!("joining threads"); - for t in threads { + for t in rpu.thread_hdls { t.join().unwrap(); } } @@ -394,17 +398,17 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let leader_bank = { - let bank = Bank::new(&alice); - Rpu::new(bank, alice.last_id(), None) - }; + let leader_bank = Bank::new(&alice); let mut local = leader.2.local_addr().unwrap(); local.set_port(0); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let mut threads = leader_bank.serve( + let rpu = Rpu::new1( + leader_bank, + alice.last_id(), + None, leader.0.clone(), leader.2, broadcast_socket, @@ -414,6 +418,7 @@ mod tests { sink(), ); + let mut threads = rpu.thread_hdls; for _ in 0..N { replicant(&leader.0, exit.clone(), &alice, &mut threads); } diff --git a/src/tpu.rs b/src/tpu.rs index 6b4720735..39f6da055 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -7,7 +7,6 @@ use crdt::{Crdt, ReplicatedData}; use hash::Hash; use packet; use record_stage::RecordStage; -use result::Result; use sig_verify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; @@ -23,6 +22,7 @@ pub struct Tpu { bank: Arc, start_hash: Hash, tick_duration: Option, + pub thread_hdls: Vec>, } impl Tpu { @@ -32,9 +32,27 @@ impl Tpu { bank: Arc::new(bank), start_hash, tick_duration, + thread_hdls: vec![], } } + pub fn new1( + bank: Bank, + start_hash: Hash, + tick_duration: Option, + me: ReplicatedData, + requests_socket: UdpSocket, + broadcast_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Self { + let mut tpu = Tpu::new(bank, start_hash, tick_duration); + let thread_hdls = tpu.serve(me, requests_socket, broadcast_socket, gossip, exit, writer); + tpu.thread_hdls.extend(thread_hdls); + tpu + } + /// Create a UDP microservice that forwards messages the given Tpu. /// This service is the network leader /// Set `exit` to shutdown its threads.