MultiIteratorScanner - improve banking stage performance with high contention

This commit is contained in:
Andrew Fitzgerald 2022-11-14 13:04:21 -06:00
parent e269fe3383
commit ee2f760d3d
No known key found for this signature in database
GPG Key ID: 451BBF88ED8DC156
5 changed files with 737 additions and 294 deletions

View File

@ -104,7 +104,6 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
10,
None,
);
});

View File

@ -17,7 +17,7 @@ use {
tracer_packet_stats::TracerPacketStats,
unprocessed_packet_batches::*,
unprocessed_transaction_storage::{
ThreadType, UnprocessedTransactionStorage, UNPROCESSED_BUFFER_STEP_SIZE,
ConsumeScannerPayload, ThreadType, UnprocessedTransactionStorage,
},
},
core::iter::repeat,
@ -161,7 +161,7 @@ pub struct BankingStageStats {
consume_buffered_packets_elapsed: AtomicU64,
receive_and_buffer_packets_elapsed: AtomicU64,
filter_pending_packets_elapsed: AtomicU64,
packet_conversion_elapsed: AtomicU64,
pub(crate) packet_conversion_elapsed: AtomicU64,
transaction_processing_elapsed: AtomicU64,
}
@ -616,8 +616,9 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
fn do_process_packets(
max_tx_ingestion_ns: u128,
poh_recorder: &Arc<RwLock<PohRecorder>>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
working_bank: &Arc<Bank>,
bank_creation_time: &Arc<Instant>,
payload: &mut ConsumeScannerPayload,
recorder: &TransactionRecorder,
transaction_status_sender: &Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@ -626,90 +627,71 @@ impl BankingStage {
log_messages_bytes_limit: Option<usize>,
consumed_buffered_packets_count: &mut usize,
rebuffered_packet_count: &mut usize,
reached_end_of_slot: &mut bool,
test_fn: &Option<impl Fn()>,
packets_to_process: &Vec<Arc<ImmutableDeserializedPacket>>,
) -> Option<Vec<usize>> {
// TODO: Right now we iterate through buffer and try the highest weighted transaction once
// but we should retry the highest weighted transactions more often.
let (bank_start, poh_recorder_lock_time) = measure!(
poh_recorder.read().unwrap().bank_start(),
"poh_recorder.read",
);
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
if payload.reached_end_of_slot {
return None;
}
let packets_to_process_len = packets_to_process.len();
if let Some(BankStart {
working_bank,
bank_creation_time,
}) = bank_start
let (process_transactions_summary, process_packets_transactions_time) = measure!(
Self::process_packets_transactions(
working_bank,
bank_creation_time,
recorder,
&payload.sanitized_transactions,
transaction_status_sender,
gossip_vote_sender,
banking_stage_stats,
qos_service,
payload.slot_metrics_tracker,
log_messages_bytes_limit
),
"process_packets_transactions",
);
payload
.slot_metrics_tracker
.increment_process_packets_transactions_us(process_packets_transactions_time.as_us());
// Clear payload for next iteration
payload.sanitized_transactions.clear();
payload.write_accounts.clear();
let ProcessTransactionsSummary {
reached_max_poh_height,
retryable_transaction_indexes,
..
} = process_transactions_summary;
if reached_max_poh_height
|| !Bank::should_bank_still_be_processing_txs(bank_creation_time, max_tx_ingestion_ns)
{
let (process_transactions_summary, process_packets_transactions_time) = measure!(
Self::process_packets_transactions(
&working_bank,
&bank_creation_time,
recorder,
packets_to_process.iter().map(|p| &**p),
transaction_status_sender,
gossip_vote_sender,
banking_stage_stats,
qos_service,
slot_metrics_tracker,
log_messages_bytes_limit
),
"process_packets_transactions",
);
slot_metrics_tracker.increment_process_packets_transactions_us(
process_packets_transactions_time.as_us(),
);
let ProcessTransactionsSummary {
reached_max_poh_height,
retryable_transaction_indexes,
..
} = process_transactions_summary;
if reached_max_poh_height
|| !Bank::should_bank_still_be_processing_txs(
&bank_creation_time,
max_tx_ingestion_ns,
)
{
*reached_end_of_slot = true;
}
// The difference between all transactions passed to execution and the ones that
// are retryable were the ones that were either:
// 1) Committed into the block
// 2) Dropped without being committed because they had some fatal error (too old,
// duplicate signature, etc.)
//
// Note: This assumes that every packet deserializes into one transaction!
*consumed_buffered_packets_count +=
packets_to_process_len.saturating_sub(retryable_transaction_indexes.len());
// Out of the buffered packets just retried, collect any still unprocessed
// transactions in this batch for forwarding
*rebuffered_packet_count += retryable_transaction_indexes.len();
if let Some(test_fn) = test_fn {
test_fn();
}
slot_metrics_tracker
.increment_retryable_packets_count(retryable_transaction_indexes.len() as u64);
Some(retryable_transaction_indexes)
} else if *reached_end_of_slot {
None
} else {
// mark as end-of-slot to avoid aggressively lock poh for the remaining for
// packet batches in buffer
*reached_end_of_slot = true;
None
payload.reached_end_of_slot = true;
}
// The difference between all transactions passed to execution and the ones that
// are retryable were the ones that were either:
// 1) Committed into the block
// 2) Dropped without being committed because they had some fatal error (too old,
// duplicate signature, etc.)
//
// Note: This assumes that every packet deserializes into one transaction!
*consumed_buffered_packets_count +=
packets_to_process_len.saturating_sub(retryable_transaction_indexes.len());
// Out of the buffered packets just retried, collect any still unprocessed
// transactions in this batch for forwarding
*rebuffered_packet_count += retryable_transaction_indexes.len();
if let Some(test_fn) = test_fn {
test_fn();
}
payload
.slot_metrics_tracker
.increment_retryable_packets_count(retryable_transaction_indexes.len() as u64);
Some(retryable_transaction_indexes)
}
#[allow(clippy::too_many_arguments)]
@ -725,38 +707,57 @@ impl BankingStage {
recorder: &TransactionRecorder,
qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
num_packets_to_process_per_iteration: usize,
log_messages_bytes_limit: Option<usize>,
) {
let mut rebuffered_packet_count = 0;
let mut consumed_buffered_packets_count = 0;
let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot = false;
let reached_end_of_slot;
let num_packets_to_process = unprocessed_transaction_storage.len();
let bank = poh_recorder.read().unwrap().bank();
unprocessed_transaction_storage.process_packets(
bank,
num_packets_to_process_per_iteration,
|packets_to_process| {
Self::do_process_packets(
max_tx_ingestion_ns,
poh_recorder,
slot_metrics_tracker,
recorder,
transaction_status_sender,
gossip_vote_sender,
banking_stage_stats,
qos_service,
log_messages_bytes_limit,
&mut consumed_buffered_packets_count,
&mut rebuffered_packet_count,
&mut reached_end_of_slot,
&test_fn,
packets_to_process,
)
},
// TODO: Right now we iterate through buffer and try the highest weighted transaction once
// but we should retry the highest weighted transactions more often.
let (bank_start, poh_recorder_lock_time) = measure!(
poh_recorder.read().unwrap().bank_start(),
"poh_recorder.read",
);
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
if let Some(BankStart {
working_bank,
bank_creation_time,
}) = bank_start
{
let returned_payload = unprocessed_transaction_storage.process_packets(
working_bank.clone(),
banking_stage_stats,
slot_metrics_tracker,
|packets_to_process, payload| {
Self::do_process_packets(
max_tx_ingestion_ns,
&working_bank,
&bank_creation_time,
payload,
recorder,
transaction_status_sender,
gossip_vote_sender,
banking_stage_stats,
qos_service,
log_messages_bytes_limit,
&mut consumed_buffered_packets_count,
&mut rebuffered_packet_count,
&test_fn,
packets_to_process,
)
},
);
reached_end_of_slot = returned_payload.reached_end_of_slot;
} else {
reached_end_of_slot = true;
}
if reached_end_of_slot {
slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len(
@ -900,7 +901,6 @@ impl BankingStage {
recorder,
qos_service,
slot_metrics_tracker,
UNPROCESSED_BUFFER_STEP_SIZE,
log_messages_bytes_limit
),
"consume_buffered_packets",
@ -1827,27 +1827,21 @@ impl BankingStage {
/// This function returns a vector containing index of all valid transactions. A valid
/// transaction has result Ok() as the value
fn filter_valid_transaction_indexes(
valid_txs: &[TransactionCheckResult],
transaction_indexes: &[usize],
) -> Vec<usize> {
fn filter_valid_transaction_indexes(valid_txs: &[TransactionCheckResult]) -> Vec<usize> {
valid_txs
.iter()
.enumerate()
.filter_map(|(index, (x, _h))| if x.is_ok() { Some(index) } else { None })
.map(|x| transaction_indexes[x])
.collect_vec()
}
/// This function filters pending packets that are still valid
/// # Arguments
/// * `transactions` - a batch of transactions deserialized from packets
/// * `transaction_to_packet_indexes` - maps each transaction to a packet index
/// * `pending_indexes` - identifies which indexes in the `transactions` list are still pending
fn filter_pending_packets_from_pending_txs(
bank: &Arc<Bank>,
transactions: &[SanitizedTransaction],
transaction_to_packet_indexes: &[usize],
pending_indexes: &[usize],
) -> Vec<usize> {
let filter =
@ -1859,7 +1853,7 @@ impl BankingStage {
FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET,
);
Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes)
Self::filter_valid_transaction_indexes(&results)
}
#[allow(clippy::too_many_arguments)]
@ -1867,7 +1861,7 @@ impl BankingStage {
bank: &'a Arc<Bank>,
bank_creation_time: &Instant,
poh: &'a TransactionRecorder,
deserialized_packets: impl Iterator<Item = &'a ImmutableDeserializedPacket>,
sanitized_transactions: &[SanitizedTransaction],
transaction_status_sender: &Option<TransactionStatusSender>,
gossip_vote_sender: &'a ReplayVoteSender,
banking_stage_stats: &'a BankingStageStats,
@ -1875,39 +1869,12 @@ impl BankingStage {
slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
log_messages_bytes_limit: Option<usize>,
) -> ProcessTransactionsSummary {
// Convert packets to transactions
let ((transactions, transaction_to_packet_indexes), packet_conversion_time): (
(Vec<SanitizedTransaction>, Vec<usize>),
_,
) = measure!(
deserialized_packets
.enumerate()
.filter_map(|(i, deserialized_packet)| {
deserialized_packet
.build_sanitized_transaction(
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
)
.map(|transaction| (transaction, i))
})
.unzip(),
"packet_conversion",
);
let packet_conversion_us = packet_conversion_time.as_us();
slot_metrics_tracker.increment_transactions_from_packets_us(packet_conversion_us);
banking_stage_stats
.packet_conversion_elapsed
.fetch_add(packet_conversion_us, Ordering::Relaxed);
inc_new_counter_info!("banking_stage-packet_conversion", 1);
// Process transactions
let (mut process_transactions_summary, process_transactions_time) = measure!(
Self::process_transactions(
bank,
bank_creation_time,
&transactions,
sanitized_transactions,
poh,
transaction_status_sender,
gossip_vote_sender,
@ -1938,8 +1905,7 @@ impl BankingStage {
let (filtered_retryable_transaction_indexes, filter_retryable_packets_time) = measure!(
Self::filter_pending_packets_from_pending_txs(
bank,
&transactions,
&transaction_to_packet_indexes,
sanitized_transactions,
retryable_transaction_indexes,
),
"filter_pending_packets_time",
@ -2164,7 +2130,6 @@ mod tests {
},
std::{
borrow::Cow,
collections::HashSet,
path::Path,
sync::atomic::{AtomicBool, Ordering},
thread::sleep,
@ -2639,33 +2604,27 @@ mod tests {
#[test]
fn test_bank_filter_valid_transaction_indexes() {
assert_eq!(
BankingStage::filter_valid_transaction_indexes(
&[
(Err(TransactionError::BlockhashNotFound), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Ok(()), None),
],
&[2, 4, 5, 9, 11, 13]
),
[5, 11, 13]
BankingStage::filter_valid_transaction_indexes(&[
(Err(TransactionError::BlockhashNotFound), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Ok(()), None),
]),
[2, 4, 5]
);
assert_eq!(
BankingStage::filter_valid_transaction_indexes(
&[
(Ok(()), None),
(Err(TransactionError::BlockhashNotFound), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Ok(()), None),
(Ok(()), None),
],
&[1, 6, 7, 9, 31, 43]
),
[1, 9, 31, 43]
BankingStage::filter_valid_transaction_indexes(&[
(Ok(()), None),
(Err(TransactionError::BlockhashNotFound), None),
(Err(TransactionError::BlockhashNotFound), None),
(Ok(()), None),
(Ok(()), None),
(Ok(()), None),
]),
[0, 3, 4, 5]
);
}
@ -3844,36 +3803,27 @@ mod tests {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
num_conflicting_transactions,
None,
);
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the poh recorder has a bank, should process all non conflicting buffered packets.
// Processes one packet per iteration of the loop
let num_packets_to_process_per_iteration = num_conflicting_transactions;
for num_expected_unprocessed in (0..num_conflicting_transactions).rev() {
poh_recorder.write().unwrap().set_bank(&bank, false);
BankingStage::consume_buffered_packets(
&Pubkey::default(),
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
num_packets_to_process_per_iteration,
None,
);
if num_expected_unprocessed == 0 {
assert!(buffered_packet_batches.is_empty())
} else {
assert_eq!(buffered_packet_batches.len(), num_expected_unprocessed);
}
}
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(&bank, false);
BankingStage::consume_buffered_packets(
&Pubkey::default(),
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
assert!(buffered_packet_batches.is_empty());
poh_recorder
.read()
.unwrap()
@ -3897,11 +3847,8 @@ mod tests {
finished_packet_sender.send(()).unwrap();
continue_receiver.recv().unwrap();
});
// When the poh recorder has a bank, it should process all non conflicting buffered packets.
// Because each conflicting transaction is in it's own `Packet` within a `PacketBatch`, then
// each iteration of this loop will process one element of the batch per iteration of the
// loop.
let interrupted_iteration = 1;
// When the poh recorder has a bank, it should process all buffered packets.
let num_conflicting_transactions = transactions.len();
poh_recorder.write().unwrap().set_bank(&bank, false);
let poh_recorder_ = poh_recorder.clone();
let recorder = poh_recorder_.read().unwrap().recorder();
@ -3917,19 +3864,14 @@ mod tests {
)
.unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let num_packets_to_process_per_iteration = 1;
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(
deserialized_packets.clone().into_iter(),
deserialized_packets.into_iter(),
num_conflicting_transactions,
),
ThreadType::Transactions,
);
let all_packet_message_hashes: HashSet<Hash> = buffered_packet_batches
.iter()
.map(|packet| *packet.immutable_section().message_hash())
.collect();
BankingStage::consume_buffered_packets(
&Pubkey::default(),
std::u128::MAX,
@ -3942,40 +3884,28 @@ mod tests {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
num_packets_to_process_per_iteration,
None,
);
// Check everything is correct. All indexes after `interrupted_iteration`
// should still be unprocessed
assert_eq!(
buffered_packet_batches.len(),
deserialized_packets[interrupted_iteration + 1..].len()
);
for packet in buffered_packet_batches.iter() {
assert!(all_packet_message_hashes
.contains(packet.immutable_section().message_hash()));
}
// Check everything is correct. All valid packets should be processed.
assert!(buffered_packet_batches.is_empty());
})
.unwrap();
for i in 0..=interrupted_iteration {
// Should be calling `test_fn` for each non-conflicting batch.
// In this case each batch is of size 1.
for i in 0..num_conflicting_transactions {
finished_packet_receiver.recv().unwrap();
if i == interrupted_iteration {
if i + 1 == num_conflicting_transactions {
poh_recorder
.write()
.read()
.unwrap()
.schedule_dummy_max_height_reached_failure();
.is_exited
.store(true, Ordering::Relaxed);
}
continue_sender.send(()).unwrap();
}
t_consume.join().unwrap();
poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();

View File

@ -37,6 +37,7 @@ pub mod leader_slot_banking_stage_metrics;
pub mod leader_slot_banking_stage_timing_metrics;
pub mod ledger_cleanup_service;
pub mod ledger_metric_report_service;
pub mod multi_iterator_scanner;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_deserializer;

View File

@ -0,0 +1,346 @@
//! Provides an iterator interface that create non-conflicting batches of elements to process.
//!
//! The problem that this structure is targetting is as following:
//! We have a slice of transactions we want to process in batches where transactions
//! in the same batch do not conflict with each other. This allows us process them in
//! parallel. The original slice is ordered by priority, and it is often the case
//! that transactions with high-priority are conflicting with each other. This means
//! we cannot simply grab chunks of transactions.
//! The solution is to create a MultiIteratorScanner that will use multiple iterators, up
//! to the desired batch size, and create batches of transactions that do not conflict with
//! each other. The MultiIteratorScanner stores state for the current positions of each iterator,
//! as well as which transactions have already been handled. If a transaction is invalid it can
//! also be skipped without being considered for future batches.
//!
/// Output from the element checker used in `MultiIteratorScanner::iterate`.
pub enum ProcessingDecision {
/// Should be processed by the scanner.
Now,
/// Should be skipped by the scanner on this pass - process later.
Later,
/// Should be skipped and marked as handled so we don't try processing it again.
Never,
}
/// Iterates over a slice creating valid non-self-conflicting batches of elements to process,
/// elements between batches are not guaranteed to be non-conflicting.
/// Conflicting elements are guaranteed to be processed in the order they appear in the slice,
/// as long as the `should_process` function is appropriately marking resources as used.
/// It is also guaranteed that elements within the batch are in the order they appear in
/// the slice. The batch size is not guaranteed to be `max_iterators` - it can be smaller.
///
/// # Example:
///
/// Assume transactions with same letter conflict with each other. A typical priority ordered
/// buffer might look like:
///
/// // [A, A, B, A, C, D, B, C, D]
///
/// If we want to have batches of size 4, the MultiIteratorScanner will proceed as follows:
///
/// // [A, A, B, A, C, D, B, C, D]
/// // ^ ^ ^ ^
///
/// // [A, A, B, A, C, D, B, C, D]
/// // ^ ^ ^ ^
///
/// // [A, A, B, A, C, D, B, C, D]
/// // ^
/// The iterator will iterate with batches:
///
/// // [[A, B, C, D], [A, B, C, D], [A]]
///
pub struct MultiIteratorScanner<'a, T, U, F>
where
F: FnMut(&T, &mut U) -> ProcessingDecision,
{
/// Maximum number of iterators to use.
max_iterators: usize,
/// Slice that we're iterating over
slice: &'a [T],
/// Payload - used to store shared mutable state between scanner and the processing function.
payload: U,
/// Function that checks if an element should be processed. This function is also responsible
/// for marking resources, such as locks, as used.
should_process: F,
/// Store whether an element has already been handled
already_handled: Vec<bool>,
/// Current indices inside `slice` for multiple iterators
current_positions: Vec<usize>,
/// Container to store items for iteration - Should only be used in `get_current_items()`
current_items: Vec<&'a T>,
/// Initialized
initialized: bool,
}
impl<'a, T, U, F> MultiIteratorScanner<'a, T, U, F>
where
F: FnMut(&T, &mut U) -> ProcessingDecision,
{
pub fn new(slice: &'a [T], max_iterators: usize, payload: U, should_process: F) -> Self {
assert!(max_iterators > 0);
Self {
max_iterators,
slice,
payload,
should_process,
already_handled: vec![false; slice.len()],
current_positions: Vec::with_capacity(max_iterators),
current_items: Vec::with_capacity(max_iterators),
initialized: false,
}
}
/// Returns a slice of the item references at the current positions of the iterators
/// and a mutable reference to the payload.
///
/// Returns None if the scanner is done iterating.
pub fn iterate(&mut self) -> Option<(&[&'a T], &mut U)> {
if !self.initialized {
self.initialized = true;
self.initialize_current_positions();
} else {
self.advance_current_positions();
}
self.get_current_items()
}
/// Consume the iterator and return the payload.
pub fn finalize(self) -> U {
self.payload
}
/// Initialize the `current_positions` vector for the first batch.
fn initialize_current_positions(&mut self) {
let mut last_index = 0;
for _iterator_index in 0..self.max_iterators {
match self.march_iterator(last_index) {
Some(index) => {
self.current_positions.push(index);
last_index = index.saturating_add(1);
}
None => break,
}
}
}
/// March iterators forward to find the next batch of items.
fn advance_current_positions(&mut self) {
if let Some(mut prev_index) = self.current_positions.first().copied() {
for iterator_index in 0..self.current_positions.len() {
// If the previous iterator has passed this iterator, we should start
// at it's position + 1 to avoid duplicate re-traversal.
let start_index = (self.current_positions[iterator_index].saturating_add(1))
.max(prev_index.saturating_add(1));
match self.march_iterator(start_index) {
Some(index) => {
self.current_positions[iterator_index] = index;
prev_index = index;
}
None => {
// Drop current positions that go past the end of the slice
self.current_positions.truncate(iterator_index);
break;
}
}
}
}
}
/// Get the current items from the slice using `self.current_positions`.
/// Returns `None` if there are no more items.
fn get_current_items(&mut self) -> Option<(&[&'a T], &mut U)> {
self.current_items.clear();
for index in &self.current_positions {
self.current_items.push(&self.slice[*index]);
}
(!self.current_items.is_empty()).then_some((&self.current_items, &mut self.payload))
}
/// Moves the iterator to its' next position. If we've reached the end of the slice, we return None
fn march_iterator(&mut self, starting_index: usize) -> Option<usize> {
let mut found = None;
for index in starting_index..self.slice.len() {
if !self.already_handled[index] {
match (self.should_process)(&self.slice[index], &mut self.payload) {
ProcessingDecision::Now => {
self.already_handled[index] = true;
found = Some(index);
break;
}
ProcessingDecision::Later => {
// Do nothing - iterator will try this element in a future batch
}
ProcessingDecision::Never => {
self.already_handled[index] = true;
}
}
}
}
found
}
}
#[cfg(test)]
mod tests {
use {super::MultiIteratorScanner, crate::multi_iterator_scanner::ProcessingDecision};
struct TestScannerPayload {
locks: Vec<bool>,
}
fn test_scanner_locking_should_process(
item: &i32,
payload: &mut TestScannerPayload,
) -> ProcessingDecision {
if payload.locks[*item as usize] {
ProcessingDecision::Later
} else {
payload.locks[*item as usize] = true;
ProcessingDecision::Now
}
}
#[test]
fn test_multi_iterator_scanner_empty() {
let slice: Vec<i32> = vec![];
let mut scanner = MultiIteratorScanner::new(&slice, 2, (), |_, _| ProcessingDecision::Now);
assert!(scanner.iterate().is_none());
}
#[test]
fn test_multi_iterator_scanner_iterate() {
let slice = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11];
let should_process = |_item: &i32, _payload: &mut ()| ProcessingDecision::Now;
let mut scanner = MultiIteratorScanner::new(&slice, 2, (), should_process);
let mut actual_batches = vec![];
while let Some((batch, _payload)) = scanner.iterate() {
actual_batches.push(batch.to_vec());
}
// Batch 1: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// ^ ^
// Batch 2: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// ^ ^
// Batch 3: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// ^ ^
// Batch 4: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// ^ ^
// Batch 5: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// ^ ^
// Batch 6: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// ^
let expected_batches = vec![
vec![&1, &2],
vec![&3, &4],
vec![&5, &6],
vec![&7, &8],
vec![&9, &10],
vec![&11],
];
assert_eq!(actual_batches, expected_batches);
}
#[test]
fn test_multi_iterator_scanner_iterate_with_gaps() {
let slice = [0, 0, 0, 1, 2, 3, 1];
let payload = TestScannerPayload {
locks: vec![false; 4],
};
let mut scanner =
MultiIteratorScanner::new(&slice, 2, payload, test_scanner_locking_should_process);
let mut actual_batches = vec![];
while let Some((batch, payload)) = scanner.iterate() {
// free the resources
for item in batch {
payload.locks[**item as usize] = false;
}
actual_batches.push(batch.to_vec());
}
// Batch 1: [0, 0, 0, 1, 2, 3, 4]
// ^ ^
// Batch 2: [0, 0, 0, 1, 2, 3, 4]
// ^ ^
// Batch 3: [0, 0, 0, 1, 2, 3, 4]
// ^ ^
// Batch 4: [0, 0, 0, 1, 2, 3, 4]
// -----------^ (--- indicates where the 0th iterator marched from)
let expected_batches = vec![vec![&0, &1], vec![&0, &2], vec![&0, &3], vec![&1]];
assert_eq!(actual_batches, expected_batches);
let TestScannerPayload { locks } = scanner.finalize();
assert_eq!(locks, vec![false; 4]);
}
#[test]
fn test_multi_iterator_scanner_iterate_conflicts_not_at_front() {
let slice = [1, 2, 3, 0, 0, 0, 3, 2, 1];
let payload = TestScannerPayload {
locks: vec![false; 4],
};
let mut scanner =
MultiIteratorScanner::new(&slice, 2, payload, test_scanner_locking_should_process);
let mut actual_batches = vec![];
while let Some((batch, payload)) = scanner.iterate() {
// free the resources
for item in batch {
payload.locks[**item as usize] = false;
}
actual_batches.push(batch.to_vec());
}
// Batch 1: [1, 2, 3, 0, 0, 0, 3, 2, 1]
// ^ ^
// Batch 2: [1, 2, 3, 0, 0, 0, 3, 2, 1]
// ^ ^
// Batch 3: [1, 2, 3, 0, 0, 0, 3, 2, 1]
// ^ ^
// Batch 4: [1, 2, 3, 0, 0, 0, 3, 2, 1]
// ^ ^
// Batch 5: [1, 2, 3, 0, 0, 0, 3, 2, 1]
// ^
let expected_batches = vec![
vec![&1, &2],
vec![&3, &0],
vec![&0, &3],
vec![&0, &2],
vec![&1],
];
assert_eq!(actual_batches, expected_batches);
let TestScannerPayload { locks } = scanner.finalize();
assert_eq!(locks, vec![false; 4]);
}
#[test]
fn test_multi_iterator_scanner_iterate_with_never_process() {
let slice = [0, 4, 1, 2];
let should_process = |item: &i32, _payload: &mut ()| match item {
4 => ProcessingDecision::Never,
_ => ProcessingDecision::Now,
};
let mut scanner = MultiIteratorScanner::new(&slice, 2, (), should_process);
let mut actual_batches = vec![];
while let Some((batch, _payload)) = scanner.iterate() {
actual_batches.push(batch.to_vec());
}
// Batch 1: [0, 4, 1, 2]
// ^ ^
// Batch 2: [0, 4, 1, 2]
// ^
let expected_batches = vec![vec![&0, &1], vec![&2]];
assert_eq!(actual_batches, expected_batches);
}
}

View File

@ -1,12 +1,14 @@
use {
crate::{
banking_stage::{FilterForwardingResults, ForwardOption},
banking_stage::{BankingStageStats, FilterForwardingResults, ForwardOption},
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
immutable_deserialized_packet::ImmutableDeserializedPacket,
latest_unprocessed_votes::{
LatestUnprocessedVotes, LatestValidatorVotePacket, VoteBatchInsertionMetrics,
VoteSource,
},
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
multi_iterator_scanner::{MultiIteratorScanner, ProcessingDecision},
unprocessed_packet_batches::{
DeserializedPacket, PacketBatchInsertionMetrics, UnprocessedPacketBatches,
},
@ -16,13 +18,19 @@ use {
solana_measure::measure,
solana_runtime::bank::Bank,
solana_sdk::{
clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, saturating_add_assign,
transaction::SanitizedTransaction,
clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, pubkey::Pubkey,
saturating_add_assign, transaction::SanitizedTransaction,
},
std::{
collections::HashSet,
sync::{atomic::Ordering, Arc},
},
std::sync::Arc,
};
pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
// Step-size set to be 64, equal to the maximum batch/entry size. With the
// multi-iterator change, there's no point in getting larger batches of
// non-conflicting transactions.
pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 64;
/// Maximum numer of votes a single receive call will accept
const MAX_NUM_VOTES_RECEIVE: usize = 10_000;
@ -119,6 +127,112 @@ fn filter_processed_packets<'a, F>(
}
}
/// Convenient wrapper for shared-state between banking stage processing and the
/// multi-iterator checking function.
pub struct ConsumeScannerPayload<'a> {
pub reached_end_of_slot: bool,
pub write_accounts: HashSet<Pubkey>,
pub sanitized_transactions: Vec<SanitizedTransaction>,
pub slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
}
fn consume_scan_should_process_packet(
bank: &Bank,
banking_stage_stats: &BankingStageStats,
packet: &ImmutableDeserializedPacket,
payload: &mut ConsumeScannerPayload,
) -> ProcessingDecision {
// If end of the slot, return should process (quick loop after reached end of slot)
if payload.reached_end_of_slot {
return ProcessingDecision::Now;
}
// Before sanitization, let's quickly check the static keys (performance optimization)
let message = &packet.transaction().get_message().message;
let static_keys = message.static_account_keys();
for key in static_keys.iter().enumerate().filter_map(|(idx, key)| {
if message.is_maybe_writable(idx) {
Some(key)
} else {
None
}
}) {
if payload.write_accounts.contains(key) {
return ProcessingDecision::Later;
}
}
// Try to deserialize the packet
let (maybe_sanitized_transaction, sanitization_time) = measure!(
packet.build_sanitized_transaction(&bank.feature_set, bank.vote_only_bank(), bank)
);
let sanitization_time_us = sanitization_time.as_us();
payload
.slot_metrics_tracker
.increment_transactions_from_packets_us(sanitization_time_us);
banking_stage_stats
.packet_conversion_elapsed
.fetch_add(sanitization_time_us, Ordering::Relaxed);
if let Some(sanitized_transaction) = maybe_sanitized_transaction {
let message = sanitized_transaction.message();
let conflicts_with_batch = message.account_keys().iter().enumerate().any(|(idx, key)| {
if message.is_writable(idx) {
payload.write_accounts.contains(key)
} else {
false
}
});
if conflicts_with_batch {
ProcessingDecision::Later
} else {
message
.account_keys()
.iter()
.enumerate()
.for_each(|(idx, key)| {
if message.is_writable(idx) {
payload.write_accounts.insert(*key);
}
});
payload.sanitized_transactions.push(sanitized_transaction);
ProcessingDecision::Now
}
} else {
ProcessingDecision::Never
}
}
fn create_consume_multi_iterator<'a, 'b, F>(
packets: &'a [Arc<ImmutableDeserializedPacket>],
slot_metrics_tracker: &'b mut LeaderSlotMetricsTracker,
should_process_packet: F,
) -> MultiIteratorScanner<'a, Arc<ImmutableDeserializedPacket>, ConsumeScannerPayload<'b>, F>
where
F: FnMut(
&Arc<ImmutableDeserializedPacket>,
&mut ConsumeScannerPayload<'b>,
) -> ProcessingDecision,
'b: 'a,
{
let payload = ConsumeScannerPayload {
reached_end_of_slot: false,
write_accounts: HashSet::new(),
sanitized_transactions: Vec::with_capacity(UNPROCESSED_BUFFER_STEP_SIZE),
slot_metrics_tracker,
};
MultiIteratorScanner::new(
packets,
UNPROCESSED_BUFFER_STEP_SIZE,
payload,
should_process_packet,
)
}
impl UnprocessedTransactionStorage {
pub fn new_transaction_storage(
unprocessed_packet_batches: UnprocessedPacketBatches,
@ -233,22 +347,34 @@ impl UnprocessedTransactionStorage {
/// The processing function takes a stream of packets ready to process, and returns the indices
/// of the unprocessed packets that are eligible for retry. A return value of None means that
/// all packets are unprocessed and eligible for retry.
pub fn process_packets<F>(
#[must_use]
pub fn process_packets<'a, F>(
&mut self,
bank: Option<Arc<Bank>>,
batch_size: usize,
bank: Arc<Bank>,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
processing_function: F,
) where
F: FnMut(&Vec<Arc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
) -> ConsumeScannerPayload<'a>
where
F: FnMut(
&Vec<Arc<ImmutableDeserializedPacket>>,
&mut ConsumeScannerPayload,
) -> Option<Vec<usize>>,
{
match (self, bank) {
(Self::LocalTransactionStorage(transaction_storage), _) => {
transaction_storage.process_packets(batch_size, processing_function)
}
(Self::VoteStorage(vote_storage), Some(bank)) => {
vote_storage.process_packets(bank, batch_size, processing_function)
}
_ => {}
match self {
Self::LocalTransactionStorage(transaction_storage) => transaction_storage
.process_packets(
&bank,
banking_stage_stats,
slot_metrics_tracker,
processing_function,
),
Self::VoteStorage(vote_storage) => vote_storage.process_packets(
bank,
banking_stage_stats,
slot_metrics_tracker,
processing_function,
),
}
}
}
@ -312,39 +438,62 @@ impl VoteStorage {
FilterForwardingResults::default()
}
fn process_packets<F>(&mut self, bank: Arc<Bank>, batch_size: usize, mut processing_function: F)
fn process_packets<'a, F>(
&mut self,
bank: Arc<Bank>,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
mut processing_function: F,
) -> ConsumeScannerPayload<'a>
where
F: FnMut(&Vec<Arc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
F: FnMut(
&Vec<Arc<ImmutableDeserializedPacket>>,
&mut ConsumeScannerPayload,
) -> Option<Vec<usize>>,
{
if matches!(self.vote_source, VoteSource::Gossip) {
panic!("Gossip vote thread should not be processing transactions");
}
// Insert the retryable votes back in
self.latest_unprocessed_votes.insert_batch(
// Based on the stake distribution present in the supplied bank, drain the unprocessed votes
// from each validator using a weighted random ordering. Votes from validators with
// 0 stake are ignored.
self.latest_unprocessed_votes
.drain_unprocessed(bank)
.into_iter()
.chunks(batch_size)
.into_iter()
.flat_map(|vote_packets| {
let vote_packets = vote_packets.into_iter().collect_vec();
if let Some(retryable_vote_indices) = processing_function(&vote_packets) {
retryable_vote_indices
.iter()
.map(|i| vote_packets[*i].clone())
.collect_vec()
} else {
vote_packets
}
})
.filter_map(|packet| {
LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok()
}),
let should_process_packet =
|packet: &Arc<ImmutableDeserializedPacket>, payload: &mut ConsumeScannerPayload| {
consume_scan_should_process_packet(&bank, banking_stage_stats, packet, payload)
};
// Based on the stake distribution present in the supplied bank, drain the unprocessed votes
// from each validator using a weighted random ordering. Votes from validators with
// 0 stake are ignored.
let all_vote_packets = self
.latest_unprocessed_votes
.drain_unprocessed(bank.clone());
let mut scanner = create_consume_multi_iterator(
&all_vote_packets,
slot_metrics_tracker,
should_process_packet,
);
while let Some((packets, payload)) = scanner.iterate() {
let vote_packets = packets.iter().map(|p| (*p).clone()).collect_vec();
if let Some(retryable_vote_indices) = processing_function(&vote_packets, payload) {
self.latest_unprocessed_votes.insert_batch(
retryable_vote_indices.iter().filter_map(|i| {
LatestValidatorVotePacket::new_from_immutable(
vote_packets[*i].clone(),
self.vote_source,
)
.ok()
}),
);
} else {
self.latest_unprocessed_votes
.insert_batch(vote_packets.into_iter().filter_map(|packet| {
LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok()
}));
}
}
scanner.finalize()
}
}
@ -711,35 +860,53 @@ impl ThreadLocalUnprocessedPackets {
)
}
fn process_packets<F>(&mut self, batch_size: usize, mut processing_function: F)
fn process_packets<'a, F>(
&mut self,
bank: &Bank,
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
mut processing_function: F,
) -> ConsumeScannerPayload<'a>
where
F: FnMut(&Vec<Arc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
F: FnMut(
&Vec<Arc<ImmutableDeserializedPacket>>,
&mut ConsumeScannerPayload,
) -> Option<Vec<usize>>,
{
let mut retryable_packets = self.take_priority_queue();
let original_capacity = retryable_packets.capacity();
let mut new_retryable_packets = MinMaxHeap::with_capacity(original_capacity);
new_retryable_packets.extend(
retryable_packets
.drain_desc()
.chunks(batch_size)
.into_iter()
.flat_map(|packets_to_process| {
let packets_to_process = packets_to_process.into_iter().collect_vec();
if let Some(retryable_transaction_indexes) =
processing_function(&packets_to_process)
{
self.collect_retained_packets(
&packets_to_process,
&retryable_transaction_indexes,
)
} else {
packets_to_process
}
}),
let all_packets_to_process = retryable_packets.drain_desc().collect_vec();
let should_process_packet =
|packet: &Arc<ImmutableDeserializedPacket>, payload: &mut ConsumeScannerPayload| {
consume_scan_should_process_packet(bank, banking_stage_stats, packet, payload)
};
let mut scanner = create_consume_multi_iterator(
&all_packets_to_process,
slot_metrics_tracker,
should_process_packet,
);
while let Some((packets_to_process, payload)) = scanner.iterate() {
let packets_to_process = packets_to_process
.iter()
.map(|p| (*p).clone())
.collect_vec();
let retryable_packets = if let Some(retryable_transaction_indexes) =
processing_function(&packets_to_process, payload)
{
self.collect_retained_packets(&packets_to_process, &retryable_transaction_indexes)
} else {
packets_to_process
};
new_retryable_packets.extend(retryable_packets);
}
self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets;
self.verify_priority_queue(original_capacity);
scanner.finalize()
}
/// Prepare a chunk of packets for forwarding, filter out already forwarded packets while