Metrics prioritization fees (#34653)

* Adding metrics for prioritization fees min/max per thread

* Adding scheduled transaction prioritization fees to the metrics

* Changes after andrews comments

* fixing Taos comments

* Adding metrics to the new scheduler

* Fixing getting of min max for TransactionStateContainer

* Fix clippy CI Issue

* Changes after andrews comments about min/max for new scheduler

* Creating a new structure to store prio fee metrics

* Reporting with prio fee stats banking_stage_scheduler_counts

* merging prioritization stats into SchedulerCountMetrics

* Minor changes after andrews review
This commit is contained in:
galactus 2024-02-01 22:06:45 +01:00 committed by GitHub
parent daa2449ad4
commit 35f900b03b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 287 additions and 22 deletions

View File

@ -659,7 +659,10 @@ impl BankingStage {
}
let (decision, make_decision_time) =
measure!(decision_maker.make_consume_or_forward_decision());
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start());
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(
decision.bank_start(),
Some(unprocessed_transaction_storage),
);
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
match decision {

View File

@ -212,6 +212,8 @@ impl ConsumeWorkerMetrics {
retryable_transaction_indexes,
execute_and_commit_timings,
error_counters,
min_prioritization_fees,
max_prioritization_fees,
..
}: &ExecuteAndCommitTransactionsOutput,
) {
@ -227,7 +229,20 @@ impl ConsumeWorkerMetrics {
self.count_metrics
.retryable_transaction_count
.fetch_add(retryable_transaction_indexes.len(), Ordering::Relaxed);
let min_prioritization_fees = self
.count_metrics
.min_prioritization_fees
.fetch_min(*min_prioritization_fees, Ordering::Relaxed);
let max_prioritization_fees = self
.count_metrics
.max_prioritization_fees
.fetch_max(*max_prioritization_fees, Ordering::Relaxed);
self.count_metrics
.min_prioritization_fees
.swap(min_prioritization_fees, Ordering::Relaxed);
self.count_metrics
.max_prioritization_fees
.swap(max_prioritization_fees, Ordering::Relaxed);
self.update_on_execute_and_commit_timings(execute_and_commit_timings);
self.update_on_error_counters(error_counters);
}
@ -368,7 +383,6 @@ impl ConsumeWorkerMetrics {
}
}
#[derive(Default)]
struct ConsumeWorkerCountMetrics {
transactions_attempted_execution_count: AtomicUsize,
executed_transactions_count: AtomicUsize,
@ -376,6 +390,23 @@ struct ConsumeWorkerCountMetrics {
retryable_transaction_count: AtomicUsize,
retryable_expired_bank_count: AtomicUsize,
cost_model_throttled_transactions_count: AtomicUsize,
min_prioritization_fees: AtomicU64,
max_prioritization_fees: AtomicU64,
}
impl Default for ConsumeWorkerCountMetrics {
fn default() -> Self {
Self {
transactions_attempted_execution_count: AtomicUsize::default(),
executed_transactions_count: AtomicUsize::default(),
executed_with_successful_result_count: AtomicUsize::default(),
retryable_transaction_count: AtomicUsize::default(),
retryable_expired_bank_count: AtomicUsize::default(),
cost_model_throttled_transactions_count: AtomicUsize::default(),
min_prioritization_fees: AtomicU64::new(u64::MAX),
max_prioritization_fees: AtomicU64::default(),
}
}
}
impl ConsumeWorkerCountMetrics {
@ -416,6 +447,17 @@ impl ConsumeWorkerCountMetrics {
.swap(0, Ordering::Relaxed),
i64
),
(
"min_prioritization_fees",
self.min_prioritization_fees
.swap(u64::MAX, Ordering::Relaxed),
i64
),
(
"max_prioritization_fees",
self.max_prioritization_fees.swap(0, Ordering::Relaxed),
i64
),
);
}
}

View File

