BankingStage Refactor: Add state to PacketReceiver (#30090)

This commit is contained in:
Andrew Fitzgerald 2023-02-03 11:35:43 -08:00 committed by GitHub
parent 785a6e3a69
commit 8914d1af27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 19 deletions

View File

@ -18,7 +18,6 @@ use {
leader_slot_banking_stage_timing_metrics::{ leader_slot_banking_stage_timing_metrics::{
LeaderExecuteAndCommitTimings, RecordTransactionsTimings, LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
}, },
packet_deserializer::PacketDeserializer,
qos_service::QosService, qos_service::QosService,
tracer_packet_stats::TracerPacketStats, tracer_packet_stats::TracerPacketStats,
unprocessed_packet_batches::*, unprocessed_packet_batches::*,
@ -411,9 +410,9 @@ impl BankingStage {
.unwrap_or(false); .unwrap_or(false);
// Many banks that process transactions in parallel. // Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads) let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| { .map(|id| {
let (packet_receiver, unprocessed_transaction_storage) = let (packet_receiver, unprocessed_transaction_storage) =
match (i, should_split_voting_threads) { match (id, should_split_voting_threads) {
(0, false) => ( (0, false) => (
gossip_vote_receiver.clone(), gossip_vote_receiver.clone(),
UnprocessedTransactionStorage::new_transaction_storage( UnprocessedTransactionStorage::new_transaction_storage(
@ -451,7 +450,7 @@ impl BankingStage {
), ),
}; };
let mut packet_deserializer = PacketDeserializer::new(packet_receiver); let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
let transaction_status_sender = transaction_status_sender.clone(); let transaction_status_sender = transaction_status_sender.clone();
let replay_vote_sender = replay_vote_sender.clone(); let replay_vote_sender = replay_vote_sender.clone();
@ -466,14 +465,14 @@ impl BankingStage {
); );
Builder::new() Builder::new()
.name(format!("solBanknStgTx{i:02}")) .name(format!("solBanknStgTx{id:02}"))
.spawn(move || { .spawn(move || {
Self::process_loop( Self::process_loop(
&mut packet_deserializer, &mut packet_receiver,
&decision_maker, &decision_maker,
&forwarder, &forwarder,
&poh_recorder, &poh_recorder,
i, id,
transaction_status_sender, transaction_status_sender,
replay_vote_sender, replay_vote_sender,
log_messages_bytes_limit, log_messages_bytes_limit,
@ -707,7 +706,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn process_loop( fn process_loop(
packet_deserializer: &mut PacketDeserializer, packet_receiver: &mut PacketReceiver,
decision_maker: &DecisionMaker, decision_maker: &DecisionMaker,
forwarder: &Forwarder, forwarder: &Forwarder,
poh_recorder: &Arc<RwLock<PohRecorder>>, poh_recorder: &Arc<RwLock<PohRecorder>>,
@ -752,9 +751,7 @@ impl BankingStage {
tracer_packet_stats.report(1000); tracer_packet_stats.report(1000);
match PacketReceiver::receive_and_buffer_packets( match packet_receiver.receive_and_buffer_packets(
packet_deserializer,
id,
&mut unprocessed_transaction_storage, &mut unprocessed_transaction_storage,
&mut banking_stage_stats, &mut banking_stage_stats,
&mut tracer_packet_stats, &mut tracer_packet_stats,

View File

@ -1,6 +1,7 @@
use { use {
super::BankingStageStats, super::BankingStageStats,
crate::{ crate::{
banking_trace::BankingPacketReceiver,
immutable_deserialized_packet::ImmutableDeserializedPacket, immutable_deserialized_packet::ImmutableDeserializedPacket,
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
packet_deserializer::{PacketDeserializer, ReceivePacketResults}, packet_deserializer::{PacketDeserializer, ReceivePacketResults},
@ -13,13 +14,22 @@ use {
std::{sync::atomic::Ordering, time::Duration}, std::{sync::atomic::Ordering, time::Duration},
}; };
pub struct PacketReceiver; pub struct PacketReceiver {
id: u32,
packet_deserializer: PacketDeserializer,
}
impl PacketReceiver { impl PacketReceiver {
pub fn new(id: u32, banking_packet_receiver: BankingPacketReceiver) -> Self {
Self {
id,
packet_deserializer: PacketDeserializer::new(banking_packet_receiver),
}
}
/// Receive incoming packets, push into unprocessed buffer with packet indexes /// Receive incoming packets, push into unprocessed buffer with packet indexes
pub fn receive_and_buffer_packets( pub fn receive_and_buffer_packets(
packet_deserializer: &mut PacketDeserializer, &mut self,
id: u32,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &mut BankingStageStats, banking_stage_stats: &mut BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats, tracer_packet_stats: &mut TracerPacketStats,
@ -28,15 +38,14 @@ impl PacketReceiver {
let (result, recv_time_us) = measure_us!({ let (result, recv_time_us) = measure_us!({
let recv_timeout = Self::get_receive_timeout(unprocessed_transaction_storage); let recv_timeout = Self::get_receive_timeout(unprocessed_transaction_storage);
let mut recv_and_buffer_measure = Measure::start("recv_and_buffer"); let mut recv_and_buffer_measure = Measure::start("recv_and_buffer");
packet_deserializer self.packet_deserializer
.receive_packets( .receive_packets(
recv_timeout, recv_timeout,
unprocessed_transaction_storage.max_receive_size(), unprocessed_transaction_storage.max_receive_size(),
) )
.map(|receive_packet_results| { .map(|receive_packet_results| {
Self::buffer_packets( self.buffer_packets(
receive_packet_results, receive_packet_results,
id,
unprocessed_transaction_storage, unprocessed_transaction_storage,
banking_stage_stats, banking_stage_stats,
tracer_packet_stats, tracer_packet_stats,
@ -73,20 +82,20 @@ impl PacketReceiver {
} }
fn buffer_packets( fn buffer_packets(
&self,
ReceivePacketResults { ReceivePacketResults {
deserialized_packets, deserialized_packets,
new_tracer_stats_option, new_tracer_stats_option,
passed_sigverify_count, passed_sigverify_count,
failed_sigverify_count, failed_sigverify_count,
}: ReceivePacketResults, }: ReceivePacketResults,
id: u32,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &mut BankingStageStats, banking_stage_stats: &mut BankingStageStats,
tracer_packet_stats: &mut TracerPacketStats, tracer_packet_stats: &mut TracerPacketStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker, slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) { ) {
let packet_count = deserialized_packets.len(); let packet_count = deserialized_packets.len();
debug!("@{:?} txs: {} id: {}", timestamp(), packet_count, id); debug!("@{:?} txs: {} id: {}", timestamp(), packet_count, self.id);
if let Some(new_sigverify_stats) = &new_tracer_stats_option { if let Some(new_sigverify_stats) = &new_tracer_stats_option {
tracer_packet_stats.aggregate_sigverify_tracer_packet_stats(new_sigverify_stats); tracer_packet_stats.aggregate_sigverify_tracer_packet_stats(new_sigverify_stats);