Cleanup buffered packets (#15210)

This commit is contained in:
carllin 2021-02-12 03:27:37 -08:00 committed by GitHub
parent 2a3501d431
commit 629dcd0f39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 475 additions and 114 deletions

7
Cargo.lock generated
View File

@ -3398,6 +3398,12 @@ dependencies = [
"winreg",
]
[[package]]
name = "retain_mut"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53552c6c49e1e13f1a203ef0080ab3bbef0beb570a528993e83df057a9d9bba1"
[[package]]
name = "ring"
version = "0.16.12"
@ -4268,6 +4274,7 @@ dependencies = [
"rayon",
"regex",
"reqwest 0.10.8",
"retain_mut",
"rustc_version",
"rustversion",
"serde",

View File

@ -44,6 +44,7 @@ rand_chacha = "0.2.2"
raptorq = "1.4.2"
rayon = "1.5.0"
regex = "1.3.9"
retain_mut = "0.1.2"
rustversion = "1.0.4"
serde = "1.0.122"
serde_bytes = "0.11"

View File

@ -28,6 +28,7 @@ use solana_sdk::system_instruction;
use solana_sdk::system_transaction;
use solana_sdk::timing::{duration_as_us, timestamp};
use solana_sdk::transaction::Transaction;
use std::collections::VecDeque;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
@ -68,10 +69,10 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let len = 4096;
let chunk_size = 1024;
let batches = to_packets_chunked(&vec![tx; len], chunk_size);
let mut packets = vec![];
let mut packets = VecDeque::new();
for batch in batches {
let batch_len = batch.packets.len();
packets.push((batch, vec![0usize; batch_len]));
packets.push_back((batch, vec![0usize; batch_len]));
}
let (s, _r) = unbounded();
// This tests the performance of buffering packets.
@ -81,9 +82,10 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&my_pubkey,
&poh_recorder,
&mut packets,
10_000,
None,
&s,
None::<Box<dyn Fn()>>,
None,
);
});

View File

