From 4f629dd982b18e16caa30289f1c9aca3cadeb2d6 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 10 May 2018 10:26:18 -0600 Subject: [PATCH] Add events socket instead of modifying the existing socket --- src/bin/testnode.rs | 6 +++--- src/thin_client.rs | 21 ++++++++++++++------- src/tpu.rs | 32 +++----------------------------- 3 files changed, 20 insertions(+), 39 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index a4de9e45c0..0eb38e195e 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -54,7 +54,7 @@ fn main() { let serve_addr = format!("0.0.0.0:{}", port); let gossip_addr = format!("0.0.0.0:{}", port + 1); let replicate_addr = format!("0.0.0.0:{}", port + 2); - let skinny_addr = format!("0.0.0.0:{}", port + 3); + let events_addr = format!("0.0.0.0:{}", port + 3); if stdin_isatty() { eprintln!("nothing found on stdin, expected a log file"); @@ -121,7 +121,7 @@ fn main() { let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); - let skinny_sock = UdpSocket::bind(&skinny_addr).unwrap(); + let events_sock = UdpSocket::bind(&events_addr).unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, @@ -134,7 +134,7 @@ fn main() { &tpu, d, serve_sock, - skinny_sock, + events_sock, gossip_sock, exit.clone(), stdout(), diff --git a/src/thin_client.rs b/src/thin_client.rs index 4db2056c12..274ced7e8a 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -168,7 +168,7 @@ mod tests { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - let skinny = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let addr = serve.local_addr().unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( @@ -184,8 +184,15 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); let accountant = Arc::new(Tpu::new(accounting_stage)); - let threads = - Tpu::serve(&accountant, d, serve, skinny, gossip, exit.clone(), sink()).unwrap(); + let threads = Tpu::serve( + &accountant, + d, + serve, + events_socket, + gossip, + exit.clone(), + sink(), + ).unwrap(); sleep(Duration::from_millis(300)); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -215,7 +222,7 @@ mod tests { #[test] fn test_bad_sig() { - let (leader_data, leader_gossip, _, leader_serve, leader_skinny) = tpu::test_node(); + let (leader_data, leader_gossip, _, leader_serve, leader_events) = tpu::test_node(); let alice = Mint::new(10_000); let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); @@ -227,7 +234,7 @@ mod tests { &tpu, leader_data, leader_serve, - leader_skinny, + leader_events, leader_gossip, exit.clone(), sink(), @@ -264,7 +271,7 @@ mod tests { fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - let skinny = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let leader = ReplicatedData::new( @@ -273,7 +280,7 @@ mod tests { replicate.local_addr().unwrap(), serve.local_addr().unwrap(), ); - (leader, gossip, serve, replicate, skinny) + (leader, gossip, serve, replicate, events_socket) } #[test] diff --git a/src/tpu.rs b/src/tpu.rs index f8ce290177..97dd51063c 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -1,7 +1,6 @@ //! The `tpu` module implements the Transaction Processing Unit, a //! 5-stage transaction processing pipeline in software. -use accountant::Accountant; use accounting_stage::AccountingStage; use bincode::{deserialize, serialize}; use crdt::{Crdt, ReplicatedData}; @@ -109,24 +108,6 @@ impl Tpu { }) } - fn process_thin_client_requests(_acc: &Arc, _socket: &UdpSocket) -> Result<()> { - Ok(()) - } - - fn thin_client_service( - accountant: Arc, - exit: Arc, - socket: UdpSocket, - ) -> JoinHandle<()> { - spawn(move || loop { - let _ = Self::process_thin_client_requests(&accountant, &socket); - if exit.load(Ordering::Relaxed) { - info!("sync_service exiting"); - break; - } - }) - } - /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync_no_broadcast(obj: SharedTpu) -> Result<()> { @@ -367,7 +348,7 @@ impl Tpu { obj: &SharedTpu, me: ReplicatedData, serve: UdpSocket, - skinny: UdpSocket, + _events_socket: UdpSocket, gossip: UdpSocket, exit: Arc, writer: W, @@ -430,12 +411,6 @@ impl Tpu { Mutex::new(writer), ); - let t_skinny = Self::thin_client_service( - obj.accounting_stage.accountant.clone(), - exit.clone(), - skinny, - ); - let tpu = obj.clone(); let t_server = spawn(move || loop { let e = Self::process_request_packets( @@ -457,7 +432,6 @@ impl Tpu { t_responder, t_server, t_sync, - t_skinny, t_gossip, t_listen, t_broadcast, @@ -642,7 +616,7 @@ pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { use signature::{KeyPair, KeyPairUtil}; - let skinny = UdpSocket::bind("127.0.0.1:0").unwrap(); + let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -653,7 +627,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke replicate.local_addr().unwrap(), serve.local_addr().unwrap(), ); - (d, gossip, replicate, serve, skinny) + (d, gossip, replicate, serve, events_socket) } #[cfg(test)]