diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 23e15562e..3ec47a3e1 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -197,20 +197,30 @@ impl PrioGraphScheduler { } /// Receive completed batches of transactions without blocking. + /// Returns (num_transactions, num_retryable_transactions) on success. pub fn receive_completed( &mut self, container: &mut TransactionStateContainer, - ) -> Result<(), SchedulerError> { - while self.try_receive_completed(container)? {} - Ok(()) + ) -> Result<(usize, usize), SchedulerError> { + let mut total_num_transactions = 0; + let mut total_num_retryable = 0; + loop { + let (num_transactions, num_retryable) = self.try_receive_completed(container)?; + if num_transactions == 0 { + break; + } + total_num_transactions += num_transactions; + total_num_retryable += num_retryable; + } + Ok((total_num_transactions, total_num_retryable)) } /// Receive completed batches of transactions. - /// Returns `Ok(true)` if a batch was received, `Ok(false)` if no batch was received. + /// Returns `Ok((num_transactions, num_retryable))` if a batch was received, `Ok((0, 0))` if no batch was received. fn try_receive_completed( &mut self, container: &mut TransactionStateContainer, - ) -> Result { + ) -> Result<(usize, usize), SchedulerError> { match self.finished_consume_work_receiver.try_recv() { Ok(FinishedConsumeWork { work: @@ -222,6 +232,9 @@ impl PrioGraphScheduler { }, retryable_indexes, }) => { + let num_transactions = ids.len(); + let num_retryable = retryable_indexes.len(); + // Free the locks self.complete_batch(batch_id, &transactions); @@ -246,9 +259,9 @@ impl PrioGraphScheduler { container.remove_by_id(&id); } - Ok(true) + Ok((num_transactions, num_retryable)) } - Err(TryRecvError::Empty) => Ok(false), + Err(TryRecvError::Empty) => Ok((0, 0)), Err(TryRecvError::Disconnected) => Err(SchedulerError::DisconnectedRecvChannel( "finished consume work", )), diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 8c1dc4f91..2688e243c 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -15,7 +15,9 @@ use { TOTAL_BUFFERED_PACKETS, }, crossbeam_channel::RecvTimeoutError, + solana_measure::measure_us, solana_runtime::bank_forks::BankForks, + solana_sdk::{saturating_add_assign, timing::AtomicInterval}, std::{ sync::{Arc, RwLock}, time::Duration, @@ -36,6 +38,10 @@ pub(crate) struct SchedulerController { container: TransactionStateContainer, /// State for scheduling and communicating with worker threads. scheduler: PrioGraphScheduler, + /// Metrics tracking counts on transactions in different states. + count_metrics: SchedulerCountMetrics, + /// Metrics tracking time spent in different code sections. + timing_metrics: SchedulerTimingMetrics, } impl SchedulerController { @@ -52,6 +58,8 @@ impl SchedulerController { transaction_id_generator: TransactionIdGenerator::default(), container: TransactionStateContainer::with_capacity(TOTAL_BUFFERED_PACKETS), scheduler, + count_metrics: SchedulerCountMetrics::default(), + timing_metrics: SchedulerTimingMetrics::default(), } } @@ -67,13 +75,21 @@ impl SchedulerController { // `Forward` will drop packets from the buffer instead of forwarding. // During receiving, since packets would be dropped from buffer anyway, we can // bypass sanitization and buffering and immediately drop the packets. - let decision = self.decision_maker.make_consume_or_forward_decision(); + let (decision, decision_time_us) = + measure_us!(self.decision_maker.make_consume_or_forward_decision()); + saturating_add_assign!(self.timing_metrics.decision_time_us, decision_time_us); self.process_transactions(&decision)?; - self.scheduler.receive_completed(&mut self.container)?; - if !self.receive_packets(&decision) { + self.receive_completed()?; + if !self.receive_and_buffer_packets(&decision) { break; } + + // Report metrics only if there is data. + // Reset intervals when appropriate, regardless of report. + let should_report = self.count_metrics.has_data(); + self.count_metrics.maybe_report_and_reset(should_report); + self.timing_metrics.maybe_report_and_reset(should_report); } Ok(()) @@ -86,10 +102,14 @@ impl SchedulerController { ) -> Result<(), SchedulerError> { match decision { BufferedPacketsDecision::Consume(_bank_start) => { - let _num_scheduled = self.scheduler.schedule(&mut self.container)?; + let (num_scheduled, schedule_time_us) = + measure_us!(self.scheduler.schedule(&mut self.container)?); + saturating_add_assign!(self.count_metrics.num_scheduled, num_scheduled); + saturating_add_assign!(self.timing_metrics.schedule_time_us, schedule_time_us); } BufferedPacketsDecision::Forward => { - self.clear_container(); + let (_, clear_time_us) = measure_us!(self.clear_container()); + saturating_add_assign!(self.timing_metrics.clear_time_us, clear_time_us); } BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {} } @@ -102,11 +122,25 @@ impl SchedulerController { fn clear_container(&mut self) { while let Some(id) = self.container.pop() { self.container.remove_by_id(&id.id); + saturating_add_assign!(self.count_metrics.num_dropped_on_clear, 1); } } + /// Receives completed transactions from the workers and updates metrics. + fn receive_completed(&mut self) -> Result<(), SchedulerError> { + let ((num_transactions, num_retryable), receive_completed_time_us) = + measure_us!(self.scheduler.receive_completed(&mut self.container)?); + saturating_add_assign!(self.count_metrics.num_finished, num_transactions); + saturating_add_assign!(self.count_metrics.num_retryable, num_retryable); + saturating_add_assign!( + self.timing_metrics.receive_completed_time_us, + receive_completed_time_us + ); + Ok(()) + } + /// Returns whether the packet receiver is still connected. - fn receive_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { + fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { let remaining_queue_capacity = self.container.remaining_queue_capacity(); const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(100); @@ -125,17 +159,29 @@ impl SchedulerController { } }; - let received_packet_results = self + let (received_packet_results, receive_time_us) = measure_us!(self .packet_receiver - .receive_packets(recv_timeout, remaining_queue_capacity); + .receive_packets(recv_timeout, remaining_queue_capacity)); + saturating_add_assign!(self.timing_metrics.receive_time_us, receive_time_us); - match (received_packet_results, should_buffer) { - (Ok(receive_packet_results), true) => { - self.buffer_packets(receive_packet_results.deserialized_packets) + match received_packet_results { + Ok(receive_packet_results) => { + let num_received_packets = receive_packet_results.deserialized_packets.len(); + saturating_add_assign!(self.count_metrics.num_received, num_received_packets); + if should_buffer { + let (_, buffer_time_us) = measure_us!( + self.buffer_packets(receive_packet_results.deserialized_packets) + ); + saturating_add_assign!(self.timing_metrics.buffer_time_us, buffer_time_us); + } else { + saturating_add_assign!( + self.count_metrics.num_dropped_on_receive, + num_received_packets + ); + } } - (Ok(receive_packet_results), false) => drop(receive_packet_results), - (Err(RecvTimeoutError::Timeout), _) => {} - (Err(RecvTimeoutError::Disconnected), _) => return false, + Err(RecvTimeoutError::Timeout) => {} + Err(RecvTimeoutError::Disconnected) => return false, } true @@ -151,6 +197,7 @@ impl SchedulerController { let Some(transaction) = packet.build_sanitized_transaction(feature_set, vote_only, bank.as_ref()) else { + saturating_add_assign!(self.count_metrics.num_dropped_on_sanitization, 1); continue; }; @@ -160,15 +207,151 @@ impl SchedulerController { max_age_slot: last_slot_in_epoch, }; let transaction_priority_details = packet.priority_details(); - self.container.insert_new_transaction( + if self.container.insert_new_transaction( transaction_id, transaction_ttl, transaction_priority_details, - ); + ) { + saturating_add_assign!(self.count_metrics.num_dropped_on_capacity, 1); + } + saturating_add_assign!(self.count_metrics.num_buffered, 1); } } } +#[derive(Default)] +struct SchedulerCountMetrics { + interval: AtomicInterval, + + /// Number of packets received. + num_received: usize, + /// Number of packets buffered. + num_buffered: usize, + + /// Number of transactions scheduled. + num_scheduled: usize, + /// Number of completed transactions received from workers. + num_finished: usize, + /// Number of transactions that were retryable. + num_retryable: usize, + + /// Number of transactions that were immediately dropped on receive. + num_dropped_on_receive: usize, + /// Number of transactions that were dropped due to sanitization failure. + num_dropped_on_sanitization: usize, + /// Number of transactions that were dropped due to clearing. + num_dropped_on_clear: usize, + /// Number of transactions that were dropped due to exceeded capacity. + num_dropped_on_capacity: usize, +} + +impl SchedulerCountMetrics { + fn maybe_report_and_reset(&mut self, should_report: bool) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) { + if should_report { + self.report(); + } + self.reset(); + } + } + + fn report(&self) { + datapoint_info!( + "banking_stage_scheduler_counts", + ("num_received", self.num_received, i64), + ("num_buffered", self.num_buffered, i64), + ("num_scheduled", self.num_scheduled, i64), + ("num_finished", self.num_finished, i64), + ("num_retryable", self.num_retryable, i64), + ("num_dropped_on_receive", self.num_dropped_on_receive, i64), + ( + "num_dropped_on_sanitization", + self.num_dropped_on_sanitization, + i64 + ), + ("num_dropped_on_clear", self.num_dropped_on_clear, i64), + ("num_dropped_on_capacity", self.num_dropped_on_capacity, i64) + ); + } + + fn has_data(&self) -> bool { + self.num_received != 0 + || self.num_buffered != 0 + || self.num_scheduled != 0 + || self.num_finished != 0 + || self.num_retryable != 0 + || self.num_dropped_on_receive != 0 + || self.num_dropped_on_sanitization != 0 + || self.num_dropped_on_clear != 0 + || self.num_dropped_on_capacity != 0 + } + + fn reset(&mut self) { + self.num_received = 0; + self.num_buffered = 0; + self.num_scheduled = 0; + self.num_finished = 0; + self.num_retryable = 0; + self.num_dropped_on_receive = 0; + self.num_dropped_on_sanitization = 0; + self.num_dropped_on_clear = 0; + self.num_dropped_on_capacity = 0; + } +} + +#[derive(Default)] +struct SchedulerTimingMetrics { + interval: AtomicInterval, + /// Time spent making processing decisions. + decision_time_us: u64, + /// Time spent receiving packets. + receive_time_us: u64, + /// Time spent buffering packets. + buffer_time_us: u64, + /// Time spent scheduling transactions. + schedule_time_us: u64, + /// Time spent clearing transactions from the container. + clear_time_us: u64, + /// Time spent receiving completed transactions. + receive_completed_time_us: u64, +} + +impl SchedulerTimingMetrics { + fn maybe_report_and_reset(&mut self, should_report: bool) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) { + if should_report { + self.report(); + } + self.reset(); + } + } + + fn report(&self) { + datapoint_info!( + "banking_stage_scheduler_timing", + ("decision_time", self.decision_time_us, i64), + ("receive_time", self.receive_time_us, i64), + ("buffer_time", self.buffer_time_us, i64), + ("schedule_time", self.schedule_time_us, i64), + ("clear_time", self.clear_time_us, i64), + ( + "receive_completed_time", + self.receive_completed_time_us, + i64 + ) + ); + } + + fn reset(&mut self) { + self.receive_time_us = 0; + self.buffer_time_us = 0; + self.schedule_time_us = 0; + self.receive_completed_time_us = 0; + } +} + #[cfg(test)] mod tests { use { diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 768076533..10401a88e 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -119,12 +119,13 @@ impl TransactionStateContainer { } /// Insert a new transaction into the container's queues and maps. + /// Returns `true` if a packet was dropped due to capacity limits. pub(crate) fn insert_new_transaction( &mut self, transaction_id: TransactionId, transaction_ttl: SanitizedTransactionTTL, transaction_priority_details: TransactionPriorityDetails, - ) { + ) -> bool { let priority_id = TransactionPriorityId::new(transaction_priority_details.priority, transaction_id); self.id_to_transaction_state.insert( @@ -151,12 +152,15 @@ impl TransactionStateContainer { /// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority /// transaction will be dropped (removed from the queue and map). - pub(crate) fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) { + /// Returns `true` if a packet was dropped due to capacity limits. + pub(crate) fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool { if self.remaining_queue_capacity() == 0 { let popped_id = self.priority_queue.push_pop_min(priority_id); self.remove_by_id(&popped_id.id); + true } else { self.priority_queue.push(priority_id); + false } }