Move sig verification stage into its own module

This commit is contained in:
Greg Fitzgerald 2018-05-11 21:51:37 -06:00
parent 3c11a91f77
commit 19607886f7
3 changed files with 106 additions and 82 deletions

View File

@ -16,6 +16,7 @@ pub mod packet;
pub mod plan; pub mod plan;
pub mod recorder; pub mod recorder;
pub mod result; pub mod result;
pub mod sig_verify_stage;
pub mod signature; pub mod signature;
pub mod streamer; pub mod streamer;
pub mod thin_client; pub mod thin_client;

96
src/sig_verify_stage.rs Normal file
View File

@ -0,0 +1,96 @@
//! The `sig_verify_stage` implements the signature verification stage of the TPU.
use ecdsa;
use packet::SharedPackets;
use rand::{thread_rng, Rng};
use result::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use std::time::Instant;
use streamer;
use timing;
pub struct SigVerifyStage {
pub output: Receiver<Vec<(SharedPackets, Vec<u8>)>>,
pub thread_hdls: Vec<JoinHandle<()>>,
}
impl SigVerifyStage {
pub fn new(exit: Arc<AtomicBool>, packets_receiver: Receiver<SharedPackets>) -> Self {
let (verified_sender, output) = channel();
let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender);
SigVerifyStage {
thread_hdls,
output,
}
}
fn verify_batch(batch: Vec<SharedPackets>) -> Vec<(SharedPackets, Vec<u8>)> {
let r = ecdsa::ed25519_verify(&batch);
batch.into_iter().zip(r).collect()
}
fn verifier(
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let (batch, len) =
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
let now = Instant::now();
let batch_len = batch.len();
let rand_id = thread_rng().gen_range(0, 100);
info!(
"@{:?} verifier: verifying: {} id: {}",
timing::timestamp(),
batch.len(),
rand_id
);
let verified_batch = Self::verify_batch(batch);
sendr
.lock()
.expect("lock in fn verify_batch in tpu")
.send(verified_batch)?;
let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
info!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
timing::timestamp(),
batch_len,
total_time_ms,
rand_id,
len,
(len as f32 / total_time_s)
);
Ok(())
}
fn verifier_service(
exit: Arc<AtomicBool>,
packets_receiver: Arc<Mutex<streamer::PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> JoinHandle<()> {
spawn(move || loop {
let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone());
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
}
fn verifier_services(
exit: Arc<AtomicBool>,
packets_receiver: streamer::PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packets_receiver));
(0..4)
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
.collect()
}
}

View File

@ -3,27 +3,24 @@
use accounting_stage::AccountingStage; use accounting_stage::AccountingStage;
use crdt::{Crdt, ReplicatedData}; use crdt::{Crdt, ReplicatedData};
use ecdsa;
use entry::Entry; use entry::Entry;
use ledger; use ledger;
use packet; use packet;
use packet::SharedPackets; use packet::SharedPackets;
use rand::{thread_rng, Rng};
use result::Result; use result::Result;
use serde_json; use serde_json;
use sig_verify_stage::SigVerifyStage;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::Write; use std::io::Write;
use std::io::sink; use std::io::sink;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::time::Instant;
use streamer; use streamer;
use thin_client_service::ThinClientService; use thin_client_service::ThinClientService;
use timing;
pub struct Tpu { pub struct Tpu {
accounting_stage: AccountingStage, accounting_stage: AccountingStage,
@ -129,73 +126,6 @@ impl Tpu {
}) })
} }
fn verify_batch(batch: Vec<SharedPackets>) -> Vec<(SharedPackets, Vec<u8>)> {
let r = ecdsa::ed25519_verify(&batch);
batch.into_iter().zip(r).collect()
}
fn verifier(
recvr: &Arc<Mutex<streamer::PacketReceiver>>,
sendr: &Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> Result<()> {
let (batch, len) =
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
let now = Instant::now();
let batch_len = batch.len();
let rand_id = thread_rng().gen_range(0, 100);
info!(
"@{:?} verifier: verifying: {} id: {}",
timing::timestamp(),
batch.len(),
rand_id
);
let verified_batch = Self::verify_batch(batch);
sendr
.lock()
.expect("lock in fn verify_batch in tpu")
.send(verified_batch)?;
let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
info!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
timing::timestamp(),
batch_len,
total_time_ms,
rand_id,
len,
(len as f32 / total_time_s)
);
Ok(())
}
fn verifier_service(
exit: Arc<AtomicBool>,
packets_receiver: Arc<Mutex<streamer::PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<Vec<(SharedPackets, Vec<u8>)>>>>,
) -> JoinHandle<()> {
spawn(move || loop {
let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone());
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
})
}
fn verifier_services(
exit: Arc<AtomicBool>,
packets_receiver: streamer::PacketReceiver,
verified_sender: Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packets_receiver));
(0..4)
.map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone()))
.collect()
}
fn thin_client_service( fn thin_client_service(
obj: SharedTpu, obj: SharedTpu,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -249,16 +179,14 @@ impl Tpu {
packet_sender, packet_sender,
)?; )?;
let (verified_sender, verified_receiver) = channel(); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
let verify_threads: Vec<_> =
Self::verifier_services(exit.clone(), packet_receiver, verified_sender);
let blob_recycler = packet::BlobRecycler::default(); let blob_recycler = packet::BlobRecycler::default();
let (responder_sender, responder_receiver) = channel(); let (responder_sender, responder_receiver) = channel();
let t_thin_client = Self::thin_client_service( let t_thin_client = Self::thin_client_service(
obj.clone(), obj.clone(),
exit.clone(), exit.clone(),
verified_receiver, sig_verify_stage.output,
responder_sender, responder_sender,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
@ -299,7 +227,7 @@ impl Tpu {
t_listen, t_listen,
t_broadcast, t_broadcast,
]; ];
threads.extend(verify_threads.into_iter()); threads.extend(sig_verify_stage.thread_hdls.into_iter());
Ok(threads) Ok(threads)
} }
@ -428,16 +356,15 @@ impl Tpu {
blob_recycler.clone(), blob_recycler.clone(),
responder_receiver, responder_receiver,
); );
let (verified_sender, verified_receiver) = channel();
let verify_threads: Vec<_> = let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);
Self::verifier_services(exit.clone(), packet_receiver, verified_sender);
let t_write = Self::drain_service(obj.clone(), exit.clone()); let t_write = Self::drain_service(obj.clone(), exit.clone());
let t_thin_client = Self::thin_client_service( let t_thin_client = Self::thin_client_service(
obj.clone(), obj.clone(),
exit.clone(), exit.clone(),
verified_receiver, sig_verify_stage.output,
responder_sender, responder_sender,
packet_recycler.clone(), packet_recycler.clone(),
blob_recycler.clone(), blob_recycler.clone(),
@ -457,7 +384,7 @@ impl Tpu {
t_thin_client, t_thin_client,
t_write, t_write,
]; ];
threads.extend(verify_threads.into_iter()); threads.extend(sig_verify_stage.thread_hdls.into_iter());
Ok(threads) Ok(threads)
} }
} }