parent
c07f700c53
commit
d1948b5a00
|
@ -114,7 +114,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
|
|
||||||
fn verifier(
|
fn verifier(
|
||||||
recvr: &streamer::PacketReceiver,
|
recvr: &streamer::PacketReceiver,
|
||||||
sendr: &Sender<(Vec<SharedPackets>, Vec<Vec<u8>>)>,
|
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let msgs = recvr.recv_timeout(timer)?;
|
let msgs = recvr.recv_timeout(timer)?;
|
||||||
|
@ -133,7 +133,8 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
.map(|x| ecdsa::ed25519_verify(&x.to_vec()))
|
.map(|x| ecdsa::ed25519_verify(&x.to_vec()))
|
||||||
.collect();
|
.collect();
|
||||||
for (v, r) in v.chunks(chunk).zip(rvs) {
|
for (v, r) in v.chunks(chunk).zip(rvs) {
|
||||||
sendr.send((v.to_vec(), r))?;
|
let xs = v.to_vec().into_iter().zip(r).collect();
|
||||||
|
sendr.send(xs)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -155,7 +156,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
vers: Vec<u8>,
|
vers: Vec<u8>,
|
||||||
) -> Vec<(Response, SocketAddr)> {
|
) -> Vec<(Response, SocketAddr)> {
|
||||||
let mut rsps = Vec::new();
|
let mut rsps = Vec::new();
|
||||||
for (data, v) in reqs.into_iter().zip(vers.into_iter()) {
|
for (data, v) in reqs.into_iter().zip(vers) {
|
||||||
if let Some((req, rsp_addr)) = data {
|
if let Some((req, rsp_addr)) = data {
|
||||||
if !req.verify() {
|
if !req.verify() {
|
||||||
continue;
|
continue;
|
||||||
|
@ -198,14 +199,14 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
|
|
||||||
fn process(
|
fn process(
|
||||||
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
verified_receiver: &Receiver<(Vec<SharedPackets>, Vec<Vec<u8>>)>,
|
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
||||||
blob_sender: &streamer::BlobSender,
|
blob_sender: &streamer::BlobSender,
|
||||||
packet_recycler: &packet::PacketRecycler,
|
packet_recycler: &packet::PacketRecycler,
|
||||||
blob_recycler: &packet::BlobRecycler,
|
blob_recycler: &packet::BlobRecycler,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let (mms, vvs) = verified_receiver.recv_timeout(timer)?;
|
let mms = verified_receiver.recv_timeout(timer)?;
|
||||||
for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) {
|
for (msgs, vers) in mms {
|
||||||
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
|
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
|
||||||
let rsps = Self::process_packets(obj, reqs, vers);
|
let rsps = Self::process_packets(obj, reqs, vers);
|
||||||
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
|
||||||
|
|
Loading…
Reference in New Issue