diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 4ed3640bf..2d8b699a3 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -149,6 +149,35 @@ impl AccountantSkel { .collect() } + fn process_packets( + obj: &Arc>>, + reqs: Vec>, + vers: Vec, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut rsps = VecDeque::new(); + for (data, v) in reqs.into_iter().zip(vers.into_iter()) { + if let Some((req, rsp_addr)) = data { + if !req.verify() { + continue; + } + if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + rsps.push_back(blob); + } + } + } + Ok(rsps) + } + fn process( obj: &Arc>>, verified_receiver: &Receiver<(Vec, Vec>)>, @@ -159,29 +188,8 @@ impl AccountantSkel { let timer = Duration::new(1, 0); let (mms, vvs) = verified_receiver.recv_timeout(timer)?; for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) { - let mut rsps = VecDeque::new(); - { - let reqs = Self::deserialize_packets(&((*msgs).read().unwrap())); - for (data, v) in reqs.into_iter().zip(vers.into_iter()) { - if let Some((req, rsp_addr)) = data { - if !req.verify() { - continue; - } - if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - rsps.push_back(blob); - } - } - } - } + let reqs = Self::deserialize_packets(&msgs.read().unwrap()); + let rsps = Self::process_packets(obj, reqs, vers, blob_recycler)?; if !rsps.is_empty() { //don't wake up the other side if there is nothing blob_sender.send(rsps)?;