@ -26,6 +26,7 @@ use {
bank::{Bank, LoadAndExecuteTransactionsOutput},
svm::account_loader::validate_fee_payer,
transaction_batch::TransactionBatch,
transaction_priority_details::GetTransactionPriorityDetails,
},
solana_sdk::{
clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE},
@ -69,6 +70,8 @@ pub struct ExecuteAndCommitTransactionsOutput {
pub commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
pub(crate) execute_and_commit_timings: LeaderExecuteAndCommitTimings,
pub(crate) error_counters: TransactionErrorMetrics,
pub(crate) min_prioritization_fees: u64,
pub(crate) max_prioritization_fees: u64,
}
pub struct Consumer {
@ -291,6 +294,8 @@ impl Consumer {
let mut total_execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
let mut total_error_counters = TransactionErrorMetrics::default();
let mut reached_max_poh_height = false;
let mut overall_min_prioritization_fees: u64 = u64::MAX;
let mut overall_max_prioritization_fees: u64 = 0;
while chunk_start != transactions.len() {
let chunk_end = std::cmp::min(
transactions.len(),
@ -321,6 +326,8 @@ impl Consumer {
commit_transactions_result: new_commit_transactions_result,
execute_and_commit_timings: new_execute_and_commit_timings,
error_counters: new_error_counters,
min_prioritization_fees,
max_prioritization_fees,
..
} = execute_and_commit_transactions_output;
@ -330,6 +337,10 @@ impl Consumer {
total_transactions_attempted_execution_count,
new_transactions_attempted_execution_count
);
overall_min_prioritization_fees =
std::cmp::min(overall_min_prioritization_fees, min_prioritization_fees);
overall_max_prioritization_fees =
std::cmp::min(overall_max_prioritization_fees, max_prioritization_fees);
trace!(
"process_transactions result: {:?}",
@ -390,6 +401,8 @@ impl Consumer {
cost_model_us: total_cost_model_us,
execute_and_commit_timings: total_execute_and_commit_timings,
error_counters: total_error_counters,
min_prioritization_fees: overall_min_prioritization_fees,
max_prioritization_fees: overall_max_prioritization_fees,
}
}
@ -567,6 +580,19 @@ impl Consumer {
});
execute_and_commit_timings.collect_balances_us = collect_balances_us;
let min_max = batch
.sanitized_transactions()
.iter()
.filter_map(|transaction| {
let round_compute_unit_price_enabled = false; // TODO get from working_bank.feature_set
transaction
.get_transaction_priority_details(round_compute_unit_price_enabled)
.map(|details| details.priority)
})
.minmax();
let (min_prioritization_fees, max_prioritization_fees) =
min_max.into_option().unwrap_or_default();
let (load_and_execute_transactions_output, load_execute_us) = measure_us!(bank
.load_and_execute_transactions(
batch,
@ -648,6 +674,8 @@ impl Consumer {
commit_transactions_result: Err(recorder_err),
execute_and_commit_timings,
error_counters,
min_prioritization_fees,
max_prioritization_fees,
};
}
@ -703,6 +731,8 @@ impl Consumer {
commit_transactions_result: Ok(commit_transaction_statuses),
execute_and_commit_timings,
error_counters,
min_prioritization_fees,
max_prioritization_fees,
}
}

View File

@ -1,7 +1,9 @@
use {
super::{
leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics},
unprocessed_transaction_storage::InsertPacketBatchSummary,
unprocessed_transaction_storage::{
InsertPacketBatchSummary, UnprocessedTransactionStorage,
},
},
solana_accounts_db::transaction_error_metrics::*,
solana_poh::poh_recorder::BankStart,
@ -52,6 +54,53 @@ pub(crate) struct ProcessTransactionsSummary {
// Breakdown of all the transaction errors from transactions passed for execution
pub error_counters: TransactionErrorMetrics,
pub min_prioritization_fees: u64,
pub max_prioritization_fees: u64,
}
// Metrics describing prioritization fee information for each transaction storage before processing transactions
#[derive(Debug, Default)]
struct LeaderPrioritizationFeesMetrics {
// minimum prioritization fees in the MinMaxHeap
min_prioritization_fees_per_cu: u64,
// maximum prioritization fees in the MinMaxHeap
max_prioritization_fees_per_cu: u64,
}
impl LeaderPrioritizationFeesMetrics {
fn new(unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>) -> Self {
if let Some(unprocessed_transaction_storage) = unprocessed_transaction_storage {
Self {
min_prioritization_fees_per_cu: unprocessed_transaction_storage
.get_min_priority()
.unwrap_or_default(),
max_prioritization_fees_per_cu: unprocessed_transaction_storage
.get_max_priority()
.unwrap_or_default(),
}
} else {
Self::default()
}
}
fn report(&self, id: u32, slot: Slot) {
datapoint_info!(
"banking_stage-leader_prioritization_fees_info",
("id", id, i64),
("slot", slot, i64),
(
"min_prioritization_fees_per_cu",
self.min_prioritization_fees_per_cu,
i64
),
(
"max_prioritization_fees_per_cu",
self.max_prioritization_fees_per_cu,
i64
)
);
}
}
// Metrics describing packets ingested/processed in various parts of BankingStage during this
@ -138,6 +187,11 @@ struct LeaderSlotPacketCountMetrics {
// total number of forwardable batches that were attempted for forwarding. A forwardable batch
// is defined in `ForwardPacketBatchesByAccounts` in `forward_packet_batches_by_accounts.rs`
forwardable_batches_count: u64,
// min prioritization fees for scheduled transactions
min_prioritization_fees: u64,
// max prioritization fees for scheduled transactions
max_prioritization_fees: u64,
}
impl LeaderSlotPacketCountMetrics {
@ -255,6 +309,16 @@ impl LeaderSlotPacketCountMetrics {
self.end_of_slot_unprocessed_buffer_len as i64,
i64
),
(
"min_prioritization_fees",
self.min_prioritization_fees as i64,
i64
),
(
"max_prioritization_fees",
self.max_prioritization_fees as i64,
i64
),
);
}
}
@ -277,12 +341,19 @@ pub(crate) struct LeaderSlotMetrics {
timing_metrics: LeaderSlotTimingMetrics,
prioritization_fees_metric: LeaderPrioritizationFeesMetrics,
// Used by tests to check if the `self.report()` method was called
is_reported: bool,
}
impl LeaderSlotMetrics {
pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self {
pub(crate) fn new(
id: u32,
slot: Slot,
bank_creation_time: &Instant,
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
) -> Self {
Self {
id,
slot,
@ -290,6 +361,9 @@ impl LeaderSlotMetrics {
transaction_error_metrics: TransactionErrorMetrics::new(),
vote_packet_count_metrics: VotePacketCountMetrics::new(),
timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time),
prioritization_fees_metric: LeaderPrioritizationFeesMetrics::new(
unprocessed_transaction_storage,
),
is_reported: false,
}
}
@ -301,6 +375,7 @@ impl LeaderSlotMetrics {
self.transaction_error_metrics.report(self.id, self.slot);
self.packet_count_metrics.report(self.id, self.slot);
self.vote_packet_count_metrics.report(self.id, self.slot);
self.prioritization_fees_metric.report(self.id, self.slot);
}
/// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None
@ -372,6 +447,7 @@ impl LeaderSlotMetricsTracker {
pub(crate) fn check_leader_slot_boundary(
&mut self,
bank_start: Option<&BankStart>,
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
) -> MetricsTrackerAction {
match (self.leader_slot_metrics.as_mut(), bank_start) {
(None, None) => MetricsTrackerAction::Noop,
@ -387,6 +463,7 @@ impl LeaderSlotMetricsTracker {
self.id,
bank_start.working_bank.slot(),
&bank_start.bank_creation_time,
unprocessed_transaction_storage,
)))
}
@ -398,6 +475,7 @@ impl LeaderSlotMetricsTracker {
self.id,
bank_start.working_bank.slot(),
&bank_start.bank_creation_time,
unprocessed_transaction_storage,
)))
} else {
MetricsTrackerAction::Noop
@ -449,6 +527,8 @@ impl LeaderSlotMetricsTracker {
cost_model_us,
ref execute_and_commit_timings,
error_counters,
min_prioritization_fees,
max_prioritization_fees,
..
} = process_transactions_summary;
@ -525,6 +605,23 @@ impl LeaderSlotMetricsTracker {
*cost_model_us
);
leader_slot_metrics
.packet_count_metrics
.min_prioritization_fees = std::cmp::min(
leader_slot_metrics
.packet_count_metrics
.min_prioritization_fees,
*min_prioritization_fees,
);
leader_slot_metrics
.packet_count_metrics
.max_prioritization_fees = std::cmp::min(
leader_slot_metrics
.packet_count_metrics
.max_prioritization_fees,
*max_prioritization_fees,
);
leader_slot_metrics
.timing_metrics
.execute_and_commit_timings
@ -896,7 +993,7 @@ mod tests {
..
} = setup_test_slot_boundary_banks();
// Test that with no bank being tracked, and no new bank being tracked, nothing is reported
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
@ -916,8 +1013,8 @@ mod tests {
// Test case where the thread has not detected a leader bank, and now sees a leader bank.
// Metrics should not be reported because leader slot has not ended
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
let action =
leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::NewTracker(None)),
mem::discriminant(&action)
@ -941,12 +1038,12 @@ mod tests {
{
// Setup first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
@ -959,7 +1056,7 @@ mod tests {
}
{
// Assert no-op if still no new bank
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
@ -981,13 +1078,13 @@ mod tests {
{
// Setup with first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert nop-op if same bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
@ -996,7 +1093,7 @@ mod tests {
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
@ -1025,13 +1122,13 @@ mod tests {
{
// Setup with first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if new bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&next_poh_recorder_bank));
.check_leader_slot_boundary(Some(&next_poh_recorder_bank), None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)),
mem::discriminant(&action)
@ -1044,7 +1141,7 @@ mod tests {
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
@ -1072,13 +1169,13 @@ mod tests {
{
// Setup with next_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&next_poh_recorder_bank));
.check_leader_slot_boundary(Some(&next_poh_recorder_bank), None);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if new bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)),
mem::discriminant(&action)
@ -1091,7 +1188,7 @@ mod tests {
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)

View File

@ -17,6 +17,7 @@ use {
TOTAL_BUFFERED_PACKETS,
},
crossbeam_channel::RecvTimeoutError,
itertools::MinMaxResult,
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
solana_cost_model::cost_model::CostModel,
solana_measure::measure_us,
@ -95,10 +96,11 @@ impl SchedulerController {
if !self.receive_and_buffer_packets(&decision) {
break;
}
// Report metrics only if there is data.
// Reset intervals when appropriate, regardless of report.
let should_report = self.count_metrics.has_data();
self.count_metrics
.update_prioritization_stats(self.container.get_min_max_prioritization_fees());
self.count_metrics.maybe_report_and_reset(should_report);
self.timing_metrics.maybe_report_and_reset(should_report);
self.worker_metrics
@ -419,6 +421,10 @@ struct SchedulerCountMetrics {
num_dropped_on_age_and_status: usize,
/// Number of transactions that were dropped due to exceeded capacity.
num_dropped_on_capacity: usize,
/// Min prioritization fees in the transaction container
min_prioritization_fees: u64,
/// Max prioritization fees in the transaction container
max_prioritization_fees: u64,
}
impl SchedulerCountMetrics {
@ -468,7 +474,17 @@ impl SchedulerCountMetrics {
self.num_dropped_on_age_and_status,
i64
),
("num_dropped_on_capacity", self.num_dropped_on_capacity, i64)
("num_dropped_on_capacity", self.num_dropped_on_capacity, i64),
(
"min_prioritization_fees",
self.get_min_prioritization_fees(),
i64
),
(
"max_prioritization_fees",
self.get_max_prioritization_fees(),
i64
)
);
}
@ -504,6 +520,38 @@ impl SchedulerCountMetrics {
self.num_dropped_on_clear = 0;
self.num_dropped_on_age_and_status = 0;
self.num_dropped_on_capacity = 0;
self.min_prioritization_fees = u64::MAX;
self.max_prioritization_fees = 0;
}
pub fn update_prioritization_stats(&mut self, min_max_fees: MinMaxResult<u64>) {
// update min/max priotization fees
match min_max_fees {
itertools::MinMaxResult::NoElements => {
// do nothing
}
itertools::MinMaxResult::OneElement(e) => {
self.min_prioritization_fees = e;
self.max_prioritization_fees = e;
}
itertools::MinMaxResult::MinMax(min, max) => {
self.min_prioritization_fees = min;
self.max_prioritization_fees = max;
}
}
}
pub fn get_min_prioritization_fees(&self) -> u64 {
// to avoid getting u64::max recorded by metrics / in case of edge cases
if self.min_prioritization_fees != u64::MAX {
self.min_prioritization_fees
} else {
0
}
}
pub fn get_max_prioritization_fees(&self) -> u64 {
self.max_prioritization_fees
}
}

View File

@ -4,6 +4,7 @@ use {
transaction_state::{SanitizedTransactionTTL, TransactionState},
},
crate::banking_stage::scheduler_messages::TransactionId,
itertools::MinMaxResult,
min_max_heap::MinMaxHeap,
solana_cost_model::transaction_cost::TransactionCost,
solana_runtime::transaction_priority_details::TransactionPriorityDetails,
@ -149,6 +150,16 @@ impl TransactionStateContainer {
.remove(id)
.expect("transaction must exist");
}
pub(crate) fn get_min_max_prioritization_fees(&self) -> MinMaxResult<u64> {
match self.priority_queue.peek_min() {
Some(min) => match self.priority_queue.peek_max() {
Some(max) => MinMaxResult::MinMax(min.priority, max.priority),
None => MinMaxResult::OneElement(min.priority),
},
None => MinMaxResult::NoElements,
}
}
}
#[cfg(test)]

View File

@ -193,6 +193,14 @@ impl UnprocessedPacketBatches {
self.packet_priority_queue.is_empty()
}
pub fn get_min_priority(&self) -> Option<u64> {
self.packet_priority_queue.peek_min().map(|x| x.priority())
}
pub fn get_max_priority(&self) -> Option<u64> {
self.packet_priority_queue.peek_max().map(|x| x.priority())
}
fn push_internal(&mut self, deserialized_packet: DeserializedPacket) {
// Push into the priority queue
self.packet_priority_queue

View File

@ -282,6 +282,24 @@ impl UnprocessedTransactionStorage {
}
}
pub fn get_min_priority(&self) -> Option<u64> {
match self {
Self::VoteStorage(_) => None,
Self::LocalTransactionStorage(transaction_storage) => {
transaction_storage.get_min_priority()
}
}
}
pub fn get_max_priority(&self) -> Option<u64> {
match self {
Self::VoteStorage(_) => None,
Self::LocalTransactionStorage(transaction_storage) => {
transaction_storage.get_max_priority()
}
}
}
/// Returns the maximum number of packets a receive should accept
pub fn max_receive_size(&self) -> usize {
match self {
@ -529,6 +547,14 @@ impl ThreadLocalUnprocessedPackets {
self.unprocessed_packet_batches.len()
}
pub fn get_min_priority(&self) -> Option<u64> {
self.unprocessed_packet_batches.get_min_priority()
}
pub fn get_max_priority(&self) -> Option<u64> {
self.unprocessed_packet_batches.get_max_priority()
}
fn max_receive_size(&self) -> usize {
self.unprocessed_packet_batches.capacity() - self.unprocessed_packet_batches.len()
}