diff --git a/benches/bank.rs b/benches/bank.rs index 41952288ef..2f41e8072e 100644 --- a/benches/bank.rs +++ b/benches/bank.rs @@ -42,7 +42,7 @@ fn bench_process_transaction(bencher: &mut Bencher) { bencher.iter(|| { // Since benchmarker runs this multiple times, we need to clear the signatures. bank.clear_signatures(); - let results = bank.process_transactions(transactions.clone()); + let results = bank.process_transactions(&transactions); assert!(results.iter().all(Result::is_ok)); }) } diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index d6a5d48904..9fd648e60e 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -1,17 +1,18 @@ #![feature(test)] extern crate bincode; +extern crate rand; extern crate rayon; extern crate solana; extern crate test; +use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana::bank::Bank; -use solana::banking_stage::BankingStage; +use solana::banking_stage::{BankingStage, NUM_THREADS}; use solana::entry::Entry; use solana::mint::Mint; use solana::packet::{to_packets_chunked, PacketRecycler}; -use solana::poh_service::PohService; -use solana::signature::{Keypair, KeypairUtil}; +use solana::signature::{KeypairUtil, Pubkey, Signature}; use solana::transaction::Transaction; use std::iter; use std::sync::mpsc::{channel, Receiver}; @@ -19,68 +20,6 @@ use std::sync::Arc; use std::time::Duration; use test::Bencher; -// use self::test::Bencher; -// use bank::{Bank, MAX_ENTRY_IDS}; -// use bincode::serialize; -// use hash::hash; -// use mint::Mint; -// use rayon::prelude::*; -// use signature::{Keypair, KeypairUtil}; -// use std::collections::HashSet; -// use std::time::Instant; -// use transaction::Transaction; -// -// fn bench_process_transactions(_bencher: &mut Bencher) { -// let mint = Mint::new(100_000_000); -// let bank = Bank::new(&mint); -// // Create transactions between unrelated parties. -// let txs = 100_000; -// let last_ids: Mutex> = Mutex::new(HashSet::new()); -// let transactions: Vec<_> = (0..txs) -// .into_par_iter() -// .map(|i| { -// // Seed the 'to' account and a cell for its signature. -// let dummy_id = i % (MAX_ENTRY_IDS as i32); -// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash -// { -// let mut last_ids = last_ids.lock().unwrap(); -// if !last_ids.contains(&last_id) { -// last_ids.insert(last_id); -// bank.register_entry_id(&last_id); -// } -// } -// -// // Seed the 'from' account. -// let rando0 = Keypair::new(); -// let tx = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); -// bank.process_transaction(&tx).unwrap(); -// -// let rando1 = Keypair::new(); -// let tx = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); -// bank.process_transaction(&tx).unwrap(); -// -// // Finally, return a transaction that's unique -// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) -// }) -// .collect(); -// -// let banking_stage = EventProcessor::new(bank, &mint.last_id(), None); -// -// let now = Instant::now(); -// assert!(banking_stage.process_transactions(transactions).is_ok()); -// let duration = now.elapsed(); -// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; -// let tps = txs as f64 / sec; -// -// // Ensure that all transactions were successfully logged. -// drop(banking_stage.historian_input); -// let entries: Vec = banking_stage.output.lock().unwrap().iter().collect(); -// assert_eq!(entries.len(), 1); -// assert_eq!(entries[0].transactions.len(), txs as usize); -// -// println!("{} tps", tps); -// } - fn check_txs(receiver: &Receiver>, ref_tx_count: usize) { let mut total = 0; loop { @@ -101,132 +40,63 @@ fn check_txs(receiver: &Receiver>, ref_tx_count: usize) { #[bench] fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { - let tx = 10_000_usize; + let txes = 1000 * NUM_THREADS; let mint_total = 1_000_000_000_000; let mint = Mint::new(mint_total); - let num_dst_accounts = 8 * 1024; - let num_src_accounts = 8 * 1024; - - let srckeys: Vec<_> = (0..num_src_accounts).map(|_| Keypair::new()).collect(); - let dstkeys: Vec<_> = (0..num_dst_accounts) - .map(|_| Keypair::new().pubkey()) - .collect(); - - let transactions: Vec<_> = (0..tx) - .map(|i| { - Transaction::new( - &srckeys[i % num_src_accounts], - dstkeys[i % num_dst_accounts], - i as i64, - mint.last_id(), - ) - }).collect(); let (verified_sender, verified_receiver) = channel(); - let (entry_sender, entry_receiver) = channel(); let packet_recycler = PacketRecycler::default(); - - let setup_transactions: Vec<_> = (0..num_src_accounts) - .map(|i| { - Transaction::new( - &mint.keypair(), - srckeys[i].pubkey(), - mint_total / num_src_accounts as i64, - mint.last_id(), - ) - }).collect(); - - bencher.iter(move || { - let bank = Arc::new(Bank::new(&mint)); - - let (hash_sender, hash_receiver) = channel(); - let (_poh_service, poh_receiver) = PohService::new(bank.last_id(), hash_receiver, None); - - let verified_setup: Vec<_> = - to_packets_chunked(&packet_recycler, &setup_transactions.clone(), tx) - .into_iter() - .map(|x| { - let len = (x).read().packets.len(); - (x, iter::repeat(1).take(len).collect()) - }).collect(); - - verified_sender.send(verified_setup).unwrap(); - BankingStage::process_packets( - &bank, - &hash_sender, - &poh_receiver, - &verified_receiver, - &entry_sender, - ).unwrap(); - - check_txs(&entry_receiver, num_src_accounts); - - let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192) - .into_iter() - .map(|x| { - let len = (x).read().packets.len(); - (x, iter::repeat(1).take(len).collect()) - }).collect(); - - verified_sender.send(verified).unwrap(); - BankingStage::process_packets( - &bank, - &hash_sender, - &poh_receiver, - &verified_receiver, - &entry_sender, - ).unwrap(); - - check_txs(&entry_receiver, tx); - }); -} - -#[bench] -fn bench_banking_stage_single_from(bencher: &mut Bencher) { - let tx = 10_000_usize; - let mint = Mint::new(1_000_000_000_000); - let mut pubkeys = Vec::new(); - let num_keys = 8; - for _ in 0..num_keys { - pubkeys.push(Keypair::new().pubkey()); - } - - let transactions: Vec<_> = (0..tx) + let bank = Arc::new(Bank::new(&mint)); + let dummy = Transaction::new(&mint.keypair(), mint.keypair().pubkey(), 1, mint.last_id()); + let transactions: Vec<_> = (0..txes) .into_par_iter() - .map(|i| { - Transaction::new( - &mint.keypair(), - pubkeys[i % num_keys], - i as i64, - mint.last_id(), - ) + .map(|_| { + let mut new = dummy.clone(); + let from: Vec = (0..64).map(|_| thread_rng().gen()).collect(); + let to: Vec = (0..64).map(|_| thread_rng().gen()).collect(); + let sig: Vec = (0..64).map(|_| thread_rng().gen()).collect(); + new.keys[0] = Pubkey::new(&from[0..32]); + new.keys[1] = Pubkey::new(&to[0..32]); + new.signature = Signature::new(&sig[0..64]); + new }).collect(); - - let (verified_sender, verified_receiver) = channel(); - let (entry_sender, entry_receiver) = channel(); - let packet_recycler = PacketRecycler::default(); - + // fund all the accounts + transactions.iter().for_each(|tx| { + let fund = Transaction::new( + &mint.keypair(), + tx.keys[0], + mint_total / txes as i64, + mint.last_id(), + ); + assert!(bank.process_transaction(&fund).is_ok()); + }); + //sanity check, make sure all the transactions can execute sequentially + transactions.iter().for_each(|tx| { + let res = bank.process_transaction(&tx); + assert!(res.is_ok(), "sanity test transactions"); + }); + bank.clear_signatures(); + //sanity check, make sure all the transactions can execute in parallel + let res = bank.process_transactions(&transactions); + for r in res { + assert!(r.is_ok(), "sanity parallel execution"); + } + bank.clear_signatures(); + let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), 192) + .into_iter() + .map(|x| { + let len = x.read().packets.len(); + (x, iter::repeat(1).take(len).collect()) + }).collect(); + let (_stage, signal_receiver) = + BankingStage::new(bank.clone(), verified_receiver, Default::default()); bencher.iter(move || { - let bank = Arc::new(Bank::new(&mint)); - - let (hash_sender, hash_receiver) = channel(); - let (_poh_service, poh_receiver) = PohService::new(bank.last_id(), hash_receiver, None); - - let verified: Vec<_> = to_packets_chunked(&packet_recycler, &transactions.clone(), tx) - .into_iter() - .map(|x| { - let len = (x).read().packets.len(); - (x, iter::repeat(1).take(len).collect()) - }).collect(); - verified_sender.send(verified).unwrap(); - BankingStage::process_packets( - &bank, - &hash_sender, - &poh_receiver, - &verified_receiver, - &entry_sender, - ).unwrap(); - - check_txs(&entry_receiver, tx); + for v in verified.chunks(verified.len() / NUM_THREADS) { + verified_sender.send(v.to_vec()).unwrap(); + } + check_txs(&signal_receiver, txes); + bank.clear_signatures(); + // make sure the tx last id is still registered + bank.register_entry_id(&mint.last_id()); }); } diff --git a/src/banking_stage.rs b/src/banking_stage.rs index 3eacd95407..4b1ee0f64c 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -10,26 +10,44 @@ use entry::Entry; use hash::Hasher; use log::Level; use packet::{Packets, SharedPackets}; -use poh_service::PohService; +use poh_recorder::PohRecorder; use rayon::prelude::*; use result::{Error, Result}; use service::Service; use std::net::SocketAddr; -use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; -use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; use timing; use transaction::Transaction; +// number of threads is 1 until mt bank is ready +pub const NUM_THREADS: usize = 1; + /// Stores the stage's thread handle and output receiver. pub struct BankingStage { /// Handle to the stage's thread. - thread_hdl: JoinHandle<()>, + thread_hdls: Vec>, } +pub enum Config { + /// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before transmitting a new entry. + Tick(usize), + /// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1 poh once and producing 1 + /// tick. + Sleep(Duration), +} + +impl Default for Config { + fn default() -> Config { + // TODO: Change this to Tick to enable PoH + Config::Sleep(Duration::from_millis(500)) + } +} impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. /// Discard input packets using `packet_recycler` to minimize memory @@ -37,48 +55,74 @@ impl BankingStage { pub fn new( bank: Arc, verified_receiver: Receiver)>>, - tick_duration: Option, + config: Config, ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); - let thread_hdl = Builder::new() - .name("solana-banking-stage".to_string()) + let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); + let poh = PohRecorder::new(bank.clone(), entry_sender); + let tick_poh = poh.clone(); + // Tick producer is a headless producer, so when it exits it should notify the banking stage. + // Since channel are not used to talk between these threads an AtomicBool is used as a + // signal. + let poh_exit = Arc::new(AtomicBool::new(false)); + let banking_exit = poh_exit.clone(); + // Single thread to generate entries from many banks. + // This thread talks to poh_service and broadcasts the entries once they have been recorded. + // Once an entry has been recorded, its last_id is registered with the bank. + let tick_producer = Builder::new() + .name("solana-banking-stage-tick_producer".to_string()) .spawn(move || { - let poh_service = PohService::new(bank.last_id(), tick_duration.is_some()); + if let Err(e) = Self::tick_producer(tick_poh, config, &poh_exit) { + match e { + Error::SendError => (), + _ => error!( + "solana-banking-stage-tick_producer unexpected error {:?}", + e + ), + } + } + debug!("tick producer exiting"); + poh_exit.store(true, Ordering::Relaxed); + }).unwrap(); - let mut last_tick = Instant::now(); - - loop { - let timeout = - tick_duration.map(|duration| duration - (Instant::now() - last_tick)); - - if let Err(e) = Self::process_packets( - timeout, - &bank, - &poh_service, - &verified_receiver, - &entry_sender, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::RecvError(_) => break, - Error::SendError => break, - _ => { - error!("process_packets() {:?}", e); + // Many banks that process transactions in parallel. + let mut thread_hdls: Vec> = (0..NUM_THREADS) + .into_iter() + .map(|_| { + let thread_bank = bank.clone(); + let thread_verified_receiver = shared_verified_receiver.clone(); + let thread_poh = poh.clone(); + let thread_banking_exit = banking_exit.clone(); + Builder::new() + .name("solana-banking-stage-tx".to_string()) + .spawn(move || { + loop { + if let Err(e) = Self::process_packets( + &thread_bank, + &thread_verified_receiver, + &thread_poh, + ) { + debug!("got error {:?}", e); + match e { + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { + break + } + Error::RecvError(_) => break, + Error::SendError => break, + _ => error!("solana-banking-stage-tx {:?}", e), + } + } + if thread_banking_exit.load(Ordering::Relaxed) { + debug!("tick service exited"); break; } } - } - if tick_duration.is_some() && last_tick.elapsed() > tick_duration.unwrap() { - if let Err(e) = Self::tick(&poh_service, &bank, &entry_sender) { - error!("tick() {:?}", e); - } - last_tick = Instant::now(); - } - } - poh_service.join().unwrap(); - }).unwrap(); - (BankingStage { thread_hdl }, entry_receiver) + thread_banking_exit.store(true, Ordering::Relaxed); + }).unwrap() + }).collect(); + thread_hdls.push(tick_producer); + (BankingStage { thread_hdls }, entry_receiver) } /// Convert the transactions from a blob of binary data to a vector of transactions and @@ -93,32 +137,32 @@ impl BankingStage { }).collect() } - fn tick( - poh_service: &PohService, - bank: &Arc, - entry_sender: &Sender>, - ) -> Result<()> { - let poh = poh_service.tick(); - bank.register_entry_id(&poh.id); - - let entry = Entry { - num_hashes: poh.num_hashes, - id: poh.id, - transactions: vec![], - }; - entry_sender.send(vec![entry])?; - Ok(()) + fn tick_producer(poh: PohRecorder, config: Config, poh_exit: &AtomicBool) -> Result<()> { + loop { + match config { + Config::Tick(num) => { + for _ in 0..num { + poh.hash(); + } + } + Config::Sleep(duration) => { + sleep(duration); + } + } + poh.tick()?; + if poh_exit.load(Ordering::Relaxed) { + debug!("tick service exited"); + return Ok(()); + } + } } fn process_transactions( bank: &Arc, - transactions: &[Transaction], - poh_service: &PohService, - ) -> Result> { - let mut entries = Vec::new(); - - debug!("processing: {}", transactions.len()); - + transactions: Vec, + poh: &PohRecorder, + ) -> Result<()> { + debug!("transactions: {}", transactions.len()); let mut chunk_start = 0; while chunk_start != transactions.len() { let chunk_end = chunk_start + Entry::num_will_fit(&transactions[chunk_start..]); @@ -140,46 +184,29 @@ impl BankingStage { } }).collect(); - debug!("processed: {}", processed_transactions.len()); - - chunk_start = chunk_end; - - let hash = hasher.result(); - if !processed_transactions.is_empty() { - let poh = poh_service.record(hash); - - bank.register_entry_id(&poh.id); - entries.push(Entry { - num_hashes: poh.num_hashes, - id: poh.id, - transactions: processed_transactions, - }); + let hash = hasher.result(); + debug!("processed ok: {} {}", processed_transactions.len(), hash); + poh.record(hash, processed_transactions)?; } + chunk_start = chunk_end; } - - debug!("done process_transactions, {} entries", entries.len()); - - Ok(entries) + debug!("done process_transactions"); + Ok(()) } /// Process the incoming packets and send output `Signal` messages to `signal_sender`. /// Discard packets via `packet_recycler`. pub fn process_packets( - timeout: Option, bank: &Arc, - poh_service: &PohService, - verified_receiver: &Receiver)>>, - entry_sender: &Sender>, + verified_receiver: &Arc)>>>>, + poh: &PohRecorder, ) -> Result<()> { let recv_start = Instant::now(); - // TODO pass deadline to recv_deadline() when/if it becomes available? - let mms = if let Some(timeout) = timeout { - verified_receiver.recv_timeout(timeout)? - } else { - verified_receiver.recv()? - }; - let now = Instant::now(); + let mms = verified_receiver + .lock() + .unwrap() + .recv_timeout(Duration::from_millis(100))?; let mut reqs_len = 0; let mms_len = mms.len(); info!( @@ -210,14 +237,12 @@ impl BankingStage { }, }).collect(); debug!("verified transactions {}", transactions.len()); - - let entries = Self::process_transactions(bank, &transactions, poh_service)?; - entry_sender.send(entries)?; + Self::process_transactions(bank, transactions, poh)?; } inc_new_counter_info!( "banking_stage-time_ms", - timing::duration_as_ms(&now.elapsed()) as usize + timing::duration_as_ms(&proc_start.elapsed()) as usize ); let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); @@ -242,7 +267,10 @@ impl Service for BankingStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - self.thread_hdl.join() + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) } } @@ -259,15 +287,25 @@ mod tests { use transaction::Transaction; #[test] - fn test_banking_stage_shutdown() { + fn test_banking_stage_shutdown1() { let bank = Bank::new(&Mint::new(2)); let (verified_sender, verified_receiver) = channel(); let (banking_stage, _entry_receiver) = - BankingStage::new(Arc::new(bank), verified_receiver, None); + BankingStage::new(Arc::new(bank), verified_receiver, Default::default()); drop(verified_sender); assert_eq!(banking_stage.join().unwrap(), ()); } + #[test] + fn test_banking_stage_shutdown2() { + let bank = Bank::new(&Mint::new(2)); + let (_verified_sender, verified_receiver) = channel(); + let (banking_stage, entry_receiver) = + BankingStage::new(Arc::new(bank), verified_receiver, Default::default()); + drop(entry_receiver); + assert_eq!(banking_stage.join().unwrap(), ()); + } + #[test] fn test_banking_stage_tick() { let bank = Arc::new(Bank::new(&Mint::new(2))); @@ -276,9 +314,9 @@ mod tests { let (banking_stage, entry_receiver) = BankingStage::new( bank.clone(), verified_receiver, - Some(Duration::from_millis(1)), + Config::Sleep(Duration::from_millis(1)), ); - sleep(Duration::from_millis(50)); + sleep(Duration::from_millis(500)); drop(verified_sender); let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); @@ -288,19 +326,6 @@ mod tests { assert_eq!(banking_stage.join().unwrap(), ()); } - #[test] - fn test_banking_stage_no_tick() { - let bank = Arc::new(Bank::new(&Mint::new(2))); - let (verified_sender, verified_receiver) = channel(); - let (banking_stage, entry_receiver) = BankingStage::new(bank, verified_receiver, None); - sleep(Duration::from_millis(1000)); - drop(verified_sender); - - let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); - assert!(entries.len() == 0); - assert_eq!(banking_stage.join().unwrap(), ()); - } - #[test] fn test_banking_stage_entries_only() { let mint = Mint::new(2); @@ -308,7 +333,7 @@ mod tests { let start_hash = bank.last_id(); let (verified_sender, verified_receiver) = channel(); let (banking_stage, entry_receiver) = - BankingStage::new(bank.clone(), verified_receiver, None); + BankingStage::new(bank, verified_receiver, Default::default()); // good tx let keypair = mint.keypair(); @@ -333,59 +358,19 @@ mod tests { drop(verified_sender); - let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); - assert_eq!(entries.len(), 1); - assert!(entries.verify(&start_hash)); - assert_eq!(entries[entries.len() - 1].id, bank.last_id()); + //receive entries + ticks + let entries: Vec<_> = entry_receiver.iter().map(|x| x).collect(); + assert!(entries.len() >= 1); + + let mut last_id = start_hash; + entries.iter().for_each(|entries| { + assert_eq!(entries.len(), 1); + assert!(entries.verify(&last_id)); + last_id = entries.last().unwrap().id; + }); + drop(entry_receiver); assert_eq!(banking_stage.join().unwrap(), ()); } - - #[test] - fn test_banking_stage() { - let mint = Mint::new(2); - let bank = Arc::new(Bank::new(&mint)); - let start_hash = bank.last_id(); - let (verified_sender, verified_receiver) = channel(); - let (banking_stage, entry_receiver) = BankingStage::new( - bank.clone(), - verified_receiver, - Some(Duration::from_millis(1)), - ); - - // wait for some ticks - sleep(Duration::from_millis(50)); - - // good tx - let keypair = mint.keypair(); - let tx = Transaction::system_new(&keypair, keypair.pubkey(), 1, start_hash); - - // good tx, but no verify - let tx_no_ver = Transaction::system_new(&keypair, keypair.pubkey(), 1, start_hash); - - // bad tx, AccountNotFound - let keypair = Keypair::new(); - let tx_anf = Transaction::system_new(&keypair, keypair.pubkey(), 1, start_hash); - - // send 'em over - let recycler = PacketRecycler::default(); - let packets = to_packets(&recycler, &[tx, tx_no_ver, tx_anf]); - - // glad they all fit - assert_eq!(packets.len(), 1); - verified_sender // tx, no_ver, anf - .send(vec![(packets[0].clone(), vec![1u8, 0u8, 1u8])]) - .unwrap(); - - drop(verified_sender); - - let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); - assert!(entries.len() > 1); - assert!(entries.verify(&start_hash)); - assert_eq!(entries[entries.len() - 1].id, bank.last_id()); - - assert_eq!(banking_stage.join().unwrap(), ()); - } - #[test] fn test_banking_stage_entryfication() { // In this attack we'll demonstrate that a verifier can interpret the ledger @@ -396,7 +381,7 @@ mod tests { let recycler = PacketRecycler::default(); let (verified_sender, verified_receiver) = channel(); let (banking_stage, entry_receiver) = - BankingStage::new(bank.clone(), verified_receiver, None); + BankingStage::new(bank.clone(), verified_receiver, Default::default()); // Process a batch that includes a transaction that receives two tokens. let alice = Keypair::new(); @@ -419,7 +404,7 @@ mod tests { // Collect the ledger and feed it to a new bank. let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); // same assertion as running through the bank, really... - assert_eq!(entries.len(), 2); + assert!(entries.len() >= 2); // Assert the user holds one token, not two. If the stage only outputs one // entry, then the second transaction will be rejected, because it drives @@ -434,5 +419,4 @@ mod tests { } assert_eq!(bank.get_balance(&alice.pubkey()), 1); } - } diff --git a/src/fullnode.rs b/src/fullnode.rs index 3c204e371b..dd883b5cfe 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -316,14 +316,11 @@ impl Fullnode { } None => { // Start in leader mode. - let tick_duration = None; - // TODO: To light up PoH, uncomment the following line: - //let tick_duration = Some(Duration::from_millis(1000)); let (tpu, entry_receiver, tpu_exit) = Tpu::new( keypair.clone(), &bank, &crdt, - tick_duration, + Default::default(), node.sockets .transaction .iter() @@ -430,14 +427,11 @@ impl Fullnode { fn validator_to_leader(&mut self, entry_height: u64) { self.crdt.write().unwrap().set_leader(self.keypair.pubkey()); - let tick_duration = None; - // TODO: To light up PoH, uncomment the following line: - //let tick_duration = Some(Duration::from_millis(1000)); let (tpu, blob_receiver, tpu_exit) = Tpu::new( self.keypair.clone(), &self.bank, &self.crdt, - tick_duration, + Default::default(), self.transaction_sockets .iter() .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) diff --git a/src/lib.rs b/src/lib.rs index 1ffb243aa6..b2f4265c81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,7 +39,7 @@ pub mod netutil; pub mod packet; pub mod payment_plan; pub mod poh; -pub mod poh_service; +pub mod poh_recorder; pub mod recvmmsg; pub mod recycler; pub mod replicate_stage; diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs new file mode 100644 index 0000000000..9f84f582cd --- /dev/null +++ b/src/poh_recorder.rs @@ -0,0 +1,95 @@ +//! The `poh_recorder` module provides an object for synchronizing with Proof of History. +//! It synchronizes PoH, bank's register_entry_id and the ledger +//! +use bank::Bank; +use entry::Entry; +use hash::Hash; +use poh::Poh; +use result::Result; +use std::sync::mpsc::Sender; +use std::sync::{Arc, Mutex}; +use transaction::Transaction; + +#[derive(Clone)] +pub struct PohRecorder { + poh: Arc>, + bank: Arc, + sender: Sender>, +} + +impl PohRecorder { + /// A recorder to synchronize PoH with the following data structures + /// * bank - the LastId's queue is updated on `tick` and `record` events + /// * sender - the Entry channel that outputs to the ledger + pub fn new(bank: Arc, sender: Sender>) -> Self { + let poh = Arc::new(Mutex::new(Poh::new(bank.last_id()))); + PohRecorder { poh, bank, sender } + } + + pub fn hash(&self) { + // TODO: amortize the cost of this lock by doing the loop in here for + // some min amount of hashes + let mut poh = self.poh.lock().unwrap(); + poh.hash() + } + + pub fn tick(&self) -> Result<()> { + // Register and send the entry out while holding the lock. + // This guarantees PoH order and Entry production and banks LastId queue is the same + let mut poh = self.poh.lock().unwrap(); + let tick = poh.tick(); + self.bank.register_entry_id(&tick.id); + let entry = Entry { + num_hashes: tick.num_hashes, + id: tick.id, + transactions: vec![], + }; + self.sender.send(vec![entry])?; + Ok(()) + } + + pub fn record(&self, mixin: Hash, txs: Vec) -> Result<()> { + // Register and send the entry out while holding the lock. + // This guarantees PoH order and Entry production and banks LastId queue is the same. + let mut poh = self.poh.lock().unwrap(); + let tick = poh.record(mixin); + self.bank.register_entry_id(&tick.id); + let entry = Entry { + num_hashes: tick.num_hashes, + id: tick.id, + transactions: txs, + }; + self.sender.send(vec![entry])?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use hash::hash; + use mint::Mint; + use std::sync::mpsc::channel; + use std::sync::Arc; + + #[test] + fn test_poh() { + let mint = Mint::new(1); + let bank = Arc::new(Bank::new(&mint)); + let (entry_sender, entry_receiver) = channel(); + let poh_recorder = PohRecorder::new(bank, entry_sender); + + //send some data + let h1 = hash(b"hello world!"); + assert!(poh_recorder.record(h1, vec![]).is_ok()); + assert!(poh_recorder.tick().is_ok()); + + //get some events + let _ = entry_receiver.recv().unwrap(); + let _ = entry_receiver.recv().unwrap(); + + //make sure it handles channel close correctly + drop(entry_receiver); + assert!(poh_recorder.tick().is_err()); + } +} diff --git a/src/poh_service.rs b/src/poh_service.rs deleted file mode 100644 index f9e8f6217d..0000000000 --- a/src/poh_service.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! The `poh_service` module provides an object for generating a Proof of History. -//! It records Hashes items on behalf of its users. It continuously generates -//! new Hashes, only stopping to check if it has been sent a Hash to mix in -//! to the Poh. -//! -//! The returned Entry includes the mix-in request, the latest Poh Hash, and the -//! number of Hashes generated in the service since the last mix-in request. -//! -//! The resulting stream of Hashes represents ordered events in time. -//! -use hash::Hash; -use poh::{Poh, PohEntry}; -use service::Service; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread::{self, Builder, JoinHandle}; - -pub struct PohService { - poh: Arc>, - thread_hdl: JoinHandle<()>, - run_poh: Arc, -} - -impl PohService { - /// A background thread that will continue tagging received Transaction messages and - /// sending back Entry messages until either the receiver or sender channel is closed. - /// if tick_duration is some, service will automatically produce entries every - /// `tick_duration`. - pub fn new(start_hash: Hash, run_poh: bool) -> Self { - let poh = Arc::new(Mutex::new(Poh::new(start_hash))); - let run_poh = Arc::new(AtomicBool::new(run_poh)); - - let thread_poh = poh.clone(); - let thread_run_poh = run_poh.clone(); - let thread_hdl = Builder::new() - .name("solana-record-service".to_string()) - .spawn(move || { - while thread_run_poh.load(Ordering::Relaxed) { - thread_poh.lock().unwrap().hash(); - } - }).unwrap(); - - PohService { - poh, - run_poh, - thread_hdl, - } - } - - pub fn tick(&self) -> PohEntry { - let mut poh = self.poh.lock().unwrap(); - poh.tick() - } - - pub fn record(&self, mixin: Hash) -> PohEntry { - let mut poh = self.poh.lock().unwrap(); - poh.record(mixin) - } - - // fn process_hash( - // mixin: Option, - // poh: &mut Poh, - // sender: &Sender, - // ) -> Result<(), ()> { - // let resp = match mixin { - // Some(mixin) => poh.record(mixin), - // None => poh.tick(), - // }; - // sender.send(resp).or(Err(()))?; - // Ok(()) - // } - // - // fn process_hashes( - // poh: &mut Poh, - // receiver: &Receiver>, - // sender: &Sender, - // ) -> Result<(), ()> { - // loop { - // match receiver.recv() { - // Ok(hash) => Self::process_hash(hash, poh, sender)?, - // Err(RecvError) => return Err(()), - // } - // } - // } - // - // fn try_process_hashes( - // poh: &mut Poh, - // receiver: &Receiver>, - // sender: &Sender, - // ) -> Result<(), ()> { - // loop { - // match receiver.try_recv() { - // Ok(hash) => Self::process_hash(hash, poh, sender)?, - // Err(TryRecvError::Empty) => return Ok(()), - // Err(TryRecvError::Disconnected) => return Err(()), - // }; - // } - // } -} - -impl Service for PohService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - self.run_poh.store(false, Ordering::Relaxed); - self.thread_hdl.join() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use poh::verify; - use std::thread::sleep; - use std::time::Duration; - - #[test] - fn test_poh() { - let poh_service = PohService::new(Hash::default(), false); - - let entry0 = poh_service.record(Hash::default()); - sleep(Duration::from_millis(1)); - let entry1 = poh_service.record(Hash::default()); - sleep(Duration::from_millis(1)); - let entry2 = poh_service.record(Hash::default()); - - assert_eq!(entry0.num_hashes, 1); - assert_eq!(entry1.num_hashes, 1); - assert_eq!(entry2.num_hashes, 1); - - assert_eq!(poh_service.join().unwrap(), ()); - - assert!(verify(Hash::default(), &[entry0, entry1, entry2])); - } - - #[test] - fn test_do_poh() { - let poh_service = PohService::new(Hash::default(), true); - - sleep(Duration::from_millis(50)); - let entry = poh_service.tick(); - assert!(entry.num_hashes > 1); - - assert_eq!(poh_service.join().unwrap(), ()); - - assert!(verify(Hash::default(), &vec![entry])); - } -} diff --git a/src/tpu.rs b/src/tpu.rs index 312bd68304..c9a38d4e41 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -26,7 +26,7 @@ //! ``` use bank::Bank; -use banking_stage::BankingStage; +use banking_stage::{BankingStage, Config}; use crdt::Crdt; use entry::Entry; use fetch_stage::FetchStage; @@ -38,7 +38,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; use std::thread; -use std::time::Duration; use write_stage::{WriteStage, WriteStageReturnType}; pub enum TpuReturnType { @@ -58,7 +57,7 @@ impl Tpu { keypair: Arc, bank: &Arc, crdt: &Arc>, - tick_duration: Option, + tick_duration: Config, transactions_sockets: Vec, ledger_path: &str, sigverify_disabled: bool,