Extract sig verify functions

This commit is contained in:
Greg Fitzgerald 2018-05-11 19:59:40 -06:00
parent f0be595e4c
commit 0488d0a82f
1 changed files with 30 additions and 31 deletions

View File

@ -220,6 +220,31 @@ impl Tpu {
})
}
fn verifier_service(
exit: Arc<AtomicBool>,
packets_receiver: Arc<Mutex<streamer::PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> JoinHandle<()> {
spawn(move || loop {
let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone());
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
}
fn verifier_services(
exit: Arc<AtomicBool>,
packets_receiver: streamer::PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packets_receiver));
(0..4)
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
.collect()
}
/// Create a UDP microservice that forwards messages the given Tpu.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
@ -257,23 +282,10 @@ impl Tpu {
blob_recycler.clone(),
responder_receiver,
);
let (verified_sender, verified_receiver) = channel();
let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let (verified_sender, verified_receiver) = channel();
let verify_threads: Vec<_> =
Self::verifier_services(exit.clone(), packet_receiver, verified_sender);
let (broadcast_sender, broadcast_receiver) = channel();
@ -422,22 +434,9 @@ impl Tpu {
responder_receiver,
);
let (verified_sender, verified_receiver) = channel();
let verify_threads: Vec<_> =
Self::verifier_services(exit.clone(), packet_receiver, verified_sender);
let mut verify_threads = Vec::new();
let shared_verified_sender = Arc::new(Mutex::new(verified_sender));
let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver));
for _ in 0..4 {
let exit_ = exit.clone();
let recv = shared_packet_receiver.clone();
let sender = shared_verified_sender.clone();
let thread = spawn(move || loop {
let e = Self::verifier(&recv, &sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
verify_threads.push(thread);
}
let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone());
let t_thin_client = Self::thin_client_service(