Scheduler: Leader-Slot metrics for Scheduler (#35087)

This commit is contained in:
Andrew Fitzgerald 2024-02-23 17:06:22 -08:00 committed by GitHub
parent 54706a885b
commit 9f581113bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 227 additions and 98 deletions

View File

@ -54,9 +54,11 @@ pub(crate) struct SchedulerController {
container: TransactionStateContainer,
/// State for scheduling and communicating with worker threads.
scheduler: PrioGraphScheduler,
/// Metrics tracking counts on transactions in different states.
/// Metrics tracking counts on transactions in different states
/// over an interval and during a leader slot.
count_metrics: SchedulerCountMetrics,
/// Metrics tracking time spent in different code sections.
/// Metrics tracking time spent in difference code sections
/// over an interval and during a leader slot.
timing_metrics: SchedulerTimingMetrics,
/// Metric report handles for the worker threads.
worker_metrics: Vec<Arc<ConsumeWorkerMetrics>>,
@ -97,7 +99,15 @@ impl SchedulerController {
// bypass sanitization and buffering and immediately drop the packets.
let (decision, decision_time_us) =
measure_us!(self.decision_maker.make_consume_or_forward_decision());
saturating_add_assign!(self.timing_metrics.decision_time_us, decision_time_us);
self.timing_metrics.update(|timing_metrics| {
saturating_add_assign!(timing_metrics.decision_time_us, decision_time_us);
});
let new_leader_slot = decision.bank_start().map(|b| b.working_bank.slot());
self.count_metrics
.maybe_report_and_reset_slot(new_leader_slot);
self.timing_metrics
.maybe_report_and_reset_slot(new_leader_slot);
self.process_transactions(&decision)?;
self.receive_completed()?;
@ -106,11 +116,15 @@ impl SchedulerController {
}
// Report metrics only if there is data.
// Reset intervals when appropriate, regardless of report.
let should_report = self.count_metrics.has_data();
let should_report = self.count_metrics.interval_has_data();
let priority_min_max = self.container.get_min_max_priority();
self.count_metrics.update(|count_metrics| {
count_metrics.update_priority_stats(priority_min_max);
});
self.count_metrics
.update_priority_stats(self.container.get_min_max_priority());
self.count_metrics.maybe_report_and_reset(should_report);
self.timing_metrics.maybe_report_and_reset(should_report);
.maybe_report_and_reset_interval(should_report);
self.timing_metrics
.maybe_report_and_reset_interval(should_report);
self.worker_metrics
.iter()
.for_each(|metrics| metrics.maybe_report_and_reset());
@ -133,31 +147,41 @@ impl SchedulerController {
},
|_| true // no pre-lock filter for now
)?);
saturating_add_assign!(
self.count_metrics.num_scheduled,
scheduling_summary.num_scheduled
);
saturating_add_assign!(
self.count_metrics.num_unschedulable,
scheduling_summary.num_unschedulable
);
saturating_add_assign!(
self.count_metrics.num_schedule_filtered_out,
scheduling_summary.num_filtered_out
);
saturating_add_assign!(
self.timing_metrics.schedule_filter_time_us,
scheduling_summary.filter_time_us
);
saturating_add_assign!(self.timing_metrics.schedule_time_us, schedule_time_us);
self.count_metrics.update(|count_metrics| {
saturating_add_assign!(
count_metrics.num_scheduled,
scheduling_summary.num_scheduled
);
saturating_add_assign!(
count_metrics.num_unschedulable,
scheduling_summary.num_unschedulable
);
saturating_add_assign!(
count_metrics.num_schedule_filtered_out,
scheduling_summary.num_filtered_out
);
});
self.timing_metrics.update(|timing_metrics| {
saturating_add_assign!(
timing_metrics.schedule_filter_time_us,
scheduling_summary.filter_time_us
);
saturating_add_assign!(timing_metrics.schedule_time_us, schedule_time_us);
});
}
BufferedPacketsDecision::Forward => {
let (_, clear_time_us) = measure_us!(self.clear_container());
saturating_add_assign!(self.timing_metrics.clear_time_us, clear_time_us);
self.timing_metrics.update(|timing_metrics| {
saturating_add_assign!(timing_metrics.clear_time_us, clear_time_us);
});
}
BufferedPacketsDecision::ForwardAndHold => {
let (_, clean_time_us) = measure_us!(self.clean_queue());
saturating_add_assign!(self.timing_metrics.clean_time_us, clean_time_us);
self.timing_metrics.update(|timing_metrics| {
saturating_add_assign!(timing_metrics.clean_time_us, clean_time_us);
});
}
BufferedPacketsDecision::Hold => {}
}
@ -192,10 +216,15 @@ impl SchedulerController {
/// Clears the transaction state container.
/// This only clears pending transactions, and does **not** clear in-flight transactions.
fn clear_container(&mut self) {
let mut num_dropped_on_clear: usize = 0;
while let Some(id) = self.container.pop() {
self.container.remove_by_id(&id.id);
saturating_add_assign!(self.count_metrics.num_dropped_on_clear, 1);
saturating_add_assign!(num_dropped_on_clear, 1);
}
self.count_metrics.update(|count_metrics| {
saturating_add_assign!(count_metrics.num_dropped_on_clear, num_dropped_on_clear);
});
}
/// Clean unprocessable transactions from the queue. These will be transactions that are
@ -215,7 +244,7 @@ impl SchedulerController {
const CHUNK_SIZE: usize = 128;
let mut error_counters = TransactionErrorMetrics::default();
let mut num_dropped_on_age_and_status: usize = 0;
for chunk in transaction_ids.chunks(CHUNK_SIZE) {
let lock_results = vec![Ok(()); chunk.len()];
let sanitized_txs: Vec<_> = chunk
@ -238,23 +267,36 @@ impl SchedulerController {
for ((result, _nonce, _lamports), id) in check_results.into_iter().zip(chunk.iter()) {
if result.is_err() {
saturating_add_assign!(self.count_metrics.num_dropped_on_age_and_status, 1);
saturating_add_assign!(num_dropped_on_age_and_status, 1);
self.container.remove_by_id(&id.id);
}
}
}
self.count_metrics.update(|count_metrics| {
saturating_add_assign!(
count_metrics.num_dropped_on_age_and_status,
num_dropped_on_age_and_status
);
});
}
/// Receives completed transactions from the workers and updates metrics.
fn receive_completed(&mut self) -> Result<(), SchedulerError> {
let ((num_transactions, num_retryable), receive_completed_time_us) =
measure_us!(self.scheduler.receive_completed(&mut self.container)?);
saturating_add_assign!(self.count_metrics.num_finished, num_transactions);
saturating_add_assign!(self.count_metrics.num_retryable, num_retryable);
saturating_add_assign!(
self.timing_metrics.receive_completed_time_us,
receive_completed_time_us
);
self.count_metrics.update(|count_metrics| {
saturating_add_assign!(count_metrics.num_finished, num_transactions);
saturating_add_assign!(count_metrics.num_retryable, num_retryable);
});
self.timing_metrics.update(|timing_metrics| {
saturating_add_assign!(
timing_metrics.receive_completed_time_us,
receive_completed_time_us
);
});
Ok(())
}
@ -281,22 +323,33 @@ impl SchedulerController {
let (received_packet_results, receive_time_us) = measure_us!(self
.packet_receiver
.receive_packets(recv_timeout, remaining_queue_capacity));
saturating_add_assign!(self.timing_metrics.receive_time_us, receive_time_us);
self.timing_metrics.update(|timing_metrics| {
saturating_add_assign!(timing_metrics.receive_time_us, receive_time_us);
});
match received_packet_results {
Ok(receive_packet_results) => {
let num_received_packets = receive_packet_results.deserialized_packets.len();
saturating_add_assign!(self.count_metrics.num_received, num_received_packets);
self.count_metrics.update(|count_metrics| {
saturating_add_assign!(count_metrics.num_received, num_received_packets);
});
if should_buffer {
let (_, buffer_time_us) = measure_us!(
self.buffer_packets(receive_packet_results.deserialized_packets)
);
saturating_add_assign!(self.timing_metrics.buffer_time_us, buffer_time_us);
self.timing_metrics.update(|timing_metrics| {
saturating_add_assign!(timing_metrics.buffer_time_us, buffer_time_us);
});
} else {
saturating_add_assign!(
self.count_metrics.num_dropped_on_receive,
num_received_packets
);
self.count_metrics.update(|count_metrics| {
saturating_add_assign!(
count_metrics.num_dropped_on_receive,
num_received_packets
);
});
}
}
Err(RecvTimeoutError::Timeout) => {}
@ -348,6 +401,8 @@ impl SchedulerController {
let post_lock_validation_count = transactions.len();
let mut post_transaction_check_count: usize = 0;
let mut num_dropped_on_capacity: usize = 0;
let mut num_buffered: usize = 0;
for ((transaction, fee_budget_limits), _) in transactions
.into_iter()
.zip(fee_budget_limits_vec)
@ -370,9 +425,9 @@ impl SchedulerController {
priority,
cost,
) {
saturating_add_assign!(self.count_metrics.num_dropped_on_capacity, 1);
saturating_add_assign!(num_dropped_on_capacity, 1);
}
saturating_add_assign!(self.count_metrics.num_buffered, 1);
saturating_add_assign!(num_buffered, 1);
}
// Update metrics for transactions that were dropped.
@ -382,18 +437,25 @@ impl SchedulerController {
let num_dropped_on_transaction_checks =
post_lock_validation_count.saturating_sub(post_transaction_check_count);
saturating_add_assign!(
self.count_metrics.num_dropped_on_sanitization,
num_dropped_on_sanitization
);
saturating_add_assign!(
self.count_metrics.num_dropped_on_validate_locks,
num_dropped_on_lock_validation
);
saturating_add_assign!(
self.count_metrics.num_dropped_on_receive_transaction_checks,
num_dropped_on_transaction_checks
);
self.count_metrics.update(|count_metrics| {
saturating_add_assign!(
count_metrics.num_dropped_on_capacity,
num_dropped_on_capacity
);
saturating_add_assign!(count_metrics.num_buffered, num_buffered);
saturating_add_assign!(
count_metrics.num_dropped_on_sanitization,
num_dropped_on_sanitization
);
saturating_add_assign!(
count_metrics.num_dropped_on_validate_locks,
num_dropped_on_lock_validation
);
saturating_add_assign!(
count_metrics.num_dropped_on_receive_transaction_checks,
num_dropped_on_transaction_checks
);
});
}
}

View File

@ -1,15 +1,45 @@
use {
itertools::MinMaxResult,
solana_sdk::timing::AtomicInterval,
std::ops::{Deref, DerefMut},
solana_sdk::{clock::Slot, timing::AtomicInterval},
};
#[derive(Default)]
pub struct SchedulerCountMetrics {
interval: IntervalSchedulerCountMetrics,
slot: SlotSchedulerCountMetrics,
}
impl SchedulerCountMetrics {
pub fn update(&mut self, update: impl Fn(&mut SchedulerCountMetricsInner)) {
update(&mut self.interval.metrics);
update(&mut self.slot.metrics);
}
pub fn maybe_report_and_reset_slot(&mut self, slot: Option<Slot>) {
self.slot.maybe_report_and_reset(slot);
}
pub fn maybe_report_and_reset_interval(&mut self, should_report: bool) {
self.interval.maybe_report_and_reset(should_report);
}
pub fn interval_has_data(&self) -> bool {
self.interval.metrics.has_data()
}
}
#[derive(Default)]
struct IntervalSchedulerCountMetrics {
interval: AtomicInterval,
metrics: SchedulerCountMetricsInner,
}
#[derive(Default)]
struct SlotSchedulerCountMetrics {
slot: Option<Slot>,
metrics: SchedulerCountMetricsInner,
}
#[derive(Default)]
pub struct SchedulerCountMetricsInner {
/// Number of packets received.
@ -49,35 +79,36 @@ pub struct SchedulerCountMetricsInner {
pub max_prioritization_fees: u64,
}
impl Deref for SchedulerCountMetrics {
type Target = SchedulerCountMetricsInner;
fn deref(&self) -> &Self::Target {
&self.metrics
}
}
impl DerefMut for SchedulerCountMetrics {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.metrics
}
}
impl SchedulerCountMetrics {
pub fn maybe_report_and_reset(&mut self, should_report: bool) {
impl IntervalSchedulerCountMetrics {
fn maybe_report_and_reset(&mut self, should_report: bool) {
const REPORT_INTERVAL_MS: u64 = 1000;
if self.interval.should_update(REPORT_INTERVAL_MS) {
if should_report {
self.report("banking_stage_scheduler_counts");
self.metrics.report("banking_stage_scheduler_counts", None);
}
self.reset();
self.metrics.reset();
}
}
}
impl SlotSchedulerCountMetrics {
fn maybe_report_and_reset(&mut self, slot: Option<Slot>) {
if self.slot != slot {
// Only report if there was an assigned slot.
if self.slot.is_some() {
self.metrics
.report("banking_stage_scheduler_slot_counts", self.slot);
}
self.metrics.reset();
self.slot = slot;
}
}
}
impl SchedulerCountMetricsInner {
fn report(&self, name: &'static str) {
datapoint_info!(
name,
fn report(&self, name: &'static str, slot: Option<Slot>) {
let mut datapoint = create_datapoint!(
@point name,
("num_received", self.num_received, i64),
("num_buffered", self.num_buffered, i64),
("num_scheduled", self.num_scheduled, i64),
@ -115,6 +146,10 @@ impl SchedulerCountMetricsInner {
("min_priority", self.get_min_priority(), i64),
("max_priority", self.get_max_priority(), i64)
);
if let Some(slot) = slot {
datapoint.add_field_i64("slot", slot as i64);
}
solana_metrics::submit(datapoint, log::Level::Info);
}
pub fn has_data(&self) -> bool {
@ -186,10 +221,37 @@ impl SchedulerCountMetricsInner {
#[derive(Default)]
pub struct SchedulerTimingMetrics {
interval: IntervalSchedulerTimingMetrics,
slot: SlotSchedulerTimingMetrics,
}
impl SchedulerTimingMetrics {
pub fn update(&mut self, update: impl Fn(&mut SchedulerTimingMetricsInner)) {
update(&mut self.interval.metrics);
update(&mut self.slot.metrics);
}
pub fn maybe_report_and_reset_slot(&mut self, slot: Option<Slot>) {
self.slot.maybe_report_and_reset(slot);
}
pub fn maybe_report_and_reset_interval(&mut self, should_report: bool) {
self.interval.maybe_report_and_reset(should_report);
}
}
#[derive(Default)]
struct IntervalSchedulerTimingMetrics {
interval: AtomicInterval,
metrics: SchedulerTimingMetricsInner,
}
#[derive(Default)]
struct SlotSchedulerTimingMetrics {
slot: Option<Slot>,
metrics: SchedulerTimingMetricsInner,
}
#[derive(Default)]
pub struct SchedulerTimingMetricsInner {
/// Time spent making processing decisions.
@ -210,35 +272,36 @@ pub struct SchedulerTimingMetricsInner {
pub receive_completed_time_us: u64,
}
impl Deref for SchedulerTimingMetrics {
type Target = SchedulerTimingMetricsInner;
fn deref(&self) -> &Self::Target {
&self.metrics
}
}
impl DerefMut for SchedulerTimingMetrics {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.metrics
}
}
impl SchedulerTimingMetrics {
pub fn maybe_report_and_reset(&mut self, should_report: bool) {
impl IntervalSchedulerTimingMetrics {
fn maybe_report_and_reset(&mut self, should_report: bool) {
const REPORT_INTERVAL_MS: u64 = 1000;
if self.interval.should_update(REPORT_INTERVAL_MS) {
if should_report {
self.report("banking_stage_scheduler_timing");
self.metrics.report("banking_stage_scheduler_timing", None);
}
self.reset();
self.metrics.reset();
}
}
}
impl SlotSchedulerTimingMetrics {
fn maybe_report_and_reset(&mut self, slot: Option<Slot>) {
if self.slot != slot {
// Only report if there was an assigned slot.
if self.slot.is_some() {
self.metrics
.report("banking_stage_scheduler_slot_counts", self.slot);
}
self.metrics.reset();
self.slot = slot;
}
}
}
impl SchedulerTimingMetricsInner {
fn report(&self, name: &'static str) {
datapoint_info!(
name,
fn report(&self, name: &'static str, slot: Option<Slot>) {
let mut datapoint = create_datapoint!(
@point name,
("decision_time_us", self.decision_time_us, i64),
("receive_time_us", self.receive_time_us, i64),
("buffer_time_us", self.buffer_time_us, i64),
@ -252,6 +315,10 @@ impl SchedulerTimingMetricsInner {
i64
)
);
if let Some(slot) = slot {
datapoint.add_field_i64("slot", slot as i64);
}
solana_metrics::submit(datapoint, log::Level::Info);
}
fn reset(&mut self) {