From 893011c3bae9e8e56acd35cd4cd00b5e3ef5a116 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 7 May 2018 14:51:08 -0600 Subject: [PATCH 1/2] Process events instead of processing only transactions Prep work to allow clients to send any type that can end up in the ledger. --- src/accountant.rs | 20 +++++++++++++------- src/accountant_skel.rs | 35 +++++++++++++++++++---------------- src/bin/testnode.rs | 8 +++++++- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index a94629837..a214aa419 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -218,13 +218,18 @@ impl Accountant { (trs, rest) } - pub fn process_verified_events(&self, events: Vec) -> Result<()> { + pub fn process_verified_events(&self, events: Vec) -> Vec> { let (trs, rest) = Self::partition_events(events); - self.process_verified_transactions(trs); + let mut results: Vec<_> = self.process_verified_transactions(trs) + .into_iter() + .map(|x| x.map(Event::Transaction)) + .collect(); + for event in rest { - self.process_verified_event(&event)?; + results.push(self.process_verified_event(event)); } - Ok(()) + + results } /// Process a Witness Signature that has already been verified. @@ -278,12 +283,13 @@ impl Accountant { } /// Process an Transaction or Witness that has already been verified. - pub fn process_verified_event(&self, event: &Event) -> Result<()> { - match *event { + pub fn process_verified_event(&self, event: Event) -> Result { + match event { Event::Transaction(ref tr) => self.process_verified_transaction(tr), Event::Signature { from, tx_sig, .. } => self.process_verified_sig(from, tx_sig), Event::Timestamp { from, dt, .. } => self.process_verified_timestamp(from, dt), - } + }?; + Ok(event) } /// Create, sign, and process a Transaction from `keypair` to `to` of diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index a2a8d21d4..e33466568 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -308,20 +308,20 @@ impl AccountantSkel { /// Split Request list into verified transactions and the rest fn partition_requests( req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> (Vec, Vec<(Request, SocketAddr)>) { - let mut trs = vec![]; + ) -> (Vec, Vec<(Request, SocketAddr)>) { + let mut events = vec![]; let mut reqs = vec![]; for (msg, rsp_addr, verify) in req_vers { match msg { Request::Transaction(tr) => { if verify != 0 { - trs.push(tr); + events.push(Event::Transaction(tr)); } } _ => reqs.push((msg, rsp_addr)), } } - (trs, reqs) + (events, reqs) } fn process_packets( @@ -329,16 +329,16 @@ impl AccountantSkel { req_vers: Vec<(Request, SocketAddr, u8)>, ) -> Result> { debug!("partitioning"); - let (trs, reqs) = Self::partition_requests(req_vers); - debug!("trs: {} reqs: {}", trs.len(), reqs.len()); + let (events, reqs) = Self::partition_requests(req_vers); + debug!("events: {} reqs: {}", events.len(), reqs.len()); // Process the transactions in parallel and then log the successful ones. - for result in self.acc.lock().unwrap().process_verified_transactions(trs) { - if let Ok(tr) = result { + for result in self.acc.lock().unwrap().process_verified_events(events) { + if let Ok(event) = result { self.historian_input .lock() .unwrap() - .send(Signal::Event(Event::Transaction(tr)))?; + .send(Signal::Event(event))?; } } @@ -436,13 +436,12 @@ impl AccountantSkel { for msgs in &blobs { let blob = msgs.read().unwrap(); let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); + let acc = obj.acc.lock().unwrap(); for entry in entries { - obj.acc.lock().unwrap().register_entry_id(&entry.id); - - obj.acc - .lock() - .unwrap() - .process_verified_events(entry.events)?; + acc.register_entry_id(&entry.id); + for result in acc.process_verified_events(entry.events) { + result?; + } } //TODO respond back to leader with hash of the state } @@ -805,7 +804,11 @@ mod tests { // the account balance below zero before the credit is added. let acc = Accountant::new(&mint); for entry in entries { - acc.process_verified_events(entry.events).unwrap(); + assert!( + acc.process_verified_events(entry.events) + .into_iter() + .all(|x| x.is_ok()) + ); } assert_eq!(acc.get_balance(&alice.pubkey()), Some(1)); } diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index e9318bbb8..8d1a4e1ea 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -103,7 +103,13 @@ fn main() { let mut last_id = entry1.id; for entry in entries { last_id = entry.id; - acc.process_verified_events(entry.events).unwrap(); + let results = acc.process_verified_events(entry.events); + for result in results { + if let Err(e) = result { + eprintln!("failed to process event {:?}", e); + exit(1); + } + } acc.register_entry_id(&last_id); } From 62bb78f58d3cff60afe4c291e9f551da7026ecd9 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 7 May 2018 15:09:08 -0600 Subject: [PATCH 2/2] Prepwork to hoist processing requests --- src/accountant_skel.rs | 54 +++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e33466568..18ce6cb6b 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -324,15 +324,8 @@ impl AccountantSkel { (events, reqs) } - fn process_packets( - &self, - req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> Result> { - debug!("partitioning"); - let (events, reqs) = Self::partition_requests(req_vers); - debug!("events: {} reqs: {}", events.len(), reqs.len()); - - // Process the transactions in parallel and then log the successful ones. + /// Process the transactions in parallel and then log the successful ones. + fn process_events(&self, events: Vec) -> Result<()> { for result in self.acc.lock().unwrap().process_verified_events(events) { if let Ok(event) = result { self.historian_input @@ -347,17 +340,15 @@ impl AccountantSkel { // Let validators know they should not attempt to process additional // transactions in parallel. self.historian_input.lock().unwrap().send(Signal::Tick)?; - debug!("after historian_input"); - // Process the remaining requests serially. - let rsps = reqs.into_iter() + Ok(()) + } + + fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> { + reqs.into_iter() .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) - .collect(); - - debug!("returning rsps"); - - Ok(rsps) + .collect() } fn serialize_response( @@ -409,9 +400,19 @@ impl AccountantSkel { v }) .collect(); - debug!("process_packets"); - let rsps = obj.process_packets(req_vers)?; - debug!("done process_packets"); + + debug!("partitioning"); + let (events, reqs) = Self::partition_requests(req_vers); + debug!("events: {} reqs: {}", events.len(), reqs.len()); + + debug!("process_events"); + obj.process_events(events)?; + debug!("done process_events"); + + debug!("process_requests"); + let rsps = obj.process_requests(reqs); + debug!("done process_requests"); + let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { info!("process: sending blobs: {}", blobs.len()); @@ -731,7 +732,7 @@ mod tests { use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::io::sink; - use std::net::{SocketAddr, UdpSocket}; + use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::sync_channel; @@ -774,7 +775,6 @@ mod tests { // Entry OR if the verifier tries to parallelize across multiple Entries. let mint = Mint::new(2); let acc = Accountant::new(&mint); - let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); let skel = AccountantSkel::new(acc, input, historian); @@ -782,13 +782,13 @@ mod tests { // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); - let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; - assert!(skel.process_packets(req_vers).is_ok()); + let events = vec![Event::Transaction(tr)]; + assert!(skel.process_events(events).is_ok()); // Process a second batch that spends one of those tokens. let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); - let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; - assert!(skel.process_packets(req_vers).is_ok()); + let events = vec![Event::Transaction(tr)]; + assert!(skel.process_events(events).is_ok()); // Collect the ledger and feed it to a new accountant. skel.historian_input @@ -1102,7 +1102,7 @@ mod bench { let skel = AccountantSkel::new(acc, input, historian); let now = Instant::now(); - assert!(skel.process_packets(req_vers).is_ok()); + assert!(skel.process_events(req_vers).is_ok()); let duration = now.elapsed(); let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let tps = txs as f64 / sec;