From 8a7545197f3b7396a34f9a67015722b2f3e19fe7 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Tue, 25 Sep 2018 15:01:51 -0700 Subject: [PATCH] move tick generation back to banking_stage, add unit tests (#1332) * move tick generation back to banking_stage, add unit tests fixes #1217 * remove channel() stuff for synchronous comm; use a mutex --- src/banking_stage.rs | 320 +++++++++++++++++++++++++++---------------- src/entry.rs | 8 +- src/poh.rs | 37 ++--- src/poh_service.rs | 166 +++++++++++----------- 4 files changed, 303 insertions(+), 228 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index e1594a3a3..b6a2a20dc 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -6,10 +6,9 @@ use bank::Bank; use bincode::deserialize; use counter::Counter; use entry::Entry; -use hash::{Hash, Hasher}; +use hash::Hasher; use log::Level; use packet::{Packets, SharedPackets}; -use poh::PohEntry; use poh_service::PohService; use rayon::prelude::*; use result::{Error, Result}; @@ -43,26 +42,39 @@ impl BankingStage { let thread_hdl = Builder::new() .name("solana-banking-stage".to_string()) .spawn(move || { - let (hash_sender, hash_receiver) = channel(); - let (poh_service, poh_receiver) = - PohService::new(bank.last_id(), hash_receiver, tick_duration); + let poh_service = PohService::new(bank.last_id(), tick_duration.is_some()); + + let mut last_tick = Instant::now(); + loop { + let timeout = + tick_duration.map(|duration| duration - (Instant::now() - last_tick)); + if let Err(e) = Self::process_packets( + timeout, &bank, - &hash_sender, - &poh_receiver, + &poh_service, &verified_receiver, &entry_sender, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::RecvError(_) => break, Error::SendError => break, - _ => error!("{:?}", e), + _ => { + error!("process_packets() {:?}", e); + break; + } } } + if tick_duration.is_some() && last_tick.elapsed() > tick_duration.unwrap() { + if let Err(e) = Self::tick(&poh_service, &entry_sender) { + error!("tick() {:?}", e); + } + last_tick = Instant::now(); + } } - drop(hash_sender); poh_service.join().unwrap(); }).unwrap(); (BankingStage { thread_hdl }, entry_receiver) @@ -80,26 +92,33 @@ impl BankingStage { }).collect() } + fn tick(poh_service: &PohService, entry_sender: &Sender>) -> Result<()> { + let poh = poh_service.tick(); + let entry = Entry { + num_hashes: poh.num_hashes, + id: poh.id, + transactions: vec![], + }; + entry_sender.send(vec![entry])?; + Ok(()) + } + fn process_transactions( bank: &Arc, transactions: &[Transaction], - hash_sender: &Sender, - poh_receiver: &Receiver, - entry_sender: &Sender>, - ) -> Result<()> { + poh_service: &PohService, + ) -> Result> { let mut entries = Vec::new(); - debug!("transactions: {}", transactions.len()); + debug!("processing: {}", transactions.len()); let mut chunk_start = 0; while chunk_start != transactions.len() { let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]); let results = bank.process_transactions(&transactions[chunk_start..chunk_end]); - debug!("results: {}", results.len()); let mut hasher = Hasher::default(); - let processed_transactions: Vec<_> = transactions[chunk_start..chunk_end] .into_iter() .enumerate() @@ -114,64 +133,45 @@ impl BankingStage { } }).collect(); - debug!("processed ok: {}", processed_transactions.len()); + debug!("processed: {}", processed_transactions.len()); chunk_start = chunk_end; let hash = hasher.result(); if !processed_transactions.is_empty() { - hash_sender.send(hash)?; + let poh = poh_service.record(hash); - let mut answered = false; - while !answered { - entries.extend(poh_receiver.try_iter().map(|poh| { - if let Some(mixin) = poh.mixin { - answered = true; - assert_eq!(mixin, hash); - bank.register_entry_id(&poh.id); - Entry { - num_hashes: poh.num_hashes, - id: poh.id, - transactions: processed_transactions.clone(), - } - } else { - Entry { - num_hashes: poh.num_hashes, - id: poh.id, - transactions: vec![], - } - } - })); - } - } else { - entries.extend(poh_receiver.try_iter().map(|poh| Entry { + bank.register_entry_id(&poh.id); + entries.push(Entry { num_hashes: poh.num_hashes, id: poh.id, - transactions: vec![], - })); + transactions: processed_transactions, + }); } } debug!("done process_transactions, {} entries", entries.len()); - entry_sender.send(entries)?; - Ok(()) + Ok(entries) } /// Process the incoming packets and send output `Signal` messages to `signal_sender`. /// Discard packets via `packet_recycler`. pub fn process_packets( + timeout: Option, bank: &Arc, - hash_sender: &Sender, - poh_receiver: &Receiver, + poh_service: &PohService, verified_receiver: &Receiver)>>, entry_sender: &Sender>, ) -> Result<()> { - let timer = Duration::new(1, 0); let recv_start = Instant::now(); - let mms = verified_receiver.recv_timeout(timer)?; - debug!("verified_recevier {:?}", verified_receiver); + // TODO pass deadline to recv_deadline() when/if it becomes available? + let mms = if let Some(timeout) = timeout { + verified_receiver.recv_timeout(timeout)? + } else { + verified_receiver.recv()? + }; let now = Instant::now(); let mut reqs_len = 0; let mms_len = mms.len(); @@ -189,6 +189,8 @@ impl BankingStage { let transactions = Self::deserialize_transactions(&msgs.read()); reqs_len += transactions.len(); + debug!("transactions received {}", transactions.len()); + let transactions: Vec<_> = transactions .into_iter() .zip(vers) @@ -200,14 +202,10 @@ impl BankingStage { None }, }).collect(); + debug!("verified transactions {}", transactions.len()); - Self::process_transactions( - bank, - &transactions, - hash_sender, - poh_receiver, - entry_sender, - )?; + let entries = Self::process_transactions(bank, &transactions, poh_service)?; + entry_sender.send(entries)?; } inc_new_counter_info!( @@ -241,63 +239,153 @@ impl Service for BankingStage { } } -// TODO: When banking is pulled out of RequestStage, add this test back in. +#[cfg(test)] +mod tests { + use super::*; + use bank::Bank; + use ledger::Block; + use mint::Mint; + use packet::{to_packets, PacketRecycler}; + use signature::{Keypair, KeypairUtil}; + use std::thread::sleep; + use transaction::Transaction; -//use bank::Bank; -//use entry::Entry; -//use hash::Hash; -//use record_stage::RecordStage; -//use record_stage::Signal; -//use result::Result; -//use std::sync::mpsc::{channel, Sender}; -//use std::sync::{Arc, Mutex}; -//use std::time::Duration; -//use transaction::Transaction; -// -//#[cfg(test)] -//mod tests { -// use bank::Bank; -// use mint::Mint; -// use signature::{KeyPair, KeyPairUtil}; -// use transaction::Transaction; -// -// #[test] -// // TODO: Move this test banking_stage. Calling process_transactions() directly -// // defeats the purpose of this test. -// fn test_banking_sequential_consistency() { -// // In this attack we'll demonstrate that a verifier can interpret the ledger -// // differently if either the server doesn't signal the ledger to add an -// // Entry OR if the verifier tries to parallelize across multiple Entries. -// let mint = Mint::new(2); -// let bank = Bank::new(&mint); -// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None); -// -// // Process a batch that includes a transaction that receives two tokens. -// let alice = KeyPair::new(); -// let tx = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); -// let transactions = vec![tx]; -// let entry0 = banking_stage.process_transactions(transactions).unwrap(); -// -// // Process a second batch that spends one of those tokens. -// let tx = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); -// let transactions = vec![tx]; -// let entry1 = banking_stage.process_transactions(transactions).unwrap(); -// -// // Collect the ledger and feed it to a new bank. -// let entries = vec![entry0, entry1]; -// -// // Assert the user holds one token, not two. If the server only output one -// // entry, then the second transaction will be rejected, because it drives -// // the account balance below zero before the credit is added. -// let bank = Bank::new(&mint); -// for entry in entries { -// assert!( -// bank -// .process_transactions(entry.transactions) -// .into_iter() -// .all(|x| x.is_ok()) -// ); -// } -// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); -// } -//} + #[test] + fn test_banking_stage_shutdown() { + let bank = Bank::new(&Mint::new(2)); + let (verified_sender, verified_receiver) = channel(); + let (banking_stage, _entry_receiver) = + BankingStage::new(Arc::new(bank), verified_receiver, None); + drop(verified_sender); + assert_eq!(banking_stage.join().unwrap(), ()); + } + + #[test] + fn test_banking_stage_tick() { + let bank = Bank::new(&Mint::new(2)); + let start_hash = bank.last_id(); + let (verified_sender, verified_receiver) = channel(); + let (banking_stage, entry_receiver) = BankingStage::new( + Arc::new(bank), + verified_receiver, + Some(Duration::from_millis(1)), + ); + sleep(Duration::from_millis(50)); + drop(verified_sender); + + let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); + assert!(entries.len() != 0); + assert!(entries.verify(&start_hash)); + assert_eq!(banking_stage.join().unwrap(), ()); + } + + #[test] + fn test_banking_stage_no_tick() { + let bank = Bank::new(&Mint::new(2)); + let (verified_sender, verified_receiver) = channel(); + let (banking_stage, entry_receiver) = + BankingStage::new(Arc::new(bank), verified_receiver, None); + sleep(Duration::from_millis(1000)); + drop(verified_sender); + + let entries: Vec<_> = entry_receiver.try_iter().map(|x| x).collect(); + assert!(entries.len() == 0); + assert_eq!(banking_stage.join().unwrap(), ()); + } + + #[test] + fn test_banking_stage() { + let mint = Mint::new(2); + let bank = Bank::new(&mint); + let start_hash = bank.last_id(); + let (verified_sender, verified_receiver) = channel(); + let (banking_stage, entry_receiver) = + BankingStage::new(Arc::new(bank), verified_receiver, None); + + // good tx + let keypair = mint.keypair(); + let tx = Transaction::new(&keypair, keypair.pubkey(), 1, start_hash); + + // good tx, but no verify + let tx_no_ver = Transaction::new(&keypair, keypair.pubkey(), 1, start_hash); + + // bad tx, AccountNotFound + let keypair = Keypair::new(); + let tx_anf = Transaction::new(&keypair, keypair.pubkey(), 1, start_hash); + + // send 'em over + let recycler = PacketRecycler::default(); + let packets = to_packets(&recycler, &[tx, tx_no_ver, tx_anf]); + + // glad they all fit + assert_eq!(packets.len(), 1); + verified_sender // tx, no_ver, anf + .send(vec![(packets[0].clone(), vec![1u8, 0u8, 1u8])]) + .unwrap(); + + drop(verified_sender); + + let entries: Vec<_> = entry_receiver.iter().map(|x| x).collect(); + assert_eq!(entries.len(), 1); + let mut last_id = start_hash; + entries.iter().for_each(|entries| { + assert_eq!(entries.len(), 1); + assert!(entries.verify(&last_id)); + last_id = entries.last().unwrap().id; + }); + assert_eq!(banking_stage.join().unwrap(), ()); + } + + #[test] + fn test_banking_stage_entryfication() { + // In this attack we'll demonstrate that a verifier can interpret the ledger + // differently if either the server doesn't signal the ledger to add an + // Entry OR if the verifier tries to parallelize across multiple Entries. + let mint = Mint::new(2); + let bank = Bank::new(&mint); + let recycler = PacketRecycler::default(); + let (verified_sender, verified_receiver) = channel(); + let (banking_stage, entry_receiver) = + BankingStage::new(Arc::new(bank), verified_receiver, None); + + // Process a batch that includes a transaction that receives two tokens. + let alice = Keypair::new(); + let tx = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); + + let packets = to_packets(&recycler, &[tx]); + verified_sender + .send(vec![(packets[0].clone(), vec![1u8])]) + .unwrap(); + + // Process a second batch that spends one of those tokens. + let tx = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); + let packets = to_packets(&recycler, &[tx]); + verified_sender + .send(vec![(packets[0].clone(), vec![1u8])]) + .unwrap(); + drop(verified_sender); + assert_eq!(banking_stage.join().unwrap(), ()); + + // Collect the ledger and feed it to a new bank. + let ventries: Vec<_> = entry_receiver.iter().collect(); + + // same assertion as below, really... + assert_eq!(ventries.len(), 2); + + // Assert the user holds one token, not two. If the stage only outputs one + // entry, then the second transaction will be rejected, because it drives + // the account balance below zero before the credit is added. + let bank = Bank::new(&mint); + for entries in ventries { + for entry in entries { + assert!( + bank.process_transactions(&entry.transactions) + .into_iter() + .all(|x| x.is_ok()) + ); + } + } + assert_eq!(bank.get_balance(&alice.pubkey()), 1); + } + +} diff --git a/src/entry.rs b/src/entry.rs index 8b87a5cb0..5b295e48d 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -200,13 +200,17 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) - return *start_hash; } - let mut poh = Poh::new(*start_hash, None); + let mut poh = Poh::new(*start_hash); for _ in 1..num_hashes { poh.hash(); } - poh.record(Transaction::hash(transactions)).id + if transactions.is_empty() { + poh.tick().id + } else { + poh.record(Transaction::hash(transactions)).id + } } /// Creates the next Tick or Transaction Entry `num_hashes` after `start_hash`. diff --git a/src/poh.rs b/src/poh.rs index 7eecb4622..20b00d772 100644 --- a/src/poh.rs +++ b/src/poh.rs @@ -1,14 +1,10 @@ //! The `Poh` module provides an object for generating a Proof of History. //! It records Hashes items on behalf of its users. - use hash::{hash, hashv, Hash}; -use std::time::{Duration, Instant}; pub struct Poh { last_hash: Hash, num_hashes: u64, - last_tick: Instant, - tick_duration: Option, } #[derive(Debug)] @@ -19,13 +15,10 @@ pub struct PohEntry { } impl Poh { - pub fn new(last_hash: Hash, tick_duration: Option) -> Self { - let last_tick = Instant::now(); + pub fn new(last_hash: Hash) -> Self { Poh { last_hash, num_hashes: 0, - last_tick, - tick_duration, } } @@ -36,10 +29,10 @@ impl Poh { pub fn record(&mut self, mixin: Hash) -> PohEntry { let num_hashes = self.num_hashes + 1; - self.num_hashes = 0; - self.last_hash = hashv(&[&self.last_hash.as_ref(), &mixin.as_ref()]); + self.num_hashes = 0; + PohEntry { num_hashes, id: self.last_hash, @@ -49,23 +42,21 @@ impl Poh { // emissions of Ticks (i.e. PohEntries without a mixin) allows // validators to parallelize the work of catching up - pub fn tick(&mut self) -> Option { - if let Some(tick_duration) = self.tick_duration { - if self.last_tick.elapsed() >= tick_duration { - self.last_tick = Instant::now(); - let entry = PohEntry { - num_hashes: self.num_hashes, - id: self.last_hash, - mixin: None, - }; - self.num_hashes = 0; - return Some(entry); - } + pub fn tick(&mut self) -> PohEntry { + self.hash(); + + let num_hashes = self.num_hashes; + self.num_hashes = 0; + + PohEntry { + num_hashes, + id: self.last_hash, + mixin: None, } - None } } +#[cfg(test)] pub fn verify(initial: Hash, entries: &[PohEntry]) -> bool { let mut last_hash = initial; diff --git a/src/poh_service.rs b/src/poh_service.rs index ed07cbac5..f9e8f6217 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -11,12 +11,14 @@ use hash::Hash; use poh::{Poh, PohEntry}; use service::Service; -use std::sync::mpsc::{channel, Receiver, RecvError, Sender, TryRecvError}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; -use std::time::Duration; pub struct PohService { + poh: Arc>, thread_hdl: JoinHandle<()>, + run_poh: Arc, } impl PohService { @@ -24,74 +26,83 @@ impl PohService { /// sending back Entry messages until either the receiver or sender channel is closed. /// if tick_duration is some, service will automatically produce entries every /// `tick_duration`. - pub fn new( - start_hash: Hash, - hash_receiver: Receiver, - tick_duration: Option, - ) -> (Self, Receiver) { - let (poh_sender, poh_receiver) = channel(); + pub fn new(start_hash: Hash, run_poh: bool) -> Self { + let poh = Arc::new(Mutex::new(Poh::new(start_hash))); + let run_poh = Arc::new(AtomicBool::new(run_poh)); + let thread_poh = poh.clone(); + let thread_run_poh = run_poh.clone(); let thread_hdl = Builder::new() .name("solana-record-service".to_string()) .spawn(move || { - let mut poh = Poh::new(start_hash, tick_duration); - if tick_duration.is_some() { - loop { - if Self::try_process_hashes(&mut poh, &hash_receiver, &poh_sender).is_err() - { - return; - } - poh.hash(); - } - } else { - let _ = Self::process_hashes(&mut poh, &hash_receiver, &poh_sender); + while thread_run_poh.load(Ordering::Relaxed) { + thread_poh.lock().unwrap().hash(); } }).unwrap(); - (PohService { thread_hdl }, poh_receiver) - } - - fn process_hash(hash: Hash, poh: &mut Poh, sender: &Sender) -> Result<(), ()> { - let resp = poh.record(hash); - sender.send(resp).or(Err(()))?; - Ok(()) - } - - fn process_hashes( - poh: &mut Poh, - receiver: &Receiver, - sender: &Sender, - ) -> Result<(), ()> { - loop { - match receiver.recv() { - Ok(hash) => Self::process_hash(hash, poh, sender)?, - Err(RecvError) => return Err(()), - } + PohService { + poh, + run_poh, + thread_hdl, } } - fn try_process_hashes( - poh: &mut Poh, - receiver: &Receiver, - sender: &Sender, - ) -> Result<(), ()> { - loop { - if let Some(resp) = poh.tick() { - sender.send(resp).or(Err(()))?; - } - match receiver.try_recv() { - Ok(hash) => Self::process_hash(hash, poh, sender)?, - Err(TryRecvError::Empty) => return Ok(()), - Err(TryRecvError::Disconnected) => return Err(()), - }; - } + pub fn tick(&self) -> PohEntry { + let mut poh = self.poh.lock().unwrap(); + poh.tick() } + + pub fn record(&self, mixin: Hash) -> PohEntry { + let mut poh = self.poh.lock().unwrap(); + poh.record(mixin) + } + + // fn process_hash( + // mixin: Option, + // poh: &mut Poh, + // sender: &Sender, + // ) -> Result<(), ()> { + // let resp = match mixin { + // Some(mixin) => poh.record(mixin), + // None => poh.tick(), + // }; + // sender.send(resp).or(Err(()))?; + // Ok(()) + // } + // + // fn process_hashes( + // poh: &mut Poh, + // receiver: &Receiver>, + // sender: &Sender, + // ) -> Result<(), ()> { + // loop { + // match receiver.recv() { + // Ok(hash) => Self::process_hash(hash, poh, sender)?, + // Err(RecvError) => return Err(()), + // } + // } + // } + // + // fn try_process_hashes( + // poh: &mut Poh, + // receiver: &Receiver>, + // sender: &Sender, + // ) -> Result<(), ()> { + // loop { + // match receiver.try_recv() { + // Ok(hash) => Self::process_hash(hash, poh, sender)?, + // Err(TryRecvError::Empty) => return Ok(()), + // Err(TryRecvError::Disconnected) => return Err(()), + // }; + // } + // } } impl Service for PohService { type JoinReturnType = (); fn join(self) -> thread::Result<()> { + self.run_poh.store(false, Ordering::Relaxed); self.thread_hdl.join() } } @@ -100,57 +111,38 @@ impl Service for PohService { mod tests { use super::*; use poh::verify; - use std::sync::mpsc::channel; use std::thread::sleep; + use std::time::Duration; #[test] fn test_poh() { - let (hash_sender, hash_receiver) = channel(); - let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver, None); + let poh_service = PohService::new(Hash::default(), false); - hash_sender.send(Hash::default()).unwrap(); + let entry0 = poh_service.record(Hash::default()); sleep(Duration::from_millis(1)); - hash_sender.send(Hash::default()).unwrap(); + let entry1 = poh_service.record(Hash::default()); sleep(Duration::from_millis(1)); - hash_sender.send(Hash::default()).unwrap(); - - let entry0 = poh_receiver.recv().unwrap(); - let entry1 = poh_receiver.recv().unwrap(); - let entry2 = poh_receiver.recv().unwrap(); + let entry2 = poh_service.record(Hash::default()); assert_eq!(entry0.num_hashes, 1); - assert_eq!(entry0.num_hashes, 1); - assert_eq!(entry0.num_hashes, 1); + assert_eq!(entry1.num_hashes, 1); + assert_eq!(entry2.num_hashes, 1); - drop(hash_sender); - assert_eq!(poh_service.thread_hdl.join().unwrap(), ()); + assert_eq!(poh_service.join().unwrap(), ()); assert!(verify(Hash::default(), &[entry0, entry1, entry2])); } #[test] - fn test_poh_closed_sender() { - let (hash_sender, hash_receiver) = channel(); - let (poh_service, poh_receiver) = PohService::new(Hash::default(), hash_receiver, None); - drop(poh_receiver); - hash_sender.send(Hash::default()).unwrap(); - assert_eq!(poh_service.thread_hdl.join().unwrap(), ()); - } - - #[test] - fn test_poh_clock() { - let (hash_sender, hash_receiver) = channel(); - let (_poh_service, poh_receiver) = PohService::new( - Hash::default(), - hash_receiver, - Some(Duration::from_millis(1)), - ); + fn test_do_poh() { + let poh_service = PohService::new(Hash::default(), true); sleep(Duration::from_millis(50)); - drop(hash_sender); - let pohs: Vec<_> = poh_receiver.iter().map(|x| x).collect(); - assert!(pohs.len() > 1); + let entry = poh_service.tick(); + assert!(entry.num_hashes > 1); - assert!(verify(Hash::default(), &pohs)); + assert_eq!(poh_service.join().unwrap(), ()); + + assert!(verify(Hash::default(), &vec![entry])); } }