BankingStage Refactor: Simplify PacketReceiver (#29784)

This commit is contained in:
Andrew Fitzgerald 2023-02-02 07:58:55 -08:00 committed by GitHub
parent 385a6e01f7
commit fd3f26380e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 64 deletions

View File

@ -455,7 +455,6 @@ impl BankingStage {
let mut packet_deserializer = PacketDeserializer::new(packet_receiver);
let poh_recorder = poh_recorder.clone();
let cluster_info = cluster_info.clone();
let mut recv_start = Instant::now();
let transaction_status_sender = transaction_status_sender.clone();
let replay_vote_sender = replay_vote_sender.clone();
let data_budget = data_budget.clone();
@ -472,7 +471,6 @@ impl BankingStage {
&decision_maker,
&poh_recorder,
&cluster_info,
&mut recv_start,
i,
transaction_status_sender,
replay_vote_sender,
@ -737,7 +735,6 @@ impl BankingStage {
decision_maker: &DecisionMaker,
poh_recorder: &Arc<RwLock<PohRecorder>>,
cluster_info: &ClusterInfo,
recv_start: &mut Instant,
id: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
@ -788,35 +785,14 @@ impl BankingStage {
tracer_packet_stats.report(1000);
// Gossip thread will almost always not wait because the transaction storage will most likely not be empty
let recv_timeout = if !unprocessed_transaction_storage.is_empty() {
// If there are buffered packets, run the equivalent of try_recv to try reading more
// packets. This prevents starving BankingStage::consume_buffered_packets due to
// buffered_packet_batches containing transactions that exceed the cost model for
// the current bank.
Duration::from_millis(0)
} else {
// Default wait time
Duration::from_millis(100)
};
let (res, receive_and_buffer_packets_time) = measure!(
PacketReceiver::receive_and_buffer_packets(
packet_deserializer,
recv_start,
recv_timeout,
id,
&mut unprocessed_transaction_storage,
&mut banking_stage_stats,
&mut tracer_packet_stats,
&mut slot_metrics_tracker,
),
"receive_and_buffer_packets",
);
slot_metrics_tracker
.increment_receive_and_buffer_packets_us(receive_and_buffer_packets_time.as_us());
match res {
match PacketReceiver::receive_and_buffer_packets(
packet_deserializer,
id,
&mut unprocessed_transaction_storage,
&mut banking_stage_stats,
&mut tracer_packet_stats,
&mut slot_metrics_tracker,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
}

View File

@ -8,50 +8,85 @@ use {
unprocessed_transaction_storage::UnprocessedTransactionStorage,
},
crossbeam_channel::RecvTimeoutError,
solana_measure::measure::Measure,
solana_sdk::{
saturating_add_assign,
timing::{duration_as_ms, timestamp},
},
std::{
sync::atomic::Ordering,
time::{Duration, Instant},
},
solana_measure::{measure, measure::Measure, measure_us},
solana_sdk::{saturating_add_assign, timing::timestamp},
std::{sync::atomic::Ordering, time::Duration},
};
pub struct PacketReceiver;
impl PacketReceiver {
#[allow(clippy::too_many_arguments)]
/// Receive incoming packets, push into unprocessed buffer with packet indexes
pub fn receive_and_buffer_packets(
packet_deserializer: &mut PacketDeserializer,
recv_start: &mut Instant,
recv_timeout: Duration,
id: u32,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &mut BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
let ReceivePacketResults {
let (result, recv_time_us) = measure_us!({
let recv_timeout = Self::get_receive_timeout(unprocessed_transaction_storage);
let mut recv_and_buffer_measure = Measure::start("recv_and_buffer");
packet_deserializer
.receive_packets(
recv_timeout,
unprocessed_transaction_storage.max_receive_size(),
)
.map(|receive_packet_results| {
Self::buffer_packets(
receive_packet_results,
id,
unprocessed_transaction_storage,
banking_stage_stats,
tracer_packet_stats,
slot_metrics_tracker,
);
recv_and_buffer_measure.stop();
// Only incremented if packets are received
banking_stage_stats
.receive_and_buffer_packets_elapsed
.fetch_add(recv_and_buffer_measure.as_us(), Ordering::Relaxed);
})
});
slot_metrics_tracker.increment_receive_and_buffer_packets_us(recv_time_us);
result
}
fn get_receive_timeout(
unprocessed_transaction_storage: &UnprocessedTransactionStorage,
) -> Duration {
// Gossip thread will almost always not wait because the transaction storage will most likely not be empty
if !unprocessed_transaction_storage.is_empty() {
// If there are buffered packets, run the equivalent of try_recv to try reading more
// packets. This prevents starving BankingStage::consume_buffered_packets due to
// buffered_packet_batches containing transactions that exceed the cost model for
// the current bank.
Duration::from_millis(0)
} else {
// Default wait time
Duration::from_millis(100)
}
}
fn buffer_packets(
ReceivePacketResults {
deserialized_packets,
new_tracer_stats_option,
passed_sigverify_count,
failed_sigverify_count,
} = packet_deserializer.handle_received_packets(
recv_timeout,
unprocessed_transaction_storage.max_receive_size(),
)?;
}: ReceivePacketResults,
id: u32,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &mut BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) {
let packet_count = deserialized_packets.len();
debug!(
"@{:?} process start stalled for: {:?}ms txs: {} id: {}",
timestamp(),
duration_as_ms(&recv_start.elapsed()),
packet_count,
id,
);
debug!("@{:?} txs: {} id: {}", timestamp(), packet_count, id);
if let Some(new_sigverify_stats) = &new_tracer_stats_option {
tracer_packet_stats.aggregate_sigverify_tracer_packet_stats(new_sigverify_stats);
@ -72,11 +107,7 @@ impl PacketReceiver {
slot_metrics_tracker,
tracer_packet_stats,
);
recv_time.stop();
banking_stage_stats
.receive_and_buffer_packets_elapsed
.fetch_add(recv_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.receive_and_buffer_packets_count
.fetch_add(packet_count, Ordering::Relaxed);
@ -89,8 +120,6 @@ impl PacketReceiver {
banking_stage_stats
.current_buffered_packets_count
.swap(unprocessed_transaction_storage.len(), Ordering::Relaxed);
*recv_start = Instant::now();
Ok(())
}
fn push_unprocessed(

View File

@ -36,7 +36,7 @@ impl PacketDeserializer {
}
/// Handles receiving packet batches from sigverify and returns a vector of deserialized packets
pub fn handle_received_packets(
pub fn receive_packets(
&self,
recv_timeout: Duration,
capacity: usize,