TransactionScheduler: Scheduler Count and Timing metrics (#33893)
This commit is contained in:
parent
ae4b62c6f5
commit
9bb82a3901
|
@ -197,20 +197,30 @@ impl PrioGraphScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive completed batches of transactions without blocking.
|
/// Receive completed batches of transactions without blocking.
|
||||||
|
/// Returns (num_transactions, num_retryable_transactions) on success.
|
||||||
pub fn receive_completed(
|
pub fn receive_completed(
|
||||||
&mut self,
|
&mut self,
|
||||||
container: &mut TransactionStateContainer,
|
container: &mut TransactionStateContainer,
|
||||||
) -> Result<(), SchedulerError> {
|
) -> Result<(usize, usize), SchedulerError> {
|
||||||
while self.try_receive_completed(container)? {}
|
let mut total_num_transactions = 0;
|
||||||
Ok(())
|
let mut total_num_retryable = 0;
|
||||||
|
loop {
|
||||||
|
let (num_transactions, num_retryable) = self.try_receive_completed(container)?;
|
||||||
|
if num_transactions == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
total_num_transactions += num_transactions;
|
||||||
|
total_num_retryable += num_retryable;
|
||||||
|
}
|
||||||
|
Ok((total_num_transactions, total_num_retryable))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive completed batches of transactions.
|
/// Receive completed batches of transactions.
|
||||||
/// Returns `Ok(true)` if a batch was received, `Ok(false)` if no batch was received.
|
/// Returns `Ok((num_transactions, num_retryable))` if a batch was received, `Ok((0, 0))` if no batch was received.
|
||||||
fn try_receive_completed(
|
fn try_receive_completed(
|
||||||
&mut self,
|
&mut self,
|
||||||
container: &mut TransactionStateContainer,
|
container: &mut TransactionStateContainer,
|
||||||
) -> Result<bool, SchedulerError> {
|
) -> Result<(usize, usize), SchedulerError> {
|
||||||
match self.finished_consume_work_receiver.try_recv() {
|
match self.finished_consume_work_receiver.try_recv() {
|
||||||
Ok(FinishedConsumeWork {
|
Ok(FinishedConsumeWork {
|
||||||
work:
|
work:
|
||||||
|
@ -222,6 +232,9 @@ impl PrioGraphScheduler {
|
||||||
},
|
},
|
||||||
retryable_indexes,
|
retryable_indexes,
|
||||||
}) => {
|
}) => {
|
||||||
|
let num_transactions = ids.len();
|
||||||
|
let num_retryable = retryable_indexes.len();
|
||||||
|
|
||||||
// Free the locks
|
// Free the locks
|
||||||
self.complete_batch(batch_id, &transactions);
|
self.complete_batch(batch_id, &transactions);
|
||||||
|
|
||||||
|
@ -246,9 +259,9 @@ impl PrioGraphScheduler {
|
||||||
container.remove_by_id(&id);
|
container.remove_by_id(&id);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(true)
|
Ok((num_transactions, num_retryable))
|
||||||
}
|
}
|
||||||
Err(TryRecvError::Empty) => Ok(false),
|
Err(TryRecvError::Empty) => Ok((0, 0)),
|
||||||
Err(TryRecvError::Disconnected) => Err(SchedulerError::DisconnectedRecvChannel(
|
Err(TryRecvError::Disconnected) => Err(SchedulerError::DisconnectedRecvChannel(
|
||||||
"finished consume work",
|
"finished consume work",
|
||||||
)),
|
)),
|
||||||
|
|
|
@ -15,7 +15,9 @@ use {
|
||||||
TOTAL_BUFFERED_PACKETS,
|
TOTAL_BUFFERED_PACKETS,
|
||||||
},
|
},
|
||||||
crossbeam_channel::RecvTimeoutError,
|
crossbeam_channel::RecvTimeoutError,
|
||||||
|
solana_measure::measure_us,
|
||||||
solana_runtime::bank_forks::BankForks,
|
solana_runtime::bank_forks::BankForks,
|
||||||
|
solana_sdk::{saturating_add_assign, timing::AtomicInterval},
|
||||||
std::{
|
std::{
|
||||||
sync::{Arc, RwLock},
|
sync::{Arc, RwLock},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
|
@ -36,6 +38,10 @@ pub(crate) struct SchedulerController {
|
||||||
container: TransactionStateContainer,
|
container: TransactionStateContainer,
|
||||||
/// State for scheduling and communicating with worker threads.
|
/// State for scheduling and communicating with worker threads.
|
||||||
scheduler: PrioGraphScheduler,
|
scheduler: PrioGraphScheduler,
|
||||||
|
/// Metrics tracking counts on transactions in different states.
|
||||||
|
count_metrics: SchedulerCountMetrics,
|
||||||
|
/// Metrics tracking time spent in different code sections.
|
||||||
|
timing_metrics: SchedulerTimingMetrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SchedulerController {
|
impl SchedulerController {
|
||||||
|
@ -52,6 +58,8 @@ impl SchedulerController {
|
||||||
transaction_id_generator: TransactionIdGenerator::default(),
|
transaction_id_generator: TransactionIdGenerator::default(),
|
||||||
container: TransactionStateContainer::with_capacity(TOTAL_BUFFERED_PACKETS),
|
container: TransactionStateContainer::with_capacity(TOTAL_BUFFERED_PACKETS),
|
||||||
scheduler,
|
scheduler,
|
||||||
|
count_metrics: SchedulerCountMetrics::default(),
|
||||||
|
timing_metrics: SchedulerTimingMetrics::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,13 +75,21 @@ impl SchedulerController {
|
||||||
// `Forward` will drop packets from the buffer instead of forwarding.
|
// `Forward` will drop packets from the buffer instead of forwarding.
|
||||||
// During receiving, since packets would be dropped from buffer anyway, we can
|
// During receiving, since packets would be dropped from buffer anyway, we can
|
||||||
// bypass sanitization and buffering and immediately drop the packets.
|
// bypass sanitization and buffering and immediately drop the packets.
|
||||||
let decision = self.decision_maker.make_consume_or_forward_decision();
|
let (decision, decision_time_us) =
|
||||||
|
measure_us!(self.decision_maker.make_consume_or_forward_decision());
|
||||||
|
saturating_add_assign!(self.timing_metrics.decision_time_us, decision_time_us);
|
||||||
|
|
||||||
self.process_transactions(&decision)?;
|
self.process_transactions(&decision)?;
|
||||||
self.scheduler.receive_completed(&mut self.container)?;
|
self.receive_completed()?;
|
||||||
if !self.receive_packets(&decision) {
|
if !self.receive_and_buffer_packets(&decision) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Report metrics only if there is data.
|
||||||
|
// Reset intervals when appropriate, regardless of report.
|
||||||
|
let should_report = self.count_metrics.has_data();
|
||||||
|
self.count_metrics.maybe_report_and_reset(should_report);
|
||||||
|
self.timing_metrics.maybe_report_and_reset(should_report);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -86,10 +102,14 @@ impl SchedulerController {
|
||||||
) -> Result<(), SchedulerError> {
|
) -> Result<(), SchedulerError> {
|
||||||
match decision {
|
match decision {
|
||||||
BufferedPacketsDecision::Consume(_bank_start) => {
|
BufferedPacketsDecision::Consume(_bank_start) => {
|
||||||
let _num_scheduled = self.scheduler.schedule(&mut self.container)?;
|
let (num_scheduled, schedule_time_us) =
|
||||||
|
measure_us!(self.scheduler.schedule(&mut self.container)?);
|
||||||
|
saturating_add_assign!(self.count_metrics.num_scheduled, num_scheduled);
|
||||||
|
saturating_add_assign!(self.timing_metrics.schedule_time_us, schedule_time_us);
|
||||||
}
|
}
|
||||||
BufferedPacketsDecision::Forward => {
|
BufferedPacketsDecision::Forward => {
|
||||||
self.clear_container();
|
let (_, clear_time_us) = measure_us!(self.clear_container());
|
||||||
|
saturating_add_assign!(self.timing_metrics.clear_time_us, clear_time_us);
|
||||||
}
|
}
|
||||||
BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {}
|
BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {}
|
||||||
}
|
}
|
||||||
|
@ -102,11 +122,25 @@ impl SchedulerController {
|
||||||
fn clear_container(&mut self) {
|
fn clear_container(&mut self) {
|
||||||
while let Some(id) = self.container.pop() {
|
while let Some(id) = self.container.pop() {
|
||||||
self.container.remove_by_id(&id.id);
|
self.container.remove_by_id(&id.id);
|
||||||
|
saturating_add_assign!(self.count_metrics.num_dropped_on_clear, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Receives completed transactions from the workers and updates metrics.
|
||||||
|
fn receive_completed(&mut self) -> Result<(), SchedulerError> {
|
||||||
|
let ((num_transactions, num_retryable), receive_completed_time_us) =
|
||||||
|
measure_us!(self.scheduler.receive_completed(&mut self.container)?);
|
||||||
|
saturating_add_assign!(self.count_metrics.num_finished, num_transactions);
|
||||||
|
saturating_add_assign!(self.count_metrics.num_retryable, num_retryable);
|
||||||
|
saturating_add_assign!(
|
||||||
|
self.timing_metrics.receive_completed_time_us,
|
||||||
|
receive_completed_time_us
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns whether the packet receiver is still connected.
|
/// Returns whether the packet receiver is still connected.
|
||||||
fn receive_packets(&mut self, decision: &BufferedPacketsDecision) -> bool {
|
fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool {
|
||||||
let remaining_queue_capacity = self.container.remaining_queue_capacity();
|
let remaining_queue_capacity = self.container.remaining_queue_capacity();
|
||||||
|
|
||||||
const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(100);
|
const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(100);
|
||||||
|
@ -125,17 +159,29 @@ impl SchedulerController {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let received_packet_results = self
|
let (received_packet_results, receive_time_us) = measure_us!(self
|
||||||
.packet_receiver
|
.packet_receiver
|
||||||
.receive_packets(recv_timeout, remaining_queue_capacity);
|
.receive_packets(recv_timeout, remaining_queue_capacity));
|
||||||
|
saturating_add_assign!(self.timing_metrics.receive_time_us, receive_time_us);
|
||||||
|
|
||||||
match (received_packet_results, should_buffer) {
|
match received_packet_results {
|
||||||
(Ok(receive_packet_results), true) => {
|
Ok(receive_packet_results) => {
|
||||||
self.buffer_packets(receive_packet_results.deserialized_packets)
|
let num_received_packets = receive_packet_results.deserialized_packets.len();
|
||||||
|
saturating_add_assign!(self.count_metrics.num_received, num_received_packets);
|
||||||
|
if should_buffer {
|
||||||
|
let (_, buffer_time_us) = measure_us!(
|
||||||
|
self.buffer_packets(receive_packet_results.deserialized_packets)
|
||||||
|
);
|
||||||
|
saturating_add_assign!(self.timing_metrics.buffer_time_us, buffer_time_us);
|
||||||
|
} else {
|
||||||
|
saturating_add_assign!(
|
||||||
|
self.count_metrics.num_dropped_on_receive,
|
||||||
|
num_received_packets
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
(Ok(receive_packet_results), false) => drop(receive_packet_results),
|
Err(RecvTimeoutError::Timeout) => {}
|
||||||
(Err(RecvTimeoutError::Timeout), _) => {}
|
Err(RecvTimeoutError::Disconnected) => return false,
|
||||||
(Err(RecvTimeoutError::Disconnected), _) => return false,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
true
|
true
|
||||||
|
@ -151,6 +197,7 @@ impl SchedulerController {
|
||||||
let Some(transaction) =
|
let Some(transaction) =
|
||||||
packet.build_sanitized_transaction(feature_set, vote_only, bank.as_ref())
|
packet.build_sanitized_transaction(feature_set, vote_only, bank.as_ref())
|
||||||
else {
|
else {
|
||||||
|
saturating_add_assign!(self.count_metrics.num_dropped_on_sanitization, 1);
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -160,15 +207,151 @@ impl SchedulerController {
|
||||||
max_age_slot: last_slot_in_epoch,
|
max_age_slot: last_slot_in_epoch,
|
||||||
};
|
};
|
||||||
let transaction_priority_details = packet.priority_details();
|
let transaction_priority_details = packet.priority_details();
|
||||||
self.container.insert_new_transaction(
|
if self.container.insert_new_transaction(
|
||||||
transaction_id,
|
transaction_id,
|
||||||
transaction_ttl,
|
transaction_ttl,
|
||||||
transaction_priority_details,
|
transaction_priority_details,
|
||||||
);
|
) {
|
||||||
|
saturating_add_assign!(self.count_metrics.num_dropped_on_capacity, 1);
|
||||||
|
}
|
||||||
|
saturating_add_assign!(self.count_metrics.num_buffered, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct SchedulerCountMetrics {
|
||||||
|
interval: AtomicInterval,
|
||||||
|
|
||||||
|
/// Number of packets received.
|
||||||
|
num_received: usize,
|
||||||
|
/// Number of packets buffered.
|
||||||
|
num_buffered: usize,
|
||||||
|
|
||||||
|
/// Number of transactions scheduled.
|
||||||
|
num_scheduled: usize,
|
||||||
|
/// Number of completed transactions received from workers.
|
||||||
|
num_finished: usize,
|
||||||
|
/// Number of transactions that were retryable.
|
||||||
|
num_retryable: usize,
|
||||||
|
|
||||||
|
/// Number of transactions that were immediately dropped on receive.
|
||||||
|
num_dropped_on_receive: usize,
|
||||||
|
/// Number of transactions that were dropped due to sanitization failure.
|
||||||
|
num_dropped_on_sanitization: usize,
|
||||||
|
/// Number of transactions that were dropped due to clearing.
|
||||||
|
num_dropped_on_clear: usize,
|
||||||
|
/// Number of transactions that were dropped due to exceeded capacity.
|
||||||
|
num_dropped_on_capacity: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchedulerCountMetrics {
|
||||||
|
fn maybe_report_and_reset(&mut self, should_report: bool) {
|
||||||
|
const REPORT_INTERVAL_MS: u64 = 1000;
|
||||||
|
if self.interval.should_update(REPORT_INTERVAL_MS) {
|
||||||
|
if should_report {
|
||||||
|
self.report();
|
||||||
|
}
|
||||||
|
self.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report(&self) {
|
||||||
|
datapoint_info!(
|
||||||
|
"banking_stage_scheduler_counts",
|
||||||
|
("num_received", self.num_received, i64),
|
||||||
|
("num_buffered", self.num_buffered, i64),
|
||||||
|
("num_scheduled", self.num_scheduled, i64),
|
||||||
|
("num_finished", self.num_finished, i64),
|
||||||
|
("num_retryable", self.num_retryable, i64),
|
||||||
|
("num_dropped_on_receive", self.num_dropped_on_receive, i64),
|
||||||
|
(
|
||||||
|
"num_dropped_on_sanitization",
|
||||||
|
self.num_dropped_on_sanitization,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
("num_dropped_on_clear", self.num_dropped_on_clear, i64),
|
||||||
|
("num_dropped_on_capacity", self.num_dropped_on_capacity, i64)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_data(&self) -> bool {
|
||||||
|
self.num_received != 0
|
||||||
|
|| self.num_buffered != 0
|
||||||
|
|| self.num_scheduled != 0
|
||||||
|
|| self.num_finished != 0
|
||||||
|
|| self.num_retryable != 0
|
||||||
|
|| self.num_dropped_on_receive != 0
|
||||||
|
|| self.num_dropped_on_sanitization != 0
|
||||||
|
|| self.num_dropped_on_clear != 0
|
||||||
|
|| self.num_dropped_on_capacity != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&mut self) {
|
||||||
|
self.num_received = 0;
|
||||||
|
self.num_buffered = 0;
|
||||||
|
self.num_scheduled = 0;
|
||||||
|
self.num_finished = 0;
|
||||||
|
self.num_retryable = 0;
|
||||||
|
self.num_dropped_on_receive = 0;
|
||||||
|
self.num_dropped_on_sanitization = 0;
|
||||||
|
self.num_dropped_on_clear = 0;
|
||||||
|
self.num_dropped_on_capacity = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct SchedulerTimingMetrics {
|
||||||
|
interval: AtomicInterval,
|
||||||
|
/// Time spent making processing decisions.
|
||||||
|
decision_time_us: u64,
|
||||||
|
/// Time spent receiving packets.
|
||||||
|
receive_time_us: u64,
|
||||||
|
/// Time spent buffering packets.
|
||||||
|
buffer_time_us: u64,
|
||||||
|
/// Time spent scheduling transactions.
|
||||||
|
schedule_time_us: u64,
|
||||||
|
/// Time spent clearing transactions from the container.
|
||||||
|
clear_time_us: u64,
|
||||||
|
/// Time spent receiving completed transactions.
|
||||||
|
receive_completed_time_us: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SchedulerTimingMetrics {
|
||||||
|
fn maybe_report_and_reset(&mut self, should_report: bool) {
|
||||||
|
const REPORT_INTERVAL_MS: u64 = 1000;
|
||||||
|
if self.interval.should_update(REPORT_INTERVAL_MS) {
|
||||||
|
if should_report {
|
||||||
|
self.report();
|
||||||
|
}
|
||||||
|
self.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report(&self) {
|
||||||
|
datapoint_info!(
|
||||||
|
"banking_stage_scheduler_timing",
|
||||||
|
("decision_time", self.decision_time_us, i64),
|
||||||
|
("receive_time", self.receive_time_us, i64),
|
||||||
|
("buffer_time", self.buffer_time_us, i64),
|
||||||
|
("schedule_time", self.schedule_time_us, i64),
|
||||||
|
("clear_time", self.clear_time_us, i64),
|
||||||
|
(
|
||||||
|
"receive_completed_time",
|
||||||
|
self.receive_completed_time_us,
|
||||||
|
i64
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&mut self) {
|
||||||
|
self.receive_time_us = 0;
|
||||||
|
self.buffer_time_us = 0;
|
||||||
|
self.schedule_time_us = 0;
|
||||||
|
self.receive_completed_time_us = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use {
|
use {
|
||||||
|
|
|
@ -119,12 +119,13 @@ impl TransactionStateContainer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert a new transaction into the container's queues and maps.
|
/// Insert a new transaction into the container's queues and maps.
|
||||||
|
/// Returns `true` if a packet was dropped due to capacity limits.
|
||||||
pub(crate) fn insert_new_transaction(
|
pub(crate) fn insert_new_transaction(
|
||||||
&mut self,
|
&mut self,
|
||||||
transaction_id: TransactionId,
|
transaction_id: TransactionId,
|
||||||
transaction_ttl: SanitizedTransactionTTL,
|
transaction_ttl: SanitizedTransactionTTL,
|
||||||
transaction_priority_details: TransactionPriorityDetails,
|
transaction_priority_details: TransactionPriorityDetails,
|
||||||
) {
|
) -> bool {
|
||||||
let priority_id =
|
let priority_id =
|
||||||
TransactionPriorityId::new(transaction_priority_details.priority, transaction_id);
|
TransactionPriorityId::new(transaction_priority_details.priority, transaction_id);
|
||||||
self.id_to_transaction_state.insert(
|
self.id_to_transaction_state.insert(
|
||||||
|
@ -151,12 +152,15 @@ impl TransactionStateContainer {
|
||||||
|
|
||||||
/// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority
|
/// Pushes a transaction id into the priority queue. If the queue is full, the lowest priority
|
||||||
/// transaction will be dropped (removed from the queue and map).
|
/// transaction will be dropped (removed from the queue and map).
|
||||||
pub(crate) fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) {
|
/// Returns `true` if a packet was dropped due to capacity limits.
|
||||||
|
pub(crate) fn push_id_into_queue(&mut self, priority_id: TransactionPriorityId) -> bool {
|
||||||
if self.remaining_queue_capacity() == 0 {
|
if self.remaining_queue_capacity() == 0 {
|
||||||
let popped_id = self.priority_queue.push_pop_min(priority_id);
|
let popped_id = self.priority_queue.push_pop_min(priority_id);
|
||||||
self.remove_by_id(&popped_id.id);
|
self.remove_by_id(&popped_id.id);
|
||||||
|
true
|
||||||
} else {
|
} else {
|
||||||
self.priority_queue.push(priority_id);
|
self.priority_queue.push(priority_id);
|
||||||
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue