Introduce slot-specific packet metrics (#22906)
This commit is contained in:
parent
4bd6a231d2
commit
2f9e30a1f7
|
@ -10,6 +10,7 @@ use {
|
||||||
rayon::prelude::*,
|
rayon::prelude::*,
|
||||||
solana_core::{
|
solana_core::{
|
||||||
banking_stage::{BankingStage, BankingStageStats},
|
banking_stage::{BankingStage, BankingStageStats},
|
||||||
|
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
|
||||||
qos_service::QosService,
|
qos_service::QosService,
|
||||||
},
|
},
|
||||||
solana_entry::entry::{next_hash, Entry},
|
solana_entry::entry::{next_hash, Entry},
|
||||||
|
@ -98,6 +99,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
|
||||||
&BankingStageStats::default(),
|
&BankingStageStats::default(),
|
||||||
&recorder,
|
&recorder,
|
||||||
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
|
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,10 @@
|
||||||
//! to contruct a software pipeline. The stage uses all available CPU cores and
|
//! to contruct a software pipeline. The stage uses all available CPU cores and
|
||||||
//! can do its processing in parallel with signature verification on the GPU.
|
//! can do its processing in parallel with signature verification on the GPU.
|
||||||
use {
|
use {
|
||||||
crate::qos_service::QosService,
|
crate::{
|
||||||
|
leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary},
|
||||||
|
qos_service::QosService,
|
||||||
|
},
|
||||||
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
|
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
|
||||||
histogram::Histogram,
|
histogram::Histogram,
|
||||||
itertools::Itertools,
|
itertools::Itertools,
|
||||||
|
@ -85,47 +88,6 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128;
|
||||||
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
|
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
|
||||||
const MIN_THREADS_BANKING: u32 = 1;
|
const MIN_THREADS_BANKING: u32 = 1;
|
||||||
|
|
||||||
/// A summary of what happened to transactions passed to the execution pipeline.
|
|
||||||
/// Transactions can
|
|
||||||
/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse
|
|
||||||
/// lock conflictss or CostModel compute limits. These types of errors are retryable and
|
|
||||||
/// counted in `Self::retryable_transaction_indexes`.
|
|
||||||
/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These
|
|
||||||
/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes`
|
|
||||||
/// 3) Were executed and committed, captured by `committed_transactions_count` below.
|
|
||||||
/// 4) Were executed and failed commit, captured by `failed_commit_count` below.
|
|
||||||
struct ProcessTransactionsSummary {
|
|
||||||
// Returns true if we hit the end of the block/max PoH height for the block before
|
|
||||||
// processing all the transactions in the batch.
|
|
||||||
reached_max_poh_height: bool,
|
|
||||||
|
|
||||||
// Total number of transactions that were passed as candidates for execution. See description
|
|
||||||
// of struct above for possible outcomes for these transactions
|
|
||||||
#[allow(dead_code)]
|
|
||||||
transactions_attempted_execution_count: usize,
|
|
||||||
|
|
||||||
// Total number of transactions that made it into the block
|
|
||||||
#[allow(dead_code)]
|
|
||||||
committed_transactions_count: usize,
|
|
||||||
|
|
||||||
// Total number of transactions that made it into the block where the transactions
|
|
||||||
// output from execution was success/no error.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
committed_transactions_with_successful_result_count: usize,
|
|
||||||
|
|
||||||
// All transactions that were executed but then failed record because the
|
|
||||||
// slot ended
|
|
||||||
#[allow(dead_code)]
|
|
||||||
failed_commit_count: usize,
|
|
||||||
|
|
||||||
// Indexes of transactions in the transactions slice that were not committed but are retryable
|
|
||||||
retryable_transaction_indexes: Vec<usize>,
|
|
||||||
|
|
||||||
// The number of transactions filtered out by the cost model
|
|
||||||
#[allow(dead_code)]
|
|
||||||
cost_model_throttled_transactions_count: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ProcessTransactionBatchOutput {
|
pub struct ProcessTransactionBatchOutput {
|
||||||
// The number of transactions filtered out by the cost model
|
// The number of transactions filtered out by the cost model
|
||||||
cost_model_throttled_transactions_count: usize,
|
cost_model_throttled_transactions_count: usize,
|
||||||
|
@ -164,6 +126,7 @@ pub struct BankingStageStats {
|
||||||
current_buffered_packet_batches_count: AtomicUsize,
|
current_buffered_packet_batches_count: AtomicUsize,
|
||||||
rebuffered_packets_count: AtomicUsize,
|
rebuffered_packets_count: AtomicUsize,
|
||||||
consumed_buffered_packets_count: AtomicUsize,
|
consumed_buffered_packets_count: AtomicUsize,
|
||||||
|
end_of_slot_filtered_invalid_count: AtomicUsize,
|
||||||
batch_packet_indexes_len: Histogram,
|
batch_packet_indexes_len: Histogram,
|
||||||
|
|
||||||
// Timing
|
// Timing
|
||||||
|
@ -171,7 +134,6 @@ pub struct BankingStageStats {
|
||||||
receive_and_buffer_packets_elapsed: AtomicU64,
|
receive_and_buffer_packets_elapsed: AtomicU64,
|
||||||
handle_retryable_packets_elapsed: AtomicU64,
|
handle_retryable_packets_elapsed: AtomicU64,
|
||||||
filter_pending_packets_elapsed: AtomicU64,
|
filter_pending_packets_elapsed: AtomicU64,
|
||||||
pub(crate) packet_duplicate_check_elapsed: AtomicU64,
|
|
||||||
packet_conversion_elapsed: AtomicU64,
|
packet_conversion_elapsed: AtomicU64,
|
||||||
unprocessed_packet_conversion_elapsed: AtomicU64,
|
unprocessed_packet_conversion_elapsed: AtomicU64,
|
||||||
transaction_processing_elapsed: AtomicU64,
|
transaction_processing_elapsed: AtomicU64,
|
||||||
|
@ -215,7 +177,6 @@ impl BankingStageStats {
|
||||||
.handle_retryable_packets_elapsed
|
.handle_retryable_packets_elapsed
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Relaxed)
|
||||||
+ self.filter_pending_packets_elapsed.load(Ordering::Relaxed)
|
+ self.filter_pending_packets_elapsed.load(Ordering::Relaxed)
|
||||||
+ self.packet_duplicate_check_elapsed.load(Ordering::Relaxed)
|
|
||||||
+ self.packet_conversion_elapsed.load(Ordering::Relaxed)
|
+ self.packet_conversion_elapsed.load(Ordering::Relaxed)
|
||||||
+ self
|
+ self
|
||||||
.unprocessed_packet_conversion_elapsed
|
.unprocessed_packet_conversion_elapsed
|
||||||
|
@ -283,6 +244,12 @@ impl BankingStageStats {
|
||||||
.swap(0, Ordering::Relaxed) as i64,
|
.swap(0, Ordering::Relaxed) as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"end_of_slot_filtered_invalid_count",
|
||||||
|
self.end_of_slot_filtered_invalid_count
|
||||||
|
.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"consume_buffered_packets_elapsed",
|
"consume_buffered_packets_elapsed",
|
||||||
self.consume_buffered_packets_elapsed
|
self.consume_buffered_packets_elapsed
|
||||||
|
@ -307,12 +274,6 @@ impl BankingStageStats {
|
||||||
.swap(0, Ordering::Relaxed) as i64,
|
.swap(0, Ordering::Relaxed) as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
(
|
|
||||||
"packet_duplicate_check_elapsed",
|
|
||||||
self.packet_duplicate_check_elapsed
|
|
||||||
.swap(0, Ordering::Relaxed) as i64,
|
|
||||||
i64
|
|
||||||
),
|
|
||||||
(
|
(
|
||||||
"packet_conversion_elapsed",
|
"packet_conversion_elapsed",
|
||||||
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64,
|
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
@ -485,13 +446,14 @@ impl BankingStage {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
|
||||||
|
/// the number of successfully forwarded packets in second part of tuple
|
||||||
fn forward_buffered_packets(
|
fn forward_buffered_packets(
|
||||||
socket: &std::net::UdpSocket,
|
socket: &std::net::UdpSocket,
|
||||||
tpu_forwards: &std::net::SocketAddr,
|
tpu_forwards: &std::net::SocketAddr,
|
||||||
buffered_packet_batches: &UnprocessedPacketBatches,
|
packets: Vec<&Packet>,
|
||||||
data_budget: &DataBudget,
|
data_budget: &DataBudget,
|
||||||
) -> std::io::Result<()> {
|
) -> (std::io::Result<()>, usize) {
|
||||||
let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
|
|
||||||
const INTERVAL_MS: u64 = 100;
|
const INTERVAL_MS: u64 = 100;
|
||||||
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
|
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
|
||||||
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
|
||||||
|
@ -518,11 +480,11 @@ impl BankingStage {
|
||||||
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len());
|
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len());
|
||||||
if let Err(SendPktsError::IoError(ioerr, _num_failed)) = batch_send(socket, &packet_vec)
|
if let Err(SendPktsError::IoError(ioerr, _num_failed)) = batch_send(socket, &packet_vec)
|
||||||
{
|
{
|
||||||
return Err(ioerr);
|
return (Err(ioerr), 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
(Ok(()), packet_vec.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns whether the given `PacketBatch` has any more remaining unprocessed
|
// Returns whether the given `PacketBatch` has any more remaining unprocessed
|
||||||
|
@ -551,6 +513,7 @@ impl BankingStage {
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
recorder: &TransactionRecorder,
|
recorder: &TransactionRecorder,
|
||||||
qos_service: &QosService,
|
qos_service: &QosService,
|
||||||
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
) {
|
) {
|
||||||
let mut rebuffered_packet_count = 0;
|
let mut rebuffered_packet_count = 0;
|
||||||
let mut consumed_buffered_packets_count = 0;
|
let mut consumed_buffered_packets_count = 0;
|
||||||
|
@ -566,7 +529,7 @@ impl BankingStage {
|
||||||
if let Some((next_leader, bank)) = &reached_end_of_slot {
|
if let Some((next_leader, bank)) = &reached_end_of_slot {
|
||||||
// We've hit the end of this slot, no need to perform more processing,
|
// We've hit the end of this slot, no need to perform more processing,
|
||||||
// just filter the remaining packets for the invalid (e.g. too old) ones
|
// just filter the remaining packets for the invalid (e.g. too old) ones
|
||||||
let new_unprocessed_indexes = Self::filter_unprocessed_packets(
|
let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot(
|
||||||
bank,
|
bank,
|
||||||
packet_batch,
|
packet_batch,
|
||||||
original_unprocessed_indexes,
|
original_unprocessed_indexes,
|
||||||
|
@ -574,6 +537,19 @@ impl BankingStage {
|
||||||
*next_leader,
|
*next_leader,
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let end_of_slot_filtered_invalid_count = original_unprocessed_indexes
|
||||||
|
.len()
|
||||||
|
.saturating_sub(new_unprocessed_indexes.len());
|
||||||
|
|
||||||
|
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
|
||||||
|
end_of_slot_filtered_invalid_count as u64,
|
||||||
|
);
|
||||||
|
|
||||||
|
banking_stage_stats
|
||||||
|
.end_of_slot_filtered_invalid_count
|
||||||
|
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
|
||||||
|
|
||||||
Self::update_buffered_packets_with_new_unprocessed(
|
Self::update_buffered_packets_with_new_unprocessed(
|
||||||
original_unprocessed_indexes,
|
original_unprocessed_indexes,
|
||||||
new_unprocessed_indexes,
|
new_unprocessed_indexes,
|
||||||
|
@ -595,6 +571,7 @@ impl BankingStage {
|
||||||
gossip_vote_sender,
|
gossip_vote_sender,
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
qos_service,
|
qos_service,
|
||||||
|
slot_metrics_tracker,
|
||||||
);
|
);
|
||||||
let ProcessTransactionsSummary {
|
let ProcessTransactionsSummary {
|
||||||
reached_max_poh_height,
|
reached_max_poh_height,
|
||||||
|
@ -719,6 +696,7 @@ impl BankingStage {
|
||||||
recorder: &TransactionRecorder,
|
recorder: &TransactionRecorder,
|
||||||
data_budget: &DataBudget,
|
data_budget: &DataBudget,
|
||||||
qos_service: &QosService,
|
qos_service: &QosService,
|
||||||
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
) -> BufferedPacketsDecision {
|
) -> BufferedPacketsDecision {
|
||||||
let bank_start;
|
let bank_start;
|
||||||
let (
|
let (
|
||||||
|
@ -760,6 +738,7 @@ impl BankingStage {
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
recorder,
|
recorder,
|
||||||
qos_service,
|
qos_service,
|
||||||
|
slot_metrics_tracker,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
BufferedPacketsDecision::Forward => {
|
BufferedPacketsDecision::Forward => {
|
||||||
|
@ -771,6 +750,7 @@ impl BankingStage {
|
||||||
socket,
|
socket,
|
||||||
false,
|
false,
|
||||||
data_budget,
|
data_budget,
|
||||||
|
slot_metrics_tracker,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
BufferedPacketsDecision::ForwardAndHold => {
|
BufferedPacketsDecision::ForwardAndHold => {
|
||||||
|
@ -782,10 +762,12 @@ impl BankingStage {
|
||||||
socket,
|
socket,
|
||||||
true,
|
true,
|
||||||
data_budget,
|
data_budget,
|
||||||
|
slot_metrics_tracker,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
decision
|
decision
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -797,6 +779,7 @@ impl BankingStage {
|
||||||
socket: &UdpSocket,
|
socket: &UdpSocket,
|
||||||
hold: bool,
|
hold: bool,
|
||||||
data_budget: &DataBudget,
|
data_budget: &DataBudget,
|
||||||
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
) {
|
) {
|
||||||
let addr = match forward_option {
|
let addr = match forward_option {
|
||||||
ForwardOption::NotForward => {
|
ForwardOption::NotForward => {
|
||||||
|
@ -814,13 +797,35 @@ impl BankingStage {
|
||||||
Some(addr) => addr,
|
Some(addr) => addr,
|
||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
let _ = Self::forward_buffered_packets(socket, &addr, buffered_packet_batches, data_budget);
|
|
||||||
|
let forwardable_packets =
|
||||||
|
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
|
||||||
|
let forwardable_packets_len = forwardable_packets.len();
|
||||||
|
let (_forward_result, sucessful_forwarded_packets_count) =
|
||||||
|
Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget);
|
||||||
|
let failed_forwarded_packets_count =
|
||||||
|
forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count);
|
||||||
|
|
||||||
|
if failed_forwarded_packets_count > 0 {
|
||||||
|
slot_metrics_tracker
|
||||||
|
.increment_failed_forwarded_packets_count(failed_forwarded_packets_count as u64);
|
||||||
|
slot_metrics_tracker.increment_packet_batch_forward_failure_count(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if sucessful_forwarded_packets_count > 0 {
|
||||||
|
slot_metrics_tracker.increment_successful_forwarded_packets_count(
|
||||||
|
sucessful_forwarded_packets_count as u64,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
if hold {
|
if hold {
|
||||||
buffered_packet_batches.retain(|(_, index, _)| !index.is_empty());
|
buffered_packet_batches.retain(|(_, index, _)| !index.is_empty());
|
||||||
for (_, _, forwarded) in buffered_packet_batches.iter_mut() {
|
for (_, _, forwarded) in buffered_packet_batches.iter_mut() {
|
||||||
*forwarded = true;
|
*forwarded = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
slot_metrics_tracker
|
||||||
|
.increment_cleared_from_buffer_after_forward_count(forwardable_packets_len as u64);
|
||||||
buffered_packet_batches.clear();
|
buffered_packet_batches.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -844,6 +849,7 @@ impl BankingStage {
|
||||||
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
|
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
|
||||||
let mut banking_stage_stats = BankingStageStats::new(id);
|
let mut banking_stage_stats = BankingStageStats::new(id);
|
||||||
let qos_service = QosService::new(cost_model, id);
|
let qos_service = QosService::new(cost_model, id);
|
||||||
|
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
|
||||||
loop {
|
loop {
|
||||||
let my_pubkey = cluster_info.id();
|
let my_pubkey = cluster_info.id();
|
||||||
while !buffered_packet_batches.is_empty() {
|
while !buffered_packet_batches.is_empty() {
|
||||||
|
@ -860,6 +866,7 @@ impl BankingStage {
|
||||||
&recorder,
|
&recorder,
|
||||||
data_budget,
|
data_budget,
|
||||||
&qos_service,
|
&qos_service,
|
||||||
|
&mut slot_metrics_tracker,
|
||||||
);
|
);
|
||||||
if matches!(decision, BufferedPacketsDecision::Hold)
|
if matches!(decision, BufferedPacketsDecision::Hold)
|
||||||
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
|
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
|
||||||
|
@ -870,6 +877,12 @@ impl BankingStage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let current_poh_bank = {
|
||||||
|
let poh = poh_recorder.lock().unwrap();
|
||||||
|
poh.bank_start()
|
||||||
|
};
|
||||||
|
slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank);
|
||||||
|
|
||||||
let recv_timeout = if !buffered_packet_batches.is_empty() {
|
let recv_timeout = if !buffered_packet_batches.is_empty() {
|
||||||
// If packets are buffered, let's wait for less time on recv from the channel.
|
// If packets are buffered, let's wait for less time on recv from the channel.
|
||||||
// This helps detect the next leader faster, and processing the buffered
|
// This helps detect the next leader faster, and processing the buffered
|
||||||
|
@ -888,6 +901,7 @@ impl BankingStage {
|
||||||
batch_limit,
|
batch_limit,
|
||||||
&mut buffered_packet_batches,
|
&mut buffered_packet_batches,
|
||||||
&mut banking_stage_stats,
|
&mut banking_stage_stats,
|
||||||
|
&mut slot_metrics_tracker,
|
||||||
) {
|
) {
|
||||||
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
|
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
|
||||||
Err(RecvTimeoutError::Disconnected) => break,
|
Err(RecvTimeoutError::Disconnected) => break,
|
||||||
|
@ -1312,6 +1326,7 @@ impl BankingStage {
|
||||||
|
|
||||||
// If `bank_creation_time` is None, it's a test so ignore the option so
|
// If `bank_creation_time` is None, it's a test so ignore the option so
|
||||||
// allow processing
|
// allow processing
|
||||||
|
// TODO adding timing metrics here from when bank was added to now
|
||||||
let should_bank_still_be_processing_txs =
|
let should_bank_still_be_processing_txs =
|
||||||
Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot);
|
Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot);
|
||||||
match (
|
match (
|
||||||
|
@ -1469,6 +1484,7 @@ impl BankingStage {
|
||||||
gossip_vote_sender: &ReplayVoteSender,
|
gossip_vote_sender: &ReplayVoteSender,
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
qos_service: &QosService,
|
qos_service: &QosService,
|
||||||
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
) -> ProcessTransactionsSummary {
|
) -> ProcessTransactionsSummary {
|
||||||
let mut packet_conversion_time = Measure::start("packet_conversion");
|
let mut packet_conversion_time = Measure::start("packet_conversion");
|
||||||
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
|
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
|
||||||
|
@ -1499,6 +1515,11 @@ impl BankingStage {
|
||||||
..
|
..
|
||||||
} = process_transactions_summary;
|
} = process_transactions_summary;
|
||||||
|
|
||||||
|
slot_metrics_tracker.accumulate_process_transactions_summary(&process_transactions_summary);
|
||||||
|
|
||||||
|
let retryable_tx_count = retryable_transaction_indexes.len();
|
||||||
|
inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count);
|
||||||
|
|
||||||
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
|
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
|
||||||
let filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs(
|
let filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs(
|
||||||
bank,
|
bank,
|
||||||
|
@ -1508,6 +1529,12 @@ impl BankingStage {
|
||||||
);
|
);
|
||||||
filter_pending_packets_time.stop();
|
filter_pending_packets_time.stop();
|
||||||
|
|
||||||
|
let retryable_packets_filtered_count = retryable_transaction_indexes
|
||||||
|
.len()
|
||||||
|
.saturating_sub(filtered_retryable_tx_indexes.len());
|
||||||
|
slot_metrics_tracker
|
||||||
|
.increment_retryable_packets_filtered_count(retryable_packets_filtered_count as u64);
|
||||||
|
|
||||||
inc_new_counter_info!(
|
inc_new_counter_info!(
|
||||||
"banking_stage-dropped_tx_before_forwarding",
|
"banking_stage-dropped_tx_before_forwarding",
|
||||||
retryable_transaction_indexes
|
retryable_transaction_indexes
|
||||||
|
@ -1530,7 +1557,7 @@ impl BankingStage {
|
||||||
process_transactions_summary
|
process_transactions_summary
|
||||||
}
|
}
|
||||||
|
|
||||||
fn filter_unprocessed_packets(
|
fn filter_unprocessed_packets_at_end_of_slot(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
packet_batch: &PacketBatch,
|
packet_batch: &PacketBatch,
|
||||||
transaction_indexes: &[usize],
|
transaction_indexes: &[usize],
|
||||||
|
@ -1600,6 +1627,7 @@ impl BankingStage {
|
||||||
batch_limit: usize,
|
batch_limit: usize,
|
||||||
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
buffered_packet_batches: &mut UnprocessedPacketBatches,
|
||||||
banking_stage_stats: &mut BankingStageStats,
|
banking_stage_stats: &mut BankingStageStats,
|
||||||
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
) -> Result<(), RecvTimeoutError> {
|
) -> Result<(), RecvTimeoutError> {
|
||||||
let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
|
let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
|
||||||
let packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
|
let packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
|
||||||
|
@ -1622,6 +1650,15 @@ impl BankingStage {
|
||||||
let mut newly_buffered_packets_count = 0;
|
let mut newly_buffered_packets_count = 0;
|
||||||
for packet_batch in packet_batch_iter {
|
for packet_batch in packet_batch_iter {
|
||||||
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
|
let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets);
|
||||||
|
// Track all the packets incoming from sigverify, both valid and invalid
|
||||||
|
slot_metrics_tracker.increment_total_new_valid_packets(packet_indexes.len() as u64);
|
||||||
|
slot_metrics_tracker.increment_newly_failed_sigverify_count(
|
||||||
|
packet_batch
|
||||||
|
.packets
|
||||||
|
.len()
|
||||||
|
.saturating_sub(packet_indexes.len()) as u64,
|
||||||
|
);
|
||||||
|
|
||||||
Self::push_unprocessed(
|
Self::push_unprocessed(
|
||||||
buffered_packet_batches,
|
buffered_packet_batches,
|
||||||
packet_batch,
|
packet_batch,
|
||||||
|
@ -1631,6 +1668,7 @@ impl BankingStage {
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
|
slot_metrics_tracker,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
proc_start.stop();
|
proc_start.stop();
|
||||||
|
@ -1681,12 +1719,16 @@ impl BankingStage {
|
||||||
newly_buffered_packets_count: &mut usize,
|
newly_buffered_packets_count: &mut usize,
|
||||||
batch_limit: usize,
|
batch_limit: usize,
|
||||||
banking_stage_stats: &mut BankingStageStats,
|
banking_stage_stats: &mut BankingStageStats,
|
||||||
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
) {
|
) {
|
||||||
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
|
||||||
if unprocessed_packet_batches.len() >= batch_limit {
|
if unprocessed_packet_batches.len() >= batch_limit {
|
||||||
*dropped_packet_batches_count += 1;
|
*dropped_packet_batches_count += 1;
|
||||||
if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() {
|
if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() {
|
||||||
*dropped_packets_count += dropped_batch.1.len();
|
*dropped_packets_count += dropped_batch.1.len();
|
||||||
|
slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count(
|
||||||
|
dropped_batch.1.len() as u64,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = banking_stage_stats
|
let _ = banking_stage_stats
|
||||||
|
@ -1694,6 +1736,8 @@ impl BankingStage {
|
||||||
.increment(packet_indexes.len() as u64);
|
.increment(packet_indexes.len() as u64);
|
||||||
|
|
||||||
*newly_buffered_packets_count += packet_indexes.len();
|
*newly_buffered_packets_count += packet_indexes.len();
|
||||||
|
slot_metrics_tracker
|
||||||
|
.increment_newly_buffered_packets_count(packet_indexes.len() as u64);
|
||||||
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
|
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3296,6 +3340,7 @@ mod tests {
|
||||||
&BankingStageStats::default(),
|
&BankingStageStats::default(),
|
||||||
&recorder,
|
&recorder,
|
||||||
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
|
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
buffered_packet_batches[0].1.len(),
|
buffered_packet_batches[0].1.len(),
|
||||||
|
@ -3316,6 +3361,7 @@ mod tests {
|
||||||
&BankingStageStats::default(),
|
&BankingStageStats::default(),
|
||||||
&recorder,
|
&recorder,
|
||||||
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
|
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
if num_expected_unprocessed == 0 {
|
if num_expected_unprocessed == 0 {
|
||||||
assert!(buffered_packet_batches.is_empty())
|
assert!(buffered_packet_batches.is_empty())
|
||||||
|
@ -3382,6 +3428,7 @@ mod tests {
|
||||||
&BankingStageStats::default(),
|
&BankingStageStats::default(),
|
||||||
&recorder,
|
&recorder,
|
||||||
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
|
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Check everything is correct. All indexes after `interrupted_iteration`
|
// Check everything is correct. All indexes after `interrupted_iteration`
|
||||||
|
@ -3479,6 +3526,7 @@ mod tests {
|
||||||
&send_socket,
|
&send_socket,
|
||||||
true,
|
true,
|
||||||
&data_budget,
|
&data_budget,
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
|
|
||||||
recv_socket
|
recv_socket
|
||||||
|
@ -3578,6 +3626,7 @@ mod tests {
|
||||||
&send_socket,
|
&send_socket,
|
||||||
hold,
|
hold,
|
||||||
&DataBudget::default(),
|
&DataBudget::default(),
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
|
|
||||||
recv_socket
|
recv_socket
|
||||||
|
@ -3639,6 +3688,7 @@ mod tests {
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
&mut banking_stage_stats,
|
&mut banking_stage_stats,
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
assert_eq!(unprocessed_packets.len(), 1);
|
assert_eq!(unprocessed_packets.len(), 1);
|
||||||
assert_eq!(dropped_packet_batches_count, 0);
|
assert_eq!(dropped_packet_batches_count, 0);
|
||||||
|
@ -3657,6 +3707,7 @@ mod tests {
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
&mut banking_stage_stats,
|
&mut banking_stage_stats,
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
assert_eq!(unprocessed_packets.len(), 2);
|
assert_eq!(unprocessed_packets.len(), 2);
|
||||||
assert_eq!(dropped_packet_batches_count, 0);
|
assert_eq!(dropped_packet_batches_count, 0);
|
||||||
|
@ -3680,6 +3731,7 @@ mod tests {
|
||||||
&mut newly_buffered_packets_count,
|
&mut newly_buffered_packets_count,
|
||||||
batch_limit,
|
batch_limit,
|
||||||
&mut banking_stage_stats,
|
&mut banking_stage_stats,
|
||||||
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
);
|
);
|
||||||
assert_eq!(unprocessed_packets.len(), 2);
|
assert_eq!(unprocessed_packets.len(), 2);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
|
|
|
@ -0,0 +1,723 @@
|
||||||
|
use {
|
||||||
|
solana_poh::poh_recorder::BankStart,
|
||||||
|
solana_sdk::{clock::Slot, saturating_add_assign},
|
||||||
|
std::time::Instant,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A summary of what happened to transactions passed to the execution pipeline.
|
||||||
|
/// Transactions can
|
||||||
|
/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse
|
||||||
|
/// lock conflicts or CostModel compute limits. These types of errors are retryable and
|
||||||
|
/// counted in `Self::retryable_transaction_indexes`.
|
||||||
|
/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These
|
||||||
|
/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes`
|
||||||
|
/// 3) Were executed and committed, captured by `committed_transactions_count` below.
|
||||||
|
/// 4) Were executed and failed commit, captured by `failed_commit_count` below.
|
||||||
|
pub(crate) struct ProcessTransactionsSummary {
|
||||||
|
// Returns true if we hit the end of the block/max PoH height for the block before
|
||||||
|
// processing all the transactions in the batch.
|
||||||
|
pub reached_max_poh_height: bool,
|
||||||
|
|
||||||
|
// Total number of transactions that were passed as candidates for execution. See description
|
||||||
|
// of struct above for possible outcomes for these transactions
|
||||||
|
pub transactions_attempted_execution_count: usize,
|
||||||
|
|
||||||
|
// Total number of transactions that made it into the block
|
||||||
|
pub committed_transactions_count: usize,
|
||||||
|
|
||||||
|
// Total number of transactions that made it into the block where the transactions
|
||||||
|
// output from execution was success/no error.
|
||||||
|
pub committed_transactions_with_successful_result_count: usize,
|
||||||
|
|
||||||
|
// All transactions that were executed but then failed record because the
|
||||||
|
// slot ended
|
||||||
|
pub failed_commit_count: usize,
|
||||||
|
|
||||||
|
// Indexes of transactions in the transactions slice that were not committed but are retryable
|
||||||
|
pub retryable_transaction_indexes: Vec<usize>,
|
||||||
|
|
||||||
|
// The number of transactions filtered out by the cost model
|
||||||
|
pub cost_model_throttled_transactions_count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metrics capturing wallclock time spent in various parts of BankingStage during this
|
||||||
|
// validator's leader slot
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct LeaderSlotTimingMetrics {
|
||||||
|
bank_detected_time: Instant,
|
||||||
|
|
||||||
|
// Delay from when the bank was created to when this thread detected it
|
||||||
|
bank_detected_delay_us: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LeaderSlotTimingMetrics {
|
||||||
|
fn new(bank_creation_time: &Instant) -> Self {
|
||||||
|
Self {
|
||||||
|
bank_detected_time: Instant::now(),
|
||||||
|
bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report(&self, id: u32, slot: Slot) {
|
||||||
|
let bank_detected_to_now = self.bank_detected_time.elapsed().as_micros() as u64;
|
||||||
|
datapoint_info!(
|
||||||
|
"banking_stage-leader_slot_loop_timings",
|
||||||
|
("id", id as i64, i64),
|
||||||
|
("slot", slot as i64, i64),
|
||||||
|
("bank_detected_to_now_us", bank_detected_to_now, i64),
|
||||||
|
(
|
||||||
|
"bank_creation_to_now_us",
|
||||||
|
bank_detected_to_now + self.bank_detected_delay_us,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
("bank_detected_delay_us", self.bank_detected_delay_us, i64),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metrics describing packets ingested/processed in various parts of BankingStage during this
|
||||||
|
// validator's leader slot
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
struct LeaderSlotPacketCountMetrics {
|
||||||
|
// total number of live packets TPU received from verified receiver for processing.
|
||||||
|
total_new_valid_packets: u64,
|
||||||
|
|
||||||
|
// total number of packets TPU received from sigverify that failed signature verification.
|
||||||
|
newly_failed_sigverify_count: u64,
|
||||||
|
|
||||||
|
// total number of dropped packet due to the thread's buffered packets capacity being reached.
|
||||||
|
exceeded_buffer_limit_dropped_packets_count: u64,
|
||||||
|
|
||||||
|
// total number of packets that got added to the pending buffer after arriving to BankingStage
|
||||||
|
newly_buffered_packets_count: u64,
|
||||||
|
|
||||||
|
// total number of transactions in the buffer that were filtered out due to things like age and
|
||||||
|
// duplicate signature checks
|
||||||
|
retryable_packets_filtered_count: u64,
|
||||||
|
|
||||||
|
// total number of transactions that attempted execution in this slot. Should equal the sum
|
||||||
|
// of `committed_transactions_count`, `retryable_errored_transaction_count`, and
|
||||||
|
// `nonretryable_errored_transactions_count`.
|
||||||
|
transactions_attempted_execution_count: u64,
|
||||||
|
|
||||||
|
// total number of transactions that were executed and committed into the block
|
||||||
|
// on this thread
|
||||||
|
committed_transactions_count: u64,
|
||||||
|
|
||||||
|
// total number of transactions that were executed, got a successful execution output/no error,
|
||||||
|
// and were then committed into the block
|
||||||
|
committed_transactions_with_successful_result_count: u64,
|
||||||
|
|
||||||
|
// total number of transactions that were not executed or failed commit, BUT were added back to the buffered
|
||||||
|
// queue becaus they were retryable errors
|
||||||
|
retryable_errored_transaction_count: u64,
|
||||||
|
|
||||||
|
// total number of transactions that attempted execution due to some fatal error (too old, duplicate signature, etc.)
|
||||||
|
// AND were dropped from the buffered queue
|
||||||
|
nonretryable_errored_transactions_count: u64,
|
||||||
|
|
||||||
|
// total number of transactions that were executed, but failed to be committed into the Poh stream because
|
||||||
|
// the block ended. Some of these may be already counted in `nonretryable_errored_transactions_count` if they
|
||||||
|
// then hit the age limit after failing to be comitted.
|
||||||
|
executed_transactions_failed_commit_count: u64,
|
||||||
|
|
||||||
|
// total number of transactions that were excluded from the block because they were too expensive
|
||||||
|
// according to the cost model. These transactions are added back to the buffered queue and are
|
||||||
|
// already counted in `self.retrayble_errored_transaction_count`.
|
||||||
|
cost_model_throttled_transactions_count: u64,
|
||||||
|
|
||||||
|
// total number of forwardsable packets that failed forwarding
|
||||||
|
failed_forwarded_packets_count: u64,
|
||||||
|
|
||||||
|
// total number of forwardsable packets that were successfully forwarded
|
||||||
|
successful_forwarded_packets_count: u64,
|
||||||
|
|
||||||
|
// total number of attempted forwards that failed. Note this is not a count of the number of packets
|
||||||
|
// that failed, just the total number of batches of packets that failed forwarding
|
||||||
|
packet_batch_forward_failure_count: u64,
|
||||||
|
|
||||||
|
// total number of valid unprocessed packets in the buffer that were removed after being forwarded
|
||||||
|
cleared_from_buffer_after_forward_count: u64,
|
||||||
|
|
||||||
|
// total number of packets removed at the end of the slot due to being too old, duplicate, etc.
|
||||||
|
end_of_slot_filtered_invalid_count: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LeaderSlotPacketCountMetrics {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self { ..Self::default() }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report(&self, id: u32, slot: Slot) {
|
||||||
|
datapoint_info!(
|
||||||
|
"banking_stage-leader_slot_packet_counts",
|
||||||
|
("id", id as i64, i64),
|
||||||
|
("slot", slot as i64, i64),
|
||||||
|
(
|
||||||
|
"total_new_valid_packets",
|
||||||
|
self.total_new_valid_packets as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"newly_failed_sigverify_count",
|
||||||
|
self.newly_failed_sigverify_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"exceeded_buffer_limit_dropped_packets_count",
|
||||||
|
self.exceeded_buffer_limit_dropped_packets_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"newly_buffered_packets_count",
|
||||||
|
self.newly_buffered_packets_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"retryable_packets_filtered_count",
|
||||||
|
self.retryable_packets_filtered_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"transactions_attempted_execution_count",
|
||||||
|
self.transactions_attempted_execution_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"committed_transactions_count",
|
||||||
|
self.committed_transactions_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"committed_transactions_with_successful_result_count",
|
||||||
|
self.committed_transactions_with_successful_result_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"retryable_errored_transaction_count",
|
||||||
|
self.retryable_errored_transaction_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"nonretryable_errored_transactions_count",
|
||||||
|
self.nonretryable_errored_transactions_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"executed_transactions_failed_commit_count",
|
||||||
|
self.executed_transactions_failed_commit_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"cost_model_throttled_transactions_count",
|
||||||
|
self.cost_model_throttled_transactions_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"failed_forwarded_packets_count",
|
||||||
|
self.failed_forwarded_packets_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"successful_forwarded_packets_count",
|
||||||
|
self.successful_forwarded_packets_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"packet_batch_forward_failure_count",
|
||||||
|
self.packet_batch_forward_failure_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"cleared_from_buffer_after_forward_count",
|
||||||
|
self.cleared_from_buffer_after_forward_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"end_of_slot_filtered_invalid_count",
|
||||||
|
self.end_of_slot_filtered_invalid_count as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct LeaderSlotMetrics {
|
||||||
|
// banking_stage creates one QosService instance per working threads, that is uniquely
|
||||||
|
// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
|
||||||
|
// and other transactions.
|
||||||
|
id: u32,
|
||||||
|
|
||||||
|
// aggregate metrics per slot
|
||||||
|
slot: Slot,
|
||||||
|
|
||||||
|
packet_count_metrics: LeaderSlotPacketCountMetrics,
|
||||||
|
|
||||||
|
timing_metrics: LeaderSlotTimingMetrics,
|
||||||
|
|
||||||
|
// Used by tests to check if the `self.report()` method was called
|
||||||
|
is_reported: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LeaderSlotMetrics {
|
||||||
|
pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self {
|
||||||
|
Self {
|
||||||
|
id,
|
||||||
|
slot,
|
||||||
|
packet_count_metrics: LeaderSlotPacketCountMetrics::new(),
|
||||||
|
timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time),
|
||||||
|
is_reported: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn report(&mut self) {
|
||||||
|
self.is_reported = true;
|
||||||
|
|
||||||
|
self.timing_metrics.report(self.id, self.slot);
|
||||||
|
self.packet_count_metrics.report(self.id, self.slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None
|
||||||
|
fn reported_slot(&self) -> Option<Slot> {
|
||||||
|
if self.is_reported {
|
||||||
|
Some(self.slot)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct LeaderSlotMetricsTracker {
|
||||||
|
// Only `Some` if BankingStage detects it's time to construct our leader slot,
|
||||||
|
// otherwise `None`
|
||||||
|
leader_slot_metrics: Option<LeaderSlotMetrics>,
|
||||||
|
id: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LeaderSlotMetricsTracker {
|
||||||
|
pub fn new(id: u32) -> Self {
|
||||||
|
Self {
|
||||||
|
leader_slot_metrics: None,
|
||||||
|
id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns reported slot if metrics were reported
|
||||||
|
pub(crate) fn update_on_leader_slot_boundary(
|
||||||
|
&mut self,
|
||||||
|
bank_start: &Option<BankStart>,
|
||||||
|
) -> Option<Slot> {
|
||||||
|
match (self.leader_slot_metrics.as_mut(), bank_start) {
|
||||||
|
(None, None) => None,
|
||||||
|
|
||||||
|
(Some(leader_slot_metrics), None) => {
|
||||||
|
leader_slot_metrics.report();
|
||||||
|
// Ensure tests catch that `report()` method was called
|
||||||
|
let reported_slot = leader_slot_metrics.reported_slot();
|
||||||
|
// Slot has ended, time to report metrics
|
||||||
|
self.leader_slot_metrics = None;
|
||||||
|
reported_slot
|
||||||
|
}
|
||||||
|
|
||||||
|
(None, Some(bank_start)) => {
|
||||||
|
// Our leader slot has begain, time to create a new slot tracker
|
||||||
|
self.leader_slot_metrics = Some(LeaderSlotMetrics::new(
|
||||||
|
self.id,
|
||||||
|
bank_start.working_bank.slot(),
|
||||||
|
&bank_start.bank_creation_time,
|
||||||
|
));
|
||||||
|
self.leader_slot_metrics.as_ref().unwrap().reported_slot()
|
||||||
|
}
|
||||||
|
|
||||||
|
(Some(leader_slot_metrics), Some(bank_start)) => {
|
||||||
|
if leader_slot_metrics.slot != bank_start.working_bank.slot() {
|
||||||
|
// Last slot has ended, new slot has began
|
||||||
|
leader_slot_metrics.report();
|
||||||
|
// Ensure tests catch that `report()` method was called
|
||||||
|
let reported_slot = leader_slot_metrics.reported_slot();
|
||||||
|
self.leader_slot_metrics = Some(LeaderSlotMetrics::new(
|
||||||
|
self.id,
|
||||||
|
bank_start.working_bank.slot(),
|
||||||
|
&bank_start.bank_creation_time,
|
||||||
|
));
|
||||||
|
reported_slot
|
||||||
|
} else {
|
||||||
|
leader_slot_metrics.reported_slot()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn accumulate_process_transactions_summary(
|
||||||
|
&mut self,
|
||||||
|
process_transactions_summary: &ProcessTransactionsSummary,
|
||||||
|
) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
let ProcessTransactionsSummary {
|
||||||
|
transactions_attempted_execution_count,
|
||||||
|
committed_transactions_count,
|
||||||
|
committed_transactions_with_successful_result_count,
|
||||||
|
failed_commit_count,
|
||||||
|
ref retryable_transaction_indexes,
|
||||||
|
cost_model_throttled_transactions_count,
|
||||||
|
..
|
||||||
|
} = process_transactions_summary;
|
||||||
|
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.transactions_attempted_execution_count,
|
||||||
|
*transactions_attempted_execution_count as u64
|
||||||
|
);
|
||||||
|
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.committed_transactions_count,
|
||||||
|
*committed_transactions_count as u64
|
||||||
|
);
|
||||||
|
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.committed_transactions_with_successful_result_count,
|
||||||
|
*committed_transactions_with_successful_result_count as u64
|
||||||
|
);
|
||||||
|
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.executed_transactions_failed_commit_count,
|
||||||
|
*failed_commit_count as u64
|
||||||
|
);
|
||||||
|
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.retryable_errored_transaction_count,
|
||||||
|
retryable_transaction_indexes.len() as u64
|
||||||
|
);
|
||||||
|
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.nonretryable_errored_transactions_count,
|
||||||
|
transactions_attempted_execution_count
|
||||||
|
.saturating_sub(*committed_transactions_count)
|
||||||
|
.saturating_sub(retryable_transaction_indexes.len()) as u64
|
||||||
|
);
|
||||||
|
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.cost_model_throttled_transactions_count,
|
||||||
|
*cost_model_throttled_transactions_count as u64
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_total_new_valid_packets(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.total_new_valid_packets,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_newly_failed_sigverify_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.newly_failed_sigverify_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_exceeded_buffer_limit_dropped_packets_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.exceeded_buffer_limit_dropped_packets_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_newly_buffered_packets_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.newly_buffered_packets_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_retryable_packets_filtered_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.retryable_packets_filtered_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_failed_forwarded_packets_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.failed_forwarded_packets_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_successful_forwarded_packets_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.successful_forwarded_packets_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_packet_batch_forward_failure_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.packet_batch_forward_failure_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_cleared_from_buffer_after_forward_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.cleared_from_buffer_after_forward_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn increment_end_of_slot_filtered_invalid_count(&mut self, count: u64) {
|
||||||
|
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
|
||||||
|
saturating_add_assign!(
|
||||||
|
leader_slot_metrics
|
||||||
|
.packet_count_metrics
|
||||||
|
.end_of_slot_filtered_invalid_count,
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use {
|
||||||
|
super::*,
|
||||||
|
solana_runtime::{bank::Bank, genesis_utils::create_genesis_config},
|
||||||
|
solana_sdk::pubkey::Pubkey,
|
||||||
|
std::sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct TestSlotBoundaryComponents {
|
||||||
|
first_bank: Arc<Bank>,
|
||||||
|
first_poh_recorder_bank: BankStart,
|
||||||
|
next_bank: Arc<Bank>,
|
||||||
|
next_poh_recorder_bank: BankStart,
|
||||||
|
leader_slot_metrics_tracker: LeaderSlotMetricsTracker,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_test_slot_boundary_banks() -> TestSlotBoundaryComponents {
|
||||||
|
let genesis = create_genesis_config(10);
|
||||||
|
let first_bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
|
||||||
|
let first_poh_recorder_bank = BankStart {
|
||||||
|
working_bank: first_bank.clone(),
|
||||||
|
bank_creation_time: Arc::new(Instant::now()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create a child descended from the first bank
|
||||||
|
let next_bank = Arc::new(Bank::new_from_parent(
|
||||||
|
&first_bank,
|
||||||
|
&Pubkey::new_unique(),
|
||||||
|
first_bank.slot() + 1,
|
||||||
|
));
|
||||||
|
let next_poh_recorder_bank = BankStart {
|
||||||
|
working_bank: next_bank.clone(),
|
||||||
|
bank_creation_time: Arc::new(Instant::now()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let banking_stage_thread_id = 0;
|
||||||
|
let leader_slot_metrics_tracker = LeaderSlotMetricsTracker::new(banking_stage_thread_id);
|
||||||
|
|
||||||
|
TestSlotBoundaryComponents {
|
||||||
|
first_bank,
|
||||||
|
first_poh_recorder_bank,
|
||||||
|
next_bank,
|
||||||
|
next_poh_recorder_bank,
|
||||||
|
leader_slot_metrics_tracker,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn test_update_on_leader_slot_boundary_not_leader_to_not_leader() {
|
||||||
|
let TestSlotBoundaryComponents {
|
||||||
|
mut leader_slot_metrics_tracker,
|
||||||
|
..
|
||||||
|
} = setup_test_slot_boundary_banks();
|
||||||
|
// Test that with no bank being tracked, and no new bank being tracked, nothing is reported
|
||||||
|
assert!(leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&None)
|
||||||
|
.is_none());
|
||||||
|
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn test_update_on_leader_slot_boundary_not_leader_to_leader() {
|
||||||
|
let TestSlotBoundaryComponents {
|
||||||
|
first_poh_recorder_bank,
|
||||||
|
mut leader_slot_metrics_tracker,
|
||||||
|
..
|
||||||
|
} = setup_test_slot_boundary_banks();
|
||||||
|
|
||||||
|
// Test case where the thread has not detected a leader bank, and now sees a leader bank.
|
||||||
|
// Metrics should not be reported because leader slot has not ended
|
||||||
|
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
|
||||||
|
assert!(leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
|
||||||
|
.is_none());
|
||||||
|
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn test_update_on_leader_slot_boundary_leader_to_not_leader() {
|
||||||
|
let TestSlotBoundaryComponents {
|
||||||
|
first_bank,
|
||||||
|
first_poh_recorder_bank,
|
||||||
|
mut leader_slot_metrics_tracker,
|
||||||
|
..
|
||||||
|
} = setup_test_slot_boundary_banks();
|
||||||
|
|
||||||
|
// Test case where the thread has a leader bank, and now detects there's no more leader bank,
|
||||||
|
// implying the slot has ended. Metrics should be reported for `first_bank.slot()`,
|
||||||
|
// because that leader slot has just ended.
|
||||||
|
assert!(leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
|
||||||
|
.is_none());
|
||||||
|
assert_eq!(
|
||||||
|
leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&None)
|
||||||
|
.unwrap(),
|
||||||
|
first_bank.slot()
|
||||||
|
);
|
||||||
|
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
|
||||||
|
assert!(leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&None)
|
||||||
|
.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn test_update_on_leader_slot_boundary_leader_to_leader_same_slot() {
|
||||||
|
let TestSlotBoundaryComponents {
|
||||||
|
first_bank,
|
||||||
|
first_poh_recorder_bank,
|
||||||
|
mut leader_slot_metrics_tracker,
|
||||||
|
..
|
||||||
|
} = setup_test_slot_boundary_banks();
|
||||||
|
|
||||||
|
// Test case where the thread has a leader bank, and now detects the same leader bank,
|
||||||
|
// implying the slot is still running. Metrics should not be reported
|
||||||
|
assert!(leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank.clone()))
|
||||||
|
.is_none());
|
||||||
|
assert!(leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
|
||||||
|
.is_none());
|
||||||
|
assert_eq!(
|
||||||
|
leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&None)
|
||||||
|
.unwrap(),
|
||||||
|
first_bank.slot()
|
||||||
|
);
|
||||||
|
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn test_update_on_leader_slot_boundary_leader_to_leader_bigger_slot() {
|
||||||
|
let TestSlotBoundaryComponents {
|
||||||
|
first_bank,
|
||||||
|
first_poh_recorder_bank,
|
||||||
|
next_bank,
|
||||||
|
next_poh_recorder_bank,
|
||||||
|
mut leader_slot_metrics_tracker,
|
||||||
|
} = setup_test_slot_boundary_banks();
|
||||||
|
|
||||||
|
// Test case where the thread has a leader bank, and now detects there's a new leader bank
|
||||||
|
// for a bigger slot, implying the slot has ended. Metrics should be reported for the
|
||||||
|
// smaller slot
|
||||||
|
assert!(leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
|
||||||
|
.is_none());
|
||||||
|
assert_eq!(
|
||||||
|
leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&Some(next_poh_recorder_bank))
|
||||||
|
.unwrap(),
|
||||||
|
first_bank.slot()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&None)
|
||||||
|
.unwrap(),
|
||||||
|
next_bank.slot()
|
||||||
|
);
|
||||||
|
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn test_update_on_leader_slot_boundary_leader_to_leader_smaller_slot() {
|
||||||
|
let TestSlotBoundaryComponents {
|
||||||
|
first_bank,
|
||||||
|
first_poh_recorder_bank,
|
||||||
|
next_bank,
|
||||||
|
next_poh_recorder_bank,
|
||||||
|
mut leader_slot_metrics_tracker,
|
||||||
|
} = setup_test_slot_boundary_banks();
|
||||||
|
// Test case where the thread has a leader bank, and now detects there's a new leader bank
|
||||||
|
// for a samller slot, implying the slot has ended. Metrics should be reported for the
|
||||||
|
// bigger slot
|
||||||
|
assert!(leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&Some(next_poh_recorder_bank))
|
||||||
|
.is_none());
|
||||||
|
assert_eq!(
|
||||||
|
leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
|
||||||
|
.unwrap(),
|
||||||
|
next_bank.slot()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
leader_slot_metrics_tracker
|
||||||
|
.update_on_leader_slot_boundary(&None)
|
||||||
|
.unwrap(),
|
||||||
|
first_bank.slot()
|
||||||
|
);
|
||||||
|
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
|
||||||
|
}
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ pub mod fork_choice;
|
||||||
pub mod gen_keys;
|
pub mod gen_keys;
|
||||||
pub mod heaviest_subtree_fork_choice;
|
pub mod heaviest_subtree_fork_choice;
|
||||||
pub mod latest_validator_votes_for_frozen_banks;
|
pub mod latest_validator_votes_for_frozen_banks;
|
||||||
|
pub mod leader_slot_banking_stage_metrics;
|
||||||
pub mod ledger_cleanup_service;
|
pub mod ledger_cleanup_service;
|
||||||
pub mod optimistic_confirmation_verifier;
|
pub mod optimistic_confirmation_verifier;
|
||||||
pub mod outstanding_requests;
|
pub mod outstanding_requests;
|
||||||
|
|
|
@ -117,8 +117,9 @@ impl QosService {
|
||||||
txs_costs
|
txs_costs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Given a list of transactions and their costs, this function returns a corresponding
|
/// Given a list of transactions and their costs, this function returns a corresponding
|
||||||
// list of Results that indicate if a transaction is selected to be included in the current block,
|
/// list of Results that indicate if a transaction is selected to be included in the current block,
|
||||||
|
/// and a count of the number of transactions that would fit in the block
|
||||||
pub fn select_transactions_per_cost<'a>(
|
pub fn select_transactions_per_cost<'a>(
|
||||||
&self,
|
&self,
|
||||||
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
|
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
|
||||||
|
@ -174,37 +175,6 @@ impl QosService {
|
||||||
.unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err));
|
.unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err));
|
||||||
}
|
}
|
||||||
|
|
||||||
// metrics accumulating apis
|
|
||||||
pub fn accumulate_tpu_ingested_packets_count(&self, count: u64) {
|
|
||||||
self.metrics
|
|
||||||
.tpu_ingested_packets_count
|
|
||||||
.fetch_add(count, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn accumulate_tpu_buffered_packets_count(&self, count: u64) {
|
|
||||||
self.metrics
|
|
||||||
.tpu_buffered_packets_count
|
|
||||||
.fetch_add(count, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn accumulated_verified_txs_count(&self, count: u64) {
|
|
||||||
self.metrics
|
|
||||||
.verified_txs_count
|
|
||||||
.fetch_add(count, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn accumulated_processed_txs_count(&self, count: u64) {
|
|
||||||
self.metrics
|
|
||||||
.processed_txs_count
|
|
||||||
.fetch_add(count, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn accumulated_retryable_txs_count(&self, count: u64) {
|
|
||||||
self.metrics
|
|
||||||
.retryable_txs_count
|
|
||||||
.fetch_add(count, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn accumulate_estimated_transaction_costs(
|
pub fn accumulate_estimated_transaction_costs(
|
||||||
&self,
|
&self,
|
||||||
cost_details: &BatchedTransactionCostDetails,
|
cost_details: &BatchedTransactionCostDetails,
|
||||||
|
@ -263,24 +233,6 @@ struct QosServiceMetrics {
|
||||||
// aggregate metrics per slot
|
// aggregate metrics per slot
|
||||||
slot: AtomicU64,
|
slot: AtomicU64,
|
||||||
|
|
||||||
// accumulated number of live packets TPU received from verified receiver for processing.
|
|
||||||
tpu_ingested_packets_count: AtomicU64,
|
|
||||||
|
|
||||||
// accumulated number of live packets TPU put into buffer due to no active bank.
|
|
||||||
tpu_buffered_packets_count: AtomicU64,
|
|
||||||
|
|
||||||
// accumulated number of verified txs, which excludes unsanitized transactions and
|
|
||||||
// non-vote transactions when in vote-only mode from ingested packets
|
|
||||||
verified_txs_count: AtomicU64,
|
|
||||||
|
|
||||||
// accumulated number of transactions been processed, includes those landed and those to be
|
|
||||||
// returned (due to AccountInUse, and other QoS related reasons)
|
|
||||||
processed_txs_count: AtomicU64,
|
|
||||||
|
|
||||||
// accumulated number of transactions buffered for retry, often due to AccountInUse and QoS
|
|
||||||
// reasons, includes retried_txs_per_block_limit_count and retried_txs_per_account_limit_count
|
|
||||||
retryable_txs_count: AtomicU64,
|
|
||||||
|
|
||||||
// accumulated time in micro-sec spent in computing transaction cost. It is the main performance
|
// accumulated time in micro-sec spent in computing transaction cost. It is the main performance
|
||||||
// overhead introduced by cost_model
|
// overhead introduced by cost_model
|
||||||
compute_cost_time: AtomicU64,
|
compute_cost_time: AtomicU64,
|
||||||
|
@ -342,31 +294,6 @@ impl QosServiceMetrics {
|
||||||
"qos-service-stats",
|
"qos-service-stats",
|
||||||
("id", self.id as i64, i64),
|
("id", self.id as i64, i64),
|
||||||
("bank_slot", bank_slot as i64, i64),
|
("bank_slot", bank_slot as i64, i64),
|
||||||
(
|
|
||||||
"tpu_ingested_packets_count",
|
|
||||||
self.tpu_ingested_packets_count.swap(0, Ordering::Relaxed) as i64,
|
|
||||||
i64
|
|
||||||
),
|
|
||||||
(
|
|
||||||
"tpu_buffered_packets_count",
|
|
||||||
self.tpu_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
|
|
||||||
i64
|
|
||||||
),
|
|
||||||
(
|
|
||||||
"verified_txs_count",
|
|
||||||
self.verified_txs_count.swap(0, Ordering::Relaxed) as i64,
|
|
||||||
i64
|
|
||||||
),
|
|
||||||
(
|
|
||||||
"processed_txs_count",
|
|
||||||
self.processed_txs_count.swap(0, Ordering::Relaxed) as i64,
|
|
||||||
i64
|
|
||||||
),
|
|
||||||
(
|
|
||||||
"retryable_txs_count",
|
|
||||||
self.retryable_txs_count.swap(0, Ordering::Relaxed) as i64,
|
|
||||||
i64
|
|
||||||
),
|
|
||||||
(
|
(
|
||||||
"compute_cost_time",
|
"compute_cost_time",
|
||||||
self.compute_cost_time.swap(0, Ordering::Relaxed) as i64,
|
self.compute_cost_time.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
|
|
@ -60,6 +60,7 @@ type Result<T> = std::result::Result<T, PohRecorderError>;
|
||||||
|
|
||||||
pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
|
pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct BankStart {
|
pub struct BankStart {
|
||||||
pub working_bank: Arc<Bank>,
|
pub working_bank: Arc<Bank>,
|
||||||
pub bank_creation_time: Arc<Instant>,
|
pub bank_creation_time: Arc<Instant>,
|
||||||
|
|
Loading…
Reference in New Issue