diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 69a82c6c87..84a3b32eef 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -66,6 +66,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let (exit, poh_recorder, poh_service, _signal_receiver) = create_test_recorder(&bank, &blockstore, None); + let recorder = poh_recorder.lock().unwrap().recorder(); + let tx = test_tx(); let len = 4096; let chunk_size = 1024; @@ -88,6 +90,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &s, None::>, None, + &recorder, ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 28eaf33790..07665c55eb 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -4,7 +4,7 @@ use crate::{ cluster_info::ClusterInfo, packet_hasher::PacketHasher, - poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntry}, + poh_recorder::{PohRecorder, PohRecorderError, TransactionRecorder, WorkingBankEntry}, poh_service::{self, PohService}, }; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; @@ -295,6 +295,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, test_fn: Option, banking_stage_stats: Option<&BankingStageStats>, + recorder: &TransactionRecorder, ) { let mut rebuffered_packets_len = 0; let mut new_tx_count = 0; @@ -323,7 +324,7 @@ impl BankingStage { Self::process_packets_transactions( &bank, &bank_creation_time, - &poh_recorder, + &recorder, &msgs, original_unprocessed_indexes.to_owned(), transaction_status_sender.clone(), @@ -428,6 +429,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, + recorder: &TransactionRecorder, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -467,6 +469,7 @@ impl BankingStage { gossip_vote_sender, None::>, Some(banking_stage_stats), + recorder, ); } BufferedPacketsDecision::Forward => { @@ -544,6 +547,7 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, ) { + let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = VecDeque::with_capacity(batch_limit); let banking_stage_stats = BankingStageStats::new(id); @@ -552,13 +556,14 @@ impl BankingStage { let decision = Self::process_buffered_packets( &my_pubkey, &socket, - poh_recorder, + &poh_recorder, cluster_info, &mut buffered_packets, enable_forwarding, transaction_status_sender.clone(), &gossip_vote_sender, &banking_stage_stats, + &recorder, ); if matches!(decision, BufferedPacketsDecision::Hold) || matches!(decision, BufferedPacketsDecision::ForwardAndHold) @@ -592,6 +597,7 @@ impl BankingStage { &mut buffered_packets, &banking_stage_stats, duplicates, + &recorder, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -625,7 +631,7 @@ impl BankingStage { bank_slot: Slot, txs: &[Transaction], results: &[TransactionExecutionResult], - poh: &Arc>, + recorder: &TransactionRecorder, ) -> (Result, Vec) { let mut processed_generation = Measure::start("record::process_generation"); let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = results @@ -655,10 +661,7 @@ impl BankingStage { let mut poh_record = Measure::start("record::poh_record"); // record and unlock will unlock all the successful transactions - let res = poh - .lock() - .unwrap() - .record(bank_slot, hash, processed_transactions); + let res = recorder.record(bank_slot, hash, processed_transactions); match res { Ok(()) => (), Err(PohRecorderError::MaxHeightReached) => { @@ -683,7 +686,7 @@ impl BankingStage { fn process_and_record_transactions_locked( bank: &Arc, - poh: &Arc>, + poh: &TransactionRecorder, batch: &TransactionBatch, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, @@ -802,7 +805,7 @@ impl BankingStage { pub fn process_and_record_transactions( bank: &Arc, txs: &[Transaction], - poh: &Arc>, + poh: &TransactionRecorder, chunk_offset: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, @@ -846,7 +849,7 @@ impl BankingStage { bank: &Arc, bank_creation_time: &Instant, transactions: &[Transaction], - poh: &Arc>, + poh: &TransactionRecorder, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, ) -> (usize, Vec) { @@ -1017,7 +1020,7 @@ impl BankingStage { fn process_packets_transactions( bank: &Arc, bank_creation_time: &Instant, - poh: &Arc>, + poh: &TransactionRecorder, msgs: &Packets, packet_indexes: Vec, transaction_status_sender: Option, @@ -1131,6 +1134,7 @@ impl BankingStage { buffered_packets: &mut UnprocessedPackets, banking_stage_stats: &BankingStageStats, duplicates: &Arc, PacketHasher)>>, + recorder: &TransactionRecorder, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; @@ -1173,7 +1177,7 @@ impl BankingStage { Self::process_packets_transactions( &bank, &bank_creation_time, - &poh, + recorder, &msgs, packet_indexes, transaction_status_sender.clone(), @@ -1309,7 +1313,7 @@ pub fn create_test_recorder( ) { let exit = Arc::new(AtomicBool::new(false)); let poh_config = Arc::new(poh_config.unwrap_or_default()); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( + let (mut poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -1330,6 +1334,7 @@ pub fn create_test_recorder( bank.ticks_per_slot(), poh_service::DEFAULT_PINNED_CPU_CORE, poh_service::DEFAULT_HASHES_PER_BATCH, + record_receiver, ); (exit, poh_recorder, poh_service, entry_receiver) @@ -1339,7 +1344,7 @@ pub fn create_test_recorder( mod tests { use super::*; use crate::{ - cluster_info::Node, poh_recorder::WorkingBank, + cluster_info::Node, poh_recorder::Record, poh_recorder::WorkingBank, transaction_status_service::TransactionStatusService, }; use crossbeam_channel::unbounded; @@ -1359,7 +1364,15 @@ mod tests { transaction::TransactionError, }; use solana_transaction_status::TransactionWithStatusMeta; - use std::{net::SocketAddr, path::Path, sync::atomic::Ordering, thread::sleep}; + use std::{ + net::SocketAddr, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::Receiver, + }, + thread::sleep, + }; #[test] fn test_banking_stage_shutdown1() { @@ -1684,6 +1697,8 @@ mod tests { #[test] fn test_bank_record_transactions() { + solana_logger::setup(); + let GenesisConfigInfo { genesis_config, mint_keypair, @@ -1701,7 +1716,8 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (poh_recorder, entry_receiver) = PohRecorder::new( + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( + // TODO use record_receiver bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -1712,8 +1728,11 @@ mod tests { &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), &Arc::new(PohConfig::default()), ); + let recorder = poh_recorder.recorder(); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder); + poh_recorder.lock().unwrap().set_working_bank(working_bank); let pubkey = solana_sdk::pubkey::new_rand(); let keypair2 = Keypair::new(); @@ -1725,12 +1744,8 @@ mod tests { ]; let mut results = vec![(Ok(()), None), (Ok(()), None)]; - let _ = BankingStage::record_transactions( - bank.slot(), - &transactions, - &results, - &poh_recorder, - ); + let _ = + BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); assert_eq!(entry.transactions.len(), transactions.len()); @@ -1742,12 +1757,8 @@ mod tests { )), None, ); - let (res, retryable) = BankingStage::record_transactions( - bank.slot(), - &transactions, - &results, - &poh_recorder, - ); + let (res, retryable) = + BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder); res.unwrap(); assert!(retryable.is_empty()); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); @@ -1755,12 +1766,8 @@ mod tests { // Other TransactionErrors should not be recorded results[0] = (Err(TransactionError::AccountNotFound), None); - let (res, retryable) = BankingStage::record_transactions( - bank.slot(), - &transactions, - &results, - &poh_recorder, - ); + let (res, retryable) = + BankingStage::record_transactions(bank.slot(), &transactions, &results, &recorder); res.unwrap(); assert!(retryable.is_empty()); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); @@ -1773,7 +1780,7 @@ mod tests { bank.slot() + 1, &transactions, &results, - &poh_recorder, + &recorder, ); assert_matches!(res, Err(PohRecorderError::MaxHeightReached)); // The first result was an error so it's filtered out. The second result was Ok(), @@ -1781,6 +1788,9 @@ mod tests { assert_eq!(retryable, vec![1]); // Should receive nothing from PohRecorder b/c record failed assert!(entry_receiver.try_recv().is_err()); + + exit.store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); } Blockstore::destroy(&ledger_path).unwrap(); } @@ -2052,7 +2062,7 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (poh_recorder, entry_receiver) = PohRecorder::new( + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -2063,15 +2073,18 @@ mod tests { &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), &Arc::new(PohConfig::default()), ); + let recorder = poh_recorder.recorder(); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder); + poh_recorder.lock().unwrap().set_working_bank(working_bank); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); BankingStage::process_and_record_transactions( &bank, &transactions, - &poh_recorder, + &recorder, 0, None, &gossip_vote_sender, @@ -2108,7 +2121,7 @@ mod tests { BankingStage::process_and_record_transactions( &bank, &transactions, - &poh_recorder, + &recorder, 0, None, &gossip_vote_sender, @@ -2117,11 +2130,36 @@ mod tests { Err(PohRecorderError::MaxHeightReached) ); + exit.store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); + assert_eq!(bank.get_balance(&pubkey), 1); } Blockstore::destroy(&ledger_path).unwrap(); } + fn simulate_poh( + record_receiver: Receiver, + poh_recorder: &Arc>, + ) -> (JoinHandle<()>, Arc) { + let exit = Arc::new(AtomicBool::new(false)); + let exit_ = exit.clone(); + let poh_recorder = poh_recorder.clone(); + let tick_producer = Builder::new() + .name("solana-simulate_poh".to_string()) + .spawn(move || loop { + PohService::read_record_receiver_and_process( + &poh_recorder, + &record_receiver, + Duration::from_millis(10), + ); + if exit_.load(Ordering::Relaxed) { + break; + } + }); + (tick_producer.unwrap(), exit) + } + #[test] fn test_bank_process_and_record_transactions_account_in_use() { solana_logger::setup(); @@ -2150,7 +2188,7 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (poh_recorder, _entry_receiver) = PohRecorder::new( + let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -2161,21 +2199,27 @@ mod tests { &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), &Arc::new(PohConfig::default()), ); + let recorder = poh_recorder.recorder(); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); poh_recorder.lock().unwrap().set_working_bank(working_bank); + let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let (result, unprocessed) = BankingStage::process_and_record_transactions( &bank, &transactions, - &poh_recorder, + &recorder, 0, None, &gossip_vote_sender, ); + exit.store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); + assert!(result.is_ok()); assert_eq!(unprocessed.len(), 1); } @@ -2245,7 +2289,7 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (poh_recorder, _entry_receiver) = PohRecorder::new( + let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -2257,9 +2301,12 @@ mod tests { &Arc::new(PohConfig::default()), ); - // Poh Recorder has not working bank, so should throw MaxHeightReached error on + // Poh Recorder has no working bank, so should throw MaxHeightReached error on // record - let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let recorder = poh_recorder.recorder(); + + let (poh_simulator, exit) = + simulate_poh(record_receiver, &Arc::new(Mutex::new(poh_recorder))); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -2268,7 +2315,7 @@ mod tests { &bank, &Instant::now(), &transactions, - &poh_recorder, + &recorder, None, &gossip_vote_sender, ); @@ -2278,6 +2325,9 @@ mod tests { retryable_txs.sort_unstable(); let expected: Vec = (0..transactions.len()).collect(); assert_eq!(retryable_txs, expected); + + exit.store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); } Blockstore::destroy(&ledger_path).unwrap(); @@ -2324,7 +2374,7 @@ mod tests { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); let blockstore = Arc::new(blockstore); - let (poh_recorder, _entry_receiver) = PohRecorder::new( + let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -2335,8 +2385,11 @@ mod tests { &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), &Arc::new(PohConfig::default()), ); + let recorder = poh_recorder.recorder(); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder); + poh_recorder.lock().unwrap().set_working_bank(working_bank); let shreds = entries_to_test_shreds(entries, bank.slot(), 0, true, 0); @@ -2355,7 +2408,7 @@ mod tests { let _ = BankingStage::process_and_record_transactions( &bank, &transactions, - &poh_recorder, + &recorder, 0, Some(TransactionStatusSender { sender: transaction_status_sender, @@ -2388,10 +2441,14 @@ mod tests { assert_eq!(meta, None); } } + + exit.store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); } Blockstore::destroy(&ledger_path).unwrap(); } + #[allow(clippy::type_complexity)] fn setup_conflicting_transactions( ledger_path: &Path, ) -> ( @@ -2399,6 +2456,8 @@ mod tests { Arc, Arc>, Receiver, + JoinHandle<()>, + Arc, ) { Blockstore::destroy(&ledger_path).unwrap(); let genesis_config_info = create_genesis_config(10_000); @@ -2410,7 +2469,7 @@ mod tests { let blockstore = Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"); let bank = Arc::new(Bank::new(&genesis_config)); - let (poh_recorder, entry_receiver) = PohRecorder::new( + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), bank.last_blockhash(), bank.slot(), @@ -2432,15 +2491,25 @@ mod tests { system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()), system_transaction::transfer(&mint_keypair, &pubkey2, 1, genesis_config.hash()), ]; - (transactions, bank, poh_recorder, entry_receiver) + let (poh_simulator, exit) = simulate_poh(record_receiver, &poh_recorder); + + ( + transactions, + bank, + poh_recorder, + entry_receiver, + poh_simulator, + exit, + ) } #[test] fn test_consume_buffered_packets() { let ledger_path = get_tmp_ledger_path!(); { - let (transactions, bank, poh_recorder, _entry_receiver) = + let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator, exit) = setup_conflicting_transactions(&ledger_path); + let recorder = poh_recorder.lock().unwrap().recorder(); let num_conflicting_transactions = transactions.len(); let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions); assert_eq!(packets_vec.len(), 1); @@ -2468,6 +2537,7 @@ mod tests { &gossip_vote_sender, None::>, None, + &recorder, ); assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); // When the poh recorder has a bank, should process all non conflicting buffered packets. @@ -2483,6 +2553,7 @@ mod tests { &gossip_vote_sender, None::>, None, + &recorder, ); if num_expected_unprocessed == 0 { assert!(buffered_packets.is_empty()) @@ -2490,6 +2561,8 @@ mod tests { assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed); } } + exit.store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); } Blockstore::destroy(&ledger_path).unwrap(); } @@ -2498,7 +2571,7 @@ mod tests { fn test_consume_buffered_packets_interrupted() { let ledger_path = get_tmp_ledger_path!(); { - let (transactions, bank, poh_recorder, _entry_receiver) = + let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator, exit) = setup_conflicting_transactions(&ledger_path); let num_conflicting_transactions = transactions.len(); let packets_vec = to_packets_chunked(&transactions, 1); @@ -2526,6 +2599,7 @@ mod tests { let interrupted_iteration = 1; poh_recorder.lock().unwrap().set_bank(&bank); let poh_recorder_ = poh_recorder.clone(); + let recorder = poh_recorder_.lock().unwrap().recorder(); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); // Start up thread to process the banks let t_consume = Builder::new() @@ -2540,6 +2614,7 @@ mod tests { &gossip_vote_sender, test_fn, None, + &recorder, ); // Check everything is correct. All indexes after `interrupted_iteration` @@ -2573,6 +2648,8 @@ mod tests { } t_consume.join().unwrap(); + exit.store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); } Blockstore::destroy(&ledger_path).unwrap(); } diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 2c22e9213e..d182460d96 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -51,6 +51,80 @@ type Result = std::result::Result; pub type WorkingBankEntry = (Arc, (Entry, u64)); pub type BankStart = (Arc, Arc); +pub struct Record { + pub mixin: Hash, + pub transactions: Vec, + pub slot: Slot, + pub sender: Sender>, +} +impl Record { + pub fn new( + mixin: Hash, + transactions: Vec, + slot: Slot, + sender: Sender>, + ) -> Self { + Self { + mixin, + transactions, + slot, + sender, + } + } +} + +pub struct TransactionRecorder { + // shared by all users of PohRecorder + pub record_sender: Sender, + // unique to this caller + pub result_sender: Sender>, + pub result_receiver: Receiver>, +} + +impl Clone for TransactionRecorder { + fn clone(&self) -> Self { + TransactionRecorder::new(self.record_sender.clone()) + } +} + +impl TransactionRecorder { + pub fn new(record_sender: Sender) -> Self { + let (result_sender, result_receiver) = channel(); + Self { + // shared + record_sender, + // unique to this caller + result_sender, + result_receiver, + } + } + pub fn record( + &self, + bank_slot: Slot, + mixin: Hash, + transactions: Vec, + ) -> Result<()> { + let res = self.record_sender.send(Record::new( + mixin, + transactions, + bank_slot, + self.result_sender.clone(), + )); + if res.is_err() { + // If the channel is dropped, then the validator is shutting down so return that we are hitting + // the max tick height to stop transaction processing and flush any transactions in the pipeline. + return Err(PohRecorderError::MaxHeightReached); + } + let res = self + .result_receiver + .recv_timeout(std::time::Duration::from_millis(2000)); + match res { + Err(_err) => Err(PohRecorderError::MaxHeightReached), + Ok(result) => result, + } + } +} + #[derive(Clone)] pub struct WorkingBank { pub bank: Arc, @@ -86,6 +160,7 @@ pub struct PohRecorder { record_us: u64, ticks_from_record: u64, last_metric: Instant, + record_sender: Sender, } impl PohRecorder { @@ -162,6 +237,10 @@ impl PohRecorder { self.ticks_per_slot } + pub fn recorder(&self) -> TransactionRecorder { + TransactionRecorder::new(self.record_sender.clone()) + } + fn is_same_fork_as_previous_leader(&self, slot: Slot) -> bool { (slot.saturating_sub(NUM_CONSECUTIVE_LEADER_SLOTS)..slot).any(|slot| { // Check if the last slot Poh reset to was any of the @@ -503,12 +582,13 @@ impl PohRecorder { clear_bank_signal: Option>, leader_schedule_cache: &Arc, poh_config: &Arc, - ) -> (Self, Receiver) { + ) -> (Self, Receiver, Receiver) { let poh = Arc::new(Mutex::new(Poh::new( last_entry_hash, poh_config.hashes_per_tick, ))); let (sender, receiver) = channel(); + let (record_sender, record_receiver) = channel(); let (leader_first_tick_height, leader_last_tick_height, grace_ticks) = Self::compute_leader_slot_tick_heights(next_leader_slot, ticks_per_slot); ( @@ -539,8 +619,10 @@ impl PohRecorder { tick_overhead_us: 0, ticks_from_record: 0, last_metric: Instant::now(), + record_sender, }, receiver, + record_receiver, ) } @@ -557,7 +639,7 @@ impl PohRecorder { blockstore: &Arc, leader_schedule_cache: &Arc, poh_config: &Arc, - ) -> (Self, Receiver) { + ) -> (Self, Receiver, Receiver) { Self::new_with_clear_signal( tick_height, last_entry_hash, @@ -609,7 +691,7 @@ mod tests { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -636,7 +718,7 @@ mod tests { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -662,7 +744,7 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, Hash::default(), 0, @@ -690,7 +772,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -726,7 +808,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( + let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -777,7 +859,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( + let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -826,7 +908,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( + let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -864,7 +946,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -906,7 +988,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( + let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -952,7 +1034,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( + let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -996,7 +1078,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, entry_receiver) = PohRecorder::new( + let (mut poh_recorder, entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -1033,7 +1115,7 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, Hash::default(), 0, @@ -1060,7 +1142,7 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, Hash::default(), 0, @@ -1088,7 +1170,7 @@ mod tests { { let blockstore = Blockstore::open(&ledger_path) .expect("Expected to be able to open database ledger"); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, Hash::default(), 0, @@ -1121,7 +1203,7 @@ mod tests { .expect("Expected to be able to open database ledger"); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, Hash::default(), 0, @@ -1155,18 +1237,19 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let (sender, receiver) = sync_channel(1); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new_with_clear_signal( - 0, - Hash::default(), - 0, - None, - bank.ticks_per_slot(), - &Pubkey::default(), - &Arc::new(blockstore), - Some(sender), - &Arc::new(LeaderScheduleCache::default()), - &Arc::new(PohConfig::default()), - ); + let (mut poh_recorder, _entry_receiver, _record_receiver) = + PohRecorder::new_with_clear_signal( + 0, + Hash::default(), + 0, + None, + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blockstore), + Some(sender), + &Arc::new(LeaderScheduleCache::default()), + &Arc::new(PohConfig::default()), + ); poh_recorder.set_bank(&bank); poh_recorder.clear_bank(); assert!(receiver.try_recv().is_ok()); @@ -1189,7 +1272,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -1238,7 +1321,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -1300,7 +1383,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -1429,7 +1512,7 @@ mod tests { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); let bank = Arc::new(Bank::new(&genesis_config)); let prev_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, prev_hash, 0, @@ -1497,7 +1580,7 @@ mod tests { let bank = Arc::new(Bank::new(&genesis_config)); let genesis_hash = bank.last_blockhash(); - let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, bank.last_blockhash(), 0, diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index cb3594c5ba..a2b1aa6b96 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -1,12 +1,13 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream -use crate::poh_recorder::PohRecorder; +use crate::poh_recorder::{PohRecorder, Record}; +use solana_ledger::poh::Poh; use solana_measure::measure::Measure; use solana_sdk::poh_config::PohConfig; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc::Receiver, Arc, Mutex}; use std::thread::{self, sleep, Builder, JoinHandle}; -use std::time::Instant; +use std::time::{Duration, Instant}; pub struct PohService { tick_producer: JoinHandle<()>, @@ -24,6 +25,54 @@ pub const DEFAULT_PINNED_CPU_CORE: usize = 0; const TARGET_SLOT_ADJUSTMENT_NS: u64 = 50_000_000; +#[derive(Debug)] +struct PohTiming { + num_ticks: u64, + num_hashes: u64, + total_sleep_us: u64, + total_lock_time_ns: u64, + total_hash_time_ns: u64, + total_tick_time_ns: u64, + last_metric: Instant, +} + +impl PohTiming { + fn new() -> Self { + Self { + num_ticks: 0, + num_hashes: 0, + total_sleep_us: 0, + total_lock_time_ns: 0, + total_hash_time_ns: 0, + total_tick_time_ns: 0, + last_metric: Instant::now(), + } + } + fn report(&mut self, ticks_per_slot: u64) { + if self.last_metric.elapsed().as_millis() > 1000 { + let elapsed_us = self.last_metric.elapsed().as_micros() as u64; + let us_per_slot = (elapsed_us * ticks_per_slot) / self.num_ticks; + datapoint_info!( + "poh-service", + ("ticks", self.num_ticks as i64, i64), + ("hashes", self.num_hashes as i64, i64), + ("elapsed_us", us_per_slot, i64), + ("total_sleep_us", self.total_sleep_us, i64), + ("total_tick_time_us", self.total_tick_time_ns / 1000, i64), + ("total_lock_time_us", self.total_lock_time_ns / 1000, i64), + ("total_hash_time_us", self.total_hash_time_ns / 1000, i64), + ); + self.total_sleep_us = 0; + self.num_ticks = 0; + self.num_hashes = 0; + self.total_tick_time_ns = 0; + self.total_lock_time_ns = 0; + self.total_hash_time_ns = 0; + self.last_metric = Instant::now(); + } + } +} + impl PohService { pub fn new( poh_recorder: Arc>, @@ -32,6 +81,7 @@ impl PohService { ticks_per_slot: u64, pinned_cpu_core: usize, hashes_per_batch: u64, + record_receiver: Receiver, ) -> Self { let poh_exit_ = poh_exit.clone(); let poh_config = poh_config.clone(); @@ -41,12 +91,18 @@ impl PohService { solana_sys_tuner::request_realtime_poh(); if poh_config.hashes_per_tick.is_none() { if poh_config.target_tick_count.is_none() { - Self::sleepy_tick_producer(poh_recorder, &poh_config, &poh_exit_); + Self::sleepy_tick_producer( + poh_recorder, + &poh_config, + &poh_exit_, + record_receiver, + ); } else { Self::short_lived_sleepy_tick_producer( poh_recorder, &poh_config, &poh_exit_, + record_receiver, ); } } else { @@ -69,6 +125,7 @@ impl PohService { poh_config.target_tick_duration.as_nanos() as u64 - adjustment_per_tick, ticks_per_slot, hashes_per_batch, + record_receiver, ); } poh_exit_.store(true, Ordering::Relaxed); @@ -82,20 +139,53 @@ impl PohService { poh_recorder: Arc>, poh_config: &PohConfig, poh_exit: &AtomicBool, + record_receiver: Receiver, ) { while !poh_exit.load(Ordering::Relaxed) { + Self::read_record_receiver_and_process( + &poh_recorder, + &record_receiver, + Duration::from_millis(0), + ); sleep(poh_config.target_tick_duration); poh_recorder.lock().unwrap().tick(); } } + pub fn read_record_receiver_and_process( + poh_recorder: &Arc>, + record_receiver: &Receiver, + timeout: Duration, + ) { + let record = record_receiver.recv_timeout(timeout); + if let Ok(record) = record { + if record + .sender + .send(poh_recorder.lock().unwrap().record( + record.slot, + record.mixin, + record.transactions, + )) + .is_err() + { + panic!("Error returning mixin hash"); + } + } + } + fn short_lived_sleepy_tick_producer( poh_recorder: Arc>, poh_config: &PohConfig, poh_exit: &AtomicBool, + record_receiver: Receiver, ) { let mut warned = false; for _ in 0..poh_config.target_tick_count.unwrap() { + Self::read_record_receiver_and_process( + &poh_recorder, + &record_receiver, + Duration::from_millis(0), + ); sleep(poh_config.target_tick_duration); poh_recorder.lock().unwrap().tick(); if poh_exit.load(Ordering::Relaxed) && !warned { @@ -105,78 +195,121 @@ impl PohService { } } + fn record_or_hash( + next_record: &mut Option, + poh_recorder: &Arc>, + timing: &mut PohTiming, + record_receiver: &Receiver, + hashes_per_batch: u64, + poh: &Arc>, + ) -> bool { + match next_record.take() { + Some(mut record) => { + // received message to record + // so, record for as long as we have queued up record requests + let mut lock_time = Measure::start("lock"); + let mut poh_recorder_l = poh_recorder.lock().unwrap(); + lock_time.stop(); + timing.total_lock_time_ns += lock_time.as_ns(); + loop { + let res = poh_recorder_l.record( + record.slot, + record.mixin, + std::mem::take(&mut record.transactions), + ); + let _ = record.sender.send(res); // what do we do on failure here? Ignore for now. + timing.num_hashes += 1; // note: may have also ticked inside record + + let new_record_result = record_receiver.try_recv(); + match new_record_result { + Ok(new_record) => { + // we already have second request to record, so record again while we still have the mutex + record = new_record; + } + Err(_) => { + break; + } + } + } + // PohRecorder.record would have ticked if it needed to, so should_tick will be false + } + None => { + // did not receive instructions to record, so hash until we notice we've been asked to record (or we need to tick) and then remember what to record + let mut lock_time = Measure::start("lock"); + let mut poh_l = poh.lock().unwrap(); + lock_time.stop(); + timing.total_lock_time_ns += lock_time.as_ns(); + loop { + timing.num_hashes += hashes_per_batch; + let mut hash_time = Measure::start("hash"); + let should_tick = poh_l.hash(hashes_per_batch); + hash_time.stop(); + timing.total_hash_time_ns += hash_time.as_ns(); + if should_tick { + return true; // nothing else can be done. tick required. + } + // check to see if a record request has been sent + let get_again = record_receiver.try_recv(); + match get_again { + Ok(record) => { + // remember the record we just received as the next record to occur + *next_record = Some(record); + break; + } + Err(_) => { + continue; + } + } + } + } + }; + false // should_tick = false for all code that reaches here + } + fn tick_producer( poh_recorder: Arc>, poh_exit: &AtomicBool, target_tick_ns: u64, ticks_per_slot: u64, hashes_per_batch: u64, + record_receiver: Receiver, ) { let poh = poh_recorder.lock().unwrap().poh.clone(); let mut now = Instant::now(); - let mut last_metric = Instant::now(); - let mut num_ticks = 0; - let mut num_hashes = 0; - let mut total_sleep_us = 0; - let mut total_lock_time_ns = 0; - let mut total_hash_time_ns = 0; - let mut total_tick_time_ns = 0; + let mut timing = PohTiming::new(); + let mut next_record = None; loop { - num_hashes += hashes_per_batch; - let should_tick = { - let mut lock_time = Measure::start("lock"); - let mut poh_l = poh.lock().unwrap(); - lock_time.stop(); - total_lock_time_ns += lock_time.as_ns(); - let mut hash_time = Measure::start("hash"); - let r = poh_l.hash(hashes_per_batch); - hash_time.stop(); - total_hash_time_ns += hash_time.as_ns(); - r - }; + let should_tick = Self::record_or_hash( + &mut next_record, + &poh_recorder, + &mut timing, + &record_receiver, + hashes_per_batch, + &poh, + ); if should_tick { - // Lock PohRecorder only for the final hash... + // Lock PohRecorder only for the final hash. record_or_hash will lock PohRecorder for record calls but not for hashing. { let mut lock_time = Measure::start("lock"); let mut poh_recorder_l = poh_recorder.lock().unwrap(); lock_time.stop(); - total_lock_time_ns += lock_time.as_ns(); + timing.total_lock_time_ns += lock_time.as_ns(); let mut tick_time = Measure::start("tick"); poh_recorder_l.tick(); tick_time.stop(); - total_tick_time_ns += tick_time.as_ns(); + timing.total_tick_time_ns += tick_time.as_ns(); } - num_ticks += 1; + timing.num_ticks += 1; let elapsed_ns = now.elapsed().as_nanos() as u64; // sleep is not accurate enough to get a predictable time. // Kernel can not schedule the thread for a while. while (now.elapsed().as_nanos() as u64) < target_tick_ns { std::hint::spin_loop(); } - total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000; + timing.total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000; now = Instant::now(); - if last_metric.elapsed().as_millis() > 1000 { - let elapsed_us = last_metric.elapsed().as_micros() as u64; - let us_per_slot = (elapsed_us * ticks_per_slot) / num_ticks; - datapoint_info!( - "poh-service", - ("ticks", num_ticks as i64, i64), - ("hashes", num_hashes as i64, i64), - ("elapsed_us", us_per_slot, i64), - ("total_sleep_us", total_sleep_us, i64), - ("total_tick_time_us", total_tick_time_ns / 1000, i64), - ("total_lock_time_us", total_lock_time_ns / 1000, i64), - ("total_hash_time_us", total_hash_time_ns / 1000, i64), - ); - total_sleep_us = 0; - num_ticks = 0; - num_hashes = 0; - total_tick_time_ns = 0; - total_lock_time_ns = 0; - total_hash_time_ns = 0; - last_metric = Instant::now(); - } + timing.report(ticks_per_slot); if poh_exit.load(Ordering::Relaxed) { break; } @@ -225,7 +358,7 @@ mod tests { target_tick_duration, target_tick_count: None, }); - let (poh_recorder, entry_receiver) = PohRecorder::new( + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), prev_hash, bank.slot(), @@ -305,6 +438,7 @@ mod tests { 0, DEFAULT_PINNED_CPU_CORE, hashes_per_batch, + record_receiver, ); poh_recorder.lock().unwrap().set_working_bank(working_bank); diff --git a/core/src/send_transaction_service.rs b/core/src/send_transaction_service.rs index c4c4811271..1a50712f9f 100644 --- a/core/src/send_transaction_service.rs +++ b/core/src/send_transaction_service.rs @@ -801,7 +801,7 @@ mod test { ); let bank = Arc::new(Bank::new(&genesis_config)); - let (poh_recorder, _entry_receiver) = PohRecorder::new( + let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( 0, bank.last_blockhash(), 0, diff --git a/core/src/validator.rs b/core/src/validator.rs index 08aa18536a..cbebfd52d3 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -486,24 +486,25 @@ impl Validator { ); let poh_config = Arc::new(genesis_config.poh_config.clone()); - let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal( - bank.tick_height(), - bank.last_blockhash(), - bank.slot(), - leader_schedule_cache.next_leader_slot( - &id, + let (mut poh_recorder, entry_receiver, record_receiver) = + PohRecorder::new_with_clear_signal( + bank.tick_height(), + bank.last_blockhash(), bank.slot(), - &bank, - Some(&blockstore), - GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS, - ), - bank.ticks_per_slot(), - &id, - &blockstore, - blockstore.new_shreds_signals.first().cloned(), - &leader_schedule_cache, - &poh_config, - ); + leader_schedule_cache.next_leader_slot( + &id, + bank.slot(), + &bank, + Some(&blockstore), + GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS, + ), + bank.ticks_per_slot(), + &id, + &blockstore, + blockstore.new_shreds_signals.first().cloned(), + &leader_schedule_cache, + &poh_config, + ); if config.snapshot_config.is_some() { poh_recorder.set_bank(&bank); } @@ -644,6 +645,7 @@ impl Validator { bank.ticks_per_slot(), config.poh_pinned_cpu_core, config.poh_hashes_per_batch, + record_receiver, ); assert_eq!( blockstore.new_shreds_signals.len(),