Move thin client service thread into thin_client_service.rs

This commit is contained in:
Greg Fitzgerald 2018-05-11 23:37:44 -06:00
parent cd96843699
commit d2f95d5319
2 changed files with 49 additions and 41 deletions

View File

@ -14,13 +14,12 @@ use result::Result;
use signature::PublicKey; use signature::PublicKey;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use transaction::Transaction; use transaction::Transaction;
//use std::io::{Cursor, Write};
//use std::sync::atomic::{AtomicBool, Ordering};
//use std::sync::mpsc::{channel, Receiver, Sender}; //use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex};
//use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use streamer; use streamer;
@ -277,6 +276,38 @@ impl RequestProcessor {
} }
} }
pub struct ThinClientService {
pub thread_hdl: JoinHandle<()>,
}
impl ThinClientService {
pub fn new(
request_processor: Arc<RequestProcessor>,
accounting_stage: Arc<AccountingStage>,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
responder_sender: streamer::BlobSender,
packet_recycler: packet::PacketRecycler,
blob_recycler: packet::BlobRecycler,
) -> Self {
let thread_hdl = spawn(move || loop {
let e = request_processor.process_request_packets(
&accounting_stage,
&verified_receiver,
&responder_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
}
});
ThinClientService { thread_hdl }
}
}
#[cfg(test)] #[cfg(test)]
pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> { pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
let mut out = vec![]; let mut out = vec![];

View File

@ -6,22 +6,21 @@ use crdt::{Crdt, ReplicatedData};
use entry_writer::EntryWriter; use entry_writer::EntryWriter;
use ledger; use ledger;
use packet; use packet;
use packet::SharedPackets;
use result::Result; use result::Result;
use sig_verify_stage::SigVerifyStage; use sig_verify_stage::SigVerifyStage;
use std::io::Write; use std::io::Write;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer; use streamer;
use thin_client_service::RequestProcessor; use thin_client_service::{RequestProcessor, ThinClientService};
pub struct Tpu { pub struct Tpu {
accounting_stage: AccountingStage, accounting_stage: Arc<AccountingStage>,
request_processor: RequestProcessor, request_processor: Arc<RequestProcessor>,
} }
type SharedTpu = Arc<Tpu>; type SharedTpu = Arc<Tpu>;
@ -31,8 +30,8 @@ impl Tpu {
pub fn new(accounting_stage: AccountingStage) -> Self { pub fn new(accounting_stage: AccountingStage) -> Self {
let request_processor = RequestProcessor::new(accounting_stage.accountant.clone()); let request_processor = RequestProcessor::new(accounting_stage.accountant.clone());
Tpu { Tpu {
accounting_stage, accounting_stage: Arc::new(accounting_stage),
request_processor, request_processor: Arc::new(request_processor),
} }
} }
@ -66,30 +65,6 @@ impl Tpu {
}) })
} }
fn thin_client_service(
obj: SharedTpu,
exit: Arc<AtomicBool>,
verified_receiver: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
responder_sender: streamer::BlobSender,
packet_recycler: packet::PacketRecycler,
blob_recycler: packet::BlobRecycler,
) -> JoinHandle<()> {
spawn(move || loop {
let e = obj.request_processor.process_request_packets(
&obj.accounting_stage,
&verified_receiver,
&responder_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() {
if exit.load(Ordering::Relaxed) {
break;
}
}
})
}
/// Create a UDP microservice that forwards messages the given Tpu. /// Create a UDP microservice that forwards messages the given Tpu.
/// This service is the network leader /// This service is the network leader
/// Set `exit` to shutdown its threads. /// Set `exit` to shutdown its threads.
@ -123,8 +98,9 @@ impl Tpu {
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = packet::BlobRecycler::default();
let (responder_sender, responder_receiver) = channel(); let (responder_sender, responder_receiver) = channel();
let t_thin_client = Self::thin_client_service( let thin_client_service = ThinClientService::new(
obj.clone(), obj.request_processor.clone(),
obj.accounting_stage.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.output, sig_verify_stage.output,
responder_sender, responder_sender,
@ -161,7 +137,7 @@ impl Tpu {
let mut threads = vec![ let mut threads = vec![
t_receiver, t_receiver,
t_responder, t_responder,
t_thin_client, thin_client_service.thread_hdl,
t_write, t_write,
t_gossip, t_gossip,
t_listen, t_listen,
@ -301,8 +277,9 @@ impl Tpu {
let t_write = Self::drain_service(obj.clone(), exit.clone()); let t_write = Self::drain_service(obj.clone(), exit.clone());
let t_thin_client = Self::thin_client_service( let thin_client_service = ThinClientService::new(
obj.clone(), obj.request_processor.clone(),
obj.accounting_stage.clone(),
exit.clone(), exit.clone(),
sig_verify_stage.output, sig_verify_stage.output,
responder_sender, responder_sender,
@ -321,7 +298,7 @@ impl Tpu {
//serve threads //serve threads
t_packet_receiver, t_packet_receiver,
t_responder, t_responder,
t_thin_client, thin_client_service.thread_hdl,
t_write, t_write,
]; ];
threads.extend(sig_verify_stage.thread_hdls.into_iter()); threads.extend(sig_verify_stage.thread_hdls.into_iter());