Prepwork to hoist processing requests

This commit is contained in:
Greg Fitzgerald 2018-05-07 15:09:08 -06:00
parent 893011c3ba
commit 62bb78f58d
1 changed files with 27 additions and 27 deletions

View File

@ -324,15 +324,8 @@ impl AccountantSkel {
(events, reqs) (events, reqs)
} }
fn process_packets( /// Process the transactions in parallel and then log the successful ones.
&self, fn process_events(&self, events: Vec<Event>) -> Result<()> {
req_vers: Vec<(Request, SocketAddr, u8)>,
) -> Result<Vec<(Response, SocketAddr)>> {
debug!("partitioning");
let (events, reqs) = Self::partition_requests(req_vers);
debug!("events: {} reqs: {}", events.len(), reqs.len());
// Process the transactions in parallel and then log the successful ones.
for result in self.acc.lock().unwrap().process_verified_events(events) { for result in self.acc.lock().unwrap().process_verified_events(events) {
if let Ok(event) = result { if let Ok(event) = result {
self.historian_input self.historian_input
@ -347,17 +340,15 @@ impl AccountantSkel {
// Let validators know they should not attempt to process additional // Let validators know they should not attempt to process additional
// transactions in parallel. // transactions in parallel.
self.historian_input.lock().unwrap().send(Signal::Tick)?; self.historian_input.lock().unwrap().send(Signal::Tick)?;
debug!("after historian_input"); debug!("after historian_input");
// Process the remaining requests serially. Ok(())
let rsps = reqs.into_iter() }
fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> {
reqs.into_iter()
.filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr))
.collect(); .collect()
debug!("returning rsps");
Ok(rsps)
} }
fn serialize_response( fn serialize_response(
@ -409,9 +400,19 @@ impl AccountantSkel {
v v
}) })
.collect(); .collect();
debug!("process_packets");
let rsps = obj.process_packets(req_vers)?; debug!("partitioning");
debug!("done process_packets"); let (events, reqs) = Self::partition_requests(req_vers);
debug!("events: {} reqs: {}", events.len(), reqs.len());
debug!("process_events");
obj.process_events(events)?;
debug!("done process_events");
debug!("process_requests");
let rsps = obj.process_requests(reqs);
debug!("done process_requests");
let blobs = Self::serialize_responses(rsps, blob_recycler)?; let blobs = Self::serialize_responses(rsps, blob_recycler)?;
if !blobs.is_empty() { if !blobs.is_empty() {
info!("process: sending blobs: {}", blobs.len()); info!("process: sending blobs: {}", blobs.len());
@ -731,7 +732,7 @@ mod tests {
use signature::{KeyPair, KeyPairUtil}; use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::sink; use std::io::sink;
use std::net::{SocketAddr, UdpSocket}; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::mpsc::sync_channel; use std::sync::mpsc::sync_channel;
@ -774,7 +775,6 @@ mod tests {
// Entry OR if the verifier tries to parallelize across multiple Entries. // Entry OR if the verifier tries to parallelize across multiple Entries.
let mint = Mint::new(2); let mint = Mint::new(2);
let acc = Accountant::new(&mint); let acc = Accountant::new(&mint);
let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address");
let (input, event_receiver) = sync_channel(10); let (input, event_receiver) = sync_channel(10);
let historian = Historian::new(event_receiver, &mint.last_id(), None); let historian = Historian::new(event_receiver, &mint.last_id(), None);
let skel = AccountantSkel::new(acc, input, historian); let skel = AccountantSkel::new(acc, input, historian);
@ -782,13 +782,13 @@ mod tests {
// Process a batch that includes a transaction that receives two tokens. // Process a batch that includes a transaction that receives two tokens.
let alice = KeyPair::new(); let alice = KeyPair::new();
let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id());
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; let events = vec![Event::Transaction(tr)];
assert!(skel.process_packets(req_vers).is_ok()); assert!(skel.process_events(events).is_ok());
// Process a second batch that spends one of those tokens. // Process a second batch that spends one of those tokens.
let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id());
let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; let events = vec![Event::Transaction(tr)];
assert!(skel.process_packets(req_vers).is_ok()); assert!(skel.process_events(events).is_ok());
// Collect the ledger and feed it to a new accountant. // Collect the ledger and feed it to a new accountant.
skel.historian_input skel.historian_input
@ -1102,7 +1102,7 @@ mod bench {
let skel = AccountantSkel::new(acc, input, historian); let skel = AccountantSkel::new(acc, input, historian);
let now = Instant::now(); let now = Instant::now();
assert!(skel.process_packets(req_vers).is_ok()); assert!(skel.process_events(req_vers).is_ok());
let duration = now.elapsed(); let duration = now.elapsed();
let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0;
let tps = txs as f64 / sec; let tps = txs as f64 / sec;