Spin up threads from Rpu/Tpu constructors

This commit is contained in:
Greg Fitzgerald 2018-05-15 10:27:18 -06:00
parent 0aaa500f7c
commit 99dc4ea4a9
4 changed files with 68 additions and 15 deletions

View File

@ -116,7 +116,6 @@ fn main() {
eprintln!("creating networking stack...");
let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000)));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
serve_sock
.set_read_timeout(Some(Duration::new(1, 0)))
@ -139,7 +138,10 @@ fn main() {
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
eprintln!("starting server...");
let threads = rpu.serve(
let rpu = Rpu::new1(
bank,
last_id,
Some(Duration::from_millis(1000)),
d,
serve_sock,
broadcast_socket,
@ -149,7 +151,7 @@ fn main() {
stdout(),
);
eprintln!("Ready. Listening on {}", serve_addr);
for t in threads {
for t in rpu.thread_hdls {
t.join().expect("join");
}
}

View File

@ -23,6 +23,7 @@ pub struct Rpu {
bank: Arc<Bank>,
start_hash: Hash,
tick_duration: Option<Duration>,
pub thread_hdls: Vec<JoinHandle<()>>,
}
impl Rpu {
@ -32,9 +33,36 @@ impl Rpu {
bank: Arc::new(bank),
start_hash,
tick_duration,
thread_hdls: vec![],
}
}
pub fn new1<W: Write + Send + 'static>(
bank: Bank,
start_hash: Hash,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
broadcast_socket: UdpSocket,
respond_socket: UdpSocket,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
writer: W,
) -> Self {
let mut rpu = Self::new(bank, start_hash, tick_duration);
let thread_hdls = rpu.serve(
me,
requests_socket,
broadcast_socket,
respond_socket,
gossip,
exit,
writer,
);
rpu.thread_hdls.extend(thread_hdls);
rpu
}
/// Create a UDP microservice that forwards messages the given Rpu.
/// This service is the network leader
/// Set `exit` to shutdown its threads.

View File

@ -193,14 +193,16 @@ mod tests {
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)));
let mut local = serve.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let threads = rpu.serve(
let rpu = Rpu::new1(
bank,
alice.last_id(),
Some(Duration::from_millis(30)),
d,
serve,
broadcast_socket,
@ -232,7 +234,7 @@ mod tests {
}
assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed);
for t in threads {
for t in rpu.thread_hdls {
t.join().unwrap();
}
}
@ -245,7 +247,6 @@ mod tests {
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30)));
let serve_addr = leader_serve.local_addr().unwrap();
let mut local = leader_serve.local_addr().unwrap();
@ -253,7 +254,10 @@ mod tests {
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let threads = rpu.serve(
let rpu = Rpu::new1(
bank,
alice.last_id(),
Some(Duration::from_millis(30)),
leader_data,
leader_serve,
broadcast_socket,
@ -289,7 +293,7 @@ mod tests {
trace!("exiting");
exit.store(true, Ordering::Relaxed);
trace!("joining threads");
for t in threads {
for t in rpu.thread_hdls {
t.join().unwrap();
}
}
@ -394,17 +398,17 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_bank = {
let bank = Bank::new(&alice);
Rpu::new(bank, alice.last_id(), None)
};
let leader_bank = Bank::new(&alice);
let mut local = leader.2.local_addr().unwrap();
local.set_port(0);
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let mut threads = leader_bank.serve(
let rpu = Rpu::new1(
leader_bank,
alice.last_id(),
None,
leader.0.clone(),
leader.2,
broadcast_socket,
@ -414,6 +418,7 @@ mod tests {
sink(),
);
let mut threads = rpu.thread_hdls;
for _ in 0..N {
replicant(&leader.0, exit.clone(), &alice, &mut threads);
}

View File

@ -7,7 +7,6 @@ use crdt::{Crdt, ReplicatedData};
use hash::Hash;
use packet;
use record_stage::RecordStage;
use result::Result;
use sig_verify_stage::SigVerifyStage;
use std::io::Write;
use std::net::UdpSocket;
@ -23,6 +22,7 @@ pub struct Tpu {
bank: Arc<Bank>,
start_hash: Hash,
tick_duration: Option<Duration>,
pub thread_hdls: Vec<JoinHandle<()>>,
}
impl Tpu {
@ -32,9 +32,27 @@ impl Tpu {
bank: Arc::new(bank),
start_hash,
tick_duration,
thread_hdls: vec![],
}
}
pub fn new1<W: Write + Send + 'static>(
bank: Bank,
start_hash: Hash,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
broadcast_socket: UdpSocket,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
writer: W,
) -> Self {
let mut tpu = Tpu::new(bank, start_hash, tick_duration);
let thread_hdls = tpu.serve(me, requests_socket, broadcast_socket, gossip, exit, writer);
tpu.thread_hdls.extend(thread_hdls);
tpu
}
/// Create a UDP microservice that forwards messages the given Tpu.
/// This service is the network leader
/// Set `exit` to shutdown its threads.