Wrap the RPU with new object Server

This commit is contained in:
Greg Fitzgerald 2018-05-15 11:00:01 -06:00
parent e9ee020b5f
commit cfe8b3fc55
4 changed files with 58 additions and 10 deletions

View File

@ -10,7 +10,7 @@ use solana::bank::Bank;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
use solana::rpu::Rpu;
use solana::server::Server;
use solana::signature::{KeyPair, KeyPairUtil};
use std::env;
use std::io::{stdin, stdout, Read};
@ -138,7 +138,7 @@ fn main() {
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
eprintln!("starting server...");
let rpu = Rpu::new(
let server = Server::new(
bank,
last_id,
Some(Duration::from_millis(1000)),
@ -151,7 +151,7 @@ fn main() {
stdout(),
);
eprintln!("Ready. Listening on {}", serve_addr);
for t in rpu.thread_hdls {
for t in server.thread_hdls {
t.join().expect("join");
}
}

View File

@ -21,6 +21,7 @@ pub mod request_processor;
pub mod request_stage;
pub mod result;
pub mod rpu;
pub mod server;
pub mod sig_verify_stage;
pub mod signature;
pub mod streamer;

47
src/server.rs Normal file
View File

@ -0,0 +1,47 @@
//! The `server` module hosts all the server microservices.
use bank::Bank;
use crdt::ReplicatedData;
use hash::Hash;
use rpu::Rpu;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::thread::JoinHandle;
use std::time::Duration;
pub struct Server {
pub thread_hdls: Vec<JoinHandle<()>>,
}
impl Server {
pub fn new<W: Write + Send + 'static>(
bank: Bank,
start_hash: Hash,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
broadcast_socket: UdpSocket,
respond_socket: UdpSocket,
gossip: UdpSocket,
exit: Arc<AtomicBool>,
writer: W,
) -> Self {
let rpu = Rpu::new(
bank,
start_hash,
tick_duration,
me,
requests_socket,
broadcast_socket,
respond_socket,
gossip,
exit,
writer,
);
Server {
thread_hdls: rpu.thread_hdls,
}
}
}

View File

@ -160,7 +160,7 @@ mod tests {
use logger;
use mint::Mint;
use plan::Plan;
use rpu::Rpu;
use server::Server;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering};
@ -199,7 +199,7 @@ mod tests {
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let rpu = Rpu::new(
let server = Server::new(
bank,
alice.last_id(),
Some(Duration::from_millis(30)),
@ -234,7 +234,7 @@ mod tests {
}
assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed);
for t in rpu.thread_hdls {
for t in server.thread_hdls {
t.join().unwrap();
}
}
@ -254,7 +254,7 @@ mod tests {
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let rpu = Rpu::new(
let server = Server::new(
bank,
alice.last_id(),
Some(Duration::from_millis(30)),
@ -293,7 +293,7 @@ mod tests {
trace!("exiting");
exit.store(true, Ordering::Relaxed);
trace!("joining threads");
for t in rpu.thread_hdls {
for t in server.thread_hdls {
t.join().unwrap();
}
}
@ -405,7 +405,7 @@ mod tests {
let broadcast_socket = UdpSocket::bind(local).unwrap();
let respond_socket = UdpSocket::bind(local.clone()).unwrap();
let rpu = Rpu::new(
let server = Server::new(
leader_bank,
alice.last_id(),
None,
@ -418,7 +418,7 @@ mod tests {
sink(),
);
let mut threads = rpu.thread_hdls;
let mut threads = server.thread_hdls;
for _ in 0..N {
replicant(&leader.0, exit.clone(), &alice, &mut threads);
}