From 8a298f1628cb00df1b429acb3fb573ba3491193a Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 20 Nov 2023 10:46:04 -0800 Subject: [PATCH] TransactionScheduler: detailed consume worker metrics (#33895) --- core/src/banking_stage.rs | 4 + core/src/banking_stage/consume_worker.rs | 501 +++++++++++++++++- core/src/banking_stage/consumer.rs | 14 +- .../scheduler_controller.rs | 9 + 4 files changed, 510 insertions(+), 18 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e4e5f3125..bf052ab5c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -556,9 +556,11 @@ impl BankingStage { let (finished_work_sender, finished_work_receiver) = unbounded(); // Spawn the worker threads + let mut worker_metrics = Vec::with_capacity(num_workers as usize); for (index, work_receiver) in work_receivers.into_iter().enumerate() { let id = (index as u32).saturating_add(NUM_VOTE_PROCESSING_THREADS); let consume_worker = ConsumeWorker::new( + id, work_receiver, Consumer::new( committer.clone(), @@ -570,6 +572,7 @@ impl BankingStage { poh_recorder.read().unwrap().new_leader_bank_notifier(), ); + worker_metrics.push(consume_worker.metrics_handle()); bank_thread_hdls.push( Builder::new() .name(format!("solCoWorker{id:02}")) @@ -590,6 +593,7 @@ impl BankingStage { packet_deserializer, bank_forks, scheduler, + worker_metrics, ); Builder::new() .name("solBnkTxSched".to_string()) diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index f18f3da5d..edcefbd61 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -1,12 +1,21 @@ use { super::{ consumer::{Consumer, ExecuteAndCommitTransactionsOutput, ProcessTransactionBatchOutput}, + leader_slot_timing_metrics::LeaderExecuteAndCommitTimings, scheduler_messages::{ConsumeWork, FinishedConsumeWork}, }, crossbeam_channel::{Receiver, RecvError, SendError, Sender}, + solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics, solana_poh::leader_bank_notifier::LeaderBankNotifier, solana_runtime::bank::Bank, - std::{sync::Arc, time::Duration}, + solana_sdk::timing::AtomicInterval, + std::{ + sync::{ + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + Arc, + }, + time::Duration, + }, thiserror::Error, }; @@ -24,11 +33,13 @@ pub(crate) struct ConsumeWorker { consumed_sender: Sender, leader_bank_notifier: Arc, + metrics: Arc, } #[allow(dead_code)] impl ConsumeWorker { pub fn new( + id: u32, consume_receiver: Receiver, consumer: Consumer, consumed_sender: Sender, @@ -39,9 +50,14 @@ impl ConsumeWorker { consumer, consumed_sender, leader_bank_notifier, + metrics: Arc::new(ConsumeWorkerMetrics::new(id)), } } + pub fn metrics_handle(&self) -> Arc { + self.metrics.clone() + } + pub fn run(self) -> Result<(), ConsumeWorkerError> { loop { let work = self.consume_receiver.recv()?; @@ -70,22 +86,20 @@ impl ConsumeWorker { /// Consume a single batch. fn consume(&self, bank: &Arc, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { - let ProcessTransactionBatchOutput { - execute_and_commit_transactions_output: - ExecuteAndCommitTransactionsOutput { - retryable_transaction_indexes, - .. - }, - .. - } = self.consumer.process_and_record_aged_transactions( + let output = self.consumer.process_and_record_aged_transactions( bank, &work.transactions, &work.max_age_slots, ); + self.metrics.update_for_consume(&output); + self.metrics.has_data.store(true, Ordering::Relaxed); + self.consumed_sender.send(FinishedConsumeWork { work, - retryable_indexes: retryable_transaction_indexes, + retryable_indexes: output + .execute_and_commit_transactions_output + .retryable_transaction_indexes, })?; Ok(()) } @@ -107,7 +121,17 @@ impl ConsumeWorker { /// Send transactions back to scheduler as retryable. fn retry(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { - let retryable_indexes = (0..work.transactions.len()).collect(); + let retryable_indexes: Vec<_> = (0..work.transactions.len()).collect(); + let num_retryable = retryable_indexes.len(); + self.metrics + .count_metrics + .retryable_transaction_count + .fetch_add(num_retryable, Ordering::Relaxed); + self.metrics + .count_metrics + .retryable_expired_bank_count + .fetch_add(num_retryable, Ordering::Relaxed); + self.metrics.has_data.store(true, Ordering::Relaxed); self.consumed_sender.send(FinishedConsumeWork { work, retryable_indexes, @@ -122,6 +146,460 @@ fn try_drain_iter(work: T, receiver: &Receiver) -> impl Iterator std::iter::once(work).chain(receiver.try_iter()) } +/// Metrics tracking number of packets processed by the consume worker. +/// These are atomic, and intended to be reported by the scheduling thread +/// since the consume worker thread is sleeping unless there is work to be +/// done. +pub(crate) struct ConsumeWorkerMetrics { + id: u32, + interval: AtomicInterval, + has_data: AtomicBool, + + count_metrics: ConsumeWorkerCountMetrics, + error_metrics: ConsumeWorkerTransactionErrorMetrics, + timing_metrics: ConsumeWorkerTimingMetrics, +} + +impl ConsumeWorkerMetrics { + /// Report and reset metrics iff the interval has elapsed and the worker did some work. + pub fn maybe_report_and_reset(&self) { + const REPORT_INTERVAL_MS: u64 = 1000; + if self.interval.should_update(REPORT_INTERVAL_MS) + && self.has_data.swap(false, Ordering::Relaxed) + { + self.count_metrics.report_and_reset(self.id); + self.timing_metrics.report_and_reset(self.id); + self.error_metrics.report_and_reset(self.id); + } + } + + fn new(id: u32) -> Self { + Self { + id, + interval: AtomicInterval::default(), + has_data: AtomicBool::new(false), + count_metrics: ConsumeWorkerCountMetrics::default(), + error_metrics: ConsumeWorkerTransactionErrorMetrics::default(), + timing_metrics: ConsumeWorkerTimingMetrics::default(), + } + } + + fn update_for_consume( + &self, + ProcessTransactionBatchOutput { + cost_model_throttled_transactions_count, + cost_model_us, + execute_and_commit_transactions_output, + }: &ProcessTransactionBatchOutput, + ) { + self.count_metrics + .cost_model_throttled_transactions_count + .fetch_add(*cost_model_throttled_transactions_count, Ordering::Relaxed); + self.timing_metrics + .cost_model_us + .fetch_add(*cost_model_us, Ordering::Relaxed); + self.update_on_execute_and_commit_transactions_output( + execute_and_commit_transactions_output, + ); + } + + fn update_on_execute_and_commit_transactions_output( + &self, + ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + retryable_transaction_indexes, + execute_and_commit_timings, + error_counters, + .. + }: &ExecuteAndCommitTransactionsOutput, + ) { + self.count_metrics + .transactions_attempted_execution_count + .fetch_add(*transactions_attempted_execution_count, Ordering::Relaxed); + self.count_metrics + .executed_transactions_count + .fetch_add(*executed_transactions_count, Ordering::Relaxed); + self.count_metrics + .executed_with_successful_result_count + .fetch_add(*executed_with_successful_result_count, Ordering::Relaxed); + self.count_metrics + .retryable_transaction_count + .fetch_add(retryable_transaction_indexes.len(), Ordering::Relaxed); + + self.update_on_execute_and_commit_timings(execute_and_commit_timings); + self.update_on_error_counters(error_counters); + } + + fn update_on_execute_and_commit_timings( + &self, + LeaderExecuteAndCommitTimings { + collect_balances_us, + load_execute_us, + freeze_lock_us, + record_us, + commit_us, + find_and_send_votes_us, + .. + }: &LeaderExecuteAndCommitTimings, + ) { + self.timing_metrics + .collect_balances_us + .fetch_add(*collect_balances_us, Ordering::Relaxed); + self.timing_metrics + .load_execute_us + .fetch_add(*load_execute_us, Ordering::Relaxed); + self.timing_metrics + .freeze_lock_us + .fetch_add(*freeze_lock_us, Ordering::Relaxed); + self.timing_metrics + .record_us + .fetch_add(*record_us, Ordering::Relaxed); + self.timing_metrics + .commit_us + .fetch_add(*commit_us, Ordering::Relaxed); + self.timing_metrics + .find_and_send_votes_us + .fetch_add(*find_and_send_votes_us, Ordering::Relaxed); + } + + fn update_on_error_counters( + &self, + TransactionErrorMetrics { + total, + account_in_use, + too_many_account_locks, + account_loaded_twice, + account_not_found, + blockhash_not_found, + blockhash_too_old, + call_chain_too_deep, + already_processed, + instruction_error, + insufficient_funds, + invalid_account_for_fee, + invalid_account_index, + invalid_program_for_execution, + not_allowed_during_cluster_maintenance, + invalid_writable_account, + invalid_rent_paying_account, + would_exceed_max_block_cost_limit, + would_exceed_max_account_cost_limit, + would_exceed_max_vote_cost_limit, + would_exceed_account_data_block_limit, + max_loaded_accounts_data_size_exceeded, + program_execution_temporarily_restricted, + }: &TransactionErrorMetrics, + ) { + self.error_metrics + .total + .fetch_add(*total, Ordering::Relaxed); + self.error_metrics + .account_in_use + .fetch_add(*account_in_use, Ordering::Relaxed); + self.error_metrics + .too_many_account_locks + .fetch_add(*too_many_account_locks, Ordering::Relaxed); + self.error_metrics + .account_loaded_twice + .fetch_add(*account_loaded_twice, Ordering::Relaxed); + self.error_metrics + .account_not_found + .fetch_add(*account_not_found, Ordering::Relaxed); + self.error_metrics + .blockhash_not_found + .fetch_add(*blockhash_not_found, Ordering::Relaxed); + self.error_metrics + .blockhash_too_old + .fetch_add(*blockhash_too_old, Ordering::Relaxed); + self.error_metrics + .call_chain_too_deep + .fetch_add(*call_chain_too_deep, Ordering::Relaxed); + self.error_metrics + .already_processed + .fetch_add(*already_processed, Ordering::Relaxed); + self.error_metrics + .instruction_error + .fetch_add(*instruction_error, Ordering::Relaxed); + self.error_metrics + .insufficient_funds + .fetch_add(*insufficient_funds, Ordering::Relaxed); + self.error_metrics + .invalid_account_for_fee + .fetch_add(*invalid_account_for_fee, Ordering::Relaxed); + self.error_metrics + .invalid_account_index + .fetch_add(*invalid_account_index, Ordering::Relaxed); + self.error_metrics + .invalid_program_for_execution + .fetch_add(*invalid_program_for_execution, Ordering::Relaxed); + self.error_metrics + .not_allowed_during_cluster_maintenance + .fetch_add(*not_allowed_during_cluster_maintenance, Ordering::Relaxed); + self.error_metrics + .invalid_writable_account + .fetch_add(*invalid_writable_account, Ordering::Relaxed); + self.error_metrics + .invalid_rent_paying_account + .fetch_add(*invalid_rent_paying_account, Ordering::Relaxed); + self.error_metrics + .would_exceed_max_block_cost_limit + .fetch_add(*would_exceed_max_block_cost_limit, Ordering::Relaxed); + self.error_metrics + .would_exceed_max_account_cost_limit + .fetch_add(*would_exceed_max_account_cost_limit, Ordering::Relaxed); + self.error_metrics + .would_exceed_max_vote_cost_limit + .fetch_add(*would_exceed_max_vote_cost_limit, Ordering::Relaxed); + self.error_metrics + .would_exceed_account_data_block_limit + .fetch_add(*would_exceed_account_data_block_limit, Ordering::Relaxed); + self.error_metrics + .max_loaded_accounts_data_size_exceeded + .fetch_add(*max_loaded_accounts_data_size_exceeded, Ordering::Relaxed); + self.error_metrics + .program_execution_temporarily_restricted + .fetch_add(*program_execution_temporarily_restricted, Ordering::Relaxed); + } +} + +#[derive(Default)] +struct ConsumeWorkerCountMetrics { + transactions_attempted_execution_count: AtomicUsize, + executed_transactions_count: AtomicUsize, + executed_with_successful_result_count: AtomicUsize, + retryable_transaction_count: AtomicUsize, + retryable_expired_bank_count: AtomicUsize, + cost_model_throttled_transactions_count: AtomicUsize, +} + +impl ConsumeWorkerCountMetrics { + fn report_and_reset(&self, id: u32) { + datapoint_info!( + "banking_stage_worker_counts", + ("id", id, i64), + ( + "transactions_attempted_execution_count", + self.transactions_attempted_execution_count + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "executed_transactions_count", + self.executed_transactions_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "executed_with_successful_result_count", + self.executed_with_successful_result_count + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "retryable_transaction_count", + self.retryable_transaction_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "retryable_expired_bank_count", + self.retryable_expired_bank_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "cost_model_throttled_transactions_count", + self.cost_model_throttled_transactions_count + .swap(0, Ordering::Relaxed), + i64 + ), + ); + } +} + +#[derive(Default)] +struct ConsumeWorkerTimingMetrics { + cost_model_us: AtomicU64, + collect_balances_us: AtomicU64, + load_execute_us: AtomicU64, + freeze_lock_us: AtomicU64, + record_us: AtomicU64, + commit_us: AtomicU64, + find_and_send_votes_us: AtomicU64, +} + +impl ConsumeWorkerTimingMetrics { + fn report_and_reset(&self, id: u32) { + datapoint_info!( + "banking_stage_worker_timing", + ("id", id, i64), + ( + "cost_model_us", + self.cost_model_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "collect_balances_us", + self.collect_balances_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "load_execute_us", + self.load_execute_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "freeze_lock_us", + self.freeze_lock_us.swap(0, Ordering::Relaxed), + i64 + ), + ("record_us", self.record_us.swap(0, Ordering::Relaxed), i64), + ("commit_us", self.commit_us.swap(0, Ordering::Relaxed), i64), + ( + "find_and_send_votes_us", + self.find_and_send_votes_us.swap(0, Ordering::Relaxed), + i64 + ), + ); + } +} + +#[derive(Default)] +struct ConsumeWorkerTransactionErrorMetrics { + total: AtomicUsize, + account_in_use: AtomicUsize, + too_many_account_locks: AtomicUsize, + account_loaded_twice: AtomicUsize, + account_not_found: AtomicUsize, + blockhash_not_found: AtomicUsize, + blockhash_too_old: AtomicUsize, + call_chain_too_deep: AtomicUsize, + already_processed: AtomicUsize, + instruction_error: AtomicUsize, + insufficient_funds: AtomicUsize, + invalid_account_for_fee: AtomicUsize, + invalid_account_index: AtomicUsize, + invalid_program_for_execution: AtomicUsize, + not_allowed_during_cluster_maintenance: AtomicUsize, + invalid_writable_account: AtomicUsize, + invalid_rent_paying_account: AtomicUsize, + would_exceed_max_block_cost_limit: AtomicUsize, + would_exceed_max_account_cost_limit: AtomicUsize, + would_exceed_max_vote_cost_limit: AtomicUsize, + would_exceed_account_data_block_limit: AtomicUsize, + max_loaded_accounts_data_size_exceeded: AtomicUsize, + program_execution_temporarily_restricted: AtomicUsize, +} + +impl ConsumeWorkerTransactionErrorMetrics { + fn report_and_reset(&self, id: u32) { + datapoint_info!( + "banking_stage_worker_error_metrics", + ("id", id, i64), + ("total", self.total.swap(0, Ordering::Relaxed), i64), + ( + "account_in_use", + self.account_in_use.swap(0, Ordering::Relaxed), + i64 + ), + ( + "too_many_account_locks", + self.too_many_account_locks.swap(0, Ordering::Relaxed), + i64 + ), + ( + "account_loaded_twice", + self.account_loaded_twice.swap(0, Ordering::Relaxed), + i64 + ), + ( + "account_not_found", + self.account_not_found.swap(0, Ordering::Relaxed), + i64 + ), + ( + "blockhash_not_found", + self.blockhash_not_found.swap(0, Ordering::Relaxed), + i64 + ), + ( + "blockhash_too_old", + self.blockhash_too_old.swap(0, Ordering::Relaxed), + i64 + ), + ( + "call_chain_too_deep", + self.call_chain_too_deep.swap(0, Ordering::Relaxed), + i64 + ), + ( + "already_processed", + self.already_processed.swap(0, Ordering::Relaxed), + i64 + ), + ( + "instruction_error", + self.instruction_error.swap(0, Ordering::Relaxed), + i64 + ), + ( + "insufficient_funds", + self.insufficient_funds.swap(0, Ordering::Relaxed), + i64 + ), + ( + "invalid_account_for_fee", + self.invalid_account_for_fee.swap(0, Ordering::Relaxed), + i64 + ), + ( + "invalid_account_index", + self.invalid_account_index.swap(0, Ordering::Relaxed), + i64 + ), + ( + "invalid_program_for_execution", + self.invalid_program_for_execution + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "not_allowed_during_cluster_maintenance", + self.not_allowed_during_cluster_maintenance + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "invalid_writable_account", + self.invalid_writable_account.swap(0, Ordering::Relaxed), + i64 + ), + ( + "invalid_rent_paying_account", + self.invalid_rent_paying_account.swap(0, Ordering::Relaxed), + i64 + ), + ( + "would_exceed_max_block_cost_limit", + self.would_exceed_max_block_cost_limit + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "would_exceed_max_account_cost_limit", + self.would_exceed_max_account_cost_limit + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "would_exceed_max_vote_cost_limit", + self.would_exceed_max_vote_cost_limit + .swap(0, Ordering::Relaxed), + i64 + ), + ); + } +} + #[cfg(test)] mod tests { use { @@ -207,6 +685,7 @@ mod tests { let (consume_sender, consume_receiver) = unbounded(); let (consumed_sender, consumed_receiver) = unbounded(); let worker = ConsumeWorker::new( + 0, consume_receiver, consumer, consumed_sender, diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 19a3aa515..67134b78c 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -41,29 +41,29 @@ pub const TARGET_NUM_TRANSACTIONS_PER_BATCH: usize = 64; pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model - cost_model_throttled_transactions_count: usize, + pub(crate) cost_model_throttled_transactions_count: usize, // Amount of time spent running the cost model - cost_model_us: u64, + pub(crate) cost_model_us: u64, pub execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput, } pub struct ExecuteAndCommitTransactionsOutput { // Total number of transactions that were passed as candidates for execution - transactions_attempted_execution_count: usize, + pub(crate) transactions_attempted_execution_count: usize, // The number of transactions of that were executed. See description of in `ProcessTransactionsSummary` // for possible outcomes of execution. - executed_transactions_count: usize, + pub(crate) executed_transactions_count: usize, // Total number of the executed transactions that returned success/not // an error. - executed_with_successful_result_count: usize, + pub(crate) executed_with_successful_result_count: usize, // Transactions that either were not executed, or were executed and failed to be committed due // to the block ending. pub(crate) retryable_transaction_indexes: Vec, // A result that indicates whether transactions were successfully // committed into the Poh stream. pub commit_transactions_result: Result, PohRecorderError>, - execute_and_commit_timings: LeaderExecuteAndCommitTimings, - error_counters: TransactionErrorMetrics, + pub(crate) execute_and_commit_timings: LeaderExecuteAndCommitTimings, + pub(crate) error_counters: TransactionErrorMetrics, } pub struct Consumer { diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 2688e243c..d268bb1e0 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -9,6 +9,7 @@ use { transaction_state_container::TransactionStateContainer, }, crate::banking_stage::{ + consume_worker::ConsumeWorkerMetrics, decision_maker::{BufferedPacketsDecision, DecisionMaker}, immutable_deserialized_packet::ImmutableDeserializedPacket, packet_deserializer::PacketDeserializer, @@ -42,6 +43,8 @@ pub(crate) struct SchedulerController { count_metrics: SchedulerCountMetrics, /// Metrics tracking time spent in different code sections. timing_metrics: SchedulerTimingMetrics, + /// Metric report handles for the worker threads. + worker_metrics: Vec>, } impl SchedulerController { @@ -50,6 +53,7 @@ impl SchedulerController { packet_deserializer: PacketDeserializer, bank_forks: Arc>, scheduler: PrioGraphScheduler, + worker_metrics: Vec>, ) -> Self { Self { decision_maker, @@ -60,6 +64,7 @@ impl SchedulerController { scheduler, count_metrics: SchedulerCountMetrics::default(), timing_metrics: SchedulerTimingMetrics::default(), + worker_metrics, } } @@ -90,6 +95,9 @@ impl SchedulerController { let should_report = self.count_metrics.has_data(); self.count_metrics.maybe_report_and_reset(should_report); self.timing_metrics.maybe_report_and_reset(should_report); + self.worker_metrics + .iter() + .for_each(|metrics| metrics.maybe_report_and_reset()); } Ok(()) @@ -449,6 +457,7 @@ mod tests { packet_deserializer, bank_forks, PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver), + vec![], // no actual workers with metrics to report, this can be empty ); (test_frame, scheduler_controller)