diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index f39216a4e0..69a82c6c87 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -81,6 +81,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { bencher.iter(move || { let _ignored = BankingStage::consume_buffered_packets( &my_pubkey, + std::u128::MAX, &poh_recorder, &mut packets, None, @@ -154,6 +155,9 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let (verified_sender, verified_receiver) = unbounded(); let (vote_sender, vote_receiver) = unbounded(); + let mut bank = Bank::new(&genesis_config); + // Allow arbitrary transaction processing time for the purposes of this bench + bank.ns_per_slot = std::u128::MAX; let bank = Arc::new(Bank::new(&genesis_config)); debug!("threads: {} txs: {}", num_threads, txes); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d64881ccb7..fc3b176b62 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -159,9 +159,9 @@ pub struct BankingStage { bank_thread_hdls: Vec>, } -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Clone)] pub enum BufferedPacketsDecision { - Consume, + Consume(u128), Forward, ForwardAndHold, Hold, @@ -288,6 +288,7 @@ impl BankingStage { pub fn consume_buffered_packets( my_pubkey: &Pubkey, + max_tx_ingestion_ns: u128, poh_recorder: &Arc>, buffered_packets: &mut UnprocessedPackets, transaction_status_sender: Option, @@ -316,18 +317,24 @@ impl BankingStage { new_unprocessed_indexes, ) } else { - let bank = poh_recorder.lock().unwrap().bank(); - if let Some(bank) = bank { + let bank_start = poh_recorder.lock().unwrap().bank_start(); + if let Some((bank, bank_creation_time)) = bank_start { let (processed, verified_txs_len, new_unprocessed_indexes) = - Self::process_received_packets( + Self::process_packets_transactions( &bank, + &bank_creation_time, &poh_recorder, &msgs, original_unprocessed_indexes.to_owned(), transaction_status_sender.clone(), gossip_vote_sender, ); - if processed < verified_txs_len { + if processed < verified_txs_len + || !Bank::should_bank_still_be_processing_txs( + &bank_creation_time, + max_tx_ingestion_ns, + ) + { reached_end_of_slot = Some((poh_recorder.lock().unwrap().next_slot_leader(), bank)); } @@ -380,7 +387,7 @@ impl BankingStage { fn consume_or_forward_packets( my_pubkey: &Pubkey, leader_pubkey: Option, - bank_is_available: bool, + bank_still_processing_txs: Option<&Arc>, would_be_leader: bool, would_be_leader_shortly: bool, ) -> BufferedPacketsDecision { @@ -389,9 +396,9 @@ impl BankingStage { BufferedPacketsDecision::Hold, // else process the packets |x| { - if bank_is_available { + if let Some(bank) = bank_still_processing_txs { // If the bank is available, this node is the leader - BufferedPacketsDecision::Consume + BufferedPacketsDecision::Consume(bank.ns_per_slot) } else if would_be_leader_shortly { // If the node will be the leader soon, hold the packets for now BufferedPacketsDecision::Hold @@ -422,11 +429,18 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, ) -> BufferedPacketsDecision { - let (leader_at_slot_offset, poh_has_bank, would_be_leader, would_be_leader_shortly) = { + let bank_start; + let ( + leader_at_slot_offset, + bank_still_processing_txs, + would_be_leader, + would_be_leader_shortly, + ) = { let poh = poh_recorder.lock().unwrap(); + bank_start = poh.bank_start(); ( poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), - poh.has_bank(), + PohRecorder::get_bank_still_processing_txs(&bank_start), poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT), poh.would_be_leader( (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT, @@ -437,15 +451,16 @@ impl BankingStage { let decision = Self::consume_or_forward_packets( my_pubkey, leader_at_slot_offset, - poh_has_bank, + bank_still_processing_txs, would_be_leader, would_be_leader_shortly, ); match decision { - BufferedPacketsDecision::Consume => { + BufferedPacketsDecision::Consume(max_tx_ingestion_ns) => { Self::consume_buffered_packets( my_pubkey, + max_tx_ingestion_ns, poh_recorder, buffered_packets, transaction_status_sender, @@ -545,8 +560,8 @@ impl BankingStage { &gossip_vote_sender, &banking_stage_stats, ); - if decision == BufferedPacketsDecision::Hold - || decision == BufferedPacketsDecision::ForwardAndHold + if matches!(decision, BufferedPacketsDecision::Hold) + || matches!(decision, BufferedPacketsDecision::ForwardAndHold) { // If we are waiting on a new bank, // check the receiver for more transactions/for exiting @@ -829,6 +844,7 @@ impl BankingStage { /// than the total number if max PoH height was reached and the bank halted fn process_transactions( bank: &Arc, + bank_creation_time: &Instant, transactions: &[Transaction], poh: &Arc>, transaction_status_sender: Option, @@ -855,17 +871,25 @@ impl BankingStage { // Add the retryable txs (transactions that errored in a way that warrants a retry) // to the list of unprocessed txs. unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk); - if let Err(PohRecorderError::MaxHeightReached) = result { - info!( - "process transactions: max height reached slot: {} height: {}", - bank.slot(), - bank.tick_height() - ); - // process_and_record_transactions has returned all retryable errors in - // transactions[chunk_start..chunk_end], so we just need to push the remaining - // transactions into the unprocessed queue. - unprocessed_txs.extend(chunk_end..transactions.len()); - break; + + // If `bank_creation_time` is None, it's a test so ignore the option so + // allow processing + let should_bank_still_be_processing_txs = + Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot); + match (result, should_bank_still_be_processing_txs) { + (Err(PohRecorderError::MaxHeightReached), _) | (_, false) => { + info!( + "process transactions: max height reached slot: {} height: {}", + bank.slot(), + bank.tick_height() + ); + // process_and_record_transactions has returned all retryable errors in + // transactions[chunk_start..chunk_end], so we just need to push the remaining + // transactions into the unprocessed queue. + unprocessed_txs.extend(chunk_end..transactions.len()); + break; + } + _ => (), } // Don't exit early on any other type of error, continue processing... chunk_start = chunk_end; @@ -990,8 +1014,9 @@ impl BankingStage { Self::filter_valid_transaction_indexes(&result, transaction_to_packet_indexes) } - fn process_received_packets( + fn process_packets_transactions( bank: &Arc, + bank_creation_time: &Instant, poh: &Arc>, msgs: &Packets, packet_indexes: Vec, @@ -1013,6 +1038,7 @@ impl BankingStage { let (processed, unprocessed_tx_indexes) = Self::process_transactions( bank, + bank_creation_time, &transactions, poh, transaction_status_sender, @@ -1120,7 +1146,7 @@ impl BankingStage { id, ); inc_new_counter_debug!("banking_stage-transactions_received", count); - let mut proc_start = Measure::start("process_received_packets_process"); + let mut proc_start = Measure::start("process_packets_transactions_process"); let mut new_tx_count = 0; let mut mms_iter = mms.into_iter(); @@ -1128,8 +1154,8 @@ impl BankingStage { let mut newly_buffered_packets_count = 0; while let Some(msgs) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(&msgs.packets); - let bank = poh.lock().unwrap().bank(); - if bank.is_none() { + let bank_start = poh.lock().unwrap().bank_start(); + if PohRecorder::get_bank_still_processing_txs(&bank_start).is_none() { Self::push_unprocessed( buffered_packets, msgs, @@ -1141,16 +1167,18 @@ impl BankingStage { ); continue; } - let bank = bank.unwrap(); + let (bank, bank_creation_time) = bank_start.unwrap(); - let (processed, verified_txs_len, unprocessed_indexes) = Self::process_received_packets( - &bank, - &poh, - &msgs, - packet_indexes, - transaction_status_sender.clone(), - gossip_vote_sender, - ); + let (processed, verified_txs_len, unprocessed_indexes) = + Self::process_packets_transactions( + &bank, + &bank_creation_time, + &poh, + &msgs, + packet_indexes, + transaction_status_sender.clone(), + gossip_vote_sender, + ); new_tx_count += processed; @@ -1165,6 +1193,7 @@ impl BankingStage { duplicates, ); + // If there were retryable transactions, add the unexpired ones to the buffered queue if processed < verified_txs_len { let next_leader = poh.lock().unwrap().next_slot_leader(); // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones @@ -1661,8 +1690,10 @@ mod tests { .. } = create_genesis_config(10_000); let bank = Arc::new(Bank::new(&genesis_config)); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: bank.tick_height(), max_tick_height: std::u64::MAX, }; @@ -1915,80 +1946,80 @@ mod tests { fn test_should_process_or_forward_packets() { let my_pubkey = solana_sdk::pubkey::new_rand(); let my_pubkey1 = solana_sdk::pubkey::new_rand(); - - assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey, None, true, false, false), + let bank = Arc::new(Bank::default()); + assert_matches!( + BankingStage::consume_or_forward_packets(&my_pubkey, None, Some(&bank), false, false), BufferedPacketsDecision::Hold ); - assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey, None, false, false, false), + assert_matches!( + BankingStage::consume_or_forward_packets(&my_pubkey, None, None, false, false), BufferedPacketsDecision::Hold ); - assert_eq!( - BankingStage::consume_or_forward_packets(&my_pubkey1, None, false, false, false), + assert_matches!( + BankingStage::consume_or_forward_packets(&my_pubkey1, None, None, false, false), BufferedPacketsDecision::Hold ); - assert_eq!( + assert_matches!( BankingStage::consume_or_forward_packets( &my_pubkey, Some(my_pubkey1), - false, + None, false, false ), BufferedPacketsDecision::Forward ); - assert_eq!( + assert_matches!( BankingStage::consume_or_forward_packets( &my_pubkey, Some(my_pubkey1), - false, + None, true, true ), BufferedPacketsDecision::Hold ); - assert_eq!( + assert_matches!( BankingStage::consume_or_forward_packets( &my_pubkey, Some(my_pubkey1), - false, + None, true, false ), BufferedPacketsDecision::ForwardAndHold ); - assert_eq!( + assert_matches!( BankingStage::consume_or_forward_packets( &my_pubkey, Some(my_pubkey1), - true, + Some(&bank), false, false ), - BufferedPacketsDecision::Consume + BufferedPacketsDecision::Consume(_) ); - assert_eq!( + assert_matches!( BankingStage::consume_or_forward_packets( &my_pubkey1, Some(my_pubkey1), - false, + None, false, false ), BufferedPacketsDecision::Hold ); - assert_eq!( + assert_matches!( BankingStage::consume_or_forward_packets( &my_pubkey1, Some(my_pubkey1), - true, + Some(&bank), false, false ), - BufferedPacketsDecision::Consume + BufferedPacketsDecision::Consume(_) ); } @@ -2010,8 +2041,10 @@ mod tests { genesis_config.hash(), )]; + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: bank.tick_height(), max_tick_height: bank.tick_height() + 1, }; @@ -2106,8 +2139,10 @@ mod tests { system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()), ]; + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: bank.tick_height(), max_tick_height: bank.tick_height() + 1, }; @@ -2196,6 +2231,9 @@ mod tests { mint_keypair, .. } = create_genesis_config(10_000); + let mut bank = Bank::new(&genesis_config); + // Allow arbitrary transaction processing time for the purposes of this test + bank.ns_per_slot = std::u128::MAX; let bank = Arc::new(Bank::new(&genesis_config)); let pubkey = solana_sdk::pubkey::new_rand(); @@ -2231,6 +2269,7 @@ mod tests { let (processed_transactions_count, mut retryable_txs) = BankingStage::process_transactions( &bank, + &Instant::now(), &transactions, &poh_recorder, None, @@ -2276,8 +2315,10 @@ mod tests { let transactions = vec![success_tx, ix_error_tx, fail_tx]; bank.transfer(4, &mint_keypair, &keypair1.pubkey()).unwrap(); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: bank.tick_height(), max_tick_height: bank.tick_height() + 1, }; @@ -2420,8 +2461,10 @@ mod tests { // When the working bank in poh_recorder is None, no packets should be processed assert!(!poh_recorder.lock().unwrap().has_bank()); + let max_tx_processing_ns = std::u128::MAX; BankingStage::consume_buffered_packets( &Pubkey::default(), + max_tx_processing_ns, &poh_recorder, &mut buffered_packets, None, @@ -2436,6 +2479,7 @@ mod tests { poh_recorder.lock().unwrap().set_bank(&bank); BankingStage::consume_buffered_packets( &Pubkey::default(), + max_tx_processing_ns, &poh_recorder, &mut buffered_packets, None, @@ -2492,6 +2536,7 @@ mod tests { .spawn(move || { BankingStage::consume_buffered_packets( &Pubkey::default(), + std::u128::MAX, &poh_recorder_, &mut buffered_packets, None, diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 4492bf642f..abd140b9f5 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -49,10 +49,12 @@ pub enum PohRecorderError { type Result = std::result::Result; pub type WorkingBankEntry = (Arc, (Entry, u64)); +pub type BankStart = (Arc, Arc); #[derive(Clone)] pub struct WorkingBank { pub bank: Arc, + pub start: Arc, pub min_tick_height: u64, pub max_tick_height: u64, } @@ -97,7 +99,14 @@ impl PohRecorder { self.grace_ticks = grace_ticks; self.leader_first_tick_height = leader_first_tick_height; self.leader_last_tick_height = leader_last_tick_height; + + datapoint_info!( + "leader-slot-start-to-cleared-elapsed-ms", + ("slot", bank.slot(), i64), + ("elapsed", working_bank.start.elapsed().as_millis(), i64), + ); } + if let Some(ref signal) = self.clear_bank_signal { let _ = signal.try_send(true); } @@ -126,7 +135,13 @@ impl PohRecorder { } pub fn bank(&self) -> Option> { - self.working_bank.clone().map(|w| w.bank) + self.working_bank.as_ref().map(|w| w.bank.clone()) + } + + pub fn bank_start(&self) -> Option { + self.working_bank + .as_ref() + .map(|w| (w.bank.clone(), w.start.clone())) } pub fn has_bank(&self) -> bool { @@ -273,11 +288,15 @@ impl PohRecorder { trace!("new working bank"); assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot()); self.working_bank = Some(working_bank); + // TODO: adjust the working_bank.start time based on number of ticks + // that have already elapsed based on current tick height. let _ = self.flush_cache(false); } + pub fn set_bank(&mut self, bank: &Arc) { let working_bank = WorkingBank { bank: bank.clone(), + start: Arc::new(Instant::now()), min_tick_height: bank.tick_height(), max_tick_height: bank.max_tick_height(), }; @@ -517,6 +536,18 @@ impl PohRecorder { ) } + // Filters the return result of PohRecorder::bank_start(), returns the bank + // if it's still processing transactions + pub fn get_bank_still_processing_txs(bank_start: &Option) -> Option<&Arc> { + bank_start.as_ref().and_then(|(bank, bank_creation_time)| { + if Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot) { + Some(bank) + } else { + None + } + }) + } + #[cfg(test)] pub fn schedule_dummy_max_height_reached_failure(&mut self) { self.reset(Hash::default(), 1, None); @@ -635,8 +666,10 @@ mod tests { &Arc::new(PohConfig::default()), ); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank, + start, min_tick_height: 2, max_tick_height: 3, }; @@ -669,8 +702,10 @@ mod tests { &Arc::new(PohConfig::default()), ); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: 2, max_tick_height: 3, }; @@ -725,8 +760,10 @@ mod tests { assert_eq!(poh_recorder.tick_cache.last().unwrap().1, 4); assert_eq!(poh_recorder.tick_height, 4); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank, + start, min_tick_height: 2, max_tick_height: 3, }; @@ -765,8 +802,10 @@ mod tests { &Arc::new(PohConfig::default()), ); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: 2, max_tick_height: 3, }; @@ -801,8 +840,10 @@ mod tests { &Arc::new(PohConfig::default()), ); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: 1, max_tick_height: 2, }; @@ -841,8 +882,10 @@ mod tests { &Arc::new(PohConfig::default()), ); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: 1, max_tick_height: 2, }; @@ -885,8 +928,10 @@ mod tests { &Arc::new(PohConfig::default()), ); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: 1, max_tick_height: 2, }; @@ -927,8 +972,10 @@ mod tests { &Arc::new(PohConfig::default()), ); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank, + start, min_tick_height: 2, max_tick_height: 3, }; @@ -1049,8 +1096,10 @@ mod tests { &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), &Arc::new(PohConfig::default()), ); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank, + start, min_tick_height: 2, max_tick_height: 3, }; @@ -1118,8 +1167,10 @@ mod tests { let end_slot = 3; let max_tick_height = (end_slot + 1) * ticks_per_slot; + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: 1, max_tick_height, }; diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 02ff4c3f4e..de152a68ef 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -208,8 +208,10 @@ mod tests { ); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let exit = Arc::new(AtomicBool::new(false)); + let start = Arc::new(Instant::now()); let working_bank = WorkingBank { bank: bank.clone(), + start, min_tick_height: bank.tick_height(), max_tick_height: std::u64::MAX, }; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 8b70bd5056..9d373b8877 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -90,6 +90,7 @@ use std::{ LockResult, RwLockWriteGuard, {Arc, RwLock, RwLockReadGuard}, }, time::Duration, + time::Instant, }; pub const SECONDS_PER_YEAR: f64 = 365.25 * 24.0 * 60.0 * 60.0; @@ -782,7 +783,7 @@ pub struct Bank { ticks_per_slot: u64, /// length of a slot in ns - ns_per_slot: u128, + pub ns_per_slot: u128, /// genesis time, used for computed clock genesis_creation_time: UnixTimestamp, @@ -4638,6 +4639,16 @@ impl Bank { .is_active(&feature_set::check_init_vote_data::id()) } + // Check if the wallclock time from bank creation to now has exceeded the allotted + // time for transaction processing + pub fn should_bank_still_be_processing_txs( + bank_creation_time: &Instant, + max_tx_ingestion_nanos: u128, + ) -> bool { + // Do this check outside of the poh lock, hence not a method on PohRecorder + bank_creation_time.elapsed().as_nanos() <= max_tx_ingestion_nanos + } + pub fn deactivate_feature(&mut self, id: &Pubkey) { let mut feature_set = Arc::make_mut(&mut self.feature_set).clone(); feature_set.active.remove(&id); diff --git a/sdk/program/src/clock.rs b/sdk/program/src/clock.rs index f9e8d38aa3..4f2bc36912 100644 --- a/sdk/program/src/clock.rs +++ b/sdk/program/src/clock.rs @@ -6,6 +6,8 @@ pub const DEFAULT_TICKS_PER_SECOND: u64 = 160; pub const MS_PER_TICK: u64 = 1000 / DEFAULT_TICKS_PER_SECOND; +pub const SLOT_MS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_TICKS_PER_SECOND; + // At 160 ticks/s, 64 ticks per slot implies that leader rotation and voting will happen // every 400 ms. A fast voting cadence ensures faster finality and convergence pub const DEFAULT_TICKS_PER_SLOT: u64 = 64;