@ -8,6 +8,7 @@ use crate::{
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use retain_mut::RetainMut;
use solana_ledger::{
blockstore::Blockstore,
blockstore_processor::{send_transaction_status_batch, TransactionStatusSender},
@ -46,10 +47,10 @@ use solana_transaction_status::token_balances::{
};
use std::{
cmp,
collections::HashMap,
collections::{HashMap, VecDeque},
env,
net::UdpSocket,
sync::atomic::AtomicBool,
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
sync::mpsc::Receiver,
sync::{Arc, Mutex},
thread::{self, Builder, JoinHandle},
@ -58,7 +59,7 @@ use std::{
};
type PacketsAndOffsets = (Packets, Vec<usize>);
pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
pub type UnprocessedPackets = VecDeque<PacketsAndOffsets>;
/// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2;
@ -70,6 +71,80 @@ const TOTAL_BUFFERED_PACKETS: usize = 500_000;
const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicU64,
id: u32,
process_packets_count: AtomicUsize,
new_tx_count: AtomicUsize,
dropped_batches_count: AtomicUsize,
newly_buffered_packets_count: AtomicUsize,
current_buffered_packets_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize,
}
impl BankingStageStats {
pub fn new(id: u32) -> Self {
BankingStageStats {
id,
..BankingStageStats::default()
}
}
fn report(&self, report_interval_ms: u64) {
let should_report = {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
};
if should_report {
datapoint_info!(
"banking_stage-loop-stats",
("id", self.id as i64, i64),
(
"process_packets_count",
self.process_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"new_tx_count",
self.new_tx_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"dropped_batches_count",
self.dropped_batches_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"newly_buffered_packets_count",
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"current_buffered_packets_count",
self.current_buffered_packets_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"rebuffered_packets_count",
self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
}
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<()>>,
@ -156,9 +231,10 @@ impl BankingStage {
Self { bank_thread_hdls }
}
fn filter_valid_packets_for_forwarding(all_packets: &[PacketsAndOffsets]) -> Vec<&Packet> {
fn filter_valid_packets_for_forwarding<'a>(
all_packets: impl Iterator<Item = &'a PacketsAndOffsets>,
) -> Vec<&'a Packet> {
all_packets
.iter()
.flat_map(|(p, valid_indexes)| valid_indexes.iter().map(move |x| &p.packets[*x]))
.collect()
}
@ -166,9 +242,9 @@ impl BankingStage {
fn forward_buffered_packets(
socket: &std::net::UdpSocket,
tpu_forwards: &std::net::SocketAddr,
unprocessed_packets: &[PacketsAndOffsets],
unprocessed_packets: &VecDeque<PacketsAndOffsets>,
) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets);
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets.iter());
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
for p in packets {
socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?;
@ -177,81 +253,89 @@ impl BankingStage {
Ok(())
}
// Returns whether the given `Packets` has any more remaining unprocessed
// transactions
fn update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes: &mut Vec<usize>,
new_unprocessed_indexes: Vec<usize>,
) -> bool {
let has_more_unprocessed_transactions =
Self::packet_has_more_unprocessed_transactions(&new_unprocessed_indexes);
if has_more_unprocessed_transactions {
*original_unprocessed_indexes = new_unprocessed_indexes
};
has_more_unprocessed_transactions
}
pub fn consume_buffered_packets(
my_pubkey: &Pubkey,
poh_recorder: &Arc<Mutex<PohRecorder>>,
buffered_packets: &mut Vec<PacketsAndOffsets>,
batch_limit: usize,
buffered_packets: &mut UnprocessedPackets,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> UnprocessedPackets {
let mut unprocessed_packets = vec![];
let mut rebuffered_packets = 0;
test_fn: Option<impl Fn()>,
banking_stage_stats: Option<&BankingStageStats>,
) {
let mut rebuffered_packets_len = 0;
let mut new_tx_count = 0;
let buffered_len = buffered_packets.len();
let mut buffered_packets_iter = buffered_packets.drain(..);
let mut dropped_batches_count = 0;
let mut proc_start = Measure::start("consume_buffered_process");
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let bank = poh_recorder.lock().unwrap().bank();
if bank.is_none() {
rebuffered_packets += unprocessed_indexes.len();
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
batch_limit,
);
continue;
}
let bank = bank.unwrap();
let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_received_packets(
let mut reached_end_of_slot = None;
buffered_packets.retain_mut(|(msgs, ref mut original_unprocessed_indexes)| {
if let Some((next_leader, bank)) = &reached_end_of_slot {
// We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones
let new_unprocessed_indexes = Self::filter_unprocessed_packets(
&bank,
&poh_recorder,
&msgs,
unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
gossip_vote_sender,
&original_unprocessed_indexes,
my_pubkey,
*next_leader,
);
new_tx_count += processed;
// Collect any unprocessed transactions in this batch for forwarding
rebuffered_packets += new_unprocessed_indexes.len();
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
new_unprocessed_indexes,
&mut dropped_batches_count,
batch_limit,
);
if processed < verified_txs_len {
let next_leader = poh_recorder.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
#[allow(clippy::while_let_on_iterator)]
while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() {
let unprocessed_indexes = Self::filter_unprocessed_packets(
&bank,
&msgs,
&unprocessed_indexes,
my_pubkey,
next_leader,
);
Self::push_unprocessed(
&mut unprocessed_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
batch_limit,
);
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
new_unprocessed_indexes,
)
} else {
let bank = poh_recorder.lock().unwrap().bank();
if let Some(bank) = bank {
let (processed, verified_txs_len, new_unprocessed_indexes) =
Self::process_received_packets(
&bank,
&poh_recorder,
&msgs,
original_unprocessed_indexes.to_owned(),
transaction_status_sender.clone(),
gossip_vote_sender,
);
if processed < verified_txs_len {
reached_end_of_slot =
Some((poh_recorder.lock().unwrap().next_slot_leader(), bank));
}
new_tx_count += processed;
// Out of the buffered packets just retried, collect any still unprocessed
// transactions in this batch for forwarding
rebuffered_packets_len += new_unprocessed_indexes.len();
let has_more_unprocessed_transactions =
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
new_unprocessed_indexes,
);
if let Some(test_fn) = &test_fn {
test_fn();
}
has_more_unprocessed_transactions
} else {
rebuffered_packets_len += original_unprocessed_indexes.len();
// `original_unprocessed_indexes` must have remaining packets to process
// if not yet processed.
assert!(Self::packet_has_more_unprocessed_transactions(
&original_unprocessed_indexes
));
true
}
}
}
});
proc_start.stop();
@ -264,12 +348,14 @@ impl BankingStage {
(new_tx_count as f32) / (proc_start.as_s())
);
inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets);
inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count);
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count);
unprocessed_packets
if let Some(stats) = banking_stage_stats {
stats
.rebuffered_packets_count
.fetch_add(rebuffered_packets_len, Ordering::Relaxed);
stats
.consumed_buffered_packets_count
.fetch_add(new_tx_count, Ordering::Relaxed);
}
}
fn consume_or_forward_packets(
@ -306,11 +392,11 @@ impl BankingStage {
socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
buffered_packets: &mut Vec<PacketsAndOffsets>,
buffered_packets: &mut UnprocessedPackets,
enable_forwarding: bool,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
) -> BufferedPacketsDecision {
let (leader_at_slot_offset, poh_has_bank, would_be_leader) = {
let poh = poh_recorder.lock().unwrap();
@ -332,15 +418,15 @@ impl BankingStage {
match decision {
BufferedPacketsDecision::Consume => {
let mut unprocessed = Self::consume_buffered_packets(
Self::consume_buffered_packets(
my_pubkey,
poh_recorder,
buffered_packets,
batch_limit,
transaction_status_sender,
gossip_vote_sender,
None::<Box<dyn Fn()>>,
Some(banking_stage_stats),
);
buffered_packets.append(&mut unprocessed);
}
BufferedPacketsDecision::Forward => {
if enable_forwarding {
@ -386,7 +472,8 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender,
) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = vec![];
let mut buffered_packets = VecDeque::with_capacity(batch_limit);
let banking_stage_stats = BankingStageStats::new(id);
loop {
while !buffered_packets.is_empty() {
let decision = Self::process_buffered_packets(
@ -396,9 +483,9 @@ impl BankingStage {
cluster_info,
&mut buffered_packets,
enable_forwarding,
batch_limit,
transaction_status_sender.clone(),
&gossip_vote_sender,
&banking_stage_stats,
);
if decision == BufferedPacketsDecision::Hold {
// If we are waiting on a new bank,
@ -427,21 +514,14 @@ impl BankingStage {
batch_limit,
transaction_status_sender.clone(),
&gossip_vote_sender,
&mut buffered_packets,
&banking_stage_stats,
) {
Err(RecvTimeoutError::Timeout) => (),
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
Ok(mut unprocessed_packets) => {
if unprocessed_packets.is_empty() {
continue;
}
let num: usize = unprocessed_packets
.iter()
.map(|(_, unprocessed)| unprocessed.len())
.sum();
inc_new_counter_info!("banking_stage-buffered_packets", num);
buffered_packets.append(&mut unprocessed_packets);
}
}
banking_stage_stats.report(100);
}
}
@ -502,7 +582,6 @@ impl BankingStage {
.lock()
.unwrap()
.record(bank_slot, hash, processed_transactions);
match res {
Ok(()) => (),
Err(PohRecorderError::MaxHeightReached) => {
@ -943,7 +1022,9 @@ impl BankingStage {
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> Result<UnprocessedPackets, RecvTimeoutError> {
buffered_packets: &mut UnprocessedPackets,
banking_stage_stats: &BankingStageStats,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mms = verified_receiver.recv_timeout(recv_timeout)?;
recv_time.stop();
@ -962,17 +1043,18 @@ impl BankingStage {
let mut new_tx_count = 0;
let mut mms_iter = mms.into_iter();
let mut unprocessed_packets = vec![];
let mut dropped_batches_count = 0;
let mut newly_buffered_packets_count = 0;
while let Some(msgs) = mms_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&msgs.packets);
let bank = poh.lock().unwrap().bank();
if bank.is_none() {
Self::push_unprocessed(
&mut unprocessed_packets,
buffered_packets,
msgs,
packet_indexes,
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
);
continue;
@ -992,10 +1074,11 @@ impl BankingStage {
// Collect any unprocessed transactions in this batch for forwarding
Self::push_unprocessed(
&mut unprocessed_packets,
buffered_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
);
@ -1013,10 +1096,11 @@ impl BankingStage {
next_leader,
);
Self::push_unprocessed(
&mut unprocessed_packets,
buffered_packets,
msgs,
unprocessed_indexes,
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
);
}
@ -1036,13 +1120,23 @@ impl BankingStage {
count,
id,
);
inc_new_counter_debug!("banking_stage-process_packets", count);
inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count);
inc_new_counter_debug!("banking_stage-dropped_batches_count", dropped_batches_count);
banking_stage_stats
.process_packets_count
.fetch_add(count, Ordering::Relaxed);
banking_stage_stats
.new_tx_count
.fetch_add(new_tx_count, Ordering::Relaxed);
banking_stage_stats
.dropped_batches_count
.fetch_add(dropped_batches_count, Ordering::Relaxed);
banking_stage_stats
.newly_buffered_packets_count
.fetch_add(newly_buffered_packets_count, Ordering::Relaxed);
banking_stage_stats
.current_buffered_packets_count
.swap(buffered_packets.len(), Ordering::Relaxed);
*recv_start = Instant::now();
Ok(unprocessed_packets)
Ok(())
}
fn push_unprocessed(
@ -1050,17 +1144,23 @@ impl BankingStage {
packets: Packets,
packet_indexes: Vec<usize>,
dropped_batches_count: &mut usize,
newly_buffered_packets_count: &mut usize,
batch_limit: usize,
) {
if !packet_indexes.is_empty() {
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
if unprocessed_packets.len() >= batch_limit {
unprocessed_packets.remove(0);
*dropped_batches_count += 1;
unprocessed_packets.pop_front();
}
unprocessed_packets.push((packets, packet_indexes));
*newly_buffered_packets_count += packet_indexes.len();
unprocessed_packets.push_back((packets, packet_indexes));
}
}
fn packet_has_more_unprocessed_transactions(packet_indexes: &[usize]) -> bool {
!packet_indexes.is_empty()
}
pub fn join(self) -> thread::Result<()> {
for bank_thread_hdl in self.bank_thread_hdls {
bank_thread_hdl.join()?;
@ -1130,7 +1230,7 @@ mod tests {
transaction::TransactionError,
};
use solana_transaction_status::TransactionWithStatusMeta;
use std::{sync::atomic::Ordering, thread::sleep};
use std::{net::SocketAddr, path::Path, sync::atomic::Ordering, thread::sleep};
#[test]
fn test_banking_stage_shutdown1() {
@ -1928,7 +2028,7 @@ mod tests {
})
.collect_vec();
let result = BankingStage::filter_valid_packets_for_forwarding(&all_packets);
let result = BankingStage::filter_valid_packets_for_forwarding(all_packets.iter());
assert_eq!(result.len(), 256);
@ -2108,4 +2208,250 @@ mod tests {
}
Blockstore::destroy(&ledger_path).unwrap();
}
fn setup_conflicting_transactions(
ledger_path: &Path,
) -> (
Vec<Transaction>,
Arc<Bank>,
Arc<Mutex<PohRecorder>>,
Receiver<WorkingBankEntry>,
) {
Blockstore::destroy(&ledger_path).unwrap();
let genesis_config_info = create_genesis_config(10_000);
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = &genesis_config_info;
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger");
let bank = Arc::new(Bank::new(&genesis_config));
let (poh_recorder, entry_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
Some((4, 4)),
bank.ticks_per_slot(),
&solana_sdk::pubkey::new_rand(),
&Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
// Set up unparallelizable conflicting transactions
let pubkey0 = solana_sdk::pubkey::new_rand();
let pubkey1 = solana_sdk::pubkey::new_rand();
let pubkey2 = solana_sdk::pubkey::new_rand();
let transactions = vec![
system_transaction::transfer(&mint_keypair, &pubkey0, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey2, 1, genesis_config.hash()),
];
(transactions, bank, poh_recorder, entry_receiver)
}
#[test]
fn test_consume_buffered_packets() {
let ledger_path = get_tmp_ledger_path!();
{
let (transactions, bank, poh_recorder, _entry_receiver) =
setup_conflicting_transactions(&ledger_path);
let num_conflicting_transactions = transactions.len();
let mut packets_vec = to_packets_chunked(&transactions, num_conflicting_transactions);
assert_eq!(packets_vec.len(), 1);
assert_eq!(packets_vec[0].packets.len(), num_conflicting_transactions);
let all_packets = packets_vec.pop().unwrap();
let mut buffered_packets: UnprocessedPackets = vec![(
all_packets,
(0..num_conflicting_transactions).into_iter().collect(),
)]
.into_iter()
.collect();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.lock().unwrap().has_bank());
BankingStage::consume_buffered_packets(
&Pubkey::default(),
&poh_recorder,
&mut buffered_packets,
None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
None,
);
assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
// Processes one packet per iteration of the loop
for num_expected_unprocessed in (0..num_conflicting_transactions).rev() {
poh_recorder.lock().unwrap().set_bank(&bank);
BankingStage::consume_buffered_packets(
&Pubkey::default(),
&poh_recorder,
&mut buffered_packets,
None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
None,
);
if num_expected_unprocessed == 0 {
assert!(buffered_packets.is_empty())
} else {
assert_eq!(buffered_packets[0].1.len(), num_expected_unprocessed);
}
}
}
Blockstore::destroy(&ledger_path).unwrap();
}
#[test]
fn test_consume_buffered_packets_interrupted() {
let ledger_path = get_tmp_ledger_path!();
{
let (transactions, bank, poh_recorder, _entry_receiver) =
setup_conflicting_transactions(&ledger_path);
let num_conflicting_transactions = transactions.len();
let packets_vec = to_packets_chunked(&transactions, 1);
assert_eq!(packets_vec.len(), num_conflicting_transactions);
for single_packets in &packets_vec {
assert_eq!(single_packets.packets.len(), 1);
}
let mut buffered_packets: UnprocessedPackets = packets_vec
.clone()
.into_iter()
.map(|single_packets| (single_packets, vec![0]))
.collect();
let (continue_sender, continue_receiver) = unbounded();
let (finished_packet_sender, finished_packet_receiver) = unbounded();
let test_fn = Some(move || {
finished_packet_sender.send(()).unwrap();
continue_receiver.recv().unwrap();
});
// When the poh recorder has a bank, it should process all non conflicting buffered packets.
// Because each conflicting transaction is in it's own `Packet` within `packets_vec`, then
// each iteration of this loop will process one element of `packets_vec`per iteration of the
// loop.
let interrupted_iteration = 1;
poh_recorder.lock().unwrap().set_bank(&bank);
let poh_recorder_ = poh_recorder.clone();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// Start up thread to process the banks
let t_consume = Builder::new()
.name("consume-buffered-packets".to_string())
.spawn(move || {
BankingStage::consume_buffered_packets(
&Pubkey::default(),
&poh_recorder_,
&mut buffered_packets,
None,
&gossip_vote_sender,
test_fn,
None,
);
// Check everything is correct. All indexes after `interrupted_iteration`
// should still be unprocessed
assert_eq!(
buffered_packets.len(),
packets_vec[interrupted_iteration + 1..].iter().count()
);
for ((remaining_unprocessed_packet, _), original_packet) in buffered_packets
.iter()
.zip(&packets_vec[interrupted_iteration + 1..])
{
assert_eq!(
remaining_unprocessed_packet.packets[0],
original_packet.packets[0]
);
}
})
.unwrap();
for i in 0..=interrupted_iteration {
finished_packet_receiver.recv().unwrap();
if i == interrupted_iteration {
poh_recorder
.lock()
.unwrap()
.schedule_dummy_max_height_reached_failure();
}
continue_sender.send(()).unwrap();
}
t_consume.join().unwrap();
}
Blockstore::destroy(&ledger_path).unwrap();
}
#[test]
fn test_push_unprocessed_batch_limit() {
// Create `Packets` with 1 unprocessed element
let single_element_packets = Packets::new(vec![Packet::default()]);
let mut unprocessed_packets: UnprocessedPackets =
vec![(single_element_packets.clone(), vec![0])]
.into_iter()
.collect();
// Set the limit to 2
let batch_limit = 2;
// Create some new unprocessed packets
let new_packets = single_element_packets;
let packet_indexes = vec![];
let mut dropped_batches_count = 0;
let mut newly_buffered_packets_count = 0;
// Because the set of unprocessed `packet_indexes` is empty, the
// packets are not added to the unprocessed queue
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes,
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
);
assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_batches_count, 0);
assert_eq!(newly_buffered_packets_count, 0);
// Because the set of unprocessed `packet_indexes` is non-empty, the
// packets are added to the unprocessed queue
let packet_indexes = vec![0];
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packets,
packet_indexes.clone(),
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_batches_count, 0);
assert_eq!(newly_buffered_packets_count, 1);
// Because we've reached the batch limit, old unprocessed packets are
// dropped and the new one is appended to the end
let new_packets = Packets::new(vec![Packet::from_data(
&SocketAddr::from(([127, 0, 0, 1], 8001)),
42,
)
.unwrap()]);
assert_eq!(unprocessed_packets.len(), batch_limit);
BankingStage::push_unprocessed(
&mut unprocessed_packets,
new_packets.clone(),
packet_indexes,
&mut dropped_batches_count,
&mut newly_buffered_packets_count,
batch_limit,
);
assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(unprocessed_packets[1].0.packets[0], new_packets.packets[0]);
assert_eq!(dropped_batches_count, 1);
assert_eq!(newly_buffered_packets_count, 2);
}
}

View File

@ -501,6 +501,11 @@ impl PohRecorder {
poh_config,
)
}
#[cfg(test)]
pub fn schedule_dummy_max_height_reached_failure(&mut self) {
self.reset(Hash::default(), 1, None);
}
}
#[cfg(test)]