This commit is contained in:
Greg Fitzgerald 2018-04-11 09:02:33 -06:00
parent e838a8c28a
commit 50ccecdff5
1 changed files with 28 additions and 18 deletions

View File

@ -107,28 +107,38 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
}
fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<Vec<SharedPackets>> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
trace!("got msgs");
let mut batch = vec![msgs];
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
batch.push(more);
}
info!("batch len {}", batch.len());
Ok(batch)
}
fn verify_batch(batch: Vec<SharedPackets>) -> Vec<Vec<(SharedPackets, Vec<u8>)>> {
let chunk_size = max(1, (batch.len() + 3) / 4);
let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect();
batches
.into_par_iter()
.map(|batch| {
let r = ecdsa::ed25519_verify(&batch);
batch.into_iter().zip(r).collect()
})
.collect()
}
fn verifier(
recvr: &streamer::PacketReceiver,
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
trace!("got msgs");
let mut v = Vec::new();
v.push(msgs);
while let Ok(more) = recvr.try_recv() {
trace!("got more msgs");
v.push(more);
}
info!("batch {}", v.len());
let chunk = max(1, (v.len() + 3) / 4);
let chunks: Vec<_> = v.chunks(chunk).collect();
let rvs: Vec<_> = chunks
.into_par_iter()
.map(|x| ecdsa::ed25519_verify(&x.to_vec()))
.collect();
for (v, r) in v.chunks(chunk).zip(rvs) {
let xs = v.to_vec().into_iter().zip(r).collect();
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
for xs in verified_batches {
sendr.send(xs)?;
}
Ok(())