separate packet_deserializer inside banking_stage (#27120)

* separate packet_deserializer inside banking_stage

* Make ReceivePacketResults into a struct with named fields
This commit is contained in:
apfitzge 2022-09-01 10:00:48 -05:00 committed by GitHub
parent 874085aadc
commit 3bdc5b3f2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 174 additions and 87 deletions

View File

@ -10,6 +10,7 @@ use {
leader_slot_banking_stage_timing_metrics::{
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
},
packet_deserializer::{PacketDeserializer, ReceivePacketResults},
qos_service::QosService,
sigverify::SigverifyTracerPacketStats,
tracer_packet_stats::TracerPacketStats,
@ -456,6 +457,7 @@ impl BankingStage {
_ => (verified_receiver.clone(), ForwardOption::ForwardTransaction),
};
let mut packet_deserializer = PacketDeserializer::new(verified_receiver);
let poh_recorder = poh_recorder.clone();
let cluster_info = cluster_info.clone();
let mut recv_start = Instant::now();
@ -469,7 +471,7 @@ impl BankingStage {
.name(format!("solBanknStgTx{:02}", i))
.spawn(move || {
Self::process_loop(
&verified_receiver,
&mut packet_deserializer,
&poh_recorder,
&cluster_info,
&mut recv_start,
@ -1079,7 +1081,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
fn process_loop(
verified_receiver: &BankingPacketReceiver,
packet_deserializer: &mut PacketDeserializer,
poh_recorder: &Arc<RwLock<PohRecorder>>,
cluster_info: &ClusterInfo,
recv_start: &mut Instant,
@ -1151,7 +1153,7 @@ impl BankingStage {
let (res, receive_and_buffer_packets_time) = measure!(
Self::receive_and_buffer_packets(
verified_receiver,
packet_deserializer,
recv_start,
recv_timeout,
id,
@ -1986,57 +1988,10 @@ impl BankingStage {
process_transactions_summary
}
fn generate_packet_indexes(vers: &PacketBatch) -> Vec<usize> {
vers.iter()
.enumerate()
.filter(|(_, pkt)| !pkt.meta.discard())
.map(|(index, _)| index)
.collect()
}
fn receive_until(
verified_receiver: &BankingPacketReceiver,
recv_timeout: Duration,
packet_count_upperbound: usize,
) -> Result<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>), RecvTimeoutError> {
let start = Instant::now();
let (mut packet_batches, mut aggregated_tracer_packet_stats_option) =
verified_receiver.recv_timeout(recv_timeout)?;
let mut num_packets_received: usize = packet_batches.iter().map(|batch| batch.len()).sum();
while let Ok((packet_batch, tracer_packet_stats_option)) = verified_receiver.try_recv() {
trace!("got more packet batches in banking stage");
let (packets_received, packet_count_overflowed) = num_packets_received
.overflowing_add(packet_batch.iter().map(|batch| batch.len()).sum());
packet_batches.extend(packet_batch);
if let Some(tracer_packet_stats) = &tracer_packet_stats_option {
if let Some(aggregated_tracer_packet_stats) =
&mut aggregated_tracer_packet_stats_option
{
aggregated_tracer_packet_stats.aggregate(tracer_packet_stats);
} else {
aggregated_tracer_packet_stats_option = tracer_packet_stats_option;
}
}
// Spend any leftover receive time budget to greedily receive more packet batches,
// until the upperbound of the packet count is reached.
if start.elapsed() >= recv_timeout
|| packet_count_overflowed
|| packets_received >= packet_count_upperbound
{
break;
}
num_packets_received = packets_received;
}
Ok((packet_batches, aggregated_tracer_packet_stats_option))
}
#[allow(clippy::too_many_arguments)]
/// Receive incoming packets, push into unprocessed buffer with packet indexes
fn receive_and_buffer_packets(
verified_receiver: &BankingPacketReceiver,
packet_deserializer: &mut PacketDeserializer,
recv_start: &mut Instant,
recv_timeout: Duration,
id: u32,
@ -2046,18 +2001,16 @@ impl BankingStage {
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
let (packet_batches, new_sigverify_tracer_packet_stats_option) = Self::receive_until(
verified_receiver,
let ReceivePacketResults {
deserialized_packets,
new_tracer_stats_option,
passed_sigverify_count,
failed_sigverify_count,
} = packet_deserializer.handle_received_packets(
recv_timeout,
buffered_packet_batches.capacity() - buffered_packet_batches.len(),
)?;
if let Some(new_sigverify_tracer_packet_stats) = &new_sigverify_tracer_packet_stats_option {
tracer_packet_stats
.aggregate_sigverify_tracer_packet_stats(new_sigverify_tracer_packet_stats);
}
let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum();
let packet_count = deserialized_packets.len();
debug!(
"@{:?} process start stalled for: {:?}ms txs: {} id: {}",
timestamp(),
@ -2066,28 +2019,25 @@ impl BankingStage {
id,
);
let packet_batch_iter = packet_batches.into_iter();
if let Some(new_sigverify_stats) = &new_tracer_stats_option {
tracer_packet_stats.aggregate_sigverify_tracer_packet_stats(new_sigverify_stats);
}
// Track all the packets incoming from sigverify, both valid and invalid
slot_metrics_tracker.increment_total_new_valid_packets(passed_sigverify_count);
slot_metrics_tracker.increment_newly_failed_sigverify_count(failed_sigverify_count);
let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0;
for packet_batch in packet_batch_iter {
let packet_indexes = Self::generate_packet_indexes(&packet_batch);
// Track all the packets incoming from sigverify, both valid and invalid
slot_metrics_tracker.increment_total_new_valid_packets(packet_indexes.len() as u64);
slot_metrics_tracker.increment_newly_failed_sigverify_count(
packet_batch.len().saturating_sub(packet_indexes.len()) as u64,
);
Self::push_unprocessed(
buffered_packet_batches,
&packet_batch,
&packet_indexes,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
banking_stage_stats,
slot_metrics_tracker,
tracer_packet_stats,
)
}
Self::push_unprocessed(
buffered_packet_batches,
deserialized_packets,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
banking_stage_stats,
slot_metrics_tracker,
tracer_packet_stats,
);
recv_time.stop();
banking_stage_stats
@ -2114,26 +2064,27 @@ impl BankingStage {
fn push_unprocessed(
unprocessed_packet_batches: &mut UnprocessedPacketBatches,
packet_batch: &PacketBatch,
packet_indexes: &[usize],
deserialized_packets: Vec<ImmutableDeserializedPacket>,
dropped_packets_count: &mut usize,
newly_buffered_packets_count: &mut usize,
banking_stage_stats: &mut BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats,
) {
if !packet_indexes.is_empty() {
if !deserialized_packets.is_empty() {
let _ = banking_stage_stats
.batch_packet_indexes_len
.increment(packet_indexes.len() as u64);
.increment(deserialized_packets.len() as u64);
*newly_buffered_packets_count += packet_indexes.len();
*newly_buffered_packets_count += deserialized_packets.len();
slot_metrics_tracker
.increment_newly_buffered_packets_count(packet_indexes.len() as u64);
.increment_newly_buffered_packets_count(deserialized_packets.len() as u64);
let (number_of_dropped_packets, number_of_dropped_tracer_packets) =
unprocessed_packet_batches.insert_batch(
unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes),
deserialized_packets
.into_iter()
.map(DeserializedPacket::from_immutable_section),
);
saturating_add_assign!(*dropped_packets_count, number_of_dropped_packets);

View File

@ -38,6 +38,7 @@ pub mod ledger_cleanup_service;
pub mod ledger_metric_report_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_deserializer;
mod packet_hasher;
pub mod packet_threshold;
pub mod poh_timing_report_service;

View File

@ -0,0 +1,128 @@
//! Deserializes packets from sigverify stage. Owned by banking stage.
use {
crate::{
immutable_deserialized_packet::ImmutableDeserializedPacket,
sigverify::SigverifyTracerPacketStats,
},
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
solana_perf::packet::PacketBatch,
std::time::{Duration, Instant},
};
pub type BankingPacketBatch = (Vec<PacketBatch>, Option<SigverifyTracerPacketStats>);
pub type BankingPacketReceiver = CrossbeamReceiver<BankingPacketBatch>;
/// Results from deserializing packet batches.
pub struct ReceivePacketResults {
/// Deserialized packets from all received packet batches
pub deserialized_packets: Vec<ImmutableDeserializedPacket>,
/// Aggregate tracer stats for all received packet batches
pub new_tracer_stats_option: Option<SigverifyTracerPacketStats>,
/// Number of packets passing sigverify
pub passed_sigverify_count: u64,
/// Number of packets failing sigverify
pub failed_sigverify_count: u64,
}
pub struct PacketDeserializer {
/// Receiver for packet batches from sigverify stage
packet_batch_receiver: BankingPacketReceiver,
}
impl PacketDeserializer {
pub fn new(packet_batch_receiver: BankingPacketReceiver) -> Self {
Self {
packet_batch_receiver,
}
}
/// Handles receiving packet batches from sigverify and returns a vector of deserialized packets
pub fn handle_received_packets(
&self,
recv_timeout: Duration,
capacity: usize,
) -> Result<ReceivePacketResults, RecvTimeoutError> {
let (packet_batches, sigverify_tracer_stats_option) =
self.receive_until(recv_timeout, capacity)?;
let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum();
let mut passed_sigverify_count: usize = 0;
let mut failed_sigverify_count: usize = 0;
let mut deserialized_packets = Vec::with_capacity(packet_count);
for packet_batch in packet_batches {
let packet_indexes = Self::generate_packet_indexes(&packet_batch);
passed_sigverify_count += packet_indexes.len();
failed_sigverify_count += packet_batch.len().saturating_sub(packet_indexes.len());
deserialized_packets.extend(Self::deserialize_packets(&packet_batch, &packet_indexes));
}
Ok(ReceivePacketResults {
deserialized_packets,
new_tracer_stats_option: sigverify_tracer_stats_option,
passed_sigverify_count: passed_sigverify_count as u64,
failed_sigverify_count: failed_sigverify_count as u64,
})
}
/// Receives packet batches from sigverify stage with a timeout, and aggregates tracer packet stats
fn receive_until(
&self,
recv_timeout: Duration,
packet_count_upperbound: usize,
) -> Result<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>), RecvTimeoutError> {
let start = Instant::now();
let (mut packet_batches, mut aggregated_tracer_packet_stats_option) =
self.packet_batch_receiver.recv_timeout(recv_timeout)?;
let mut num_packets_received: usize = packet_batches.iter().map(|batch| batch.len()).sum();
while let Ok((packet_batch, tracer_packet_stats_option)) =
self.packet_batch_receiver.try_recv()
{
trace!("got more packet batches in packet deserializer");
let (packets_received, packet_count_overflowed) = num_packets_received
.overflowing_add(packet_batch.iter().map(|batch| batch.len()).sum());
packet_batches.extend(packet_batch);
if let Some(tracer_packet_stats) = &tracer_packet_stats_option {
if let Some(aggregated_tracer_packet_stats) =
&mut aggregated_tracer_packet_stats_option
{
aggregated_tracer_packet_stats.aggregate(tracer_packet_stats);
} else {
aggregated_tracer_packet_stats_option = tracer_packet_stats_option;
}
}
if start.elapsed() >= recv_timeout
|| packet_count_overflowed
|| packets_received >= packet_count_upperbound
{
break;
}
num_packets_received = packets_received;
}
Ok((packet_batches, aggregated_tracer_packet_stats_option))
}
fn generate_packet_indexes(packet_batch: &PacketBatch) -> Vec<usize> {
packet_batch
.iter()
.enumerate()
.filter(|(_, pkt)| !pkt.meta.discard())
.map(|(index, _)| index)
.collect()
}
fn deserialize_packets<'a>(
packet_batch: &'a PacketBatch,
packet_indexes: &'a [usize],
) -> impl Iterator<Item = ImmutableDeserializedPacket> + 'a {
packet_indexes.iter().filter_map(move |packet_index| {
ImmutableDeserializedPacket::new(packet_batch[*packet_index].clone(), None).ok()
})
}
}

View File

@ -25,6 +25,13 @@ pub struct DeserializedPacket {
}
impl DeserializedPacket {
pub fn from_immutable_section(immutable_section: ImmutableDeserializedPacket) -> Self {
Self {
immutable_section: Rc::new(immutable_section),
forwarded: false,
}
}
pub fn new(packet: Packet) -> Result<Self, DeserializedPacketError> {
Self::new_internal(packet, None)
}