push live packets straight to buffer, leader only process packets from buffer

This commit is contained in:
Tao Zhu 2021-12-16 11:44:10 -06:00 committed by Tao Zhu
parent c7b0917e1a
commit 3d6ab96587
1 changed files with 25 additions and 123 deletions

View File

@ -85,8 +85,7 @@ const MIN_THREADS_BANKING: u32 = 1;
pub struct BankingStageStats {
last_report: AtomicInterval,
id: u32,
process_packets_count: AtomicUsize,
new_tx_count: AtomicUsize,
receive_and_buffer_packets_count: AtomicUsize,
dropped_packet_batches_count: AtomicUsize,
dropped_packets_count: AtomicUsize,
pub(crate) dropped_duplicated_packets_count: AtomicUsize,
@ -98,7 +97,7 @@ pub struct BankingStageStats {
// Timing
consume_buffered_packets_elapsed: AtomicU64,
process_packets_elapsed: AtomicU64,
receive_and_buffer_packets_elapsed: AtomicU64,
handle_retryable_packets_elapsed: AtomicU64,
filter_pending_packets_elapsed: AtomicU64,
pub(crate) packet_duplicate_check_elapsed: AtomicU64,
@ -116,8 +115,9 @@ impl BankingStageStats {
}
fn is_empty(&self) -> bool {
0 == self.process_packets_count.load(Ordering::Relaxed) as u64
+ self.new_tx_count.load(Ordering::Relaxed) as u64
0 == self
.receive_and_buffer_packets_count
.load(Ordering::Relaxed) as u64
+ self.dropped_packet_batches_count.load(Ordering::Relaxed) as u64
+ self.dropped_packets_count.load(Ordering::Relaxed) as u64
+ self
@ -133,7 +133,9 @@ impl BankingStageStats {
+ self
.consume_buffered_packets_elapsed
.load(Ordering::Relaxed)
+ self.process_packets_elapsed.load(Ordering::Relaxed)
+ self
.receive_and_buffer_packets_elapsed
.load(Ordering::Relaxed)
+ self
.handle_retryable_packets_elapsed
.load(Ordering::Relaxed)
@ -156,13 +158,9 @@ impl BankingStageStats {
"banking_stage-loop-stats",
("id", self.id as i64, i64),
(
"process_packets_count",
self.process_packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"new_tx_count",
self.new_tx_count.swap(0, Ordering::Relaxed) as i64,
"receive_and_buffer_packets_count",
self.receive_and_buffer_packets_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
@ -216,8 +214,9 @@ impl BankingStageStats {
i64
),
(
"process_packets_elapsed",
self.process_packets_elapsed.swap(0, Ordering::Relaxed) as i64,
"receive_and_buffer_packets_elapsed",
self.receive_and_buffer_packets_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
@ -753,21 +752,15 @@ impl BankingStage {
Duration::from_millis(100)
};
match Self::process_packets(
&my_pubkey,
match Self::receive_and_buffer_packets(
verified_receiver,
poh_recorder,
recv_start,
recv_timeout,
id,
batch_limit,
transaction_status_sender.clone(),
&gossip_vote_sender,
&mut buffered_packet_batches,
&banking_stage_stats,
packet_deduper,
&recorder,
&qos_service,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
@ -1315,24 +1308,18 @@ impl BankingStage {
}
#[allow(clippy::too_many_arguments)]
/// Process the incoming packets
fn process_packets(
my_pubkey: &Pubkey,
/// Receive incoming packets, push into unprocessed buffer with packet indexes
fn receive_and_buffer_packets(
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
poh: &Arc<Mutex<PohRecorder>>,
recv_start: &mut Instant,
recv_timeout: Duration,
id: u32,
batch_limit: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
buffered_packet_batches: &mut UnprocessedPacketBatches,
banking_stage_stats: &BankingStageStats,
packet_deduper: &PacketDeduper,
recorder: &TransactionRecorder,
qos_service: &QosService,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("process_packets_recv");
let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
let packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
recv_time.stop();
@ -1345,64 +1332,18 @@ impl BankingStage {
packet_count,
id,
);
let mut proc_start = Measure::start("process_packets_transactions_process");
let mut new_tx_count = 0;
let mut proc_start = Measure::start("receive_and_buffer_packets_transactions_process");
let mut packet_batch_iter = packet_batches.into_iter();
let packet_batch_iter = packet_batches.into_iter();
let mut dropped_packets_count = 0;
let mut dropped_packet_batches_count = 0;
let mut newly_buffered_packets_count = 0;
while let Some(packet_batch) = packet_batch_iter.next() {
for packet_batch in packet_batch_iter {
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
let poh_recorder_bank = poh.lock().unwrap().get_poh_recorder_bank();
let working_bank_start = poh_recorder_bank.working_bank_start();
if PohRecorder::get_working_bank_if_not_expired(&working_bank_start).is_none() {
qos_service
.accumulate_tpu_buffered_packets_count(packet_batch.packets.len() as u64);
Self::push_unprocessed(
buffered_packet_batches,
packet_batch,
packet_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
packet_deduper,
banking_stage_stats,
);
continue;
}
// Destructure the `BankStart` behind an Arc
let BankStart {
working_bank,
bank_creation_time,
} = &*working_bank_start.unwrap();
qos_service.accumulate_tpu_ingested_packets_count(packet_batch.packets.len() as u64);
let (processed, verified_txs_len, unprocessed_indexes) =
Self::process_packets_transactions(
working_bank,
bank_creation_time,
recorder,
&packet_batch,
packet_indexes,
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
qos_service,
);
new_tx_count += processed;
qos_service.accumulated_verified_txs_count(verified_txs_len as u64);
qos_service.accumulated_processed_txs_count(processed as u64);
qos_service.accumulated_retryable_txs_count(unprocessed_indexes.len() as u64);
// Collect any unprocessed transactions in this batch for forwarding
Self::push_unprocessed(
buffered_packet_batches,
packet_batch,
unprocessed_indexes,
packet_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
@ -1410,62 +1351,23 @@ impl BankingStage {
packet_deduper,
banking_stage_stats,
);
// If there were retryable transactions, add the unexpired ones to the buffered queue
if processed < verified_txs_len {
let mut handle_retryable_packets_time = Measure::start("handle_retryable_packets");
let next_leader = poh.lock().unwrap().next_slot_leader();
// Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones
#[allow(clippy::while_let_on_iterator)]
while let Some(packet_batch) = packet_batch_iter.next() {
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
let unprocessed_indexes = Self::filter_unprocessed_packets(
working_bank,
&packet_batch,
&packet_indexes,
my_pubkey,
next_leader,
banking_stage_stats,
);
Self::push_unprocessed(
buffered_packet_batches,
packet_batch,
unprocessed_indexes,
&mut dropped_packet_batches_count,
&mut dropped_packets_count,
&mut newly_buffered_packets_count,
batch_limit,
packet_deduper,
banking_stage_stats,
);
}
handle_retryable_packets_time.stop();
banking_stage_stats
.handle_retryable_packets_elapsed
.fetch_add(handle_retryable_packets_time.as_us(), Ordering::Relaxed);
}
}
proc_start.stop();
debug!(
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}",
"@{:?} done processing transaction batches: {} time: {:?}ms total count: {} id: {}",
timestamp(),
packet_batches_len,
proc_start.as_ms(),
new_tx_count,
(new_tx_count as f32) / (proc_start.as_s()),
packet_count,
id,
);
banking_stage_stats
.process_packets_elapsed
.receive_and_buffer_packets_elapsed
.fetch_add(proc_start.as_us(), Ordering::Relaxed);
banking_stage_stats
.process_packets_count
.receive_and_buffer_packets_count
.fetch_add(packet_count, Ordering::Relaxed);
banking_stage_stats
.new_tx_count
.fetch_add(new_tx_count, Ordering::Relaxed);
banking_stage_stats
.dropped_packet_batches_count
.fetch_add(dropped_packet_batches_count, Ordering::Relaxed);