Move PohService and PohRecorder out of banking_stage and into fullnode (#2852)

* Move PohService out of banking_stage and into fullnode.

* 10 second slots
This commit is contained in:
anatoly yakovenko 2019-02-26 10:48:18 -08:00 committed by GitHub
parent 9420ba52e9
commit 6dcb97af9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 185 additions and 76 deletions

View File

@ -7,7 +7,8 @@ use rayon::prelude::*;
use solana::banking_stage::BankingStage; use solana::banking_stage::BankingStage;
use solana::entry::Entry; use solana::entry::Entry;
use solana::packet::to_packets_chunked; use solana::packet::to_packets_chunked;
use solana::poh_service::PohServiceConfig; use solana::poh_recorder::PohRecorder;
use solana::poh_service::{PohService, PohServiceConfig};
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::hash; use solana_sdk::hash::hash;
@ -16,8 +17,9 @@ use solana_sdk::signature::{KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::MAX_ENTRY_IDS; use solana_sdk::timing::MAX_ENTRY_IDS;
use std::iter; use std::iter;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver}; use std::sync::mpsc::{channel, Receiver};
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use test::Bencher; use test::Bencher;
@ -39,6 +41,20 @@ fn check_txs(receiver: &Receiver<Vec<(Entry, u64)>>, ref_tx_count: usize) {
assert_eq!(total, ref_tx_count); assert_eq!(total, ref_tx_count);
} }
fn create_test_recorder(bank: &Arc<Bank>) -> (Arc<Mutex<PohRecorder>>, PohService) {
let exit = Arc::new(AtomicBool::new(false));
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
bank.tick_height(),
bank.last_id(),
)));
let poh_service = PohService::new(
poh_recorder.clone(),
&PohServiceConfig::default(),
exit.clone(),
);
(poh_recorder, poh_service)
}
#[bench] #[bench]
#[ignore] #[ignore]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
@ -101,11 +117,11 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) })
.collect(); .collect();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (_stage, signal_receiver) = BankingStage::new( let (_stage, signal_receiver) = BankingStage::new(
&bank, &bank,
&poh_recorder,
verified_receiver, verified_receiver,
PohServiceConfig::default(),
&genesis_block.last_id(),
std::u64::MAX, std::u64::MAX,
genesis_block.bootstrap_leader_id, genesis_block.bootstrap_leader_id,
); );
@ -129,6 +145,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
start += half_len; start += half_len;
start %= verified.len(); start %= verified.len();
}); });
poh_service.close().unwrap();
} }
#[bench] #[bench]
@ -209,11 +226,11 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
(x, iter::repeat(1).take(len).collect()) (x, iter::repeat(1).take(len).collect())
}) })
.collect(); .collect();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (_stage, signal_receiver) = BankingStage::new( let (_stage, signal_receiver) = BankingStage::new(
&bank, &bank,
&poh_recorder,
verified_receiver, verified_receiver,
PohServiceConfig::default(),
&genesis_block.last_id(),
std::u64::MAX, std::u64::MAX,
genesis_block.bootstrap_leader_id, genesis_block.bootstrap_leader_id,
); );
@ -237,4 +254,5 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
start += half_len; start += half_len;
start %= verified.len(); start %= verified.len();
}); });
poh_service.close().unwrap();
} }

View File

@ -6,7 +6,7 @@ pub const NUM_TICKS_PER_SECOND: usize = 10;
// At 10 ticks/s, 8 ticks per slot implies that leader rotation and voting will happen // At 10 ticks/s, 8 ticks per slot implies that leader rotation and voting will happen
// every 800 ms. A fast voting cadence ensures faster finality and convergence // every 800 ms. A fast voting cadence ensures faster finality and convergence
pub const DEFAULT_TICKS_PER_SLOT: u64 = 8; pub const DEFAULT_TICKS_PER_SLOT: u64 = 80;
pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 64; pub const DEFAULT_SLOTS_PER_EPOCH: u64 = 64;
/// The number of most recent `last_id` values that the bank will track the signatures /// The number of most recent `last_id` values that the bank will track the signatures

View File

