From d2f95d531966fe36030218ec171eb4fff5d79765 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 11 May 2018 23:37:44 -0600 Subject: [PATCH] Move thin client service thread into thin_client_service.rs --- src/thin_client_service.rs | 39 ++++++++++++++++++++++++++--- src/tpu.rs | 51 +++++++++++--------------------------- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/src/thin_client_service.rs b/src/thin_client_service.rs index e6eda8133..059d44c79 100644 --- a/src/thin_client_service.rs +++ b/src/thin_client_service.rs @@ -14,13 +14,12 @@ use result::Result; use signature::PublicKey; use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::{spawn, JoinHandle}; use transaction::Transaction; -//use std::io::{Cursor, Write}; -//use std::sync::atomic::{AtomicBool, Ordering}; //use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::Receiver; -use std::sync::{Arc, Mutex}; -//use std::thread::{spawn, JoinHandle}; use std::time::Duration; use std::time::Instant; use streamer; @@ -277,6 +276,38 @@ impl RequestProcessor { } } +pub struct ThinClientService { + pub thread_hdl: JoinHandle<()>, +} + +impl ThinClientService { + pub fn new( + request_processor: Arc, + accounting_stage: Arc, + exit: Arc, + verified_receiver: Receiver)>>, + responder_sender: streamer::BlobSender, + packet_recycler: packet::PacketRecycler, + blob_recycler: packet::BlobRecycler, + ) -> Self { + let thread_hdl = spawn(move || loop { + let e = request_processor.process_request_packets( + &accounting_stage, + &verified_receiver, + &responder_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() { + if exit.load(Ordering::Relaxed) { + break; + } + } + }); + ThinClientService { thread_hdl } + } +} + #[cfg(test)] pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { let mut out = vec![]; diff --git a/src/tpu.rs b/src/tpu.rs index dec9116e0..f51214896 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -6,22 +6,21 @@ use crdt::{Crdt, ReplicatedData}; use entry_writer::EntryWriter; use ledger; use packet; -use packet::SharedPackets; use result::Result; use sig_verify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::channel; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; -use thin_client_service::RequestProcessor; +use thin_client_service::{RequestProcessor, ThinClientService}; pub struct Tpu { - accounting_stage: AccountingStage, - request_processor: RequestProcessor, + accounting_stage: Arc, + request_processor: Arc, } type SharedTpu = Arc; @@ -31,8 +30,8 @@ impl Tpu { pub fn new(accounting_stage: AccountingStage) -> Self { let request_processor = RequestProcessor::new(accounting_stage.accountant.clone()); Tpu { - accounting_stage, - request_processor, + accounting_stage: Arc::new(accounting_stage), + request_processor: Arc::new(request_processor), } } @@ -66,30 +65,6 @@ impl Tpu { }) } - fn thin_client_service( - obj: SharedTpu, - exit: Arc, - verified_receiver: Receiver)>>, - responder_sender: streamer::BlobSender, - packet_recycler: packet::PacketRecycler, - blob_recycler: packet::BlobRecycler, - ) -> JoinHandle<()> { - spawn(move || loop { - let e = obj.request_processor.process_request_packets( - &obj.accounting_stage, - &verified_receiver, - &responder_sender, - &packet_recycler, - &blob_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; - } - } - }) - } - /// Create a UDP microservice that forwards messages the given Tpu. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -123,8 +98,9 @@ impl Tpu { let blob_recycler = packet::BlobRecycler::default(); let (responder_sender, responder_receiver) = channel(); - let t_thin_client = Self::thin_client_service( - obj.clone(), + let thin_client_service = ThinClientService::new( + obj.request_processor.clone(), + obj.accounting_stage.clone(), exit.clone(), sig_verify_stage.output, responder_sender, @@ -161,7 +137,7 @@ impl Tpu { let mut threads = vec![ t_receiver, t_responder, - t_thin_client, + thin_client_service.thread_hdl, t_write, t_gossip, t_listen, @@ -301,8 +277,9 @@ impl Tpu { let t_write = Self::drain_service(obj.clone(), exit.clone()); - let t_thin_client = Self::thin_client_service( - obj.clone(), + let thin_client_service = ThinClientService::new( + obj.request_processor.clone(), + obj.accounting_stage.clone(), exit.clone(), sig_verify_stage.output, responder_sender, @@ -321,7 +298,7 @@ impl Tpu { //serve threads t_packet_receiver, t_responder, - t_thin_client, + thin_client_service.thread_hdl, t_write, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter());