From 1654199b23ec565298aec77a726f64795f2c6e84 Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Sun, 3 Mar 2019 16:44:06 -0800 Subject: [PATCH] Use PohRecorder to synchronize instead of rotate. (#3080) --- benches/banking_stage.rs | 45 +-- core/src/bank_forks.rs | 14 +- core/src/banking_stage.rs | 380 +++++++++------------- core/src/broadcast_stage.rs | 107 +++--- core/src/fullnode.rs | 416 +++--------------------- core/src/leader_confirmation_service.rs | 39 +-- core/src/local_cluster.rs | 21 +- core/src/poh_recorder.rs | 109 ++++--- core/src/poh_service.rs | 48 +-- core/src/replay_stage.rs | 97 ++++-- core/src/thin_client.rs | 12 +- core/src/tpu.rs | 196 ++--------- core/src/tvu.rs | 27 +- fullnode/src/main.rs | 13 +- tests/replicator.rs | 6 +- tests/rpc.rs | 3 +- tests/tvu.rs | 6 +- wallet/tests/deploy.rs | 3 +- wallet/tests/pay.rs | 9 +- wallet/tests/request_airdrop.rs | 3 +- 20 files changed, 486 insertions(+), 1068 deletions(-) diff --git a/benches/banking_stage.rs b/benches/banking_stage.rs index d021d25f7..e42f2d910 100644 --- a/benches/banking_stage.rs +++ b/benches/banking_stage.rs @@ -4,11 +4,11 @@ extern crate test; use rand::{thread_rng, Rng}; use rayon::prelude::*; -use solana::banking_stage::BankingStage; -use solana::entry::Entry; +use solana::banking_stage::{create_test_recorder, BankingStage}; +use solana::cluster_info::ClusterInfo; +use solana::cluster_info::Node; use solana::packet::to_packets_chunked; -use solana::poh_recorder::PohRecorder; -use solana::poh_service::{PohService, PohServiceConfig}; +use solana::poh_recorder::WorkingBankEntries; use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; @@ -17,17 +17,16 @@ use solana_sdk::signature::{KeypairUtil, Signature}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::timing::{DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES}; use std::iter; -use std::sync::atomic::AtomicBool; use std::sync::mpsc::{channel, Receiver}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use std::time::Duration; use test::Bencher; -fn check_txs(receiver: &Receiver>, ref_tx_count: usize) { +fn check_txs(receiver: &Receiver, ref_tx_count: usize) { let mut total = 0; loop { let entries = receiver.recv_timeout(Duration::new(1, 0)); - if let Ok(entries) = entries { + if let Ok((_, entries)) = entries { for (entry, _) in &entries { total += entry.transactions.len(); } @@ -41,20 +40,6 @@ fn check_txs(receiver: &Receiver>, ref_tx_count: usize) { assert_eq!(total, ref_tx_count); } -fn create_test_recorder(bank: &Arc) -> (Arc>, PohService) { - let exit = Arc::new(AtomicBool::new(false)); - let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - ))); - let poh_service = PohService::new( - poh_recorder.clone(), - &PohServiceConfig::default(), - exit.clone(), - ); - (poh_recorder, poh_service) -} - #[bench] #[ignore] fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { @@ -117,9 +102,11 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); - let (poh_recorder, poh_service) = create_test_recorder(&bank); - let (_stage, signal_receiver) = - BankingStage::new(&bank, &poh_recorder, verified_receiver, std::u64::MAX); + let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); + let cluster_info = ClusterInfo::new(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + poh_recorder.lock().unwrap().set_bank(&bank); let mut id = genesis_block.hash(); for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { @@ -221,9 +208,11 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { (x, iter::repeat(1).take(len).collect()) }) .collect(); - let (poh_recorder, poh_service) = create_test_recorder(&bank); - let (_stage, signal_receiver) = - BankingStage::new(&bank, &poh_recorder, verified_receiver, std::u64::MAX); + let (poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank); + let cluster_info = ClusterInfo::new(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + poh_recorder.lock().unwrap().set_bank(&bank); let mut id = genesis_block.hash(); for _ in 0..(MAX_RECENT_BLOCKHASHES * DEFAULT_TICKS_PER_SLOT as usize) { diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index 9da51345f..f4f54b790 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -39,7 +39,11 @@ impl BankForks { frozen_banks.into_iter().map(|b| (b.slot(), b)).collect() } pub fn active_banks(&self) -> Vec { - self.banks.iter().map(|(k, _v)| *k).collect() + self.banks + .iter() + .filter(|(_, v)| !v.is_frozen()) + .map(|(k, _v)| *k) + .collect() } pub fn get(&self, bank_id: u64) -> Option<&Arc> { self.banks.get(&bank_id) @@ -60,7 +64,9 @@ impl BankForks { // TODO: use the bank's own ID instead of receiving a parameter? pub fn insert(&mut self, bank_id: u64, bank: Bank) { let mut bank = Arc::new(bank); - self.banks.insert(bank_id, bank.clone()); + assert_eq!(bank_id, bank.slot()); + let prev = self.banks.insert(bank_id, bank.clone()); + assert!(prev.is_none()); if bank_id > self.working_bank.slot() { self.working_bank = bank.clone() @@ -70,7 +76,9 @@ impl BankForks { // parent if we're always calling insert() // when we construct a child bank while let Some(parent) = bank.parent() { - self.banks.remove(&parent.slot()); + if let Some(prev) = self.banks.remove(&parent.slot()) { + assert!(Arc::ptr_eq(&prev, &parent)); + } bank = parent; } } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 80caa9a04..e06d32004 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2,11 +2,13 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. +use crate::cluster_info::ClusterInfo; use crate::entry::Entry; use crate::leader_confirmation_service::LeaderConfirmationService; use crate::packet::Packets; use crate::packet::SharedPackets; -use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBank}; +use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; +use crate::poh_service::{PohService, PohServiceConfig}; use crate::result::{Error, Result}; use crate::service::Service; use crate::sigverify_stage::VerifiedPackets; @@ -15,9 +17,10 @@ use solana_metrics::counter::Counter; use solana_runtime::bank::{self, Bank, BankError}; use solana_sdk::timing::{self, duration_as_us, MAX_RECENT_BLOCKHASHES}; use solana_sdk::transaction::Transaction; +use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; -use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{Receiver, RecvTimeoutError}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; @@ -30,37 +33,18 @@ pub const NUM_THREADS: u32 = 10; /// Stores the stage's thread handle and output receiver. pub struct BankingStage { - bank_thread_hdls: Vec>, - exit: Arc, - leader_confirmation_service: LeaderConfirmationService, + bank_thread_hdls: Vec>, } impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] pub fn new( - bank: &Arc, + cluster_info: &Arc>, poh_recorder: &Arc>, verified_receiver: Receiver, - max_tick_height: u64, - ) -> (Self, Receiver>) { - let (entry_sender, entry_receiver) = channel(); - let working_bank = WorkingBank { - bank: bank.clone(), - sender: entry_sender, - min_tick_height: bank.tick_height(), - max_tick_height, - }; - - info!( - "new working bank {} {} {}", - working_bank.min_tick_height, - working_bank.max_tick_height, - poh_recorder.lock().unwrap().poh.tick_height - ); - poh_recorder.lock().unwrap().set_working_bank(working_bank); - - let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); + ) -> Self { + let verified_receiver = Arc::new(Mutex::new(verified_receiver)); // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. @@ -68,47 +52,60 @@ impl BankingStage { let exit = Arc::new(AtomicBool::new(false)); // Single thread to compute confirmation - let leader_confirmation_service = LeaderConfirmationService::new(&bank, exit.clone()); - + let lcs_handle = LeaderConfirmationService::start(&poh_recorder, exit.clone()); // Many banks that process transactions in parallel. - let bank_thread_hdls: Vec> = (0..Self::num_threads()) + let mut bank_thread_hdls: Vec> = (0..Self::num_threads()) .map(|_| { - let thread_verified_receiver = shared_verified_receiver.clone(); - let thread_poh_recorder = poh_recorder.clone(); - let thread_bank = bank.clone(); + let verified_receiver = verified_receiver.clone(); + let poh_recorder = poh_recorder.clone(); + let cluster_info = cluster_info.clone(); + let exit = exit.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { - let mut unprocessed_packets: UnprocessedPackets = vec![]; - loop { - match Self::process_packets( - &thread_bank, - &thread_verified_receiver, - &thread_poh_recorder, - ) { - Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), - Ok(more_unprocessed_packets) => { - unprocessed_packets.extend(more_unprocessed_packets); - } - Err(err) => { - debug!("solana-banking-stage-tx: exit due to {:?}", err); - break; - } - } - } - unprocessed_packets + Self::process_loop(&verified_receiver, &poh_recorder, &cluster_info); + exit.store(true, Ordering::Relaxed); }) .unwrap() }) .collect(); - ( - Self { - bank_thread_hdls, - exit, - leader_confirmation_service, - }, - entry_receiver, - ) + bank_thread_hdls.push(lcs_handle); + Self { bank_thread_hdls } + } + + fn forward_unprocessed_packets( + tpu: &std::net::SocketAddr, + unprocessed_packets: UnprocessedPackets, + ) -> std::io::Result<()> { + let socket = UdpSocket::bind("0.0.0.0:0")?; + for (packets, start_index) in unprocessed_packets { + let packets = packets.read().unwrap(); + for packet in packets.packets.iter().skip(start_index) { + socket.send_to(&packet.data[..packet.meta.size], tpu)?; + } + } + Ok(()) + } + + pub fn process_loop( + verified_receiver: &Arc>>, + poh_recorder: &Arc>, + cluster_info: &Arc>, + ) { + loop { + match Self::process_packets(&verified_receiver, &poh_recorder) { + Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), + Ok(unprocessed_packets) => { + if let Some(leader) = cluster_info.read().unwrap().leader_data() { + let _ = Self::forward_unprocessed_packets(&leader.tpu, unprocessed_packets); + } + } + Err(err) => { + debug!("solana-banking-stage-tx: exit due to {:?}", err); + break; + } + } + } } pub fn num_threads() -> u32 { @@ -190,7 +187,8 @@ impl BankingStage { bank.unlock_accounts(&txs, &results); let unlock_time = now.elapsed(); debug!( - "lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}", + "bank: {} lock: {}us load_execute: {}us record: {}us commit: {}us unlock: {}us txs_len: {}", + bank.slot(), duration_as_us(&lock_time), duration_as_us(&load_execute_time), duration_as_us(&record_time), @@ -221,7 +219,11 @@ impl BankingStage { ); trace!("process_transcations: {:?}", result); if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { - info!("process transactions: max height reached"); + info!( + "process transactions: max height reached slot: {} height: {}", + bank.slot(), + bank.tick_height() + ); break; } result?; @@ -232,7 +234,6 @@ impl BankingStage { /// Process the incoming packets pub fn process_packets( - bank: &Bank, verified_receiver: &Arc>>, poh: &Arc>, ) -> Result { @@ -241,6 +242,7 @@ impl BankingStage { .lock() .unwrap() .recv_timeout(Duration::from_millis(100))?; + let mut reqs_len = 0; let mms_len = mms.len(); info!( @@ -262,10 +264,22 @@ impl BankingStage { continue; } + let bank = poh.lock().unwrap().bank(); + if bank.is_none() { + unprocessed_packets.push((msgs, 0)); + continue; + } + let bank = bank.unwrap(); + debug!("banking-stage-tx bank {}", bank.slot()); + let transactions = Self::deserialize_transactions(&msgs.read().unwrap()); reqs_len += transactions.len(); - debug!("transactions received {}", transactions.len()); + debug!( + "bank: {} transactions received {}", + bank.slot(), + transactions.len() + ); let (verified_transactions, verified_transaction_index): (Vec<_>, Vec<_>) = transactions .into_iter() @@ -283,9 +297,13 @@ impl BankingStage { }) .unzip(); - debug!("verified transactions {}", verified_transactions.len()); + debug!( + "bank: {} verified transactions {}", + bank.slot(), + verified_transactions.len() + ); - let processed = Self::process_transactions(bank, &verified_transactions, poh)?; + let processed = Self::process_transactions(&bank, &verified_transactions, poh)?; if processed < verified_transactions.len() { bank_shutdown = true; // Collect any unprocessed transactions in this batch for forwarding @@ -313,19 +331,6 @@ impl BankingStage { Ok(unprocessed_packets) } - - pub fn join_and_collect_unprocessed_packets(&mut self) -> UnprocessedPackets { - let mut unprocessed_packets: UnprocessedPackets = vec![]; - for bank_thread_hdl in self.bank_thread_hdls.drain(..) { - match bank_thread_hdl.join() { - Ok(more_unprocessed_packets) => { - unprocessed_packets.extend(more_unprocessed_packets) - } - err => warn!("bank_thread_hdl join failed: {:?}", err), - } - } - unprocessed_packets - } } impl Service for BankingStage { @@ -335,51 +340,52 @@ impl Service for BankingStage { for bank_thread_hdl in self.bank_thread_hdls { bank_thread_hdl.join()?; } - self.exit.store(true, Ordering::Relaxed); - self.leader_confirmation_service.join()?; Ok(()) } } +pub fn create_test_recorder( + bank: &Arc, +) -> ( + Arc>, + PohService, + Receiver, +) { + let exit = Arc::new(AtomicBool::new(false)); + let (poh_recorder, entry_receiver) = + PohRecorder::new(bank.tick_height(), bank.last_blockhash()); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let poh_service = PohService::new( + poh_recorder.clone(), + &PohServiceConfig::default(), + exit.clone(), + ); + (poh_recorder, poh_service, entry_receiver) +} + #[cfg(test)] mod tests { use super::*; + use crate::cluster_info::Node; use crate::entry::EntrySlice; use crate::packet::to_packets; - use crate::poh_service::{PohService, PohServiceConfig}; + use crate::poh_recorder::WorkingBank; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::native_program::ProgramError; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; - use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; + use std::sync::mpsc::channel; use std::thread::sleep; - fn create_test_recorder(bank: &Arc) -> (Arc>, PohService) { - let exit = Arc::new(AtomicBool::new(false)); - let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - ))); - let poh_service = PohService::new( - poh_recorder.clone(), - &PohServiceConfig::default(), - exit.clone(), - ); - (poh_recorder, poh_service) - } - #[test] fn test_banking_stage_shutdown1() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service) = create_test_recorder(&bank); - let (banking_stage, _entry_receiver) = BankingStage::new( - &bank, - &poh_recorder, - verified_receiver, - DEFAULT_TICKS_PER_SLOT, - ); + let (poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank); + let cluster_info = ClusterInfo::new(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); drop(verified_sender); banking_stage.join().unwrap(); poh_service.close().unwrap(); @@ -387,30 +393,33 @@ mod tests { #[test] fn test_banking_stage_tick() { + solana_logger::setup(); let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2); genesis_block.ticks_per_slot = 4; let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service) = create_test_recorder(&bank); - let (banking_stage, entry_receiver) = BankingStage::new( - &bank, - &poh_recorder, - verified_receiver, - genesis_block.ticks_per_slot - 1, - ); + let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); + let cluster_info = ClusterInfo::new(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + poh_recorder.lock().unwrap().set_bank(&bank); + let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); + trace!("sending bank"); sleep(Duration::from_millis(600)); drop(verified_sender); + poh_service.close().unwrap(); + drop(poh_recorder); + trace!("getting entries"); let entries: Vec<_> = entry_receiver .iter() - .flat_map(|x| x.into_iter().map(|e| e.0)) + .flat_map(|x| x.1.into_iter().map(|e| e.0)) .collect(); + trace!("done"); assert_eq!(entries.len(), genesis_block.ticks_per_slot as usize - 1); assert!(entries.verify(&start_hash)); assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); banking_stage.join().unwrap(); - poh_service.close().unwrap(); } #[test] @@ -419,13 +428,11 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_block)); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service) = create_test_recorder(&bank); - let (banking_stage, entry_receiver) = BankingStage::new( - &bank, - &poh_recorder, - verified_receiver, - DEFAULT_TICKS_PER_SLOT, - ); + let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); + let cluster_info = ClusterInfo::new(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + poh_recorder.lock().unwrap().set_bank(&bank); + let banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); // good tx let keypair = mint_keypair; @@ -449,11 +456,13 @@ mod tests { .unwrap(); drop(verified_sender); + poh_service.close().expect("close"); + drop(poh_recorder); //receive entries + ticks let entries: Vec> = entry_receiver .iter() - .map(|x| x.into_iter().map(|e| e.0).collect()) + .map(|x| x.1.into_iter().map(|e| e.0).collect()) .collect(); assert!(entries.len() >= 1); @@ -466,10 +475,9 @@ mod tests { }); drop(entry_receiver); banking_stage.join().unwrap(); - poh_service.close().unwrap(); } + #[test] - #[ignore] //flaky fn test_banking_stage_entryfication() { // In this attack we'll demonstrate that a verifier can interpret the ledger // differently if either the server doesn't signal the ledger to add an @@ -477,13 +485,11 @@ mod tests { let (genesis_block, mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service) = create_test_recorder(&bank); - let (banking_stage, entry_receiver) = BankingStage::new( - &bank, - &poh_recorder, - verified_receiver, - DEFAULT_TICKS_PER_SLOT, - ); + let (poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank); + let cluster_info = ClusterInfo::new(Node::new_localhost().info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + poh_recorder.lock().unwrap().set_bank(&bank); + let _banking_stage = BankingStage::new(&cluster_info, &poh_recorder, verified_receiver); // Process a batch that includes a transaction that receives two tokens. let alice = Keypair::new(); @@ -512,13 +518,15 @@ mod tests { verified_sender .send(vec![(packets[0].clone(), vec![1u8])]) .unwrap(); + drop(verified_sender); - banking_stage.join().unwrap(); + poh_service.close().expect("close");; + drop(poh_recorder); // Collect the ledger and feed it to a new bank. let entries: Vec<_> = entry_receiver .iter() - .flat_map(|x| x.into_iter().map(|e| e.0)) + .flat_map(|x| x.1.into_iter().map(|e| e.0)) .collect(); // same assertion as running through the bank, really... assert!(entries.len() >= 2); @@ -533,94 +541,22 @@ mod tests { .for_each(|x| assert_eq!(*x, Ok(()))); } assert_eq!(bank.get_balance(&alice.pubkey()), 1); - poh_service.close().unwrap(); - } - - // Test that when the max_tick_height is reached, the banking stage exits - #[test] - fn test_max_tick_height_shutdown() { - solana_logger::setup(); - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let (verified_sender, verified_receiver) = channel(); - let max_tick_height = 10; - let (poh_recorder, poh_service) = create_test_recorder(&bank); - let (banking_stage, _entry_receiver) = - BankingStage::new(&bank, &poh_recorder, verified_receiver, max_tick_height); - - loop { - let bank_tick_height = bank.tick_height(); - if bank_tick_height >= max_tick_height { - break; - } - sleep(Duration::from_millis(10)); - } - - drop(verified_sender); - banking_stage.join().unwrap(); - poh_service.close().unwrap(); - } - - #[test] - fn test_returns_unprocessed_packet() { - solana_logger::setup(); - let (genesis_block, mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let ticks_per_slot = 1; - let (verified_sender, verified_receiver) = channel(); - let (poh_recorder, poh_service) = create_test_recorder(&bank); - let (mut banking_stage, _entry_receiver) = - BankingStage::new(&bank, &poh_recorder, verified_receiver, ticks_per_slot); - - // Wait for Poh recorder to hit max height - loop { - let bank_tick_height = bank.tick_height(); - if bank_tick_height >= ticks_per_slot { - break; - } - sleep(Duration::from_millis(10)); - } - - // Now send a transaction to the banking stage - let transaction = SystemTransaction::new_account( - &mint_keypair, - Keypair::new().pubkey(), - 2, - genesis_block.hash(), - 0, - ); - - let packets = to_packets(&[transaction]); - verified_sender - .send(vec![(packets[0].clone(), vec![1u8])]) - .unwrap(); - - // Shut down the banking stage, it should give back the transaction - drop(verified_sender); - let unprocessed_packets = banking_stage.join_and_collect_unprocessed_packets(); - assert_eq!(unprocessed_packets.len(), 1); - let (packets, start_index) = &unprocessed_packets[0]; - assert_eq!(packets.read().unwrap().packets.len(), 1); // TODO: maybe compare actual packet contents too - assert_eq!(*start_index, 0); - poh_service.close().unwrap(); } #[test] fn test_bank_record_transactions() { let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); let bank = Arc::new(Bank::new(&genesis_block)); - let (entry_sender, entry_receiver) = channel(); let working_bank = WorkingBank { bank: bank.clone(), - sender: entry_sender, min_tick_height: bank.tick_height(), max_tick_height: std::u64::MAX, }; - let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - ))); + let (poh_recorder, entry_receiver) = + PohRecorder::new(bank.tick_height(), bank.last_blockhash()); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + poh_recorder.lock().unwrap().set_working_bank(working_bank); let pubkey = Keypair::new().pubkey(); @@ -631,7 +567,7 @@ mod tests { let mut results = vec![Ok(()), Ok(())]; BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); - let entries = entry_receiver.recv().unwrap(); + let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len()); // ProgramErrors should still be recorded @@ -640,13 +576,13 @@ mod tests { ProgramError::ResultWithNegativeTokens, )); BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); - let entries = entry_receiver.recv().unwrap(); + let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len()); // Other BankErrors should not be recorded results[0] = Err(BankError::AccountNotFound); BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); - let entries = entry_receiver.recv().unwrap(); + let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1); } @@ -665,38 +601,38 @@ mod tests { 0, )]; - let (entry_sender, entry_receiver) = channel(); let working_bank = WorkingBank { bank: bank.clone(), - sender: entry_sender, min_tick_height: bank.tick_height(), max_tick_height: bank.tick_height() + 1, }; - let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - ))); + let (poh_recorder, entry_receiver) = + PohRecorder::new(bank.tick_height(), bank.last_blockhash()); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + poh_recorder.lock().unwrap().set_working_bank(working_bank); BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder).unwrap(); poh_recorder.lock().unwrap().tick(); - let mut need_tick = true; + let mut done = false; // read entries until I find mine, might be ticks... - while let Ok(entries) = entry_receiver.recv() { + while let Ok((_, entries)) = entry_receiver.recv() { for (entry, _) in entries { if !entry.is_tick() { trace!("got entry"); assert_eq!(entry.transactions.len(), transactions.len()); assert_eq!(bank.get_balance(&pubkey), 1); - need_tick = false; - } else { - break; + done = true; } } + if done { + break; + } } + trace!("done ticking"); - assert_eq!(need_tick, false); + assert_eq!(done, true); let transactions = vec![SystemTransaction::new_move( &mint_keypair, @@ -707,7 +643,7 @@ mod tests { )]; assert_matches!( - BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder,), + BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder), Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) ); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 44a50962d..8d837bf8d 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,19 +1,18 @@ //! A stage to broadcast data from a leader node to validators //! use crate::blocktree::Blocktree; -use crate::cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo, DATA_PLANE_FANOUT}; -use crate::entry::Entry; +use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; use crate::entry::EntrySlice; #[cfg(feature = "erasure")] use crate::erasure::CodingGenerator; use crate::packet::index_blobs; +use crate::poh_recorder::WorkingBankEntries; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; use rayon::prelude::*; use solana_metrics::counter::Counter; use solana_metrics::{influxdb, submit}; -use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -32,7 +31,6 @@ pub enum BroadcastStageReturnType { struct Broadcast { id: Pubkey, - blob_index: u64, #[cfg(feature = "erasure")] coding_generator: CodingGenerator, @@ -41,25 +39,46 @@ struct Broadcast { impl Broadcast { fn run( &mut self, - slot_height: u64, - max_tick_height: u64, - broadcast_table: &[NodeInfo], - receiver: &Receiver>, + cluster_info: &Arc>, + receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, ) -> Result<()> { let timer = Duration::new(1, 0); - let entries = receiver.recv_timeout(timer)?; + let (bank, entries) = receiver.recv_timeout(timer)?; + let mut broadcast_table = cluster_info + .read() + .unwrap() + .sorted_tvu_peers(&staking_utils::node_stakes(&bank)); + // Layer 1, leader nodes are limited to the fanout size. + broadcast_table.truncate(DATA_PLANE_FANOUT); + inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); + + let slot_height = bank.slot(); + let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1; + // TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of + // `max_tick_height` + let mut blob_index = blocktree + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0); + let now = Instant::now(); let mut num_entries = entries.len(); let mut ventries = Vec::new(); let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0); ventries.push(entries); - while let Ok(entries) = receiver.try_recv() { + while let Ok((same_bank, entries)) = receiver.try_recv() { num_entries += entries.len(); last_tick = entries.last().map(|v| v.1).unwrap_or(0); ventries.push(entries); + assert!(last_tick <= max_tick_height); + assert!(same_bank.slot() == bank.slot()); + if last_tick == max_tick_height { + break; + } } inc_new_counter_info!("broadcast_service-entries_received", num_entries); @@ -75,14 +94,8 @@ impl Broadcast { .collect(); // TODO: blob_index should be slot-relative... - index_blobs(&blobs, &self.id, &mut self.blob_index, slot_height); - let parent = { - if slot_height == 0 { - 0 - } else { - slot_height - 1 - } - }; + index_blobs(&blobs, &self.id, &mut blob_index, slot_height); + let parent = bank.parents().first().map(|bank| bank.slot()).unwrap_or(0); for b in blobs.iter() { b.write().unwrap().set_parent(parent); } @@ -129,7 +142,7 @@ impl Broadcast { influxdb::Point::new("broadcast-service") .add_field( "transmit-index", - influxdb::Value::Integer(self.blob_index as i64), + influxdb::Value::Integer(blob_index as i64), ) .to_owned(), ); @@ -163,12 +176,9 @@ pub struct BroadcastStage { impl BroadcastStage { #[allow(clippy::too_many_arguments)] fn run( - slot_height: u64, - bank: &Arc, sock: &UdpSocket, cluster_info: &Arc>, - blob_index: u64, - receiver: &Receiver>, + receiver: &Receiver, exit_signal: &Arc, blocktree: &Arc, ) -> BroadcastStageReturnType { @@ -176,32 +186,15 @@ impl BroadcastStage { let mut broadcast = Broadcast { id: me.id, - blob_index, #[cfg(feature = "erasure")] coding_generator: CodingGenerator::new(), }; - let max_tick_height = (slot_height + 1) * bank.ticks_per_slot() - 1; - loop { if exit_signal.load(Ordering::Relaxed) { return BroadcastStageReturnType::ExitSignal; } - let mut broadcast_table = cluster_info - .read() - .unwrap() - .sorted_tvu_peers(&staking_utils::node_stakes(&bank)); - // Layer 1, leader nodes are limited to the fanout size. - broadcast_table.truncate(DATA_PLANE_FANOUT); - inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); - if let Err(e) = broadcast.run( - slot_height, - max_tick_height, - &broadcast_table, - receiver, - sock, - blocktree, - ) { + if let Err(e) = broadcast.run(&cluster_info, receiver, sock, blocktree) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { return BroadcastStageReturnType::ChannelDisconnected; @@ -234,32 +227,19 @@ impl BroadcastStage { /// completing the cycle. #[allow(clippy::too_many_arguments)] pub fn new( - slot_height: u64, - bank: &Arc, sock: UdpSocket, cluster_info: Arc>, - blob_index: u64, - receiver: Receiver>, + receiver: Receiver, exit_sender: Arc, blocktree: &Arc, ) -> Self { let exit_signal = Arc::new(AtomicBool::new(false)); let blocktree = blocktree.clone(); - let bank = bank.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_sender); - Self::run( - slot_height, - &bank, - &sock, - &cluster_info, - blob_index, - &receiver, - &exit_signal, - &blocktree, - ) + Self::run(&sock, &cluster_info, &receiver, &exit_signal, &blocktree) }) .unwrap(); @@ -282,6 +262,7 @@ mod test { use crate::cluster_info::{ClusterInfo, Node}; use crate::entry::create_ticks; use crate::service::Service; + use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; @@ -294,14 +275,13 @@ mod test { struct MockBroadcastStage { blocktree: Arc, broadcast_service: BroadcastStage, + bank: Arc, } fn setup_dummy_broadcast_service( - slot_height: u64, leader_pubkey: Pubkey, ledger_path: &str, - entry_receiver: Receiver>, - blob_index: u64, + entry_receiver: Receiver, ) -> MockBroadcastStage { // Make the database ledger let blocktree = Arc::new(Blocktree::open(ledger_path).unwrap()); @@ -323,11 +303,8 @@ mod test { // Start up the broadcast stage let broadcast_service = BroadcastStage::new( - slot_height, - &bank, leader_info.sockets.broadcast, cluster_info, - blob_index, entry_receiver, exit_sender, &blocktree, @@ -336,6 +313,7 @@ mod test { MockBroadcastStage { blocktree, broadcast_service, + bank, } } @@ -352,17 +330,16 @@ mod test { let (entry_sender, entry_receiver) = channel(); let broadcast_service = setup_dummy_broadcast_service( - 0, leader_keypair.pubkey(), &ledger_path, entry_receiver, - 0, ); + let bank = broadcast_service.bank.clone(); let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); for (i, tick) in ticks.into_iter().enumerate() { entry_sender - .send(vec![(tick, i as u64 + 1)]) + .send((bank.clone(), vec![(tick, i as u64 + 1)])) .expect("Expect successful send to broadcast service"); } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index a1c50a382..543782708 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -16,7 +16,7 @@ use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::StorageState; use crate::tpu::Tpu; -use crate::tvu::{Sockets, Tvu, TvuRotationInfo, TvuRotationReceiver}; +use crate::tvu::{Sockets, Tvu}; use crate::voting_keypair::VotingKeypair; use solana_metrics::counter::Counter; use solana_sdk::genesis_block::GenesisBlock; @@ -26,11 +26,11 @@ use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::timing::timestamp; use solana_vote_api::vote_transaction::VoteTransaction; -use std::net::UdpSocket; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; +use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex, RwLock}; +use std::thread::sleep; use std::thread::JoinHandle; use std::thread::{spawn, Result}; use std::time::Duration; @@ -84,20 +84,15 @@ impl Default for FullnodeConfig { } pub struct Fullnode { - id: Pubkey, + pub id: Pubkey, exit: Arc, rpc_service: Option, rpc_pubsub_service: Option, + rpc_working_bank_handle: JoinHandle<()>, gossip_service: GossipService, - sigverify_disabled: bool, - tpu_sockets: Vec, - broadcast_socket: UdpSocket, node_services: NodeServices, - rotation_receiver: TvuRotationReceiver, - blocktree: Arc, poh_service: PohService, poh_recorder: Arc>, - bank_forks: Arc>, } impl Fullnode { @@ -129,10 +124,9 @@ impl Fullnode { bank.tick_height(), bank.last_blockhash(), ); - let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( - bank.tick_height(), - bank.last_blockhash(), - ))); + let (poh_recorder, entry_receiver) = + PohRecorder::new(bank.tick_height(), bank.last_blockhash()); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, exit.clone()); info!("node info: {:?}", node.info); @@ -225,8 +219,6 @@ impl Fullnode { }; // Setup channel for rotation indications - let (rotation_sender, rotation_receiver) = channel(); - let tvu = Tvu::new( voting_keypair_option, &bank_forks, @@ -235,137 +227,55 @@ impl Fullnode { sockets, blocktree.clone(), config.storage_rotate_count, - &rotation_sender, &storage_state, config.blockstream.as_ref(), ledger_signal_receiver, &subscriptions, + &poh_recorder, ); - let tpu = Tpu::new(id, &cluster_info); + let tpu = Tpu::new( + id, + &cluster_info, + &poh_recorder, + entry_receiver, + node.sockets.tpu, + node.sockets.broadcast, + config.sigverify_disabled, + &blocktree, + ); + let exit_ = exit.clone(); + let bank_forks_ = bank_forks.clone(); + let rpc_service_rp = rpc_service.request_processor.clone(); + let rpc_working_bank_handle = spawn(move || loop { + if exit_.load(Ordering::Relaxed) { + break; + } + let bank = bank_forks_.read().unwrap().working_bank(); + trace!("rpc working bank {} {}", bank.slot(), bank.last_blockhash()); + rpc_service_rp + .write() + .unwrap() + .set_bank(&bank_forks_.read().unwrap().working_bank()); + let timer = Duration::from_millis(100); + sleep(timer); + }); inc_new_counter_info!("fullnode-new", 1); Self { id, - sigverify_disabled: config.sigverify_disabled, gossip_service, rpc_service: Some(rpc_service), rpc_pubsub_service: Some(rpc_pubsub_service), + rpc_working_bank_handle, node_services: NodeServices::new(tpu, tvu), exit, - tpu_sockets: node.sockets.tpu, - broadcast_socket: node.sockets.broadcast, - rotation_receiver, - blocktree, poh_service, poh_recorder, - bank_forks, - } - } - - fn rotate(&mut self, rotation_info: TvuRotationInfo) { - trace!( - "{:?}: rotate for slot={} to leader={:?}", - self.id, - rotation_info.slot, - rotation_info.leader_id, - ); - - if let Some(ref mut rpc_service) = self.rpc_service { - // TODO: This is not the correct bank. Instead TVU should pass along the - // frozen Bank for each completed block for RPC to use from it's notion of the "best" - // available fork (until we want to surface multiple forks to RPC) - rpc_service.set_bank(&self.bank_forks.read().unwrap().working_bank()); - } - - if rotation_info.leader_id == self.id { - debug!("{:?} rotating to leader role", self.id); - let tpu_bank = self - .bank_forks - .read() - .unwrap() - .get(rotation_info.slot) - .unwrap() - .clone(); - self.node_services.tpu.switch_to_leader( - &tpu_bank, - &self.poh_recorder, - self.tpu_sockets - .iter() - .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) - .collect(), - self.broadcast_socket - .try_clone() - .expect("Failed to clone broadcast socket"), - self.sigverify_disabled, - rotation_info.slot, - &self.blocktree, - ); - } else { - self.node_services.tpu.switch_to_forwarder( - rotation_info.leader_id, - self.tpu_sockets - .iter() - .map(|s| s.try_clone().expect("Failed to clone TPU sockets")) - .collect(), - ); - } - } - - // Runs a thread to manage node role transitions. The returned closure can be used to signal the - // node to exit. - pub fn start( - mut self, - rotation_notifier: Option>, - ) -> (JoinHandle<()>, Arc, Receiver) { - let (sender, receiver) = channel(); - let exit = self.exit.clone(); - let timeout = Duration::from_secs(1); - let handle = spawn(move || loop { - if self.exit.load(Ordering::Relaxed) { - debug!("node shutdown requested"); - self.close().expect("Unable to close node"); - let _ = sender.send(true); - break; - } - - match self.rotation_receiver.recv_timeout(timeout) { - Ok(rotation_info) => { - trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot); - //TODO: this will be called by the TVU every time it votes - //instead of here - info!( - "reset PoH... {} {}", - rotation_info.tick_height, rotation_info.blockhash - ); - self.poh_recorder - .lock() - .unwrap() - .reset(rotation_info.tick_height, rotation_info.blockhash); - let slot = rotation_info.slot; - self.rotate(rotation_info); - debug!("role transition complete"); - if let Some(ref rotation_notifier) = rotation_notifier { - rotation_notifier.send(slot).unwrap(); - } - } - Err(RecvTimeoutError::Timeout) => continue, - _ => (), - } - }); - (handle, exit, receiver) - } - - pub fn run(self, rotation_notifier: Option>) -> impl FnOnce() { - let (_, exit, receiver) = self.start(rotation_notifier); - move || { - exit.store(true, Ordering::Relaxed); - receiver.recv().unwrap(); - debug!("node shutdown complete"); } } // Used for notifying many nodes in parallel to exit - fn exit(&self) { + pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); // Need to force the poh_recorder to drop the WorkingBank, // which contains the channel to BroadcastStage. This should be @@ -375,6 +285,7 @@ impl Fullnode { // in motion because exit()/close() are only called by the run() loop // which is the sole initiator of rotations. self.poh_recorder.lock().unwrap().clear_bank(); + self.poh_service.exit(); if let Some(ref rpc_service) = self.rpc_service { rpc_service.exit(); } @@ -382,10 +293,9 @@ impl Fullnode { rpc_pubsub_service.exit(); } self.node_services.exit(); - self.poh_service.exit() } - fn close(self) -> Result<()> { + pub fn close(self) -> Result<()> { self.exit(); self.join() } @@ -418,6 +328,8 @@ impl Service for Fullnode { type JoinReturnType = (); fn join(self) -> Result<()> { + self.poh_service.join()?; + drop(self.poh_recorder); if let Some(rpc_service) = self.rpc_service { rpc_service.join()?; } @@ -425,11 +337,10 @@ impl Service for Fullnode { rpc_pubsub_service.join()?; } + self.rpc_working_bank_handle.join()?; self.gossip_service.join()?; self.node_services.join()?; trace!("exit node_services!"); - self.poh_service.join()?; - trace!("exit poh!"); Ok(()) } } @@ -483,12 +394,7 @@ pub fn make_active_set_entries( #[cfg(test)] mod tests { use super::*; - use crate::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree}; - use crate::entry::make_consecutive_blobs; - use crate::streamer::responder; - use solana_sdk::hash::Hash; - use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH; - use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; + use crate::blocktree::create_new_tmp_ledger; use std::fs::remove_dir_all; #[test] @@ -551,240 +457,4 @@ mod tests { remove_dir_all(path).unwrap(); } } - - #[test] - fn test_leader_to_leader_transition() { - solana_logger::setup(); - - let bootstrap_leader_keypair = Keypair::new(); - let bootstrap_leader_node = - Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); - - // Once the bootstrap leader hits the second epoch, because there are no other choices in - // the active set, this leader will remain the leader in the second epoch. In the second - // epoch, check that the same leader knows to shut down and restart as a leader again. - let ticks_per_slot = 5; - let slots_per_epoch = 2; - - let voting_keypair = Keypair::new(); - let fullnode_config = FullnodeConfig::default(); - - let (mut genesis_block, _mint_keypair) = - GenesisBlock::new_with_leader(10_000, bootstrap_leader_keypair.pubkey(), 500); - genesis_block.ticks_per_slot = ticks_per_slot; - genesis_block.slots_per_epoch = slots_per_epoch; - - let (bootstrap_leader_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); - - // Start the bootstrap leader - let bootstrap_leader = Fullnode::new( - bootstrap_leader_node, - &Arc::new(bootstrap_leader_keypair), - &bootstrap_leader_ledger_path, - voting_keypair, - None, - &fullnode_config, - ); - - let (rotation_sender, rotation_receiver) = channel(); - let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender)); - - // Wait for the bootstrap leader to transition. Since there are no other nodes in the - // cluster it will continue to be the leader - assert_eq!(rotation_receiver.recv().unwrap(), 1); - bootstrap_leader_exit(); - } - - #[test] - #[ignore] - fn test_ledger_role_transition() { - solana_logger::setup(); - - let fullnode_config = FullnodeConfig::default(); - let ticks_per_slot = DEFAULT_TICKS_PER_SLOT; - - // Create the leader and validator nodes - let bootstrap_leader_keypair = Arc::new(Keypair::new()); - let validator_keypair = Arc::new(Keypair::new()); - let (bootstrap_leader_node, validator_node, bootstrap_leader_ledger_path, _, _) = - setup_leader_validator( - &bootstrap_leader_keypair, - &validator_keypair, - ticks_per_slot, - 0, - ); - let bootstrap_leader_info = bootstrap_leader_node.info.clone(); - - let validator_ledger_path = tmp_copy_blocktree!(&bootstrap_leader_ledger_path); - - let ledger_paths = vec![ - bootstrap_leader_ledger_path.clone(), - validator_ledger_path.clone(), - ]; - - { - // Test that a node knows to transition to a validator based on parsing the ledger - let bootstrap_leader = Fullnode::new( - bootstrap_leader_node, - &bootstrap_leader_keypair, - &bootstrap_leader_ledger_path, - Keypair::new(), - Some(&bootstrap_leader_info), - &fullnode_config, - ); - let (rotation_sender, rotation_receiver) = channel(); - let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender)); - assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH)); - - // Test that a node knows to transition to a leader based on parsing the ledger - let validator = Fullnode::new( - validator_node, - &validator_keypair, - &validator_ledger_path, - Keypair::new(), - Some(&bootstrap_leader_info), - &fullnode_config, - ); - - let (rotation_sender, rotation_receiver) = channel(); - let validator_exit = validator.run(Some(rotation_sender)); - assert_eq!(rotation_receiver.recv().unwrap(), (DEFAULT_SLOTS_PER_EPOCH)); - - validator_exit(); - bootstrap_leader_exit(); - } - for path in ledger_paths { - Blocktree::destroy(&path).expect("Expected successful database destruction"); - let _ignored = remove_dir_all(&path); - } - } - - // TODO: Rework this test or TVU (make_consecutive_blobs sends blobs that can't be handled by - // the replay_stage) - #[test] - #[ignore] - fn test_validator_to_leader_transition() { - solana_logger::setup(); - // Make leader and validator node - let ticks_per_slot = 10; - let slots_per_epoch = 4; - let leader_keypair = Arc::new(Keypair::new()); - let validator_keypair = Arc::new(Keypair::new()); - let fullnode_config = FullnodeConfig::default(); - let (leader_node, validator_node, validator_ledger_path, ledger_initial_len, blockhash) = - setup_leader_validator(&leader_keypair, &validator_keypair, ticks_per_slot, 0); - - let leader_id = leader_keypair.pubkey(); - let validator_info = validator_node.info.clone(); - - info!("leader: {:?}", leader_id); - info!("validator: {:?}", validator_info.id); - - let voting_keypair = Keypair::new(); - - // Start the validator - let validator = Fullnode::new( - validator_node, - &validator_keypair, - &validator_ledger_path, - voting_keypair, - Some(&leader_node.info), - &fullnode_config, - ); - - let blobs_to_send = slots_per_epoch * ticks_per_slot + ticks_per_slot; - - // Send blobs to the validator from our mock leader - let t_responder = { - let (s_responder, r_responder) = channel(); - let blob_sockets: Vec> = - leader_node.sockets.tvu.into_iter().map(Arc::new).collect(); - let t_responder = responder( - "test_validator_to_leader_transition", - blob_sockets[0].clone(), - r_responder, - ); - - let tvu_address = &validator_info.tvu; - - let msgs = make_consecutive_blobs( - &leader_id, - blobs_to_send, - ledger_initial_len, - blockhash, - &tvu_address, - ) - .into_iter() - .rev() - .collect(); - s_responder.send(msgs).expect("send"); - t_responder - }; - - info!("waiting for validator to rotate into the leader role"); - let (rotation_sender, rotation_receiver) = channel(); - let validator_exit = validator.run(Some(rotation_sender)); - let rotation = rotation_receiver.recv().unwrap(); - assert_eq!(rotation, blobs_to_send); - - // Close the validator so that rocksdb has locks available - validator_exit(); - let (bank_forks, bank_forks_info, _, _) = - new_banks_from_blocktree(&validator_ledger_path, None); - let bank = bank_forks.working_bank(); - let entry_height = bank_forks_info[0].entry_height; - - assert!(bank.tick_height() >= bank.ticks_per_slot() * bank.slots_per_epoch()); - - assert!(entry_height >= ledger_initial_len); - - // Shut down - t_responder.join().expect("responder thread join"); - Blocktree::destroy(&validator_ledger_path) - .expect("Expected successful database destruction"); - let _ignored = remove_dir_all(&validator_ledger_path).unwrap(); - } - - fn setup_leader_validator( - leader_keypair: &Arc, - validator_keypair: &Arc, - ticks_per_slot: u64, - num_ending_slots: u64, - ) -> (Node, Node, String, u64, Hash) { - info!("validator: {}", validator_keypair.pubkey()); - info!("leader: {}", leader_keypair.pubkey()); - - let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); - - let (mut genesis_block, mint_keypair) = - GenesisBlock::new_with_leader(10_000, leader_node.info.id, 500); - genesis_block.ticks_per_slot = ticks_per_slot; - - let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_block); - - // Add entries so that the validator is in the active set, then finish up the slot with - // ticks (and maybe add extra slots full of empty ticks) - let (entries, _) = make_active_set_entries( - validator_keypair, - &mint_keypair, - 10, - 0, - &blockhash, - ticks_per_slot * (num_ending_slots + 1), - ); - - let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot).unwrap(); - let blockhash = entries.last().unwrap().hash; - let entry_height = ticks_per_slot + entries.len() as u64; - blocktree.write_entries(1, 0, 0, entries).unwrap(); - - ( - leader_node, - validator_node, - ledger_path, - entry_height, - blockhash, - ) - } } diff --git a/core/src/leader_confirmation_service.rs b/core/src/leader_confirmation_service.rs index 7a86f30b1..5d1de4bee 100644 --- a/core/src/leader_confirmation_service.rs +++ b/core/src/leader_confirmation_service.rs @@ -2,16 +2,16 @@ //! to generate a thread which regularly calculates the last confirmation times //! observed by the leader -use crate::service::Service; +use crate::poh_recorder::PohRecorder; use solana_metrics::{influxdb, submit}; use solana_runtime::bank::Bank; use solana_sdk::timing; use solana_vote_api::vote_state::VoteState; use std::result; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread::sleep; -use std::thread::{self, Builder, JoinHandle}; +use std::thread::{Builder, JoinHandle}; use std::time::Duration; #[derive(Debug, PartialEq, Eq)] @@ -20,14 +20,11 @@ pub enum ConfirmationError { } pub const COMPUTE_CONFIRMATION_MS: u64 = 100; - -pub struct LeaderConfirmationService { - thread_hdl: JoinHandle<()>, -} +pub struct LeaderConfirmationService {} impl LeaderConfirmationService { fn get_last_supermajority_timestamp( - bank: &Arc, + bank: &Bank, last_valid_validator_timestamp: u64, ) -> result::Result { let mut total_stake = 0; @@ -69,7 +66,7 @@ impl LeaderConfirmationService { Err(ConfirmationError::NoValidSupermajority) } - pub fn compute_confirmation(bank: &Arc, last_valid_validator_timestamp: &mut u64) { + pub fn compute_confirmation(bank: &Bank, last_valid_validator_timestamp: &mut u64) { if let Ok(super_majority_timestamp) = Self::get_last_supermajority_timestamp(bank, *last_valid_validator_timestamp) { @@ -90,9 +87,9 @@ impl LeaderConfirmationService { } /// Create a new LeaderConfirmationService for computing confirmation. - pub fn new(bank: &Arc, exit: Arc) -> Self { - let bank = bank.clone(); - let thread_hdl = Builder::new() + pub fn start(poh_recorder: &Arc>, exit: Arc) -> JoinHandle<()> { + let poh_recorder = poh_recorder.clone(); + Builder::new() .name("solana-leader-confirmation-service".to_string()) .spawn(move || { let mut last_valid_validator_timestamp = 0; @@ -100,21 +97,15 @@ impl LeaderConfirmationService { if exit.load(Ordering::Relaxed) { break; } - Self::compute_confirmation(&bank, &mut last_valid_validator_timestamp); + // dont hold this lock too long + let maybe_bank = poh_recorder.lock().unwrap().bank(); + if let Some(ref bank) = maybe_bank { + Self::compute_confirmation(bank, &mut last_valid_validator_timestamp); + } sleep(Duration::from_millis(COMPUTE_CONFIRMATION_MS)); } }) - .unwrap(); - - Self { thread_hdl } - } -} - -impl Service for LeaderConfirmationService { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - self.thread_hdl.join() + .unwrap() } } diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index bacf02eec..fe6681b76 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -3,6 +3,7 @@ use crate::client::mk_client; use crate::cluster_info::{Node, NodeInfo}; use crate::fullnode::{Fullnode, FullnodeConfig}; use crate::gossip_service::discover; +use crate::service::Service; use crate::thin_client::retry_get_balance; use crate::thin_client::ThinClient; use crate::voting_keypair::VotingKeypair; @@ -14,16 +15,14 @@ use solana_vote_api::vote_state::VoteState; use solana_vote_api::vote_transaction::VoteTransaction; use std::fs::remove_dir_all; use std::io::{Error, ErrorKind, Result}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread::JoinHandle; pub struct LocalCluster { /// Keypair with funding to particpiate in the network pub funding_keypair: Keypair, /// Entry point from which the rest of the network can be discovered pub entry_point_info: NodeInfo, - fullnode_hdls: Vec<(JoinHandle<()>, Arc)>, + fullnodes: Vec, ledger_paths: Vec, } @@ -50,8 +49,7 @@ impl LocalCluster { None, &fullnode_config, ); - let (thread, exit, _) = leader_server.start(None); - let mut fullnode_hdls = vec![(thread, exit)]; + let mut fullnodes = vec![leader_server]; let mut client = mk_client(&leader_node_info); for _ in 0..(num_nodes - 1) { let validator_keypair = Arc::new(Keypair::new()); @@ -88,27 +86,26 @@ impl LocalCluster { Some(&leader_node_info), &fullnode_config, ); - let (thread, exit, _) = validator_server.start(None); - fullnode_hdls.push((thread, exit)); + fullnodes.push(validator_server); } discover(&leader_node_info, num_nodes); Self { funding_keypair: mint_keypair, entry_point_info: leader_node_info, - fullnode_hdls, + fullnodes, ledger_paths, } } pub fn exit(&self) { - for node in &self.fullnode_hdls { - node.1.store(true, Ordering::Relaxed); + for node in &self.fullnodes { + node.exit(); } } pub fn close(&mut self) { self.exit(); - while let Some(node) = self.fullnode_hdls.pop() { - node.0.join().expect("join"); + while let Some(node) = self.fullnodes.pop() { + node.join().unwrap(); } for path in &self.ledger_paths { remove_dir_all(path).unwrap(); diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index f4eddaac5..ec4b37c02 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -16,7 +16,7 @@ use crate::result::{Error, Result}; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::transaction::Transaction; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; #[derive(Debug, PartialEq, Eq, Clone)] @@ -26,10 +26,11 @@ pub enum PohRecorderError { MinHeightNotReached, } +pub type WorkingBankEntries = (Arc, Vec<(Entry, u64)>); + #[derive(Clone)] pub struct WorkingBank { pub bank: Arc, - pub sender: Sender>, pub min_tick_height: u64, pub max_tick_height: u64, } @@ -38,6 +39,7 @@ pub struct PohRecorder { pub poh: Poh, tick_cache: Vec<(Entry, u64)>, working_bank: Option, + sender: Sender, } impl PohRecorder { @@ -51,8 +53,19 @@ impl PohRecorder { self.poh.hash(); } + pub fn bank(&self) -> Option> { + self.working_bank.clone().map(|w| w.bank) + } // synchronize PoH with a bank pub fn reset(&mut self, tick_height: u64, blockhash: Hash) { + if self.poh.hash == blockhash { + assert_eq!(self.poh.tick_height, tick_height); + info!( + "reset skipped for: {},{}", + self.poh.hash, self.poh.tick_height + ); + return; + } let mut cache = vec![]; info!( "reset poh from: {},{} to: {},{}", @@ -66,6 +79,15 @@ impl PohRecorder { trace!("new working bank"); self.working_bank = Some(working_bank); } + pub fn set_bank(&mut self, bank: &Arc) { + let max_tick_height = (bank.slot() + 1) * bank.ticks_per_slot() - 1; + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: bank.tick_height(), + max_tick_height, + }; + self.set_working_bank(working_bank); + } // Flush cache will delay flushing the cache for a bank until it past the WorkingBank::min_tick_height // On a record flush will flush the cache at the WorkingBank::min_tick_height, since a record @@ -106,12 +128,16 @@ impl PohRecorder { for t in cache { working_bank.bank.register_tick(&t.0.hash); } - working_bank.sender.send(cache.to_vec()) + self.sender + .send((working_bank.bank.clone(), cache.to_vec())) } else { Ok(()) }; if self.poh.tick_height >= working_bank.max_tick_height { - info!("poh_record: max_tick_height reached, setting working bank to None"); + info!( + "poh_record: max_tick_height reached, setting working bank {} to None", + working_bank.bank.slot() + ); self.working_bank = None; } if e.is_err() { @@ -138,13 +164,21 @@ impl PohRecorder { self.record_and_send_txs(mixin, txs) } - pub fn new(tick_height: u64, last_entry_hash: Hash) -> Self { + /// A recorder to synchronize PoH with the following data structures + /// * bank - the LastId's queue is updated on `tick` and `record` events + /// * sender - the Entry channel that outputs to the ledger + pub fn new(tick_height: u64, last_entry_hash: Hash) -> (Self, Receiver) { let poh = Poh::new(last_entry_hash, tick_height); - PohRecorder { - poh, - tick_cache: vec![], - working_bank: None, - } + let (sender, receiver) = channel(); + ( + PohRecorder { + poh, + tick_cache: vec![], + working_bank: None, + sender, + }, + receiver, + ) } fn record_and_send_txs(&mut self, mixin: Hash, txs: Vec) -> Result<()> { @@ -160,9 +194,10 @@ impl PohRecorder { transactions: txs, }; trace!("sending entry {}", recorded_entry.is_tick()); - working_bank - .sender - .send(vec![(recorded_entry, poh_entry.tick_height)])?; + self.sender.send(( + working_bank.bank.clone(), + vec![(recorded_entry, poh_entry.tick_height)], + ))?; Ok(()) } @@ -186,13 +221,12 @@ mod tests { use crate::test_tx::test_tx; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; - use std::sync::mpsc::channel; use std::sync::Arc; #[test] fn test_poh_recorder_no_zero_tick() { let prev_hash = Hash::default(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); assert_eq!(poh_recorder.tick_cache[0].1, 1); @@ -202,7 +236,7 @@ mod tests { #[test] fn test_poh_recorder_tick_height_is_last_tick() { let prev_hash = Hash::default(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); poh_recorder.tick(); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 2); @@ -212,7 +246,7 @@ mod tests { #[test] fn test_poh_recorder_reset_clears_cache() { - let mut poh_recorder = PohRecorder::new(0, Hash::default()); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, Hash::default()); poh_recorder.tick(); assert_eq!(poh_recorder.tick_cache.len(), 1); poh_recorder.reset(0, Hash::default()); @@ -224,12 +258,10 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (entry_sender, _) = channel(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new(0, prev_hash); let working_bank = WorkingBank { bank, - sender: entry_sender, min_tick_height: 2, max_tick_height: 3, }; @@ -244,12 +276,10 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let working_bank = WorkingBank { - bank, - sender: entry_sender, + bank: bank.clone(), min_tick_height: 2, max_tick_height: 3, }; @@ -265,8 +295,9 @@ mod tests { poh_recorder.tick(); assert_eq!(poh_recorder.poh.tick_height, 3); assert_eq!(poh_recorder.tick_cache.len(), 0); - let e = entry_receiver.recv().expect("recv 1"); + let (bank_, e) = entry_receiver.recv().expect("recv 1"); assert_eq!(e.len(), 3); + assert_eq!(bank_.slot(), bank.slot()); assert!(poh_recorder.working_bank.is_none()); } @@ -275,8 +306,7 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); poh_recorder.tick(); poh_recorder.tick(); @@ -287,7 +317,6 @@ mod tests { let working_bank = WorkingBank { bank, - sender: entry_sender, min_tick_height: 2, max_tick_height: 3, }; @@ -296,7 +325,7 @@ mod tests { assert_eq!(poh_recorder.poh.tick_height, 5); assert!(poh_recorder.working_bank.is_none()); - let e = entry_receiver.recv().expect("recv 1"); + let (_, e) = entry_receiver.recv().expect("recv 1"); assert_eq!(e.len(), 3); } @@ -305,12 +334,10 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let working_bank = WorkingBank { bank, - sender: entry_sender, min_tick_height: 2, max_tick_height: 3, }; @@ -327,12 +354,10 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let working_bank = WorkingBank { bank, - sender: entry_sender, min_tick_height: 1, max_tick_height: 2, }; @@ -346,10 +371,10 @@ mod tests { assert_eq!(poh_recorder.tick_cache.len(), 0); //tick in the cache + entry - let e = entry_receiver.recv().expect("recv 1"); + let (_b, e) = entry_receiver.recv().expect("recv 1"); assert_eq!(e.len(), 1); assert!(e[0].0.is_tick()); - let e = entry_receiver.recv().expect("recv 2"); + let (_b, e) = entry_receiver.recv().expect("recv 2"); assert!(!e[0].0.is_tick()); } @@ -358,12 +383,10 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let working_bank = WorkingBank { bank, - sender: entry_sender, min_tick_height: 1, max_tick_height: 2, }; @@ -375,7 +398,7 @@ mod tests { let h1 = hash(b"hello world!"); assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err()); - let e = entry_receiver.recv().expect("recv 1"); + let (_bank, e) = entry_receiver.recv().expect("recv 1"); assert_eq!(e.len(), 2); assert!(e[0].0.is_tick()); assert!(e[1].0.is_tick()); @@ -386,12 +409,10 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (entry_sender, entry_receiver) = channel(); - let mut poh_recorder = PohRecorder::new(0, prev_hash); + let (mut poh_recorder, entry_receiver) = PohRecorder::new(0, prev_hash); let working_bank = WorkingBank { bank, - sender: entry_sender, min_tick_height: 2, max_tick_height: 3, }; diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 2f5fd3067..4e222ce94 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -117,20 +117,17 @@ mod tests { use solana_runtime::bank::Bank; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::hash; - use std::sync::mpsc::channel; - use std::sync::mpsc::RecvError; #[test] fn test_poh_service() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let bank = Arc::new(Bank::new(&genesis_block)); let prev_hash = bank.last_blockhash(); - let (entry_sender, entry_receiver) = channel(); - let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(bank.tick_height(), prev_hash))); + let (poh_recorder, entry_receiver) = PohRecorder::new(bank.tick_height(), prev_hash); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let exit = Arc::new(AtomicBool::new(false)); let working_bank = WorkingBank { bank: bank.clone(), - sender: entry_sender, min_tick_height: bank.tick_height(), max_tick_height: std::u64::MAX, }; @@ -171,7 +168,7 @@ mod tests { let mut need_partial = true; while need_tick || need_entry || need_partial { - for entry in entry_receiver.recv().unwrap() { + for entry in entry_receiver.recv().unwrap().1 { let entry = &entry.0; if entry.is_tick() { assert!(entry.num_hashes <= HASHES_PER_TICK); @@ -199,43 +196,4 @@ mod tests { let _ = poh_service.join().unwrap(); let _ = entry_producer.join().unwrap(); } - - #[test] - fn test_poh_service_drops_working_bank() { - let (genesis_block, _mint_keypair) = GenesisBlock::new(2); - let bank = Arc::new(Bank::new(&genesis_block)); - let prev_hash = bank.last_blockhash(); - let (entry_sender, entry_receiver) = channel(); - let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(bank.tick_height(), prev_hash))); - let exit = Arc::new(AtomicBool::new(false)); - let working_bank = WorkingBank { - bank: bank.clone(), - sender: entry_sender, - min_tick_height: bank.tick_height() + 3, - max_tick_height: bank.tick_height() + 5, - }; - - let poh_service = PohService::new( - poh_recorder.clone(), - &PohServiceConfig::default(), - Arc::new(AtomicBool::new(false)), - ); - - poh_recorder.lock().unwrap().set_working_bank(working_bank); - - // all 5 ticks are expected, there is no tick 0 - // First 4 ticks must be sent all at once, since bank shouldn't see them until - // the after bank's min_tick_height(3) is reached. - let entries = entry_receiver.recv().expect("recv 1"); - assert_eq!(entries.len(), 4); - let entries = entry_receiver.recv().expect("recv 2"); - assert_eq!(entries.len(), 1); - - //WorkingBank should be dropped by the PohService thread as well - assert_eq!(entry_receiver.recv(), Err(RecvError)); - - exit.store(true, Ordering::Relaxed); - poh_service.exit(); - let _ = poh_service.join().unwrap(); - } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 78e7c2ac8..ac3cc807d 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -8,10 +8,10 @@ use crate::cluster_info::ClusterInfo; use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::leader_schedule_utils; use crate::packet::BlobError; +use crate::poh_recorder::PohRecorder; use crate::result; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; -use crate::tvu::{TvuRotationInfo, TvuRotationSender}; use solana_metrics::counter::Counter; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; @@ -22,7 +22,7 @@ use solana_vote_api::vote_transaction::VoteTransaction; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; @@ -62,9 +62,9 @@ impl ReplayStage { _bank_forks_info: &[BankForksInfo], cluster_info: Arc>, exit: Arc, - to_leader_sender: &TvuRotationSender, ledger_signal_receiver: Receiver, subscriptions: &Arc, + poh_recorder: &Arc>, ) -> (Self, Receiver<(u64, Pubkey)>, EntryReceiver) where T: 'static + KeypairUtil + Send + Sync, @@ -73,9 +73,9 @@ impl ReplayStage { let (slot_full_sender, slot_full_receiver) = channel(); trace!("replay stage"); let exit_ = exit.clone(); - let to_leader_sender = to_leader_sender.clone(); let subscriptions = subscriptions.clone(); let bank_forks = bank_forks.clone(); + let poh_recorder = poh_recorder.clone(); let mut progress = HashMap::new(); @@ -84,6 +84,7 @@ impl ReplayStage { .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_.clone()); + let mut first_block = false; loop { let now = Instant::now(); // Stop getting entries if we get exit signal @@ -91,11 +92,11 @@ impl ReplayStage { break; } Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap()); - let live_bank_ids = bank_forks.read().unwrap().active_banks(); - trace!("live banks {:?}", live_bank_ids); + let active_banks = bank_forks.read().unwrap().active_banks(); + trace!("active banks {:?}", active_banks); let mut votable: Vec = vec![]; - for bank_id in live_bank_ids { - let bank = bank_forks.read().unwrap().get(bank_id).unwrap().clone(); + for bank_id in &active_banks { + let bank = bank_forks.read().unwrap().get(*bank_id).unwrap().clone(); if !Self::is_tpu(&bank, my_id) { Self::replay_blocktree_into_bank( &bank, @@ -104,17 +105,28 @@ impl ReplayStage { &forward_entry_sender, )?; } - let max_tick_height = (bank_id + 1) * bank.ticks_per_slot() - 1; + let max_tick_height = (*bank_id + 1) * bank.ticks_per_slot() - 1; if bank.tick_height() == max_tick_height { bank.freeze(); - votable.push(bank_id); - progress.remove(&bank_id); + info!("bank frozen {}", bank.slot()); + votable.push(*bank_id); + progress.remove(bank_id); let id = leader_schedule_utils::slot_leader_at(bank.slot(), &bank); if let Err(e) = slot_full_sender.send((bank.slot(), id)) { info!("{} slot_full alert failed: {:?}", my_id, e); } } } + let frozen = bank_forks.read().unwrap().frozen_banks(); + if votable.is_empty() + && frozen.len() == 1 + && active_banks.is_empty() + && !first_block + { + first_block = true; + votable.extend(frozen.keys()); + info!("voting on first block {:?}", votable); + } // TODO: fork selection // vote on the latest one for now votable.sort(); @@ -142,20 +154,31 @@ impl ReplayStage { ); cluster_info.write().unwrap().push_vote(vote); } + poh_recorder + .lock() + .unwrap() + .reset(parent.tick_height(), parent.last_blockhash()); + if next_leader == my_id { + let frozen = bank_forks.read().unwrap().frozen_banks(); + assert!(frozen.get(&next_slot).is_none()); + assert!(bank_forks.read().unwrap().get(next_slot).is_none()); + let tpu_bank = Bank::new_from_parent(&parent, my_id, next_slot); bank_forks.write().unwrap().insert(next_slot, tpu_bank); + if let Some(tpu_bank) = + bank_forks.read().unwrap().get(next_slot).cloned() + { + assert_eq!(bank_forks.read().unwrap().working_bank().slot(), tpu_bank.slot()); + debug!( + "poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", + my_id, + tpu_bank.slot(), + next_leader + ); + poh_recorder.lock().unwrap().set_bank(&tpu_bank); + } } - debug!( - "to_leader_sender: me: {} next_slot: {} next_leader: {}", - my_id, next_slot, next_leader - ); - to_leader_sender.send(TvuRotationInfo { - tick_height: parent.tick_height(), - blockhash: parent.last_blockhash(), - slot: next_slot, - leader_id: next_leader, - })?; } inc_new_counter_info!( "replicate_stage-duration", @@ -166,7 +189,7 @@ impl ReplayStage { match result { Err(RecvTimeoutError::Timeout) => continue, Err(_) => break, - Ok(_) => debug!("blocktree signal"), + Ok(_) => trace!("blocktree signal"), }; } Ok(()) @@ -270,25 +293,31 @@ impl ReplayStage { // Find the next slot that chains to the old slot let frozen_banks = forks.frozen_banks(); let frozen_bank_ids: Vec = frozen_banks.keys().cloned().collect(); - trace!("generate new forks {:?}", frozen_bank_ids); + trace!("frozen_banks {:?}", frozen_bank_ids); let next_slots = blocktree .get_slots_since(&frozen_bank_ids) .expect("Db error"); + trace!("generate new forks {:?}", next_slots); for (parent_id, children) in next_slots { let parent_bank = frozen_banks .get(&parent_id) .expect("missing parent in bank forks") .clone(); for child_id in children { - let new_fork = forks.get(child_id).is_none(); - if new_fork { - let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank); - trace!("new fork:{} parent:{}", child_id, parent_id); - forks.insert( - child_id, - Bank::new_from_parent(&parent_bank, leader, child_id), - ); + if frozen_banks.get(&child_id).is_some() { + trace!("child already frozen {}", child_id); + continue; } + if forks.get(child_id).is_some() { + trace!("child already active {}", child_id); + continue; + } + let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank); + info!("new fork:{} parent:{}", child_id, parent_id); + forks.insert( + child_id, + Bank::new_from_parent(&parent_bank, leader, child_id), + ); } } } @@ -305,6 +334,7 @@ impl Service for ReplayStage { #[cfg(test)] mod test { use super::*; + use crate::banking_stage::create_test_recorder; use crate::blocktree::create_new_tmp_ledger; use crate::cluster_info::{ClusterInfo, Node}; use crate::entry::create_ticks; @@ -341,13 +371,13 @@ mod test { // Set up the replay stage let exit = Arc::new(AtomicBool::new(false)); let voting_keypair = Arc::new(Keypair::new()); - let (to_leader_sender, _to_leader_receiver) = channel(); { let (bank_forks, bank_forks_info, blocktree, l_receiver) = new_banks_from_blocktree(&my_ledger_path, None); let bank = bank_forks.working_bank(); let blocktree = Arc::new(blocktree); + let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new( my_keypair.pubkey(), Some(voting_keypair.clone()), @@ -356,9 +386,9 @@ mod test { &bank_forks_info, cluster_info_me.clone(), exit.clone(), - &to_leader_sender, l_receiver, &Arc::new(RpcSubscriptions::default()), + &poh_recorder, ); let keypair = voting_keypair.as_ref(); @@ -378,6 +408,7 @@ mod test { replay_stage .close() .expect("Expect successful ReplayStage exit"); + poh_service.close().unwrap(); } let _ignored = remove_dir_all(&my_ledger_path); } diff --git a/core/src/thin_client.rs b/core/src/thin_client.rs index f89c47f0f..b5213ec12 100644 --- a/core/src/thin_client.rs +++ b/core/src/thin_client.rs @@ -498,7 +498,6 @@ mod tests { fn test_thin_client_basic() { solana_logger::setup(); let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let bob_pubkey = Keypair::new().pubkey(); info!( @@ -525,7 +524,7 @@ mod tests { let transaction_count = client.transaction_count(); assert_eq!(transaction_count, 1); - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } @@ -534,7 +533,6 @@ mod tests { fn test_bad_sig() { solana_logger::setup(); let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let bob_pubkey = Keypair::new().pubkey(); info!( "found leader: {:?}", @@ -562,7 +560,7 @@ mod tests { let balance = client.get_balance(&bob_pubkey); assert_eq!(balance.unwrap(), 1001); - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } @@ -570,7 +568,6 @@ mod tests { fn test_register_vote_account() { solana_logger::setup(); let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); info!( "found leader: {:?}", poll_gossip_for_leader(leader_data.gossip, Some(5)).unwrap() @@ -626,7 +623,7 @@ mod tests { sleep(Duration::from_millis(900)); } - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } @@ -645,7 +642,6 @@ mod tests { fn test_zero_balance_after_nonzero() { solana_logger::setup(); let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let bob_keypair = Keypair::new(); info!( @@ -682,7 +678,7 @@ mod tests { info!("Bob's balance is {:?}", bob_balance); assert!(bob_balance.is_err(),); - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index a49cf5906..2e19f7ad9 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -1,29 +1,22 @@ //! The `tpu` module implements the Transaction Processing Unit, a //! multi-stage transaction processing pipeline in software. -use crate::banking_stage::{BankingStage, UnprocessedPackets}; +use crate::banking_stage::BankingStage; use crate::blocktree::Blocktree; use crate::broadcast_stage::BroadcastStage; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::fetch_stage::FetchStage; -use crate::poh_recorder::PohRecorder; +use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; use crate::sigverify_stage::SigVerifyStage; -use crate::tpu_forwarder::TpuForwarder; -use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread; -pub enum TpuMode { - Leader(LeaderServices), - Forwarder(ForwarderServices), -} - pub struct LeaderServices { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, @@ -49,7 +42,7 @@ impl LeaderServices { } } - fn exit(&self) { + pub fn exit(&self) { self.fetch_stage.close(); } @@ -67,187 +60,63 @@ impl LeaderServices { Ok(()) } - fn close(self) -> thread::Result<()> { - self.exit(); - self.join() - } -} - -pub struct ForwarderServices { - tpu_forwarder: TpuForwarder, -} - -impl ForwarderServices { - fn new(tpu_forwarder: TpuForwarder) -> Self { - ForwarderServices { tpu_forwarder } - } - - fn exit(&self) { - self.tpu_forwarder.close(); - } - - fn join(self) -> thread::Result<()> { - self.tpu_forwarder.join() - } - - fn close(self) -> thread::Result<()> { + pub fn close(self) -> thread::Result<()> { self.exit(); self.join() } } pub struct Tpu { - tpu_mode: Option, + leader_services: LeaderServices, exit: Arc, - id: Pubkey, - cluster_info: Arc>, + pub id: Pubkey, } impl Tpu { - pub fn new(id: Pubkey, cluster_info: &Arc>) -> Self { - Self { - tpu_mode: None, - exit: Arc::new(AtomicBool::new(false)), - id, - cluster_info: cluster_info.clone(), - } - } - - fn mode_exit(&mut self) { - match &mut self.tpu_mode { - Some(TpuMode::Leader(svcs)) => { - svcs.exit(); - } - Some(TpuMode::Forwarder(svcs)) => { - svcs.exit(); - } - None => (), - } - } - - fn mode_close(&mut self) { - let tpu_mode = self.tpu_mode.take(); - if let Some(tpu_mode) = tpu_mode { - match tpu_mode { - TpuMode::Leader(svcs) => { - let _ = svcs.close(); - } - TpuMode::Forwarder(svcs) => { - let _ = svcs.close(); - } - } - } - } - - fn forward_unprocessed_packets( - tpu: &std::net::SocketAddr, - unprocessed_packets: UnprocessedPackets, - ) -> std::io::Result<()> { - let socket = UdpSocket::bind("0.0.0.0:0")?; - for (packets, start_index) in unprocessed_packets { - let packets = packets.read().unwrap(); - for packet in packets.packets.iter().skip(start_index) { - socket.send_to(&packet.data[..packet.meta.size], tpu)?; - } - } - Ok(()) - } - - fn close_and_forward_unprocessed_packets(&mut self) { - self.mode_exit(); - - let unprocessed_packets = match self.tpu_mode.as_mut() { - Some(TpuMode::Leader(svcs)) => { - svcs.banking_stage.join_and_collect_unprocessed_packets() - } - Some(TpuMode::Forwarder(svcs)) => { - svcs.tpu_forwarder.join_and_collect_unprocessed_packets() - } - None => vec![], - }; - - if !unprocessed_packets.is_empty() { - let tpu = self.cluster_info.read().unwrap().leader_data().unwrap().tpu; - info!("forwarding unprocessed packets to new leader at {:?}", tpu); - Tpu::forward_unprocessed_packets(&tpu, unprocessed_packets).unwrap_or_else(|err| { - warn!("Failed to forward unprocessed transactions: {:?}", err) - }); - } - - self.mode_close(); - } - - pub fn switch_to_forwarder(&mut self, leader_id: Pubkey, transactions_sockets: Vec) { - self.close_and_forward_unprocessed_packets(); - - self.cluster_info.write().unwrap().set_leader(leader_id); - - let tpu_forwarder = TpuForwarder::new(transactions_sockets, self.cluster_info.clone()); - self.tpu_mode = Some(TpuMode::Forwarder(ForwarderServices::new(tpu_forwarder))); - } - - #[allow(clippy::too_many_arguments)] - pub fn switch_to_leader( - &mut self, - bank: &Arc, + pub fn new( + id: Pubkey, + cluster_info: &Arc>, poh_recorder: &Arc>, + entry_receiver: Receiver, transactions_sockets: Vec, broadcast_socket: UdpSocket, sigverify_disabled: bool, - slot: u64, blocktree: &Arc, - ) { - self.close_and_forward_unprocessed_packets(); + ) -> Self { + cluster_info.write().unwrap().set_leader(id); - self.cluster_info.write().unwrap().set_leader(self.id); - - self.exit = Arc::new(AtomicBool::new(false)); + let exit = Arc::new(AtomicBool::new(false)); let (packet_sender, packet_receiver) = channel(); - let fetch_stage = FetchStage::new_with_sender( - transactions_sockets, - self.exit.clone(), - &packet_sender.clone(), - ); - let cluster_info_vote_listener = ClusterInfoVoteListener::new( - self.exit.clone(), - self.cluster_info.clone(), - packet_sender, - ); + let fetch_stage = + FetchStage::new_with_sender(transactions_sockets, exit.clone(), &packet_sender.clone()); + let cluster_info_vote_listener = + ClusterInfoVoteListener::new(exit.clone(), cluster_info.clone(), packet_sender); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); - // TODO: Fix BankingStage/BroadcastStage to operate on `slot` directly instead of - // `max_tick_height` - let max_tick_height = (slot + 1) * bank.ticks_per_slot() - 1; - let blob_index = blocktree - .meta(slot) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0); - - let (banking_stage, entry_receiver) = - BankingStage::new(&bank, poh_recorder, verified_receiver, max_tick_height); + let banking_stage = BankingStage::new(&cluster_info, poh_recorder, verified_receiver); let broadcast_stage = BroadcastStage::new( - slot, - bank, broadcast_socket, - self.cluster_info.clone(), - blob_index, + cluster_info.clone(), entry_receiver, - self.exit.clone(), + exit.clone(), blocktree, ); - let svcs = LeaderServices::new( + let leader_services = LeaderServices::new( fetch_stage, sigverify_stage, banking_stage, cluster_info_vote_listener, broadcast_stage, ); - self.tpu_mode = Some(TpuMode::Leader(svcs)); + Self { + leader_services, + exit, + id, + } } pub fn exit(&self) { @@ -258,8 +127,8 @@ impl Tpu { self.exit.load(Ordering::Relaxed) } - pub fn close(mut self) -> thread::Result<()> { - self.mode_close(); + pub fn close(self) -> thread::Result<()> { + self.exit(); self.join() } } @@ -268,11 +137,6 @@ impl Service for Tpu { type JoinReturnType = (); fn join(self) -> thread::Result<()> { - match self.tpu_mode { - Some(TpuMode::Leader(svcs)) => svcs.join()?, - Some(TpuMode::Forwarder(svcs)) => svcs.join()?, - None => (), - } - Ok(()) + self.leader_services.join() } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 3088c4f36..be0f8785b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -18,29 +18,19 @@ use crate::blockstream_service::BlockstreamService; use crate::blocktree::Blocktree; use crate::blocktree_processor::BankForksInfo; use crate::cluster_info::ClusterInfo; +use crate::poh_recorder::PohRecorder; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; use crate::storage_stage::{StorageStage, StorageState}; -use solana_sdk::hash::Hash; -use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, RwLock}; +use std::sync::mpsc::{channel, Receiver}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; -pub struct TvuRotationInfo { - pub tick_height: u64, // tick height, bank might not exist yet - pub blockhash: Hash, // blockhash that was voted on - pub slot: u64, // slot height to initiate a rotation - pub leader_id: Pubkey, // leader upon rotation -} -pub type TvuRotationSender = Sender; -pub type TvuRotationReceiver = Receiver; - pub struct Tvu { fetch_stage: BlobFetchStage, retransmit_stage: RetransmitStage, @@ -72,11 +62,11 @@ impl Tvu { sockets: Sockets, blocktree: Arc, storage_rotate_count: u64, - to_leader_sender: &TvuRotationSender, storage_state: &StorageState, blockstream: Option<&String>, ledger_signal_receiver: Receiver, subscriptions: &Arc, + poh_recorder: &Arc>, ) -> Self where T: 'static + KeypairUtil + Sync + Send, @@ -124,9 +114,9 @@ impl Tvu { &bank_forks_info, cluster_info.clone(), exit.clone(), - to_leader_sender, ledger_signal_receiver, subscriptions, + poh_recorder, ); let blockstream_service = if blockstream.is_some() { @@ -197,6 +187,7 @@ impl Service for Tvu { #[cfg(test)] pub mod tests { use super::*; + use crate::banking_stage::create_test_recorder; use crate::blocktree::get_tmp_ledger_path; use crate::cluster_info::{ClusterInfo, Node}; use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; @@ -228,7 +219,8 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path) .expect("Expected to successfully open ledger"); - let (sender, _receiver) = channel(); + let bank = bank_forks.working_bank(); + let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let tvu = Tvu::new( Some(Arc::new(Keypair::new())), &Arc::new(RwLock::new(bank_forks)), @@ -243,12 +235,13 @@ pub mod tests { }, Arc::new(blocktree), STORAGE_ROTATE_TEST_COUNT, - &sender, &StorageState::default(), None, l_receiver, &Arc::new(RpcSubscriptions::default()), + &poh_recorder, ); tvu.close().expect("close"); + poh_service.close().expect("close"); } } diff --git a/fullnode/src/main.rs b/fullnode/src/main.rs index b23595849..904ff0183 100644 --- a/fullnode/src/main.rs +++ b/fullnode/src/main.rs @@ -4,6 +4,7 @@ use solana::client::mk_client; use solana::cluster_info::{Node, NodeInfo, FULLNODE_PORT_RANGE}; use solana::fullnode::{Fullnode, FullnodeConfig}; use solana::local_vote_signer_service::LocalVoteSignerService; +use solana::service::Service; use solana::socketaddr; use solana::thin_client::{poll_gossip_for_leader, ThinClient}; use solana::voting_keypair::{RemoteVoteSigner, VotingKeypair}; @@ -17,7 +18,6 @@ use std::fs::File; use std::io::{Error, ErrorKind, Result}; use std::net::{Ipv4Addr, SocketAddr}; use std::process::exit; -use std::sync::mpsc::channel; use std::sync::Arc; fn parse_identity(matches: &ArgMatches<'_>) -> (Keypair, SocketAddr) { @@ -286,9 +286,6 @@ fn main() { &fullnode_config, ); - let (rotation_sender, rotation_receiver) = channel(); - fullnode.run(Some(rotation_sender)); - if !fullnode_config.voting_disabled { let leader_node_info = loop { info!("Looking for leader..."); @@ -313,10 +310,6 @@ fn main() { File::create(filename).unwrap_or_else(|_| panic!("Unable to create: {}", filename)); } info!("Node initialized"); - loop { - info!( - "Node rotation event: {:?}", - rotation_receiver.recv().unwrap() - ); - } + fullnode.join().expect("fullnode exit"); + info!("Node exiting.."); } diff --git a/tests/replicator.rs b/tests/replicator.rs index 6457c3827..e01f59c2a 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -61,7 +61,6 @@ fn test_replicator_startup_basic() { None, &fullnode_config, ); - let leader_exit = leader.run(None); debug!( "leader: {:?}", @@ -91,7 +90,6 @@ fn test_replicator_startup_basic() { Some(&leader_info), &fullnode_config, ); - let validator_exit = validator.run(None); let bob = Keypair::new(); @@ -217,8 +215,8 @@ fn test_replicator_startup_basic() { } replicator.close(); - validator_exit(); - leader_exit(); + validator.close().unwrap(); + leader.close().unwrap(); } info!("cleanup"); diff --git a/tests/rpc.rs b/tests/rpc.rs index d02eb9c72..c0a0b7725 100644 --- a/tests/rpc.rs +++ b/tests/rpc.rs @@ -17,7 +17,6 @@ fn test_rpc_send_tx() { solana_logger::setup(); let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let bob_pubkey = Keypair::new().pubkey(); let client = reqwest::Client::new(); @@ -93,6 +92,6 @@ fn test_rpc_send_tx() { assert_eq!(confirmed_tx, true); - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } diff --git a/tests/tvu.rs b/tests/tvu.rs index 9386eb107..02cccb84e 100644 --- a/tests/tvu.rs +++ b/tests/tvu.rs @@ -3,6 +3,7 @@ extern crate solana; use log::*; use solana::bank_forks::BankForks; +use solana::banking_stage::create_test_recorder; use solana::blocktree::{get_tmp_ledger_path, Blocktree}; use solana::blocktree_processor::BankForksInfo; use solana::cluster_info::{ClusterInfo, Node}; @@ -110,7 +111,7 @@ fn test_replay() { .expect("Expected to successfully open ledger"); let vote_account_keypair = Arc::new(Keypair::new()); let voting_keypair = VotingKeypair::new_local(&vote_account_keypair); - let (to_leader_sender, _to_leader_receiver) = channel(); + let (poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let tvu = Tvu::new( Some(Arc::new(voting_keypair)), &Arc::new(RwLock::new(bank_forks)), @@ -125,11 +126,11 @@ fn test_replay() { }, Arc::new(blocktree), STORAGE_ROTATE_TEST_COUNT, - &to_leader_sender, &StorageState::default(), None, ledger_signal_receiver, &Arc::new(RpcSubscriptions::default()), + &poh_recorder, ); let mut alice_ref_balance = starting_balance; @@ -182,6 +183,7 @@ fn test_replay() { let bob_balance = bank.get_balance(&bob_keypair.pubkey()); assert_eq!(bob_balance, starting_balance - alice_ref_balance); + poh_service.close().expect("close"); tvu.close().expect("close"); exit.store(true, Ordering::Relaxed); dr_l.join().expect("join"); diff --git a/wallet/tests/deploy.rs b/wallet/tests/deploy.rs index d7359d579..bd43cf6fb 100644 --- a/wallet/tests/deploy.rs +++ b/wallet/tests/deploy.rs @@ -20,7 +20,6 @@ fn test_wallet_deploy_program() { pathbuf.set_extension("so"); let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let (sender, receiver) = channel(); run_local_drone(alice, sender); @@ -76,6 +75,6 @@ fn test_wallet_deploy_program() { &elf ); - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } diff --git a/wallet/tests/pay.rs b/wallet/tests/pay.rs index e746ada1a..918c02df6 100644 --- a/wallet/tests/pay.rs +++ b/wallet/tests/pay.rs @@ -21,7 +21,6 @@ fn check_balance(expected_balance: u64, client: &RpcClient, pubkey: Pubkey) { #[test] fn test_wallet_timestamp_tx() { let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let bob_pubkey = Keypair::new().pubkey(); let (sender, receiver) = channel(); @@ -75,14 +74,13 @@ fn test_wallet_timestamp_tx() { check_balance(1, &rpc_client, process_id); // contract balance check_balance(10, &rpc_client, bob_pubkey); // recipient balance - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } #[test] fn test_wallet_witness_tx() { let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let bob_pubkey = Keypair::new().pubkey(); let (sender, receiver) = channel(); @@ -133,14 +131,13 @@ fn test_wallet_witness_tx() { check_balance(1, &rpc_client, process_id); // contract balance check_balance(10, &rpc_client, bob_pubkey); // recipient balance - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } #[test] fn test_wallet_cancel_tx() { let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let bob_pubkey = Keypair::new().pubkey(); let (sender, receiver) = channel(); @@ -191,6 +188,6 @@ fn test_wallet_cancel_tx() { check_balance(1, &rpc_client, process_id); // contract balance check_balance(0, &rpc_client, bob_pubkey); // recipient balance - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); } diff --git a/wallet/tests/request_airdrop.rs b/wallet/tests/request_airdrop.rs index 3f75fd4c2..28da6e793 100644 --- a/wallet/tests/request_airdrop.rs +++ b/wallet/tests/request_airdrop.rs @@ -9,7 +9,6 @@ use std::sync::mpsc::channel; #[test] fn test_wallet_request_airdrop() { let (server, leader_data, alice, ledger_path) = new_fullnode(); - let server_exit = server.run(None); let (sender, receiver) = channel(); run_local_drone(alice, sender); let drone_addr = receiver.recv().unwrap(); @@ -30,6 +29,6 @@ fn test_wallet_request_airdrop() { .unwrap(); assert_eq!(balance, 50); - server_exit(); + server.close().unwrap(); remove_dir_all(ledger_path).unwrap(); }