diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e334665687..18ce6cb6b9 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -324,15 +324,8 @@ impl AccountantSkel { (events, reqs) } - fn process_packets( - &self, - req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> Result> { - debug!("partitioning"); - let (events, reqs) = Self::partition_requests(req_vers); - debug!("events: {} reqs: {}", events.len(), reqs.len()); - - // Process the transactions in parallel and then log the successful ones. + /// Process the transactions in parallel and then log the successful ones. + fn process_events(&self, events: Vec) -> Result<()> { for result in self.acc.lock().unwrap().process_verified_events(events) { if let Ok(event) = result { self.historian_input @@ -347,17 +340,15 @@ impl AccountantSkel { // Let validators know they should not attempt to process additional // transactions in parallel. self.historian_input.lock().unwrap().send(Signal::Tick)?; - debug!("after historian_input"); - // Process the remaining requests serially. - let rsps = reqs.into_iter() + Ok(()) + } + + fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> { + reqs.into_iter() .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) - .collect(); - - debug!("returning rsps"); - - Ok(rsps) + .collect() } fn serialize_response( @@ -409,9 +400,19 @@ impl AccountantSkel { v }) .collect(); - debug!("process_packets"); - let rsps = obj.process_packets(req_vers)?; - debug!("done process_packets"); + + debug!("partitioning"); + let (events, reqs) = Self::partition_requests(req_vers); + debug!("events: {} reqs: {}", events.len(), reqs.len()); + + debug!("process_events"); + obj.process_events(events)?; + debug!("done process_events"); + + debug!("process_requests"); + let rsps = obj.process_requests(reqs); + debug!("done process_requests"); + let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { info!("process: sending blobs: {}", blobs.len()); @@ -731,7 +732,7 @@ mod tests { use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::io::sink; - use std::net::{SocketAddr, UdpSocket}; + use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::sync_channel; @@ -774,7 +775,6 @@ mod tests { // Entry OR if the verifier tries to parallelize across multiple Entries. let mint = Mint::new(2); let acc = Accountant::new(&mint); - let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); let skel = AccountantSkel::new(acc, input, historian); @@ -782,13 +782,13 @@ mod tests { // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); - let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; - assert!(skel.process_packets(req_vers).is_ok()); + let events = vec![Event::Transaction(tr)]; + assert!(skel.process_events(events).is_ok()); // Process a second batch that spends one of those tokens. let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); - let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; - assert!(skel.process_packets(req_vers).is_ok()); + let events = vec![Event::Transaction(tr)]; + assert!(skel.process_events(events).is_ok()); // Collect the ledger and feed it to a new accountant. skel.historian_input @@ -1102,7 +1102,7 @@ mod bench { let skel = AccountantSkel::new(acc, input, historian); let now = Instant::now(); - assert!(skel.process_packets(req_vers).is_ok()); + assert!(skel.process_events(req_vers).is_ok()); let duration = now.elapsed(); let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let tps = txs as f64 / sec;