From 8049323ca82cc7d55221993741772c0cf73db7af Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 23 May 2018 11:06:18 -0700 Subject: [PATCH] @garious review --- src/bin/testnode.rs | 2 +- src/server.rs | 4 ++-- src/thin_client.rs | 48 ++++++++++++++++++++++----------------------- src/tvu.rs | 37 +++++++++++++++++++--------------- 4 files changed, 48 insertions(+), 43 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 4b82899a95..1663e1e60d 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -140,7 +140,7 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let server = Server::leader( + let server = Server::new_leader( bank, last_id, Some(Duration::from_millis(1000)), diff --git a/src/server.rs b/src/server.rs index 94e3ffbc11..12828128eb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -18,7 +18,7 @@ pub struct Server { } impl Server { - pub fn leader( + pub fn new_leader( bank: Bank, start_hash: Hash, tick_duration: Option, @@ -49,7 +49,7 @@ impl Server { thread_hdls.extend(tpu.thread_hdls); Server { thread_hdls } } - pub fn validator( + pub fn new_validator( bank: Bank, me: ReplicatedData, requests_socket: UdpSocket, diff --git a/src/thin_client.rs b/src/thin_client.rs index b564afdc24..7cfacf82cb 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -204,16 +204,16 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let server = Server::leader( + let server = Server::new_leader( bank, alice.last_id(), Some(Duration::from_millis(30)), leader.data.clone(), - leader.requests, - leader.event, - leader.broadcast, - leader.respond, - leader.gossip, + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); @@ -249,16 +249,16 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let server = Server::leader( + let server = Server::new_leader( bank, alice.last_id(), Some(Duration::from_millis(30)), leader.data.clone(), - leader.requests, - leader.event, - leader.broadcast, - leader.respond, - leader.gossip, + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); @@ -303,13 +303,13 @@ mod tests { ) { let replicant = TestNode::new(); let replicant_bank = Bank::new(&alice); - let mut ts = Server::validator( + let mut ts = Server::new_validator( replicant_bank, replicant.data.clone(), - replicant.requests, - replicant.respond, - replicant.replicate, - replicant.gossip, + replicant.sockets.requests, + replicant.sockets.respond, + replicant.sockets.replicate, + replicant.sockets.gossip, leader.clone(), exit.clone(), ); @@ -334,7 +334,7 @@ mod tests { let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.gossip, exit.clone()); + let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy.sockets.gossip, exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge let mut converged = false; @@ -373,16 +373,16 @@ mod tests { let leader_bank = Bank::new(&alice); let events_addr = leader.data.events_addr; - let server = Server::leader( + let server = Server::new_leader( leader_bank, alice.last_id(), None, leader.data.clone(), - leader.requests, - leader.event, - leader.broadcast, - leader.respond, - leader.gossip, + leader.sockets.requests, + leader.sockets.event, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, exit.clone(), sink(), ); diff --git a/src/tvu.rs b/src/tvu.rs index a75bbe59d8..c4fd85229d 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -11,7 +11,7 @@ //! peers in the crdt. Peers is everyone who is not me or the leader that has a known replicate //! address. //! 3.b window -//! - Verified blobs are placed into a window, indexed by the counter set by the leader. This could +//! - Verified blobs are placed into a window, indexed by the counter set by the leader.sockets. This could //! be the PoH counter if its monitonically increasing in each blob. Easure coding is used to //! recover any missing packets, and requests are made at random to peers and parents to retransmit //! a missing packet. @@ -189,7 +189,7 @@ pub mod tests { let cref_l = Arc::new(RwLock::new(crdt_l)); let t_l_gossip = Crdt::gossip(cref_l.clone(), exit.clone()); let window1 = streamer::default_window(); - let t_l_listen = Crdt::listen(cref_l, window1, leader.gossip, exit.clone()); + let t_l_listen = Crdt::listen(cref_l, window1, leader.sockets.gossip, exit.clone()); //start crdt2 let mut crdt2 = Crdt::new(target2.data.clone()); @@ -199,7 +199,7 @@ pub mod tests { let cref2 = Arc::new(RwLock::new(crdt2)); let t2_gossip = Crdt::gossip(cref2.clone(), exit.clone()); let window2 = streamer::default_window(); - let t2_listen = Crdt::listen(cref2, window2, target2.gossip, exit.clone()); + let t2_listen = Crdt::listen(cref2, window2, target2.sockets.gossip, exit.clone()); // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to @@ -210,14 +210,14 @@ pub mod tests { let t_receiver = streamer::blob_receiver( exit.clone(), recv_recycler.clone(), - target2.replicate, + target2.sockets.replicate, s_reader, ).unwrap(); // simulate leader sending messages let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( - leader.requests, + leader.sockets.requests, exit.clone(), resp_recycler.clone(), r_responder, @@ -230,8 +230,8 @@ pub mod tests { let tvu = Tvu::new( bank.clone(), target1.data, - target1.gossip, - target1.replicate, + target1.sockets.gossip, + target1.sockets.replicate, leader.data, exit.clone(), ); @@ -305,8 +305,7 @@ pub mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } - pub struct TestNode { - pub data: ReplicatedData, + pub struct Sockets { pub gossip: UdpSocket, pub requests: UdpSocket, pub replicate: UdpSocket, @@ -314,6 +313,10 @@ pub mod tests { pub respond: UdpSocket, pub broadcast: UdpSocket, } + pub struct TestNode { + pub data: ReplicatedData, + pub sockets: Sockets, + } impl TestNode { pub fn new() -> TestNode { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -331,13 +334,15 @@ pub mod tests { event.local_addr().unwrap(), ); TestNode { - data, - gossip, - requests, - replicate, - event, - respond, - broadcast, + data: data, + sockets: Sockets { + gossip, + requests, + replicate, + event, + respond, + broadcast, + } } } }