diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b368cb7008..508818d120 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -68,7 +68,6 @@ use { collections::HashMap, env, net::{SocketAddr, UdpSocket}, - rc::Rc, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, @@ -618,7 +617,7 @@ impl BankingStage { MinMaxHeap::with_capacity(capacity), ) }; - let retryable_packets: MinMaxHeap> = retryable_packets + let retryable_packets: MinMaxHeap> = retryable_packets .drain_desc() .chunks(num_packets_to_process_per_iteration) .into_iter() @@ -1053,95 +1052,91 @@ impl BankingStage { // `ForwardPacketBatchesByAccounts.rs`. let mut accepting_packets = true; // batch iterate through buffered_packet_batches in desc priority order - let retained_priority_queue: MinMaxHeap> = - original_priority_queue - .drain_desc() - .chunks(batch_size) - .into_iter() - .flat_map(|packets_to_process| { - let packets_to_process = packets_to_process.into_iter().collect_vec(); + let retained_priority_queue: MinMaxHeap<_> = original_priority_queue + .drain_desc() + .chunks(batch_size) + .into_iter() + .flat_map(|packets_to_process| { + let packets_to_process = packets_to_process.into_iter().collect_vec(); - // Vec of same size of `packets_to_process`, each indicates - // corresponding packet is tracer packet. - let tracer_packet_indexes = packets_to_process + // Vec of same size of `packets_to_process`, each indicates + // corresponding packet is tracer packet. + let tracer_packet_indexes = packets_to_process + .iter() + .map(|deserialized_packet| { + deserialized_packet + .original_packet() + .meta + .is_tracer_packet() + }) + .collect::>(); + saturating_add_assign!( + total_tracer_packets_in_buffer, + tracer_packet_indexes .iter() - .map(|deserialized_packet| { - deserialized_packet - .original_packet() - .meta - .is_tracer_packet() - }) - .collect::>(); - saturating_add_assign!( - total_tracer_packets_in_buffer, - tracer_packet_indexes - .iter() - .filter(|is_tracer| **is_tracer) - .count() - ); + .filter(|is_tracer| **is_tracer) + .count() + ); - if accepting_packets { - let ( - (sanitized_transactions, transaction_to_packet_indexes), - packet_conversion_time, - ): ((Vec, Vec), _) = measure!( - Self::sanitize_unforwarded_packets( - buffered_packet_batches, - &packets_to_process, - bank, - ), - "sanitize_packet", - ); - saturating_add_assign!( - total_packet_conversion_us, - packet_conversion_time.as_us() - ); - - let (forwardable_transaction_indexes, filter_packets_time) = measure!( - Self::filter_invalid_transactions(&sanitized_transactions, bank,), - "filter_packets", - ); - saturating_add_assign!( - total_filter_packets_us, - filter_packets_time.as_us() - ); - - for forwardable_transaction_index in &forwardable_transaction_indexes { - saturating_add_assign!(total_forwardable_packets, 1); - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - if tracer_packet_indexes[forwardable_packet_index] { - saturating_add_assign!(total_forwardable_tracer_packets, 1); - } - } - - accepting_packets = Self::add_filtered_packets_to_forward_buffer( - forward_buffer, - &packets_to_process, - &sanitized_transactions, - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - &mut dropped_tx_before_forwarding_count, - ); - - Self::collect_retained_packets( + if accepting_packets { + let ( + (sanitized_transactions, transaction_to_packet_indexes), + packet_conversion_time, + ): ((Vec, Vec), _) = measure!( + Self::sanitize_unforwarded_packets( buffered_packet_batches, &packets_to_process, - &Self::prepare_filtered_packet_indexes( - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - ), - ) - } else { - // skip sanitizing and filtering if not longer able to add more packets for forwarding - saturating_add_assign!( - dropped_tx_before_forwarding_count, - packets_to_process.len() - ); - packets_to_process + bank, + ), + "sanitize_packet", + ); + saturating_add_assign!( + total_packet_conversion_us, + packet_conversion_time.as_us() + ); + + let (forwardable_transaction_indexes, filter_packets_time) = measure!( + Self::filter_invalid_transactions(&sanitized_transactions, bank,), + "filter_packets", + ); + saturating_add_assign!(total_filter_packets_us, filter_packets_time.as_us()); + + for forwardable_transaction_index in &forwardable_transaction_indexes { + saturating_add_assign!(total_forwardable_packets, 1); + let forwardable_packet_index = + transaction_to_packet_indexes[*forwardable_transaction_index]; + if tracer_packet_indexes[forwardable_packet_index] { + saturating_add_assign!(total_forwardable_tracer_packets, 1); + } } - }) - .collect(); + + accepting_packets = Self::add_filtered_packets_to_forward_buffer( + forward_buffer, + &packets_to_process, + &sanitized_transactions, + &transaction_to_packet_indexes, + &forwardable_transaction_indexes, + &mut dropped_tx_before_forwarding_count, + ); + + Self::collect_retained_packets( + buffered_packet_batches, + &packets_to_process, + &Self::prepare_filtered_packet_indexes( + &transaction_to_packet_indexes, + &forwardable_transaction_indexes, + ), + ) + } else { + // skip sanitizing and filtering if not longer able to add more packets for forwarding + saturating_add_assign!( + dropped_tx_before_forwarding_count, + packets_to_process.len() + ); + packets_to_process + } + }) + .collect(); // replace packet priority queue buffered_packet_batches.packet_priority_queue = retained_priority_queue; @@ -1163,7 +1158,7 @@ impl BankingStage { /// Take buffered_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. fn swap_priority_queue( buffered_packet_batches: &mut UnprocessedPacketBatches, - ) -> MinMaxHeap> { + ) -> MinMaxHeap> { let capacity = buffered_packet_batches.capacity(); std::mem::replace( &mut buffered_packet_batches.packet_priority_queue, @@ -1174,7 +1169,7 @@ impl BankingStage { /// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding. fn sanitize_unforwarded_packets( buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Rc], + packets_to_process: &[Arc], bank: &Arc, ) -> (Vec, Vec) { // Get ref of ImmutableDeserializedPacket @@ -1244,9 +1239,9 @@ impl BankingStage { fn collect_retained_packets( buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Rc], + packets_to_process: &[Arc], retained_packet_indexes: &[usize], - ) -> Vec> { + ) -> Vec> { Self::remove_non_retained_packets( buffered_packet_batches, packets_to_process, @@ -1262,7 +1257,7 @@ impl BankingStage { /// been removed from UnprocessedPacketBatches.packet_priority_queue fn remove_non_retained_packets( buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Rc], + packets_to_process: &[Arc], retained_packet_indexes: &[usize], ) { Self::filter_processed_packets( @@ -1283,7 +1278,7 @@ impl BankingStage { /// returns if forward_buffer is still accepting packets, and how many packets added. fn add_filtered_packets_to_forward_buffer( forward_buffer: &mut ForwardPacketBatchesByAccounts, - packets_to_process: &[Rc], + packets_to_process: &[Arc], transactions: &[SanitizedTransaction], transaction_to_packet_indexes: &[usize], retained_transaction_indexes: &[usize], diff --git a/core/src/forward_packet_batches_by_accounts.rs b/core/src/forward_packet_batches_by_accounts.rs index 1de865a426..9132b14a21 100644 --- a/core/src/forward_packet_batches_by_accounts.rs +++ b/core/src/forward_packet_batches_by_accounts.rs @@ -6,7 +6,7 @@ use { cost_tracker::{CostTracker, CostTrackerError}, }, solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction}, - std::rc::Rc, + std::sync::Arc, }; /// `ForwardBatch` to have half of default cost_tracker limits, as smaller batch @@ -27,7 +27,7 @@ pub struct ForwardBatch { cost_tracker: CostTracker, // `forwardable_packets` keeps forwardable packets in a vector in its // original fee prioritized order - forwardable_packets: Vec>, + forwardable_packets: Vec>, } impl Default for ForwardBatch { @@ -59,7 +59,7 @@ impl ForwardBatch { &mut self, write_lock_accounts: &[Pubkey], compute_units: u64, - immutable_packet: Rc, + immutable_packet: Arc, ) -> Result { let res = self.cost_tracker.try_add_requested_cus( write_lock_accounts, @@ -125,7 +125,7 @@ impl ForwardPacketBatchesByAccounts { pub fn try_add_packet( &mut self, sanitized_transaction: &SanitizedTransaction, - packet: Rc, + packet: Arc, ) -> bool { if self.accepting_packets { // get write_lock_accounts @@ -164,7 +164,7 @@ impl ForwardPacketBatchesByAccounts { &mut self, write_lock_accounts: &[Pubkey], compute_units: u64, - immutable_packet: Rc, + immutable_packet: Arc, ) -> bool { for forward_batch in self.forward_batches.iter_mut() { if forward_batch diff --git a/core/src/latest_unprocessed_votes.rs b/core/src/latest_unprocessed_votes.rs index d829f587c4..fe73582abd 100644 --- a/core/src/latest_unprocessed_votes.rs +++ b/core/src/latest_unprocessed_votes.rs @@ -14,7 +14,6 @@ use { std::{ collections::HashMap, ops::DerefMut, - rc::Rc, sync::{Arc, RwLock}, }, }; @@ -30,26 +29,23 @@ pub enum VoteSource { pub struct LatestValidatorVotePacket { vote_source: VoteSource, pubkey: Pubkey, - vote: Option>, + vote: Option>, slot: Slot, forwarded: bool, } -unsafe impl Send for LatestValidatorVotePacket {} -unsafe impl Sync for LatestValidatorVotePacket {} - impl LatestValidatorVotePacket { pub fn new(packet: Packet, vote_source: VoteSource) -> Result { if !packet.meta.is_simple_vote_tx() { return Err(DeserializedPacketError::VoteTransactionError); } - let vote = Rc::new(ImmutableDeserializedPacket::new(packet, None)?); + let vote = Arc::new(ImmutableDeserializedPacket::new(packet, None)?); Self::new_from_immutable(vote, vote_source) } pub fn new_from_immutable( - vote: Rc, + vote: Arc, vote_source: VoteSource, ) -> Result { let message = vote.transaction().get_message(); @@ -80,7 +76,7 @@ impl LatestValidatorVotePacket { } } - pub fn get_vote_packet(&self) -> Rc { + pub fn get_vote_packet(&self) -> Arc { self.vote.as_ref().unwrap().clone() } @@ -101,7 +97,7 @@ impl LatestValidatorVotePacket { self.vote.is_none() } - pub fn take_vote(&mut self) -> Option> { + pub fn take_vote(&mut self) -> Option> { self.vote.take() } } @@ -293,7 +289,7 @@ impl LatestUnprocessedVotes { } /// Drains all votes yet to be processed sorted by a weighted random ordering by stake - pub fn drain_unprocessed(&self, bank: Arc) -> Vec> { + pub fn drain_unprocessed(&self, bank: Arc) -> Vec> { let pubkeys_by_stake = weighted_random_order_by_stake( &bank, self.latest_votes_per_pubkey.read().unwrap().keys(), diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 4e6ad8d24b..7b7ccb7963 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -11,7 +11,6 @@ use { std::{ cmp::Ordering, collections::{hash_map::Entry, HashMap}, - rc::Rc, sync::Arc, }, }; @@ -20,14 +19,14 @@ use { /// SanitizedTransaction #[derive(Debug, Clone, PartialEq, Eq)] pub struct DeserializedPacket { - immutable_section: Rc, + immutable_section: Arc, pub forwarded: bool, } impl DeserializedPacket { pub fn from_immutable_section(immutable_section: ImmutableDeserializedPacket) -> Self { Self { - immutable_section: Rc::new(immutable_section), + immutable_section: Arc::new(immutable_section), forwarded: false, } } @@ -51,12 +50,12 @@ impl DeserializedPacket { let immutable_section = ImmutableDeserializedPacket::new(packet, priority_details)?; Ok(Self { - immutable_section: Rc::new(immutable_section), + immutable_section: Arc::new(immutable_section), forwarded: false, }) } - pub fn immutable_section(&self) -> &Rc { + pub fn immutable_section(&self) -> &Arc { &self.immutable_section } } @@ -80,7 +79,7 @@ impl Ord for DeserializedPacket { /// to pick proper packets to add to the block. #[derive(Debug, Default)] pub struct UnprocessedPacketBatches { - pub packet_priority_queue: MinMaxHeap>, + pub packet_priority_queue: MinMaxHeap>, pub message_hash_to_transaction: HashMap, batch_limit: usize, } @@ -170,7 +169,7 @@ impl UnprocessedPacketBatches { { // TODO: optimize this only when number of packets // with outdated blockhash is high - let new_packet_priority_queue: MinMaxHeap> = self + let new_packet_priority_queue: MinMaxHeap> = self .packet_priority_queue .drain() .filter(|immutable_packet| { diff --git a/core/src/unprocessed_transaction_storage.rs b/core/src/unprocessed_transaction_storage.rs index 97b57c8718..6642da00a4 100644 --- a/core/src/unprocessed_transaction_storage.rs +++ b/core/src/unprocessed_transaction_storage.rs @@ -14,7 +14,7 @@ use { min_max_heap::MinMaxHeap, solana_perf::packet::PacketBatch, solana_runtime::bank::Bank, - std::{rc::Rc, sync::Arc}, + std::sync::Arc, }; const MAX_STAKED_VALIDATORS: usize = 10_000; @@ -203,7 +203,7 @@ impl UnprocessedTransactionStorage { batch_size: usize, processing_function: F, ) where - F: FnMut(&Vec>) -> Option>, + F: FnMut(&Vec>) -> Option>, { match (self, bank) { (Self::LocalTransactionStorage(transaction_storage), _) => { @@ -273,7 +273,7 @@ impl VoteStorage { fn process_packets(&mut self, bank: Arc, batch_size: usize, mut processing_function: F) where - F: FnMut(&Vec>) -> Option>, + F: FnMut(&Vec>) -> Option>, { if matches!(self.vote_source, VoteSource::Gossip) { panic!("Gossip vote thread should not be processing transactions"); @@ -370,7 +370,7 @@ impl ThreadLocalUnprocessedPackets { fn process_packets(&mut self, batch_size: usize, mut processing_function: F) where - F: FnMut(&Vec>) -> Option>, + F: FnMut(&Vec>) -> Option>, { let mut retryable_packets = { let capacity = self.unprocessed_packet_batches.capacity(); @@ -379,7 +379,7 @@ impl ThreadLocalUnprocessedPackets { MinMaxHeap::with_capacity(capacity), ) }; - let retryable_packets: MinMaxHeap> = retryable_packets + let retryable_packets: MinMaxHeap> = retryable_packets .drain_desc() .chunks(batch_size) .into_iter()