@garious review

This commit is contained in:
Anatoly Yakovenko 2018-05-23 11:06:18 -07:00 committed by Greg Fitzgerald
parent b38c7ea2ff
commit 8049323ca8
4 changed files with 48 additions and 43 deletions

View File

@ -140,7 +140,7 @@ fn main() {
let respond_socket = UdpSocket::bind(local.clone()).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap();
eprintln!("starting server..."); eprintln!("starting server...");
let server = Server::leader( let server = Server::new_leader(
bank, bank,
last_id, last_id,
Some(Duration::from_millis(1000)), Some(Duration::from_millis(1000)),

View File

@ -18,7 +18,7 @@ pub struct Server {
} }
impl Server { impl Server {
pub fn leader<W: Write + Send + 'static>( pub fn new_leader<W: Write + Send + 'static>(
bank: Bank, bank: Bank,
start_hash: Hash, start_hash: Hash,
tick_duration: Option<Duration>, tick_duration: Option<Duration>,
@ -49,7 +49,7 @@ impl Server {
thread_hdls.extend(tpu.thread_hdls); thread_hdls.extend(tpu.thread_hdls);
Server { thread_hdls } Server { thread_hdls }
} }
pub fn validator( pub fn new_validator(
bank: Bank, bank: Bank,
me: ReplicatedData, me: ReplicatedData,
requests_socket: UdpSocket, requests_socket: UdpSocket,

View File

@ -204,16 +204,16 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let server = Server::leader( let server = Server::new_leader(
bank, bank,
alice.last_id(), alice.last_id(),
Some(Duration::from_millis(30)), Some(Duration::from_millis(30)),
leader.data.clone(), leader.data.clone(),
leader.requests, leader.sockets.requests,
leader.event, leader.sockets.event,
leader.broadcast, leader.sockets.broadcast,
leader.respond, leader.sockets.respond,
leader.gossip, leader.sockets.gossip,
exit.clone(), exit.clone(),
sink(), sink(),
); );
@ -249,16 +249,16 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey(); let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let server = Server::leader( let server = Server::new_leader(
bank, bank,
alice.last_id(), alice.last_id(),
Some(Duration::from_millis(30)), Some(Duration::from_millis(30)),
leader.data.clone(), leader.data.clone(),
leader.requests, leader.sockets.requests,
leader.event, leader.sockets.event,
leader.broadcast, leader.sockets.broadcast,
leader.respond, leader.sockets.respond,
leader.gossip, leader.sockets.gossip,
exit.clone(), exit.clone(),
sink(), sink(),
); );
@ -303,13 +303,13 @@ mod tests {
) { ) {
let replicant = TestNode::new(); let replicant = TestNode::new();
let replicant_bank = Bank::new(&alice); let replicant_bank = Bank::new(&alice);
let mut ts = Server::validator( let mut ts = Server::new_validator(
replicant_bank, replicant_bank,
replicant.data.clone(), replicant.data.clone(),
replicant.requests, replicant.sockets.requests,
replicant.respond, replicant.sockets.respond,
replicant.replicate, replicant.sockets.replicate,
replicant.gossip, replicant.sockets.gossip,
leader.clone(), leader.clone(),
exit.clone(), exit.clone(),
); );
@ -334,7 +334,7 @@ mod tests {
let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window(); let spy_window = default_window();
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.gossip, exit.clone()); let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.sockets.gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge //wait for the network to converge
let mut converged = false; let mut converged = false;
@ -373,16 +373,16 @@ mod tests {
let leader_bank = Bank::new(&alice); let leader_bank = Bank::new(&alice);
let events_addr = leader.data.events_addr; let events_addr = leader.data.events_addr;
let server = Server::leader( let server = Server::new_leader(
leader_bank, leader_bank,
alice.last_id(), alice.last_id(),
None, None,
leader.data.clone(), leader.data.clone(),
leader.requests, leader.sockets.requests,
leader.event, leader.sockets.event,
leader.broadcast, leader.sockets.broadcast,
leader.respond, leader.sockets.respond,
leader.gossip, leader.sockets.gossip,
exit.clone(), exit.clone(),
sink(), sink(),
); );

View File

@ -11,7 +11,7 @@
//! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate //! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate
//! address. //! address.
//! 3.b window //! 3.b window
//! - Verified blobs are placed into a window, indexed by the counter set by the leader. This could //! - Verified blobs are placed into a window, indexed by the counter set by the leader.sockets. This could
//! be the PoH counter if its monitonically increasing in each blob. Easure coding is used to //! be the PoH counter if its monitonically increasing in each blob. Easure coding is used to
//! recover any missing packets, and requests are made at random to peers and parents to retransmit //! recover any missing packets, and requests are made at random to peers and parents to retransmit
//! a missing packet. //! a missing packet.
@ -189,7 +189,7 @@ pub mod tests {
let cref_l = Arc::new(RwLock::new(crdt_l)); let cref_l = Arc::new(RwLock::new(crdt_l));
let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone());
let window1 = streamer::default_window(); let window1 = streamer::default_window();
let t_l_listen = Crdt::listen(cref_l, window1, leader.gossip, exit.clone()); let t_l_listen = Crdt::listen(cref_l, window1, leader.sockets.gossip, exit.clone());
//start crdt2 //start crdt2
let mut crdt2 = Crdt::new(target2.data.clone()); let mut crdt2 = Crdt::new(target2.data.clone());
@ -199,7 +199,7 @@ pub mod tests {
let cref2 = Arc::new(RwLock::new(crdt2)); let cref2 = Arc::new(RwLock::new(crdt2));
let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone());
let window2 = streamer::default_window(); let window2 = streamer::default_window();
let t2_listen = Crdt::listen(cref2, window2, target2.gossip, exit.clone()); let t2_listen = Crdt::listen(cref2, window2, target2.sockets.gossip, exit.clone());
// setup some blob services to send blobs into the socket // setup some blob services to send blobs into the socket
// to simulate the source peer and get blobs out of the socket to // to simulate the source peer and get blobs out of the socket to
@ -210,14 +210,14 @@ pub mod tests {
let t_receiver = streamer::blob_receiver( let t_receiver = streamer::blob_receiver(
exit.clone(), exit.clone(),
recv_recycler.clone(), recv_recycler.clone(),
target2.replicate, target2.sockets.replicate,
s_reader, s_reader,
).unwrap(); ).unwrap();
// simulate leader sending messages // simulate leader sending messages
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = streamer::responder( let t_responder = streamer::responder(
leader.requests, leader.sockets.requests,
exit.clone(), exit.clone(),
resp_recycler.clone(), resp_recycler.clone(),
r_responder, r_responder,
@ -230,8 +230,8 @@ pub mod tests {
let tvu = Tvu::new( let tvu = Tvu::new(
bank.clone(), bank.clone(),
target1.data, target1.data,
target1.gossip, target1.sockets.gossip,
target1.replicate, target1.sockets.replicate,
leader.data, leader.data,
exit.clone(), exit.clone(),
); );
@ -305,8 +305,7 @@ pub mod tests {
t_l_gossip.join().expect("join"); t_l_gossip.join().expect("join");
t_l_listen.join().expect("join"); t_l_listen.join().expect("join");
} }
pub struct TestNode { pub struct Sockets {
pub data: ReplicatedData,
pub gossip: UdpSocket, pub gossip: UdpSocket,
pub requests: UdpSocket, pub requests: UdpSocket,
pub replicate: UdpSocket, pub replicate: UdpSocket,
@ -314,6 +313,10 @@ pub mod tests {
pub respond: UdpSocket, pub respond: UdpSocket,
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
} }
pub struct TestNode {
pub data: ReplicatedData,
pub sockets: Sockets,
}
impl TestNode { impl TestNode {
pub fn new() -> TestNode { pub fn new() -> TestNode {
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -331,13 +334,15 @@ pub mod tests {
event.local_addr().unwrap(), event.local_addr().unwrap(),
); );
TestNode { TestNode {
data, data: data,
gossip, sockets: Sockets {
requests, gossip,
replicate, requests,
event, replicate,
respond, event,
broadcast, respond,
broadcast,
}
} }
} }
} }