diff --git a/Cargo.lock b/Cargo.lock index bc3c4b5f0b..207621f8c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3398,6 +3398,12 @@ dependencies = [ "winreg", ] +[[package]] +name = "retain_mut" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53552c6c49e1e13f1a203ef0080ab3bbef0beb570a528993e83df057a9d9bba1" + [[package]] name = "ring" version = "0.16.12" @@ -4268,6 +4274,7 @@ dependencies = [ "rayon", "regex", "reqwest 0.10.8", + "retain_mut", "rustc_version", "rustversion", "serde", diff --git a/core/Cargo.toml b/core/Cargo.toml index 43169cfb82..4579e945f5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -44,6 +44,7 @@ rand_chacha = "0.2.2" raptorq = "1.4.2" rayon = "1.5.0" regex = "1.3.9" +retain_mut = "0.1.2" rustversion = "1.0.4" serde = "1.0.122" serde_bytes = "0.11" diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index bfda8d0729..d40731c221 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -28,6 +28,7 @@ use solana_sdk::system_instruction; use solana_sdk::system_transaction; use solana_sdk::timing::{duration_as_us, timestamp}; use solana_sdk::transaction::Transaction; +use std::collections::VecDeque; use std::sync::atomic::Ordering; use std::sync::mpsc::Receiver; use std::sync::Arc; @@ -68,10 +69,10 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let len = 4096; let chunk_size = 1024; let batches = to_packets_chunked(&vec![tx; len], chunk_size); - let mut packets = vec![]; + let mut packets = VecDeque::new(); for batch in batches { let batch_len = batch.packets.len(); - packets.push((batch, vec![0usize; batch_len])); + packets.push_back((batch, vec![0usize; batch_len])); } let (s, _r) = unbounded(); // This tests the performance of buffering packets. @@ -81,9 +82,10 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &my_pubkey, &poh_recorder, &mut packets, - 10_000, None, &s, + None::>, + None, ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index f333caa5dd..7108fa8990 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -8,6 +8,7 @@ use crate::{ }; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; +use retain_mut::RetainMut; use solana_ledger::{ blockstore::Blockstore, blockstore_processor::{send_transaction_status_batch, TransactionStatusSender}, @@ -46,10 +47,10 @@ use solana_transaction_status::token_balances::{ }; use std::{ cmp, - collections::HashMap, + collections::{HashMap, VecDeque}, env, net::UdpSocket, - sync::atomic::AtomicBool, + sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, sync::mpsc::Receiver, sync::{Arc, Mutex}, thread::{self, Builder, JoinHandle}, @@ -58,7 +59,7 @@ use std::{ }; type PacketsAndOffsets = (Packets, Vec); -pub type UnprocessedPackets = Vec; +pub type UnprocessedPackets = VecDeque; /// Transaction forwarding pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; @@ -70,6 +71,80 @@ const TOTAL_BUFFERED_PACKETS: usize = 500_000; const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; +#[derive(Debug, Default)] +pub struct BankingStageStats { + last_report: AtomicU64, + id: u32, + process_packets_count: AtomicUsize, + new_tx_count: AtomicUsize, + dropped_batches_count: AtomicUsize, + newly_buffered_packets_count: AtomicUsize, + current_buffered_packets_count: AtomicUsize, + rebuffered_packets_count: AtomicUsize, + consumed_buffered_packets_count: AtomicUsize, +} + +impl BankingStageStats { + pub fn new(id: u32) -> Self { + BankingStageStats { + id, + ..BankingStageStats::default() + } + } + + fn report(&self, report_interval_ms: u64) { + let should_report = { + let last = self.last_report.load(Ordering::Relaxed); + let now = solana_sdk::timing::timestamp(); + now.saturating_sub(last) > report_interval_ms + && self.last_report.compare_exchange( + last, + now, + Ordering::Relaxed, + Ordering::Relaxed, + ) == Ok(last) + }; + + if should_report { + datapoint_info!( + "banking_stage-loop-stats", + ("id", self.id as i64, i64), + ( + "process_packets_count", + self.process_packets_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "new_tx_count", + self.new_tx_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "dropped_batches_count", + self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "newly_buffered_packets_count", + self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "current_buffered_packets_count", + self.current_buffered_packets_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "rebuffered_packets_count", + self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ); + } + } +} + /// Stores the stage's thread handle and output receiver. pub struct BankingStage { bank_thread_hdls: Vec>, @@ -156,9 +231,10 @@ impl BankingStage { Self { bank_thread_hdls } } - fn filter_valid_packets_for_forwarding(all_packets: &[PacketsAndOffsets]) -> Vec<&Packet> { + fn filter_valid_packets_for_forwarding<'a>( + all_packets: impl Iterator, + ) -> Vec<&'a Packet> { all_packets - .iter() .flat_map(|(p, valid_indexes)| valid_indexes.iter().map(move |x| &p.packets[*x])) .collect() } @@ -166,9 +242,9 @@ impl BankingStage { fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, - unprocessed_packets: &[PacketsAndOffsets], + unprocessed_packets: &VecDeque, ) -> std::io::Result<()> { - let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets); + let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter()); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); for p in packets { socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?; @@ -177,81 +253,89 @@ impl BankingStage { Ok(()) } + // Returns whether the given `Packets` has any more remaining unprocessed + // transactions + fn update_buffered_packets_with_new_unprocessed( + original_unprocessed_indexes: &mut Vec, + new_unprocessed_indexes: Vec, + ) -> bool { + let has_more_unprocessed_transactions = + Self::packet_has_more_unprocessed_transactions(&new_unprocessed_indexes); + if has_more_unprocessed_transactions { + *original_unprocessed_indexes = new_unprocessed_indexes + }; + has_more_unprocessed_transactions + } + pub fn consume_buffered_packets( my_pubkey: &Pubkey, poh_recorder: &Arc>, - buffered_packets: &mut Vec, - batch_limit: usize, + buffered_packets: &mut UnprocessedPackets, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - ) -> UnprocessedPackets { - let mut unprocessed_packets = vec![]; - let mut rebuffered_packets = 0; + test_fn: Option, + banking_stage_stats: Option<&BankingStageStats>, + ) { + let mut rebuffered_packets_len = 0; let mut new_tx_count = 0; let buffered_len = buffered_packets.len(); - let mut buffered_packets_iter = buffered_packets.drain(..); - let mut dropped_batches_count = 0; - let mut proc_start = Measure::start("consume_buffered_process"); - while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { - let bank = poh_recorder.lock().unwrap().bank(); - if bank.is_none() { - rebuffered_packets += unprocessed_indexes.len(); - Self::push_unprocessed( - &mut unprocessed_packets, - msgs, - unprocessed_indexes, - &mut dropped_batches_count, - batch_limit, - ); - continue; - } - let bank = bank.unwrap(); - - let (processed, verified_txs_len, new_unprocessed_indexes) = - Self::process_received_packets( + let mut reached_end_of_slot = None; + buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes)| { + if let Some((next_leader, bank)) = &reached_end_of_slot { + // We've hit the end of this slot, no need to perform more processing, + // just filter the remaining packets for the invalid (e.g. too old) ones + let new_unprocessed_indexes = Self::filter_unprocessed_packets( &bank, - &poh_recorder, &msgs, - unprocessed_indexes.to_owned(), - transaction_status_sender.clone(), - gossip_vote_sender, + &original_unprocessed_indexes, + my_pubkey, + *next_leader, ); - - new_tx_count += processed; - - // Collect any unprocessed transactions in this batch for forwarding - rebuffered_packets += new_unprocessed_indexes.len(); - Self::push_unprocessed( - &mut unprocessed_packets, - msgs, - new_unprocessed_indexes, - &mut dropped_batches_count, - batch_limit, - ); - - if processed < verified_txs_len { - let next_leader = poh_recorder.lock().unwrap().next_slot_leader(); - // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones - #[allow(clippy::while_let_on_iterator)] - while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { - let unprocessed_indexes = Self::filter_unprocessed_packets( - &bank, - &msgs, - &unprocessed_indexes, - my_pubkey, - next_leader, - ); - Self::push_unprocessed( - &mut unprocessed_packets, - msgs, - unprocessed_indexes, - &mut dropped_batches_count, - batch_limit, - ); + Self::update_buffered_packets_with_new_unprocessed( + original_unprocessed_indexes, + new_unprocessed_indexes, + ) + } else { + let bank = poh_recorder.lock().unwrap().bank(); + if let Some(bank) = bank { + let (processed, verified_txs_len, new_unprocessed_indexes) = + Self::process_received_packets( + &bank, + &poh_recorder, + &msgs, + original_unprocessed_indexes.to_owned(), + transaction_status_sender.clone(), + gossip_vote_sender, + ); + if processed < verified_txs_len { + reached_end_of_slot = + Some((poh_recorder.lock().unwrap().next_slot_leader(), bank)); + } + new_tx_count += processed; + // Out of the buffered packets just retried, collect any still unprocessed + // transactions in this batch for forwarding + rebuffered_packets_len += new_unprocessed_indexes.len(); + let has_more_unprocessed_transactions = + Self::update_buffered_packets_with_new_unprocessed( + original_unprocessed_indexes, + new_unprocessed_indexes, + ); + if let Some(test_fn) = &test_fn { + test_fn(); + } + has_more_unprocessed_transactions + } else { + rebuffered_packets_len += original_unprocessed_indexes.len(); + // `original_unprocessed_indexes` must have remaining packets to process + // if not yet processed. + assert!(Self::packet_has_more_unprocessed_transactions( + &original_unprocessed_indexes + )); + true } } - } + }); proc_start.stop(); @@ -264,12 +348,14 @@ impl BankingStage { (new_tx_count as f32) / (proc_start.as_s()) ); - inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); - inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count); - inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); - inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count); - - unprocessed_packets + if let Some(stats) = banking_stage_stats { + stats + .rebuffered_packets_count + .fetch_add(rebuffered_packets_len, Ordering::Relaxed); + stats + .consumed_buffered_packets_count + .fetch_add(new_tx_count, Ordering::Relaxed); + } } fn consume_or_forward_packets( @@ -306,11 +392,11 @@ impl BankingStage { socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, - buffered_packets: &mut Vec, + buffered_packets: &mut UnprocessedPackets, enable_forwarding: bool, - batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + banking_stage_stats: &BankingStageStats, ) -> BufferedPacketsDecision { let (leader_at_slot_offset, poh_has_bank, would_be_leader) = { let poh = poh_recorder.lock().unwrap(); @@ -332,15 +418,15 @@ impl BankingStage { match decision { BufferedPacketsDecision::Consume => { - let mut unprocessed = Self::consume_buffered_packets( + Self::consume_buffered_packets( my_pubkey, poh_recorder, buffered_packets, - batch_limit, transaction_status_sender, gossip_vote_sender, + None::>, + Some(banking_stage_stats), ); - buffered_packets.append(&mut unprocessed); } BufferedPacketsDecision::Forward => { if enable_forwarding { @@ -386,7 +472,8 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut buffered_packets = vec![]; + let mut buffered_packets = VecDeque::with_capacity(batch_limit); + let banking_stage_stats = BankingStageStats::new(id); loop { while !buffered_packets.is_empty() { let decision = Self::process_buffered_packets( @@ -396,9 +483,9 @@ impl BankingStage { cluster_info, &mut buffered_packets, enable_forwarding, - batch_limit, transaction_status_sender.clone(), &gossip_vote_sender, + &banking_stage_stats, ); if decision == BufferedPacketsDecision::Hold { // If we are waiting on a new bank, @@ -427,21 +514,14 @@ impl BankingStage { batch_limit, transaction_status_sender.clone(), &gossip_vote_sender, + &mut buffered_packets, + &banking_stage_stats, ) { - Err(RecvTimeoutError::Timeout) => (), + Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, - Ok(mut unprocessed_packets) => { - if unprocessed_packets.is_empty() { - continue; - } - let num: usize = unprocessed_packets - .iter() - .map(|(_, unprocessed)| unprocessed.len()) - .sum(); - inc_new_counter_info!("banking_stage-buffered_packets", num); - buffered_packets.append(&mut unprocessed_packets); - } } + + banking_stage_stats.report(100); } } @@ -502,7 +582,6 @@ impl BankingStage { .lock() .unwrap() .record(bank_slot, hash, processed_transactions); - match res { Ok(()) => (), Err(PohRecorderError::MaxHeightReached) => { @@ -943,7 +1022,9 @@ impl BankingStage { batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - ) -> Result { + buffered_packets: &mut UnprocessedPackets, + banking_stage_stats: &BankingStageStats, + ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; recv_time.stop(); @@ -962,17 +1043,18 @@ impl BankingStage { let mut new_tx_count = 0; let mut mms_iter = mms.into_iter(); - let mut unprocessed_packets = vec![]; let mut dropped_batches_count = 0; + let mut newly_buffered_packets_count = 0; while let Some(msgs) = mms_iter.next() { let packet_indexes = Self::generate_packet_indexes(&msgs.packets); let bank = poh.lock().unwrap().bank(); if bank.is_none() { Self::push_unprocessed( - &mut unprocessed_packets, + buffered_packets, msgs, packet_indexes, &mut dropped_batches_count, + &mut newly_buffered_packets_count, batch_limit, ); continue; @@ -992,10 +1074,11 @@ impl BankingStage { // Collect any unprocessed transactions in this batch for forwarding Self::push_unprocessed( - &mut unprocessed_packets, + buffered_packets, msgs, unprocessed_indexes, &mut dropped_batches_count, + &mut newly_buffered_packets_count, batch_limit, ); @@ -1013,10 +1096,11 @@ impl BankingStage { next_leader, ); Self::push_unprocessed( - &mut unprocessed_packets, + buffered_packets, msgs, unprocessed_indexes, &mut dropped_batches_count, + &mut newly_buffered_packets_count, batch_limit, ); } @@ -1036,13 +1120,23 @@ impl BankingStage { count, id, ); - inc_new_counter_debug!("banking_stage-process_packets", count); - inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); - inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count); - + banking_stage_stats + .process_packets_count + .fetch_add(count, Ordering::Relaxed); + banking_stage_stats + .new_tx_count + .fetch_add(new_tx_count, Ordering::Relaxed); + banking_stage_stats + .dropped_batches_count + .fetch_add(dropped_batches_count, Ordering::Relaxed); + banking_stage_stats + .newly_buffered_packets_count + .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); + banking_stage_stats + .current_buffered_packets_count + .swap(buffered_packets.len(), Ordering::Relaxed); *recv_start = Instant::now(); - - Ok(unprocessed_packets) + Ok(()) } fn push_unprocessed( @@ -1050,17 +1144,23 @@ impl BankingStage { packets: Packets, packet_indexes: Vec, dropped_batches_count: &mut usize, + newly_buffered_packets_count: &mut usize, batch_limit: usize, ) { - if !packet_indexes.is_empty() { + if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packets.len() >= batch_limit { - unprocessed_packets.remove(0); *dropped_batches_count += 1; + unprocessed_packets.pop_front(); } - unprocessed_packets.push((packets, packet_indexes)); + *newly_buffered_packets_count += packet_indexes.len(); + unprocessed_packets.push_back((packets, packet_indexes)); } } + fn packet_has_more_unprocessed_transactions(packet_indexes: &[usize]) -> bool { + !packet_indexes.is_empty() + } + pub fn join(self) -> thread::Result<()> { for bank_thread_hdl in self.bank_thread_hdls { bank_thread_hdl.join()?; @@ -1130,7 +1230,7 @@ mod tests { transaction::TransactionError, }; use solana_transaction_status::TransactionWithStatusMeta; - use std::{sync::atomic::Ordering, thread::sleep}; + use std::{net::SocketAddr, path::Path, sync::atomic::Ordering, thread::sleep}; #[test] fn test_banking_stage_shutdown1() { @@ -1928,7 +2028,7 @@ mod tests { }) .collect_vec(); - let result = BankingStage::filter_valid_packets_for_forwarding(&all_packets); + let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter()); assert_eq!(result.len(), 256); @@ -2108,4 +2208,250 @@ mod tests { } Blockstore::destroy(&ledger_path).unwrap(); } + + fn setup_conflicting_transactions( + ledger_path: &Path, + ) -> ( + Vec, + Arc, + Arc>, + Receiver, + ) { + Blockstore::destroy(&ledger_path).unwrap(); + let genesis_config_info = create_genesis_config(10_000); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = &genesis_config_info; + let blockstore = + Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"); + let bank = Arc::new(Bank::new(&genesis_config)); + let (poh_recorder, entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + Some((4, 4)), + bank.ticks_per_slot(), + &solana_sdk::pubkey::new_rand(), + &Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + ); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + + // Set up unparallelizable conflicting transactions + let pubkey0 = solana_sdk::pubkey::new_rand(); + let pubkey1 = solana_sdk::pubkey::new_rand(); + let pubkey2 = solana_sdk::pubkey::new_rand(); + let transactions = vec![ + system_transaction::transfer(&mint_keypair, &pubkey0, 1, genesis_config.hash()), + system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()), + system_transaction::transfer(&mint_keypair, &pubkey2, 1, genesis_config.hash()), + ]; + (transactions, bank, poh_recorder, entry_receiver) + } + + #[test] + fn test_consume_buffered_packets() { + let ledger_path = get_tmp_ledger_path!(); + { + let (transactions, bank, poh_recorder, _entry_receiver) = + setup_conflicting_transactions(&ledger_path); + let num_conflicting_transactions = transactions.len(); + let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions); + assert_eq!(packets_vec.len(), 1); + assert_eq!(packets_vec[0].packets.len(), num_conflicting_transactions); + let all_packets = packets_vec.pop().unwrap(); + let mut buffered_packets: UnprocessedPackets = vec![( + all_packets, + (0..num_conflicting_transactions).into_iter().collect(), + )] + .into_iter() + .collect(); + + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + + // When the working bank in poh_recorder is None, no packets should be processed + assert!(!poh_recorder.lock().unwrap().has_bank()); + BankingStage::consume_buffered_packets( + &Pubkey::default(), + &poh_recorder, + &mut buffered_packets, + None, + &gossip_vote_sender, + None::>, + None, + ); + assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); + // When the poh recorder has a bank, should process all non conflicting buffered packets. + // Processes one packet per iteration of the loop + for num_expected_unprocessed in (0..num_conflicting_transactions).rev() { + poh_recorder.lock().unwrap().set_bank(&bank); + BankingStage::consume_buffered_packets( + &Pubkey::default(), + &poh_recorder, + &mut buffered_packets, + None, + &gossip_vote_sender, + None::>, + None, + ); + if num_expected_unprocessed == 0 { + assert!(buffered_packets.is_empty()) + } else { + assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed); + } + } + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + #[test] + fn test_consume_buffered_packets_interrupted() { + let ledger_path = get_tmp_ledger_path!(); + { + let (transactions, bank, poh_recorder, _entry_receiver) = + setup_conflicting_transactions(&ledger_path); + let num_conflicting_transactions = transactions.len(); + let packets_vec = to_packets_chunked(&transactions, 1); + assert_eq!(packets_vec.len(), num_conflicting_transactions); + for single_packets in &packets_vec { + assert_eq!(single_packets.packets.len(), 1); + } + let mut buffered_packets: UnprocessedPackets = packets_vec + .clone() + .into_iter() + .map(|single_packets| (single_packets, vec![0])) + .collect(); + + let (continue_sender, continue_receiver) = unbounded(); + let (finished_packet_sender, finished_packet_receiver) = unbounded(); + + let test_fn = Some(move || { + finished_packet_sender.send(()).unwrap(); + continue_receiver.recv().unwrap(); + }); + // When the poh recorder has a bank, it should process all non conflicting buffered packets. + // Because each conflicting transaction is in it's own `Packet` within `packets_vec`, then + // each iteration of this loop will process one element of `packets_vec`per iteration of the + // loop. + let interrupted_iteration = 1; + poh_recorder.lock().unwrap().set_bank(&bank); + let poh_recorder_ = poh_recorder.clone(); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + // Start up thread to process the banks + let t_consume = Builder::new() + .name("consume-buffered-packets".to_string()) + .spawn(move || { + BankingStage::consume_buffered_packets( + &Pubkey::default(), + &poh_recorder_, + &mut buffered_packets, + None, + &gossip_vote_sender, + test_fn, + None, + ); + + // Check everything is correct. All indexes after `interrupted_iteration` + // should still be unprocessed + assert_eq!( + buffered_packets.len(), + packets_vec[interrupted_iteration + 1..].iter().count() + ); + for ((remaining_unprocessed_packet, _), original_packet) in buffered_packets + .iter() + .zip(&packets_vec[interrupted_iteration + 1..]) + { + assert_eq!( + remaining_unprocessed_packet.packets[0], + original_packet.packets[0] + ); + } + }) + .unwrap(); + + for i in 0..=interrupted_iteration { + finished_packet_receiver.recv().unwrap(); + if i == interrupted_iteration { + poh_recorder + .lock() + .unwrap() + .schedule_dummy_max_height_reached_failure(); + } + continue_sender.send(()).unwrap(); + } + + t_consume.join().unwrap(); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + #[test] + fn test_push_unprocessed_batch_limit() { + // Create `Packets` with 1 unprocessed element + let single_element_packets = Packets::new(vec![Packet::default()]); + let mut unprocessed_packets: UnprocessedPackets = + vec![(single_element_packets.clone(), vec![0])] + .into_iter() + .collect(); + // Set the limit to 2 + let batch_limit = 2; + // Create some new unprocessed packets + let new_packets = single_element_packets; + let packet_indexes = vec![]; + + let mut dropped_batches_count = 0; + let mut newly_buffered_packets_count = 0; + // Because the set of unprocessed `packet_indexes` is empty, the + // packets are not added to the unprocessed queue + BankingStage::push_unprocessed( + &mut unprocessed_packets, + new_packets.clone(), + packet_indexes, + &mut dropped_batches_count, + &mut newly_buffered_packets_count, + batch_limit, + ); + assert_eq!(unprocessed_packets.len(), 1); + assert_eq!(dropped_batches_count, 0); + assert_eq!(newly_buffered_packets_count, 0); + + // Because the set of unprocessed `packet_indexes` is non-empty, the + // packets are added to the unprocessed queue + let packet_indexes = vec![0]; + BankingStage::push_unprocessed( + &mut unprocessed_packets, + new_packets, + packet_indexes.clone(), + &mut dropped_batches_count, + &mut newly_buffered_packets_count, + batch_limit, + ); + assert_eq!(unprocessed_packets.len(), 2); + assert_eq!(dropped_batches_count, 0); + assert_eq!(newly_buffered_packets_count, 1); + + // Because we've reached the batch limit, old unprocessed packets are + // dropped and the new one is appended to the end + let new_packets = Packets::new(vec![Packet::from_data( + &SocketAddr::from(([127, 0, 0, 1], 8001)), + 42, + ) + .unwrap()]); + assert_eq!(unprocessed_packets.len(), batch_limit); + BankingStage::push_unprocessed( + &mut unprocessed_packets, + new_packets.clone(), + packet_indexes, + &mut dropped_batches_count, + &mut newly_buffered_packets_count, + batch_limit, + ); + assert_eq!(unprocessed_packets.len(), 2); + assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]); + assert_eq!(dropped_batches_count, 1); + assert_eq!(newly_buffered_packets_count, 2); + } } diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index f95b32f678..6fa6ea3a58 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -501,6 +501,11 @@ impl PohRecorder { poh_config, ) } + + #[cfg(test)] + pub fn schedule_dummy_max_height_reached_failure(&mut self) { + self.reset(Hash::default(), 1, None); + } } #[cfg(test)]