diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 3ade017385..47632cd2ed 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -107,28 +107,38 @@ impl AccountantSkel { } } + fn recv_batch(recvr: &streamer::PacketReceiver) -> Result> { + let timer = Duration::new(1, 0); + let msgs = recvr.recv_timeout(timer)?; + trace!("got msgs"); + let mut batch = vec![msgs]; + while let Ok(more) = recvr.try_recv() { + trace!("got more msgs"); + batch.push(more); + } + info!("batch len {}", batch.len()); + Ok(batch) + } + + fn verify_batch(batch: Vec) -> Vec)>> { + let chunk_size = max(1, (batch.len() + 3) / 4); + let batches: Vec<_> = batch.chunks(chunk_size).map(|x| x.to_vec()).collect(); + batches + .into_par_iter() + .map(|batch| { + let r = ecdsa::ed25519_verify(&batch); + batch.into_iter().zip(r).collect() + }) + .collect() + } + fn verifier( recvr: &streamer::PacketReceiver, sendr: &Sender)>>, ) -> Result<()> { - let timer = Duration::new(1, 0); - let msgs = recvr.recv_timeout(timer)?; - trace!("got msgs"); - let mut v = Vec::new(); - v.push(msgs); - while let Ok(more) = recvr.try_recv() { - trace!("got more msgs"); - v.push(more); - } - info!("batch {}", v.len()); - let chunk = max(1, (v.len() + 3) / 4); - let chunks: Vec<_> = v.chunks(chunk).collect(); - let rvs: Vec<_> = chunks - .into_par_iter() - .map(|x| ecdsa::ed25519_verify(&x.to_vec())) - .collect(); - for (v, r) in v.chunks(chunk).zip(rvs) { - let xs = v.to_vec().into_iter().zip(r).collect(); + let batch = Self::recv_batch(recvr)?; + let verified_batches = Self::verify_batch(batch); + for xs in verified_batches { sendr.send(xs)?; } Ok(())