From 0a30bd74c1c517ea00042697e385a49427d6e22c Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 12 Apr 2018 20:53:34 -0600 Subject: [PATCH 1/2] Tell verifiers when not to parallelize accounting Without this patch, many batches of transactions could be tossed into a single entry, but the parallelized accountant can only guarentee the transactions in the batch can be processed in parallel. This patch signals the historian to generate a new Entry after each batch. Validators must maintain sequential consistency across Entries. --- src/accountant.rs | 38 ++++++++++++++++++++++++++++++++++- src/accountant_skel.rs | 45 +++++++++++++++++++++++++++++++++++++++++- src/bin/testnode.rs | 4 +--- 3 files changed, 82 insertions(+), 5 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 8fb576bf84..682fa7635b 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -155,8 +155,12 @@ impl Accountant { pub fn process_verified_transactions(&self, trs: Vec) -> Vec> { // Run all debits first to filter out any transactions that can't be processed // in parallel deterministically. - trs.into_par_iter() + let results: Vec<_> = trs.into_par_iter() .map(|tr| self.process_verified_transaction_debits(&tr).map(|_| tr)) + .collect(); // Calling collect() here forces all debits to complete before moving on. + + results + .into_par_iter() .map(|result| { result.map(|tr| { self.process_verified_transaction_credits(&tr); @@ -166,6 +170,27 @@ impl Accountant { .collect() } + fn partition_events(events: Vec) -> (Vec, Vec) { + let mut trs = vec![]; + let mut rest = vec![]; + for event in events { + match event { + Event::Transaction(tr) => trs.push(tr), + _ => rest.push(event), + } + } + (trs, rest) + } + + pub fn process_verified_events(&self, events: Vec) -> Result<()> { + let (trs, rest) = Self::partition_events(events); + self.process_verified_transactions(trs); + for event in rest { + self.process_verified_event(&event)?; + } + Ok(()) + } + /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { @@ -410,6 +435,17 @@ mod tests { // Assert we're no longer able to use the oldest entry ID. assert!(!acc.reserve_signature_with_last_id(&sig, &alice.last_id())); } + + #[test] + fn test_debits_before_credits() { + let mint = Mint::new(2); + let acc = Accountant::new(&mint); + let alice = KeyPair::new(); + let tr0 = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); + let tr1 = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); + let trs = vec![tr0, tr1]; + assert!(acc.process_verified_transactions(trs)[1].is_err()); + } } #[cfg(all(feature = "unstable", test))] diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index babe576f50..d548220114 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -179,6 +179,10 @@ impl AccountantSkel { } } + // Let validators know they should not attempt to process additional + // transactions in parallel. + self.historian.sender.send(Signal::Tick)?; + // Process the remaining requests serially. let rsps = reqs.into_iter() .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) @@ -321,12 +325,14 @@ mod tests { use accountant::Accountant; use accountant_skel::AccountantSkel; use accountant_stub::AccountantStub; + use entry::Entry; use historian::Historian; use mint::Mint; use plan::Plan; + use recorder::Signal; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; - use std::net::UdpSocket; + use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::thread::sleep; @@ -359,6 +365,43 @@ mod tests { assert_eq!(rv[1].read().unwrap().packets.len(), 1); } + #[test] + fn test_accounting_sequential_consistency() { + // In this attack we'll demonstrate that a verifier can interpret the ledger + // differently if either the server doesn't signal the ledger to add an + // Entry OR if the verifier tries to parallelize across multiple Entries. + let mint = Mint::new(2); + let acc = Accountant::new(&mint); + let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); + let historian = Historian::new(&mint.last_id(), None); + let mut skel = AccountantSkel::new(acc, mint.last_id(), sink(), historian); + + // Process a batch that includes a transaction that receives two tokens. + let alice = KeyPair::new(); + let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); + let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; + assert!(skel.process_packets(req_vers).is_ok()); + + // Process a second batch that spends one of those tokens. + let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); + let req_vers = vec![(Request::Transaction(tr), rsp_addr, 1_u8)]; + assert!(skel.process_packets(req_vers).is_ok()); + + // Collect the ledger and feed it to a new accountant. + skel.historian.sender.send(Signal::Tick).unwrap(); + drop(skel.historian.sender); + let entries: Vec = skel.historian.receiver.iter().collect(); + + // Assert the user holds one token, not two. If the server only output one + // entry, then the second transaction will be rejected, because it drives + // the account balance below zero before the credit is added. + let acc = Accountant::new(&mint); + for entry in entries { + acc.process_verified_events(entry.events).unwrap(); + } + assert_eq!(acc.get_balance(&alice.pubkey()), Some(1)); + } + #[test] fn test_accountant_bad_sig() { let serve_port = 9002; diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index b79b99e758..3e0ac84266 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -53,9 +53,7 @@ fn main() { let mut last_id = entry1.id; for entry in entries { last_id = entry.id; - for event in entry.events { - acc.process_verified_event(&event).unwrap(); - } + acc.process_verified_events(entry.events).unwrap(); } let historian = Historian::new(&last_id, Some(1000)); From 7fc42de7581bdded1f251d672fdae7396b8effa7 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 13 Apr 2018 00:36:23 -0400 Subject: [PATCH 2/2] Fix bench --- src/accountant_skel.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index d548220114..41d8f90ede 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -511,7 +511,6 @@ mod bench { let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - skel.historian.sender.send(Signal::Tick).unwrap(); drop(skel.historian.sender); let entries: Vec = skel.historian.receiver.iter().collect(); assert_eq!(entries.len(), 1);