Better symmetry

deserialize -> process -> serialize
This commit is contained in:
Greg Fitzgerald 2018-04-06 16:34:52 -06:00
parent a93ec03d2c
commit 584c8c07b8
1 changed files with 29 additions and 19 deletions

View File

@ -149,10 +149,29 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
.collect()
}
fn create_blob_from_response(
blob_recycler: &packet::BlobRecycler,
fn process_packets(
obj: &Arc<Mutex<AccountantSkel<W>>>,
reqs: Vec<Option<(Request, SocketAddr)>>,
vers: Vec<u8>,
) -> Vec<(Response, SocketAddr)> {
let mut rsps = Vec::new();
for (data, v) in reqs.into_iter().zip(vers.into_iter()) {
if let Some((req, rsp_addr)) = data {
if !req.verify() {
continue;
}
if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) {
rsps.push((resp, rsp_addr));
}
}
}
rsps
}
fn serialize_response(
resp: Response,
rsp_addr: SocketAddr,
blob_recycler: &packet::BlobRecycler,
) -> Result<packet::SharedBlob> {
let blob = blob_recycler.allocate();
{
@ -166,25 +185,15 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
Ok(blob)
}
fn process_packets(
obj: &Arc<Mutex<AccountantSkel<W>>>,
reqs: Vec<Option<(Request, SocketAddr)>>,
vers: Vec<u8>,
fn serialize_responses(
rsps: Vec<(Response, SocketAddr)>,
blob_recycler: &packet::BlobRecycler,
) -> Result<VecDeque<packet::SharedBlob>> {
let mut rsps = VecDeque::new();
for (data, v) in reqs.into_iter().zip(vers.into_iter()) {
if let Some((req, rsp_addr)) = data {
if !req.verify() {
continue;
}
if let Some(resp) = obj.lock().unwrap().log_verified_request(req, v) {
let blob = Self::create_blob_from_response(blob_recycler, resp, rsp_addr)?;
rsps.push_back(blob);
}
}
let mut blobs = VecDeque::new();
for (resp, rsp_addr) in rsps {
blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?);
}
Ok(rsps)
Ok(blobs)
}
fn process(
@ -198,7 +207,8 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let (mms, vvs) = verified_receiver.recv_timeout(timer)?;
for (msgs, vers) in mms.into_iter().zip(vvs.into_iter()) {
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
let blobs = Self::process_packets(obj, reqs, vers, blob_recycler)?;
let rsps = Self::process_packets(obj, reqs, vers);
let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() {
//don't wake up the other side if there is nothing
blob_sender.send(blobs)?;