@ -7,14 +7,12 @@ use crate::leader_confirmation_service::LeaderConfirmationService;
use crate::packet::Packets; use crate::packet::Packets;
use crate::packet::SharedPackets; use crate::packet::SharedPackets;
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBank}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBank};
use crate::poh_service::{PohService, PohServiceConfig};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::VerifiedPackets; use crate::sigverify_stage::VerifiedPackets;
use bincode::deserialize; use bincode::deserialize;
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_runtime::bank::{self, Bank, BankError}; use solana_runtime::bank::{self, Bank, BankError};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS}; use solana_sdk::timing::{self, duration_as_us, MAX_ENTRY_IDS};
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
@ -34,9 +32,8 @@ pub const NUM_THREADS: u32 = 10;
/// Stores the stage's thread handle and output receiver. /// Stores the stage's thread handle and output receiver.
pub struct BankingStage { pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>>, bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>>,
poh_service: PohService, exit: Arc<AtomicBool>,
leader_confirmation_service: LeaderConfirmationService, leader_confirmation_service: LeaderConfirmationService,
poh_exit: Arc<AtomicBool>,
} }
impl BankingStage { impl BankingStage {
@ -44,14 +41,12 @@ impl BankingStage {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new( pub fn new(
bank: &Arc<Bank>, bank: &Arc<Bank>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>, verified_receiver: Receiver<VerifiedPackets>,
config: PohServiceConfig,
last_entry_id: &Hash,
max_tick_height: u64, max_tick_height: u64,
leader_id: Pubkey, leader_id: Pubkey,
) -> (Self, Receiver<Vec<(Entry, u64)>>) { ) -> (Self, Receiver<Vec<(Entry, u64)>>) {
let (entry_sender, entry_receiver) = channel(); let (entry_sender, entry_receiver) = channel();
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
let working_bank = WorkingBank { let working_bank = WorkingBank {
bank: bank.clone(), bank: bank.clone(),
sender: entry_sender, sender: entry_sender,
@ -59,29 +54,31 @@ impl BankingStage {
max_tick_height, max_tick_height,
}; };
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new( info!(
bank.tick_height(), "new working bank {} {} {}",
*last_entry_id, working_bank.min_tick_height,
))); working_bank.max_tick_height,
poh_recorder.lock().unwrap().poh.tick_height
);
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded. // This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its last_id is registered with the bank. // Once an entry has been recorded, its last_id is registered with the bank.
let poh_exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let poh_service = PohService::new(poh_recorder.clone(), config, poh_exit.clone());
// Single thread to compute confirmation // Single thread to compute confirmation
let leader_confirmation_service = let leader_confirmation_service =
LeaderConfirmationService::new(bank.clone(), leader_id, poh_exit.clone()); LeaderConfirmationService::new(bank.clone(), leader_id, exit.clone());
// Many banks that process transactions in parallel. // Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>> = (0..Self::num_threads()) let bank_thread_hdls: Vec<JoinHandle<UnprocessedPackets>> = (0..Self::num_threads())
.map(|_| { .map(|_| {
let thread_bank = bank.clone();
let thread_verified_receiver = shared_verified_receiver.clone(); let thread_verified_receiver = shared_verified_receiver.clone();
let thread_poh_recorder = poh_recorder.clone(); let thread_poh_recorder = poh_recorder.clone();
let thread_bank = bank.clone();
Builder::new() Builder::new()
.name("solana-banking-stage-tx".to_string()) .name("solana-banking-stage-tx".to_string())
.spawn(move || { .spawn(move || {
@ -110,9 +107,8 @@ impl BankingStage {
( (
Self { Self {
bank_thread_hdls, bank_thread_hdls,
poh_service, exit,
leader_confirmation_service, leader_confirmation_service,
poh_exit,
}, },
entry_receiver, entry_receiver,
) )
@ -213,7 +209,7 @@ impl BankingStage {
/// Returns the number of transactions successfully processed by the bank, which may be less /// Returns the number of transactions successfully processed by the bank, which may be less
/// than the total number if max PoH height was reached and the bank halted /// than the total number if max PoH height was reached and the bank halted
fn process_transactions( fn process_transactions(
bank: &Arc<Bank>, bank: &Bank,
transactions: &[Transaction], transactions: &[Transaction],
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
) -> Result<(usize)> { ) -> Result<(usize)> {
@ -226,7 +222,9 @@ impl BankingStage {
&transactions[chunk_start..chunk_end], &transactions[chunk_start..chunk_end],
poh, poh,
); );
trace!("process_transcations: {:?}", result);
if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result { if let Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) = result {
info!("process transactions: max height reached");
break; break;
} }
result?; result?;
@ -237,7 +235,7 @@ impl BankingStage {
/// Process the incoming packets /// Process the incoming packets
pub fn process_packets( pub fn process_packets(
bank: &Arc<Bank>, bank: &Bank,
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>, verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
) -> Result<UnprocessedPackets> { ) -> Result<UnprocessedPackets> {
@ -340,9 +338,8 @@ impl Service for BankingStage {
for bank_thread_hdl in self.bank_thread_hdls { for bank_thread_hdl in self.bank_thread_hdls {
bank_thread_hdl.join()?; bank_thread_hdl.join()?;
} }
self.poh_exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
self.leader_confirmation_service.join()?; self.leader_confirmation_service.join()?;
self.poh_service.join()?;
Ok(()) Ok(())
} }
} }
@ -352,6 +349,7 @@ mod tests {
use super::*; use super::*;
use crate::entry::EntrySlice; use crate::entry::EntrySlice;
use crate::packet::to_packets; use crate::packet::to_packets;
use crate::poh_service::{PohService, PohServiceConfig};
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::native_program::ProgramError; use solana_sdk::native_program::ProgramError;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
@ -359,21 +357,36 @@ mod tests {
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use std::thread::sleep; use std::thread::sleep;
fn create_test_recorder(bank: &Arc<Bank>) -> (Arc<Mutex<PohRecorder>>, PohService) {
let exit = Arc::new(AtomicBool::new(false));
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
bank.tick_height(),
bank.last_id(),
)));
let poh_service = PohService::new(
poh_recorder.clone(),
&PohServiceConfig::default(),
exit.clone(),
);
(poh_recorder, poh_service)
}
#[test] #[test]
fn test_banking_stage_shutdown1() { fn test_banking_stage_shutdown1() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, _entry_receiver) = BankingStage::new( let (banking_stage, _entry_receiver) = BankingStage::new(
&bank, &bank,
&poh_recorder,
verified_receiver, verified_receiver,
PohServiceConfig::default(),
&bank.last_id(),
DEFAULT_TICKS_PER_SLOT, DEFAULT_TICKS_PER_SLOT,
genesis_block.bootstrap_leader_id, genesis_block.bootstrap_leader_id,
); );
drop(verified_sender); drop(verified_sender);
banking_stage.join().unwrap(); banking_stage.join().unwrap();
poh_service.close().unwrap();
} }
#[test] #[test]
@ -382,11 +395,11 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_id(); let start_hash = bank.last_id();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, entry_receiver) = BankingStage::new( let (banking_stage, entry_receiver) = BankingStage::new(
&bank, &bank,
&poh_recorder,
verified_receiver, verified_receiver,
PohServiceConfig::Sleep(Duration::from_millis(1)),
&bank.last_id(),
DEFAULT_TICKS_PER_SLOT, DEFAULT_TICKS_PER_SLOT,
genesis_block.bootstrap_leader_id, genesis_block.bootstrap_leader_id,
); );
@ -401,6 +414,7 @@ mod tests {
assert!(entries.verify(&start_hash)); assert!(entries.verify(&start_hash));
assert_eq!(entries[entries.len() - 1].id, bank.last_id()); assert_eq!(entries[entries.len() - 1].id, bank.last_id());
banking_stage.join().unwrap(); banking_stage.join().unwrap();
poh_service.close().unwrap();
} }
#[test] #[test]
@ -409,11 +423,11 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let start_hash = bank.last_id(); let start_hash = bank.last_id();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, entry_receiver) = BankingStage::new( let (banking_stage, entry_receiver) = BankingStage::new(
&bank, &bank,
&poh_recorder,
verified_receiver, verified_receiver,
PohServiceConfig::default(),
&bank.last_id(),
DEFAULT_TICKS_PER_SLOT, DEFAULT_TICKS_PER_SLOT,
genesis_block.bootstrap_leader_id, genesis_block.bootstrap_leader_id,
); );
@ -457,6 +471,7 @@ mod tests {
}); });
drop(entry_receiver); drop(entry_receiver);
banking_stage.join().unwrap(); banking_stage.join().unwrap();
poh_service.close().unwrap();
} }
#[test] #[test]
fn test_banking_stage_entryfication() { fn test_banking_stage_entryfication() {
@ -466,11 +481,11 @@ mod tests {
let (genesis_block, mint_keypair) = GenesisBlock::new(2); let (genesis_block, mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, entry_receiver) = BankingStage::new( let (banking_stage, entry_receiver) = BankingStage::new(
&bank, &bank,
&poh_recorder,
verified_receiver, verified_receiver,
PohServiceConfig::default(),
&bank.last_id(),
DEFAULT_TICKS_PER_SLOT, DEFAULT_TICKS_PER_SLOT,
genesis_block.bootstrap_leader_id, genesis_block.bootstrap_leader_id,
); );
@ -523,20 +538,22 @@ mod tests {
.for_each(|x| assert_eq!(*x, Ok(()))); .for_each(|x| assert_eq!(*x, Ok(())));
} }
assert_eq!(bank.get_balance(&alice.pubkey()), 1); assert_eq!(bank.get_balance(&alice.pubkey()), 1);
poh_service.close().unwrap();
} }
// Test that when the max_tick_height is reached, the banking stage exits // Test that when the max_tick_height is reached, the banking stage exits
#[test] #[test]
fn test_max_tick_height_shutdown() { fn test_max_tick_height_shutdown() {
solana_logger::setup();
let (genesis_block, _mint_keypair) = GenesisBlock::new(2); let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let max_tick_height = 10; let max_tick_height = 10;
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (banking_stage, _entry_receiver) = BankingStage::new( let (banking_stage, _entry_receiver) = BankingStage::new(
&bank, &bank,
&poh_recorder,
verified_receiver, verified_receiver,
PohServiceConfig::default(),
&bank.last_id(),
max_tick_height, max_tick_height,
genesis_block.bootstrap_leader_id, genesis_block.bootstrap_leader_id,
); );
@ -551,6 +568,7 @@ mod tests {
drop(verified_sender); drop(verified_sender);
banking_stage.join().unwrap(); banking_stage.join().unwrap();
poh_service.close().unwrap();
} }
#[test] #[test]
@ -560,11 +578,11 @@ mod tests {
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let ticks_per_slot = 1; let ticks_per_slot = 1;
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let (poh_recorder, poh_service) = create_test_recorder(&bank);
let (mut banking_stage, _entry_receiver) = BankingStage::new( let (mut banking_stage, _entry_receiver) = BankingStage::new(
&bank, &bank,
&poh_recorder,
verified_receiver, verified_receiver,
PohServiceConfig::default(),
&bank.last_id(),
ticks_per_slot, ticks_per_slot,
genesis_block.bootstrap_leader_id, genesis_block.bootstrap_leader_id,
); );
@ -599,6 +617,7 @@ mod tests {
let (packets, start_index) = &unprocessed_packets[0]; let (packets, start_index) = &unprocessed_packets[0];
assert_eq!(packets.read().unwrap().packets.len(), 1); // TODO: maybe compare actual packet contents too assert_eq!(packets.read().unwrap().packets.len(), 1); // TODO: maybe compare actual packet contents too
assert_eq!(*start_index, 0); assert_eq!(*start_index, 0);
poh_service.close().unwrap();
} }
#[test] #[test]
@ -617,6 +636,7 @@ mod tests {
bank.tick_height(), bank.tick_height(),
bank.last_id(), bank.last_id(),
))); )));
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let pubkey = Keypair::new().pubkey(); let pubkey = Keypair::new().pubkey();
let transactions = vec![ let transactions = vec![
@ -625,7 +645,6 @@ mod tests {
]; ];
let mut results = vec![Ok(()), Ok(())]; let mut results = vec![Ok(()), Ok(())];
poh_recorder.lock().unwrap().set_working_bank(working_bank);
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
let entries = entry_receiver.recv().unwrap(); let entries = entry_receiver.recv().unwrap();
assert_eq!(entries[0].0.transactions.len(), transactions.len()); assert_eq!(entries[0].0.transactions.len(), transactions.len());
@ -648,6 +667,7 @@ mod tests {
#[test] #[test]
fn test_bank_process_and_record_transactions() { fn test_bank_process_and_record_transactions() {
solana_logger::setup();
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank = Arc::new(Bank::new(&genesis_block)); let bank = Arc::new(Bank::new(&genesis_block));
let pubkey = Keypair::new().pubkey(); let pubkey = Keypair::new().pubkey();
@ -678,18 +698,21 @@ mod tests {
let mut need_tick = true; let mut need_tick = true;
// read entries until I find mine, might be ticks... // read entries until I find mine, might be ticks...
while need_tick { while let Ok(entries) = entry_receiver.recv() {
let entries = entry_receiver.recv().unwrap();
for (entry, _) in entries { for (entry, _) in entries {
if !entry.is_tick() { if !entry.is_tick() {
trace!("got entry");
assert_eq!(entry.transactions.len(), transactions.len()); assert_eq!(entry.transactions.len(), transactions.len());
assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(bank.get_balance(&pubkey), 1);
} else {
need_tick = false; need_tick = false;
} else {
break;
} }
} }
} }
assert_eq!(need_tick, false);
let transactions = vec![SystemTransaction::new_move( let transactions = vec![SystemTransaction::new_move(
&mint_keypair, &mint_keypair,
pubkey, pubkey,

View File

@ -6,7 +6,8 @@ use crate::blocktree_processor::{self, BankForksInfo};
use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
use crate::gossip_service::GossipService; use crate::gossip_service::GossipService;
use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig}; use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig};
use crate::poh_service::PohServiceConfig; use crate::poh_recorder::PohRecorder;
use crate::poh_service::{PohService, PohServiceConfig};
use crate::rpc_pubsub_service::PubSubService; use crate::rpc_pubsub_service::PubSubService;
use crate::rpc_service::JsonRpcService; use crate::rpc_service::JsonRpcService;
use crate::rpc_subscriptions::RpcSubscriptions; use crate::rpc_subscriptions::RpcSubscriptions;
@ -23,7 +24,7 @@ use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, Result}; use std::thread::{spawn, Result};
use std::time::Duration; use std::time::Duration;
@ -102,6 +103,8 @@ pub struct Fullnode {
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>, leader_scheduler: Arc<RwLock<LeaderScheduler>>,
poh_service: PohService,
poh_recorder: Arc<Mutex<PohRecorder>>,
} }
impl Fullnode { impl Fullnode {
@ -124,9 +127,25 @@ impl Fullnode {
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new( let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(
&config.leader_scheduler_config, &config.leader_scheduler_config,
))); )));
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = let (mut bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
new_banks_from_blocktree(ledger_path, config.ticks_per_slot(), &leader_scheduler); new_banks_from_blocktree(ledger_path, config.ticks_per_slot(), &leader_scheduler);
let exit = Arc::new(AtomicBool::new(false));
let bank_info = &bank_forks_info[0];
bank_forks.set_working_bank_id(bank_info.bank_id);
let bank = bank_forks.working_bank();
info!(
"starting PoH... {} {}",
bank.tick_height(),
bank_info.last_entry_id
);
let poh_recorder = Arc::new(Mutex::new(PohRecorder::new(
bank.tick_height(),
bank_info.last_entry_id,
)));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, exit.clone());
info!("node info: {:?}", node.info); info!("node info: {:?}", node.info);
info!("node entrypoint_info: {:?}", entrypoint_info_option); info!("node entrypoint_info: {:?}", entrypoint_info_option);
info!( info!(
@ -134,7 +153,6 @@ impl Fullnode {
node.sockets.gossip.local_addr().unwrap() node.sockets.gossip.local_addr().unwrap()
); );
let exit = Arc::new(AtomicBool::new(false));
let blocktree = Arc::new(blocktree); let blocktree = Arc::new(blocktree);
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
@ -253,6 +271,8 @@ impl Fullnode {
blocktree, blocktree,
bank_forks, bank_forks,
leader_scheduler, leader_scheduler,
poh_service,
poh_recorder,
} }
} }
@ -293,7 +313,7 @@ impl Fullnode {
}; };
self.node_services.tpu.switch_to_leader( self.node_services.tpu.switch_to_leader(
self.bank_forks.read().unwrap().working_bank(), self.bank_forks.read().unwrap().working_bank(),
PohServiceConfig::default(), &self.poh_recorder,
self.tpu_sockets self.tpu_sockets
.iter() .iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets")) .map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
@ -303,7 +323,6 @@ impl Fullnode {
.expect("Failed to clone broadcast socket"), .expect("Failed to clone broadcast socket"),
self.sigverify_disabled, self.sigverify_disabled,
rotation_info.slot, rotation_info.slot,
rotation_info.last_entry_id,
&self.blocktree, &self.blocktree,
); );
transition transition
@ -339,6 +358,13 @@ impl Fullnode {
match self.rotation_receiver.recv_timeout(timeout) { match self.rotation_receiver.recv_timeout(timeout) {
Ok(rotation_info) => { Ok(rotation_info) => {
trace!("{:?}: rotate at slot={}", self.id, rotation_info.slot);
//TODO: this will be called by the TVU every time it votes
//instead of here
self.poh_recorder.lock().unwrap().reset(
rotation_info.bank.tick_height(),
rotation_info.last_entry_id,
);
let slot = rotation_info.slot; let slot = rotation_info.slot;
let transition = self.rotate(rotation_info); let transition = self.rotate(rotation_info);
debug!("role transition complete: {:?}", transition); debug!("role transition complete: {:?}", transition);
@ -360,16 +386,25 @@ impl Fullnode {
// Used for notifying many nodes in parallel to exit // Used for notifying many nodes in parallel to exit
fn exit(&self) { fn exit(&self) {
self.exit.store(true, Ordering::Relaxed); self.exit.store(true, Ordering::Relaxed);
// Need to force the poh_recorder to drop the WorkingBank,
// which contains the channel to BroadcastStage. This should be
// sufficient as long as no other rotations are happening that
// can cause the Tpu to restart a BankingStage and reset a
// WorkingBank in poh_recorder. It follows no other rotations can be
// in motion because exit()/close() are only called by the run() loop
// which is the sole initiator of rotations.
self.poh_recorder.lock().unwrap().clear_bank();
if let Some(ref rpc_service) = self.rpc_service { if let Some(ref rpc_service) = self.rpc_service {
rpc_service.exit(); rpc_service.exit();
} }
if let Some(ref rpc_pubsub_service) = self.rpc_pubsub_service { if let Some(ref rpc_pubsub_service) = self.rpc_pubsub_service {
rpc_pubsub_service.exit(); rpc_pubsub_service.exit();
} }
self.node_services.exit() self.node_services.exit();
self.poh_service.exit()
} }
pub fn close(self) -> Result<()> { fn close(self) -> Result<()> {
self.exit(); self.exit();
self.join() self.join()
} }
@ -411,6 +446,9 @@ impl Service for Fullnode {
self.gossip_service.join()?; self.gossip_service.join()?;
self.node_services.join()?; self.node_services.join()?;
trace!("exit node_services!");
self.poh_service.join()?;
trace!("exit poh!");
Ok(()) Ok(())
} }
} }

View File

@ -35,7 +35,7 @@ pub struct WorkingBank {
} }
pub struct PohRecorder { pub struct PohRecorder {
poh: Poh, pub poh: Poh,
tick_cache: Vec<(Entry, u64)>, tick_cache: Vec<(Entry, u64)>,
working_bank: Option<WorkingBank>, working_bank: Option<WorkingBank>,
} }

View File

@ -5,11 +5,12 @@ use crate::poh_recorder::PohRecorder;
use crate::service::Service; use crate::service::Service;
use solana_sdk::timing::NUM_TICKS_PER_SECOND; use solana_sdk::timing::NUM_TICKS_PER_SECOND;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::{self, sleep, Builder, JoinHandle}; use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
#[derive(Copy, Clone)] #[derive(Clone)]
pub enum PohServiceConfig { pub enum PohServiceConfig {
/// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before /// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before
/// transmitting a new entry. /// transmitting a new entry.
@ -17,6 +18,8 @@ pub enum PohServiceConfig {
/// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1 /// * `Sleep`- Low power mode. Sleep is a rough estimate of how long to sleep before rolling 1
/// PoH once and producing 1 tick. /// PoH once and producing 1 tick.
Sleep(Duration), Sleep(Duration),
/// each node in simulation will be blocked until the receiver reads their step
Step(SyncSender<()>),
} }
impl Default for PohServiceConfig { impl Default for PohServiceConfig {
@ -43,7 +46,7 @@ impl PohService {
pub fn new( pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
config: PohServiceConfig, config: &PohServiceConfig,
poh_exit: Arc<AtomicBool>, poh_exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
// PohService is a headless producer, so when it exits it should notify the banking stage. // PohService is a headless producer, so when it exits it should notify the banking stage.
@ -51,11 +54,12 @@ impl PohService {
// signal. // signal.
let poh_exit_ = poh_exit.clone(); let poh_exit_ = poh_exit.clone();
// Single thread to generate ticks // Single thread to generate ticks
let config = config.clone();
let tick_producer = Builder::new() let tick_producer = Builder::new()
.name("solana-poh-service-tick_producer".to_string()) .name("solana-poh-service-tick_producer".to_string())
.spawn(move || { .spawn(move || {
let poh_recorder = poh_recorder; let poh_recorder = poh_recorder;
Self::tick_producer(&poh_recorder, config, &poh_exit_); Self::tick_producer(&poh_recorder, &config, &poh_exit_);
poh_exit_.store(true, Ordering::Relaxed); poh_exit_.store(true, Ordering::Relaxed);
}) })
.unwrap(); .unwrap();
@ -68,18 +72,24 @@ impl PohService {
fn tick_producer( fn tick_producer(
poh: &Arc<Mutex<PohRecorder>>, poh: &Arc<Mutex<PohRecorder>>,
config: PohServiceConfig, config: &PohServiceConfig,
poh_exit: &AtomicBool, poh_exit: &AtomicBool,
) { ) {
loop { loop {
match config { match config {
PohServiceConfig::Tick(num) => { PohServiceConfig::Tick(num) => {
for _ in 1..num { for _ in 1..*num {
poh.lock().unwrap().hash(); poh.lock().unwrap().hash();
} }
} }
PohServiceConfig::Sleep(duration) => { PohServiceConfig::Sleep(duration) => {
sleep(duration); sleep(*duration);
}
PohServiceConfig::Step(sender) => {
let r = sender.send(());
if r.is_err() {
break;
}
} }
} }
poh.lock().unwrap().tick(); poh.lock().unwrap().tick();
@ -149,7 +159,7 @@ mod tests {
const HASHES_PER_TICK: u64 = 2; const HASHES_PER_TICK: u64 = 2;
let poh_service = PohService::new( let poh_service = PohService::new(
poh_recorder.clone(), poh_recorder.clone(),
PohServiceConfig::Tick(HASHES_PER_TICK as usize), &PohServiceConfig::Tick(HASHES_PER_TICK as usize),
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),
); );
poh_recorder.lock().unwrap().set_working_bank(working_bank); poh_recorder.lock().unwrap().set_working_bank(working_bank);
@ -207,7 +217,7 @@ mod tests {
let poh_service = PohService::new( let poh_service = PohService::new(
poh_recorder.clone(), poh_recorder.clone(),
PohServiceConfig::default(), &PohServiceConfig::default(),
Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(false)),
); );

View File

@ -256,13 +256,18 @@ impl ThinClient {
/// Request a new last Entry ID from the server. This method blocks /// Request a new last Entry ID from the server. This method blocks
/// until the server sends a response. /// until the server sends a response.
pub fn get_next_last_id(&mut self, previous_last_id: &Hash) -> Hash { pub fn get_next_last_id(&mut self, previous_last_id: &Hash) -> Hash {
self.get_next_last_id_ext(previous_last_id, &|| {
sleep(Duration::from_millis(100));
})
}
pub fn get_next_last_id_ext(&mut self, previous_last_id: &Hash, func: &Fn()) -> Hash {
loop { loop {
let last_id = self.get_last_id(); let last_id = self.get_last_id();
if last_id != *previous_last_id { if last_id != *previous_last_id {
break last_id; break last_id;
} }
debug!("Got same last_id ({:?}), will retry...", last_id); debug!("Got same last_id ({:?}), will retry...", last_id);
sleep(Duration::from_millis(100)); func()
} }
} }

View File

@ -7,17 +7,16 @@ use crate::broadcast_service::BroadcastService;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::fetch_stage::FetchStage; use crate::fetch_stage::FetchStage;
use crate::poh_service::PohServiceConfig; use crate::poh_recorder::PohRecorder;
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage; use crate::sigverify_stage::SigVerifyStage;
use crate::tpu_forwarder::TpuForwarder; use crate::tpu_forwarder::TpuForwarder;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread; use std::thread;
pub enum TpuMode { pub enum TpuMode {
@ -191,12 +190,11 @@ impl Tpu {
pub fn switch_to_leader( pub fn switch_to_leader(
&mut self, &mut self,
bank: Arc<Bank>, bank: Arc<Bank>,
tick_duration: PohServiceConfig, poh_recorder: &Arc<Mutex<PohRecorder>>,
transactions_sockets: Vec<UdpSocket>, transactions_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
sigverify_disabled: bool, sigverify_disabled: bool,
slot: u64, slot: u64,
last_entry_id: Hash,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
) { ) {
self.close_and_forward_unprocessed_packets(); self.close_and_forward_unprocessed_packets();
@ -230,9 +228,8 @@ impl Tpu {
let (banking_stage, entry_receiver) = BankingStage::new( let (banking_stage, entry_receiver) = BankingStage::new(
&bank, &bank,
poh_recorder,
verified_receiver, verified_receiver,
tick_duration,
&last_entry_id,
max_tick_height, max_tick_height,
self.id, self.id,
); );

View File

@ -7,6 +7,7 @@ use solana::entry::{reconstruct_entries_from_blobs, Entry};
use solana::fullnode::{new_banks_from_blocktree, Fullnode, FullnodeConfig, FullnodeReturnType}; use solana::fullnode::{new_banks_from_blocktree, Fullnode, FullnodeConfig, FullnodeReturnType};
use solana::gossip_service::{converge, make_listening_node}; use solana::gossip_service::{converge, make_listening_node};
use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use solana::poh_service::PohServiceConfig;
use solana::result; use solana::result;
use solana::service::Service; use solana::service::Service;
use solana::thin_client::{poll_gossip_for_leader, retry_get_balance}; use solana::thin_client::{poll_gossip_for_leader, retry_get_balance};
@ -20,7 +21,7 @@ use std::collections::{HashSet, VecDeque};
use std::env; use std::env;
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, TryRecvError}; use std::sync::mpsc::{channel, sync_channel, TryRecvError};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder}; use std::thread::{sleep, Builder};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -1672,8 +1673,10 @@ fn test_fullnode_rotate(
// Create fullnode config, and set leader scheduler policies // Create fullnode config, and set leader scheduler policies
let mut fullnode_config = FullnodeConfig::default(); let mut fullnode_config = FullnodeConfig::default();
let (tick_step_sender, tick_step_receiver) = sync_channel(1);
fullnode_config.leader_scheduler_config.ticks_per_slot = ticks_per_slot; fullnode_config.leader_scheduler_config.ticks_per_slot = ticks_per_slot;
fullnode_config.leader_scheduler_config.slots_per_epoch = slots_per_epoch; fullnode_config.leader_scheduler_config.slots_per_epoch = slots_per_epoch;
fullnode_config.tick_config = PohServiceConfig::Step(tick_step_sender);
// Note: when debugging failures in this test, disabling voting can help keep the log noise // Note: when debugging failures in this test, disabling voting can help keep the log noise
// down by removing the extra vote transactions // down by removing the extra vote transactions
@ -1869,8 +1872,10 @@ fn test_fullnode_rotate(
if transact { if transact {
let mut client = mk_client(&leader_info); let mut client = mk_client(&leader_info);
client_last_id = client.get_next_last_id_ext(&client_last_id, &|| {
client_last_id = client.get_next_last_id(&client_last_id); tick_step_receiver.recv().expect("tick step");
sleep(Duration::from_millis(100));
});
info!("Transferring 500 tokens, last_id={:?}", client_last_id); info!("Transferring 500 tokens, last_id={:?}", client_last_id);
expected_bob_balance += 500; expected_bob_balance += 500;
@ -1878,14 +1883,24 @@ fn test_fullnode_rotate(
.transfer(500, &mint_keypair, bob, &client_last_id) .transfer(500, &mint_keypair, bob, &client_last_id)
.unwrap(); .unwrap();
debug!("transfer send, signature is {:?}", signature); debug!("transfer send, signature is {:?}", signature);
client.poll_for_signature(&signature).unwrap(); for _ in 0..30 {
if client.poll_for_signature(&signature).is_err() {
tick_step_receiver.recv().expect("tick step");
info!("poll for signature tick step received");
} else {
break;
}
}
debug!("transfer signature confirmed"); debug!("transfer signature confirmed");
let actual_bob_balance = let actual_bob_balance =
retry_get_balance(&mut client, &bob, Some(expected_bob_balance)).unwrap(); retry_get_balance(&mut client, &bob, Some(expected_bob_balance)).unwrap();
assert_eq!(actual_bob_balance, expected_bob_balance); assert_eq!(actual_bob_balance, expected_bob_balance);
debug!("account balance confirmed: {}", actual_bob_balance); debug!("account balance confirmed: {}", actual_bob_balance);
client_last_id = client.get_next_last_id(&client_last_id); client_last_id = client.get_next_last_id_ext(&client_last_id, &|| {
tick_step_receiver.recv().expect("tick step");
sleep(Duration::from_millis(100));
});
} else { } else {
if include_validator { if include_validator {
trace!("waiting for leader and validator to reach max tick height..."); trace!("waiting for leader and validator to reach max tick height...");
@ -1893,6 +1908,8 @@ fn test_fullnode_rotate(
trace!("waiting for leader to reach max tick height..."); trace!("waiting for leader to reach max tick height...");
} }
} }
tick_step_receiver.recv().expect("tick step");
info!("tick step received");
} }
if transact { if transact {
@ -1901,6 +1918,7 @@ fn test_fullnode_rotate(
} }
info!("Shutting down"); info!("Shutting down");
drop(tick_step_receiver);
for node_exit in node_exits { for node_exit in node_exits {
node_exit(); node_exit();
} }