ImmutableDeserializedPacket rc to arc (#28145)

This commit is contained in:
apfitzge 2022-09-30 12:07:48 -05:00 committed by GitHub
parent 857a388005
commit 82558226f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 108 additions and 118 deletions

View File

@ -68,7 +68,6 @@ use {
collections::HashMap,
env,
net::{SocketAddr, UdpSocket},
rc::Rc,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
@ -618,7 +617,7 @@ impl BankingStage {
MinMaxHeap::with_capacity(capacity),
)
};
let retryable_packets: MinMaxHeap<Rc<ImmutableDeserializedPacket>> = retryable_packets
let retryable_packets: MinMaxHeap<Arc<ImmutableDeserializedPacket>> = retryable_packets
.drain_desc()
.chunks(num_packets_to_process_per_iteration)
.into_iter()
@ -1053,95 +1052,91 @@ impl BankingStage {
// `ForwardPacketBatchesByAccounts.rs`.
let mut accepting_packets = true;
// batch iterate through buffered_packet_batches in desc priority order
let retained_priority_queue: MinMaxHeap<Rc<ImmutableDeserializedPacket>> =
original_priority_queue
.drain_desc()
.chunks(batch_size)
.into_iter()
.flat_map(|packets_to_process| {
let packets_to_process = packets_to_process.into_iter().collect_vec();
let retained_priority_queue: MinMaxHeap<_> = original_priority_queue
.drain_desc()
.chunks(batch_size)
.into_iter()
.flat_map(|packets_to_process| {
let packets_to_process = packets_to_process.into_iter().collect_vec();
// Vec<bool> of same size of `packets_to_process`, each indicates
// corresponding packet is tracer packet.
let tracer_packet_indexes = packets_to_process
// Vec<bool> of same size of `packets_to_process`, each indicates
// corresponding packet is tracer packet.
let tracer_packet_indexes = packets_to_process
.iter()
.map(|deserialized_packet| {
deserialized_packet
.original_packet()
.meta
.is_tracer_packet()
})
.collect::<Vec<_>>();
saturating_add_assign!(
total_tracer_packets_in_buffer,
tracer_packet_indexes
.iter()
.map(|deserialized_packet| {
deserialized_packet
.original_packet()
.meta
.is_tracer_packet()
})
.collect::<Vec<_>>();
saturating_add_assign!(
total_tracer_packets_in_buffer,
tracer_packet_indexes
.iter()
.filter(|is_tracer| **is_tracer)
.count()
);
.filter(|is_tracer| **is_tracer)
.count()
);
if accepting_packets {
let (
(sanitized_transactions, transaction_to_packet_indexes),
packet_conversion_time,
): ((Vec<SanitizedTransaction>, Vec<usize>), _) = measure!(
Self::sanitize_unforwarded_packets(
buffered_packet_batches,
&packets_to_process,
bank,
),
"sanitize_packet",
);
saturating_add_assign!(
total_packet_conversion_us,
packet_conversion_time.as_us()
);
let (forwardable_transaction_indexes, filter_packets_time) = measure!(
Self::filter_invalid_transactions(&sanitized_transactions, bank,),
"filter_packets",
);
saturating_add_assign!(
total_filter_packets_us,
filter_packets_time.as_us()
);
for forwardable_transaction_index in &forwardable_transaction_indexes {
saturating_add_assign!(total_forwardable_packets, 1);
let forwardable_packet_index =
transaction_to_packet_indexes[*forwardable_transaction_index];
if tracer_packet_indexes[forwardable_packet_index] {
saturating_add_assign!(total_forwardable_tracer_packets, 1);
}
}
accepting_packets = Self::add_filtered_packets_to_forward_buffer(
forward_buffer,
&packets_to_process,
&sanitized_transactions,
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
&mut dropped_tx_before_forwarding_count,
);
Self::collect_retained_packets(
if accepting_packets {
let (
(sanitized_transactions, transaction_to_packet_indexes),
packet_conversion_time,
): ((Vec<SanitizedTransaction>, Vec<usize>), _) = measure!(
Self::sanitize_unforwarded_packets(
buffered_packet_batches,
&packets_to_process,
&Self::prepare_filtered_packet_indexes(
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
),
)
} else {
// skip sanitizing and filtering if not longer able to add more packets for forwarding
saturating_add_assign!(
dropped_tx_before_forwarding_count,
packets_to_process.len()
);
packets_to_process
bank,
),
"sanitize_packet",
);
saturating_add_assign!(
total_packet_conversion_us,
packet_conversion_time.as_us()
);
let (forwardable_transaction_indexes, filter_packets_time) = measure!(
Self::filter_invalid_transactions(&sanitized_transactions, bank,),
"filter_packets",
);
saturating_add_assign!(total_filter_packets_us, filter_packets_time.as_us());
for forwardable_transaction_index in &forwardable_transaction_indexes {
saturating_add_assign!(total_forwardable_packets, 1);
let forwardable_packet_index =
transaction_to_packet_indexes[*forwardable_transaction_index];
if tracer_packet_indexes[forwardable_packet_index] {
saturating_add_assign!(total_forwardable_tracer_packets, 1);
}
}
})
.collect();
accepting_packets = Self::add_filtered_packets_to_forward_buffer(
forward_buffer,
&packets_to_process,
&sanitized_transactions,
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
&mut dropped_tx_before_forwarding_count,
);
Self::collect_retained_packets(
buffered_packet_batches,
&packets_to_process,
&Self::prepare_filtered_packet_indexes(
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
),
)
} else {
// skip sanitizing and filtering if not longer able to add more packets for forwarding
saturating_add_assign!(
dropped_tx_before_forwarding_count,
packets_to_process.len()
);
packets_to_process
}
})
.collect();
// replace packet priority queue
buffered_packet_batches.packet_priority_queue = retained_priority_queue;
@ -1163,7 +1158,7 @@ impl BankingStage {
/// Take buffered_packet_batches's priority_queue out, leave empty MinMaxHeap in its place.
fn swap_priority_queue(
buffered_packet_batches: &mut UnprocessedPacketBatches,
) -> MinMaxHeap<Rc<ImmutableDeserializedPacket>> {
) -> MinMaxHeap<Arc<ImmutableDeserializedPacket>> {
let capacity = buffered_packet_batches.capacity();
std::mem::replace(
&mut buffered_packet_batches.packet_priority_queue,
@ -1174,7 +1169,7 @@ impl BankingStage {
/// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding.
fn sanitize_unforwarded_packets(
buffered_packet_batches: &mut UnprocessedPacketBatches,
packets_to_process: &[Rc<ImmutableDeserializedPacket>],
packets_to_process: &[Arc<ImmutableDeserializedPacket>],
bank: &Arc<Bank>,
) -> (Vec<SanitizedTransaction>, Vec<usize>) {
// Get ref of ImmutableDeserializedPacket
@ -1244,9 +1239,9 @@ impl BankingStage {
fn collect_retained_packets(
buffered_packet_batches: &mut UnprocessedPacketBatches,
packets_to_process: &[Rc<ImmutableDeserializedPacket>],
packets_to_process: &[Arc<ImmutableDeserializedPacket>],
retained_packet_indexes: &[usize],
) -> Vec<Rc<ImmutableDeserializedPacket>> {
) -> Vec<Arc<ImmutableDeserializedPacket>> {
Self::remove_non_retained_packets(
buffered_packet_batches,
packets_to_process,
@ -1262,7 +1257,7 @@ impl BankingStage {
/// been removed from UnprocessedPacketBatches.packet_priority_queue
fn remove_non_retained_packets(
buffered_packet_batches: &mut UnprocessedPacketBatches,
packets_to_process: &[Rc<ImmutableDeserializedPacket>],
packets_to_process: &[Arc<ImmutableDeserializedPacket>],
retained_packet_indexes: &[usize],
) {
Self::filter_processed_packets(
@ -1283,7 +1278,7 @@ impl BankingStage {
/// returns if forward_buffer is still accepting packets, and how many packets added.
fn add_filtered_packets_to_forward_buffer(
forward_buffer: &mut ForwardPacketBatchesByAccounts,
packets_to_process: &[Rc<ImmutableDeserializedPacket>],
packets_to_process: &[Arc<ImmutableDeserializedPacket>],
transactions: &[SanitizedTransaction],
transaction_to_packet_indexes: &[usize],
retained_transaction_indexes: &[usize],

View File

@ -6,7 +6,7 @@ use {
cost_tracker::{CostTracker, CostTrackerError},
},
solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction},
std::rc::Rc,
std::sync::Arc,
};
/// `ForwardBatch` to have half of default cost_tracker limits, as smaller batch
@ -27,7 +27,7 @@ pub struct ForwardBatch {
cost_tracker: CostTracker,
// `forwardable_packets` keeps forwardable packets in a vector in its
// original fee prioritized order
forwardable_packets: Vec<Rc<ImmutableDeserializedPacket>>,
forwardable_packets: Vec<Arc<ImmutableDeserializedPacket>>,
}
impl Default for ForwardBatch {
@ -59,7 +59,7 @@ impl ForwardBatch {
&mut self,
write_lock_accounts: &[Pubkey],
compute_units: u64,
immutable_packet: Rc<ImmutableDeserializedPacket>,
immutable_packet: Arc<ImmutableDeserializedPacket>,
) -> Result<u64, CostTrackerError> {
let res = self.cost_tracker.try_add_requested_cus(
write_lock_accounts,
@ -125,7 +125,7 @@ impl ForwardPacketBatchesByAccounts {
pub fn try_add_packet(
&mut self,
sanitized_transaction: &SanitizedTransaction,
packet: Rc<ImmutableDeserializedPacket>,
packet: Arc<ImmutableDeserializedPacket>,
) -> bool {
if self.accepting_packets {
// get write_lock_accounts
@ -164,7 +164,7 @@ impl ForwardPacketBatchesByAccounts {
&mut self,
write_lock_accounts: &[Pubkey],
compute_units: u64,
immutable_packet: Rc<ImmutableDeserializedPacket>,
immutable_packet: Arc<ImmutableDeserializedPacket>,
) -> bool {
for forward_batch in self.forward_batches.iter_mut() {
if forward_batch

View File

@ -14,7 +14,6 @@ use {
std::{
collections::HashMap,
ops::DerefMut,
rc::Rc,
sync::{Arc, RwLock},
},
};
@ -30,26 +29,23 @@ pub enum VoteSource {
pub struct LatestValidatorVotePacket {
vote_source: VoteSource,
pubkey: Pubkey,
vote: Option<Rc<ImmutableDeserializedPacket>>,
vote: Option<Arc<ImmutableDeserializedPacket>>,
slot: Slot,
forwarded: bool,
}
unsafe impl Send for LatestValidatorVotePacket {}
unsafe impl Sync for LatestValidatorVotePacket {}
impl LatestValidatorVotePacket {
pub fn new(packet: Packet, vote_source: VoteSource) -> Result<Self, DeserializedPacketError> {
if !packet.meta.is_simple_vote_tx() {
return Err(DeserializedPacketError::VoteTransactionError);
}
let vote = Rc::new(ImmutableDeserializedPacket::new(packet, None)?);
let vote = Arc::new(ImmutableDeserializedPacket::new(packet, None)?);
Self::new_from_immutable(vote, vote_source)
}
pub fn new_from_immutable(
vote: Rc<ImmutableDeserializedPacket>,
vote: Arc<ImmutableDeserializedPacket>,
vote_source: VoteSource,
) -> Result<Self, DeserializedPacketError> {
let message = vote.transaction().get_message();
@ -80,7 +76,7 @@ impl LatestValidatorVotePacket {
}
}
pub fn get_vote_packet(&self) -> Rc<ImmutableDeserializedPacket> {
pub fn get_vote_packet(&self) -> Arc<ImmutableDeserializedPacket> {
self.vote.as_ref().unwrap().clone()
}
@ -101,7 +97,7 @@ impl LatestValidatorVotePacket {
self.vote.is_none()
}
pub fn take_vote(&mut self) -> Option<Rc<ImmutableDeserializedPacket>> {
pub fn take_vote(&mut self) -> Option<Arc<ImmutableDeserializedPacket>> {
self.vote.take()
}
}
@ -293,7 +289,7 @@ impl LatestUnprocessedVotes {
}
/// Drains all votes yet to be processed sorted by a weighted random ordering by stake
pub fn drain_unprocessed(&self, bank: Arc<Bank>) -> Vec<Rc<ImmutableDeserializedPacket>> {
pub fn drain_unprocessed(&self, bank: Arc<Bank>) -> Vec<Arc<ImmutableDeserializedPacket>> {
let pubkeys_by_stake = weighted_random_order_by_stake(
&bank,
self.latest_votes_per_pubkey.read().unwrap().keys(),

View File

@ -11,7 +11,6 @@ use {
std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
rc::Rc,
sync::Arc,
},
};
@ -20,14 +19,14 @@ use {
/// SanitizedTransaction
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeserializedPacket {
immutable_section: Rc<ImmutableDeserializedPacket>,
immutable_section: Arc<ImmutableDeserializedPacket>,
pub forwarded: bool,
}
impl DeserializedPacket {
pub fn from_immutable_section(immutable_section: ImmutableDeserializedPacket) -> Self {
Self {
immutable_section: Rc::new(immutable_section),
immutable_section: Arc::new(immutable_section),
forwarded: false,
}
}
@ -51,12 +50,12 @@ impl DeserializedPacket {
let immutable_section = ImmutableDeserializedPacket::new(packet, priority_details)?;
Ok(Self {
immutable_section: Rc::new(immutable_section),
immutable_section: Arc::new(immutable_section),
forwarded: false,
})
}
pub fn immutable_section(&self) -> &Rc<ImmutableDeserializedPacket> {
pub fn immutable_section(&self) -> &Arc<ImmutableDeserializedPacket> {
&self.immutable_section
}
}
@ -80,7 +79,7 @@ impl Ord for DeserializedPacket {
/// to pick proper packets to add to the block.
#[derive(Debug, Default)]
pub struct UnprocessedPacketBatches {
pub packet_priority_queue: MinMaxHeap<Rc<ImmutableDeserializedPacket>>,
pub packet_priority_queue: MinMaxHeap<Arc<ImmutableDeserializedPacket>>,
pub message_hash_to_transaction: HashMap<Hash, DeserializedPacket>,
batch_limit: usize,
}
@ -170,7 +169,7 @@ impl UnprocessedPacketBatches {
{
// TODO: optimize this only when number of packets
// with outdated blockhash is high
let new_packet_priority_queue: MinMaxHeap<Rc<ImmutableDeserializedPacket>> = self
let new_packet_priority_queue: MinMaxHeap<Arc<ImmutableDeserializedPacket>> = self
.packet_priority_queue
.drain()
.filter(|immutable_packet| {

View File

@ -14,7 +14,7 @@ use {
min_max_heap::MinMaxHeap,
solana_perf::packet::PacketBatch,
solana_runtime::bank::Bank,
std::{rc::Rc, sync::Arc},
std::sync::Arc,
};
const MAX_STAKED_VALIDATORS: usize = 10_000;
@ -203,7 +203,7 @@ impl UnprocessedTransactionStorage {
batch_size: usize,
processing_function: F,
) where
F: FnMut(&Vec<Rc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
F: FnMut(&Vec<Arc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
{
match (self, bank) {
(Self::LocalTransactionStorage(transaction_storage), _) => {
@ -273,7 +273,7 @@ impl VoteStorage {
fn process_packets<F>(&mut self, bank: Arc<Bank>, batch_size: usize, mut processing_function: F)
where
F: FnMut(&Vec<Rc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
F: FnMut(&Vec<Arc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
{
if matches!(self.vote_source, VoteSource::Gossip) {
panic!("Gossip vote thread should not be processing transactions");
@ -370,7 +370,7 @@ impl ThreadLocalUnprocessedPackets {
fn process_packets<F>(&mut self, batch_size: usize, mut processing_function: F)
where
F: FnMut(&Vec<Rc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
F: FnMut(&Vec<Arc<ImmutableDeserializedPacket>>) -> Option<Vec<usize>>,
{
let mut retryable_packets = {
let capacity = self.unprocessed_packet_batches.capacity();
@ -379,7 +379,7 @@ impl ThreadLocalUnprocessedPackets {
MinMaxHeap::with_capacity(capacity),
)
};
let retryable_packets: MinMaxHeap<Rc<ImmutableDeserializedPacket>> = retryable_packets
let retryable_packets: MinMaxHeap<Arc<ImmutableDeserializedPacket>> = retryable_packets
.drain_desc()
.chunks(batch_size)
.into_iter()