TransactionScheduler: detailed consume worker metrics (#33895)
This commit is contained in:
parent
c0a4fc870c
commit
8a298f1628
|
@ -556,9 +556,11 @@ impl BankingStage {
|
|||
let (finished_work_sender, finished_work_receiver) = unbounded();
|
||||
|
||||
// Spawn the worker threads
|
||||
let mut worker_metrics = Vec::with_capacity(num_workers as usize);
|
||||
for (index, work_receiver) in work_receivers.into_iter().enumerate() {
|
||||
let id = (index as u32).saturating_add(NUM_VOTE_PROCESSING_THREADS);
|
||||
let consume_worker = ConsumeWorker::new(
|
||||
id,
|
||||
work_receiver,
|
||||
Consumer::new(
|
||||
committer.clone(),
|
||||
|
@ -570,6 +572,7 @@ impl BankingStage {
|
|||
poh_recorder.read().unwrap().new_leader_bank_notifier(),
|
||||
);
|
||||
|
||||
worker_metrics.push(consume_worker.metrics_handle());
|
||||
bank_thread_hdls.push(
|
||||
Builder::new()
|
||||
.name(format!("solCoWorker{id:02}"))
|
||||
|
@ -590,6 +593,7 @@ impl BankingStage {
|
|||
packet_deserializer,
|
||||
bank_forks,
|
||||
scheduler,
|
||||
worker_metrics,
|
||||
);
|
||||
Builder::new()
|
||||
.name("solBnkTxSched".to_string())
|
||||
|
|
|
@ -1,12 +1,21 @@
|
|||
use {
|
||||
super::{
|
||||
consumer::{Consumer, ExecuteAndCommitTransactionsOutput, ProcessTransactionBatchOutput},
|
||||
leader_slot_timing_metrics::LeaderExecuteAndCommitTimings,
|
||||
scheduler_messages::{ConsumeWork, FinishedConsumeWork},
|
||||
},
|
||||
crossbeam_channel::{Receiver, RecvError, SendError, Sender},
|
||||
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
|
||||
solana_poh::leader_bank_notifier::LeaderBankNotifier,
|
||||
solana_runtime::bank::Bank,
|
||||
std::{sync::Arc, time::Duration},
|
||||
solana_sdk::timing::AtomicInterval,
|
||||
std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
},
|
||||
thiserror::Error,
|
||||
};
|
||||
|
||||
|
@ -24,11 +33,13 @@ pub(crate) struct ConsumeWorker {
|
|||
consumed_sender: Sender<FinishedConsumeWork>,
|
||||
|
||||
leader_bank_notifier: Arc<LeaderBankNotifier>,
|
||||
metrics: Arc<ConsumeWorkerMetrics>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl ConsumeWorker {
|
||||
pub fn new(
|
||||
id: u32,
|
||||
consume_receiver: Receiver<ConsumeWork>,
|
||||
consumer: Consumer,
|
||||
consumed_sender: Sender<FinishedConsumeWork>,
|
||||
|
@ -39,9 +50,14 @@ impl ConsumeWorker {
|
|||
consumer,
|
||||
consumed_sender,
|
||||
leader_bank_notifier,
|
||||
metrics: Arc::new(ConsumeWorkerMetrics::new(id)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn metrics_handle(&self) -> Arc<ConsumeWorkerMetrics> {
|
||||
self.metrics.clone()
|
||||
}
|
||||
|
||||
pub fn run(self) -> Result<(), ConsumeWorkerError> {
|
||||
loop {
|
||||
let work = self.consume_receiver.recv()?;
|
||||
|
@ -70,22 +86,20 @@ impl ConsumeWorker {
|
|||
|
||||
/// Consume a single batch.
|
||||
fn consume(&self, bank: &Arc<Bank>, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
|
||||
let ProcessTransactionBatchOutput {
|
||||
execute_and_commit_transactions_output:
|
||||
ExecuteAndCommitTransactionsOutput {
|
||||
retryable_transaction_indexes,
|
||||
..
|
||||
},
|
||||
..
|
||||
} = self.consumer.process_and_record_aged_transactions(
|
||||
let output = self.consumer.process_and_record_aged_transactions(
|
||||
bank,
|
||||
&work.transactions,
|
||||
&work.max_age_slots,
|
||||
);
|
||||
|
||||
self.metrics.update_for_consume(&output);
|
||||
self.metrics.has_data.store(true, Ordering::Relaxed);
|
||||
|
||||
self.consumed_sender.send(FinishedConsumeWork {
|
||||
work,
|
||||
retryable_indexes: retryable_transaction_indexes,
|
||||
retryable_indexes: output
|
||||
.execute_and_commit_transactions_output
|
||||
.retryable_transaction_indexes,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -107,7 +121,17 @@ impl ConsumeWorker {
|
|||
|
||||
/// Send transactions back to scheduler as retryable.
|
||||
fn retry(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
|
||||
let retryable_indexes = (0..work.transactions.len()).collect();
|
||||
let retryable_indexes: Vec<_> = (0..work.transactions.len()).collect();
|
||||
let num_retryable = retryable_indexes.len();
|
||||
self.metrics
|
||||
.count_metrics
|
||||
.retryable_transaction_count
|
||||
.fetch_add(num_retryable, Ordering::Relaxed);
|
||||
self.metrics
|
||||
.count_metrics
|
||||
.retryable_expired_bank_count
|
||||
.fetch_add(num_retryable, Ordering::Relaxed);
|
||||
self.metrics.has_data.store(true, Ordering::Relaxed);
|
||||
self.consumed_sender.send(FinishedConsumeWork {
|
||||
work,
|
||||
retryable_indexes,
|
||||
|
@ -122,6 +146,460 @@ fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T>
|
|||
std::iter::once(work).chain(receiver.try_iter())
|
||||
}
|
||||
|
||||
/// Metrics tracking number of packets processed by the consume worker.
|
||||
/// These are atomic, and intended to be reported by the scheduling thread
|
||||
/// since the consume worker thread is sleeping unless there is work to be
|
||||
/// done.
|
||||
pub(crate) struct ConsumeWorkerMetrics {
|
||||
id: u32,
|
||||
interval: AtomicInterval,
|
||||
has_data: AtomicBool,
|
||||
|
||||
count_metrics: ConsumeWorkerCountMetrics,
|
||||
error_metrics: ConsumeWorkerTransactionErrorMetrics,
|
||||
timing_metrics: ConsumeWorkerTimingMetrics,
|
||||
}
|
||||
|
||||
impl ConsumeWorkerMetrics {
|
||||
/// Report and reset metrics iff the interval has elapsed and the worker did some work.
|
||||
pub fn maybe_report_and_reset(&self) {
|
||||
const REPORT_INTERVAL_MS: u64 = 1000;
|
||||
if self.interval.should_update(REPORT_INTERVAL_MS)
|
||||
&& self.has_data.swap(false, Ordering::Relaxed)
|
||||
{
|
||||
self.count_metrics.report_and_reset(self.id);
|
||||
self.timing_metrics.report_and_reset(self.id);
|
||||
self.error_metrics.report_and_reset(self.id);
|
||||
}
|
||||
}
|
||||
|
||||
fn new(id: u32) -> Self {
|
||||
Self {
|
||||
id,
|
||||
interval: AtomicInterval::default(),
|
||||
has_data: AtomicBool::new(false),
|
||||
count_metrics: ConsumeWorkerCountMetrics::default(),
|
||||
error_metrics: ConsumeWorkerTransactionErrorMetrics::default(),
|
||||
timing_metrics: ConsumeWorkerTimingMetrics::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_for_consume(
|
||||
&self,
|
||||
ProcessTransactionBatchOutput {
|
||||
cost_model_throttled_transactions_count,
|
||||
cost_model_us,
|
||||
execute_and_commit_transactions_output,
|
||||
}: &ProcessTransactionBatchOutput,
|
||||
) {
|
||||
self.count_metrics
|
||||
.cost_model_throttled_transactions_count
|
||||
.fetch_add(*cost_model_throttled_transactions_count, Ordering::Relaxed);
|
||||
self.timing_metrics
|
||||
.cost_model_us
|
||||
.fetch_add(*cost_model_us, Ordering::Relaxed);
|
||||
self.update_on_execute_and_commit_transactions_output(
|
||||
execute_and_commit_transactions_output,
|
||||
);
|
||||
}
|
||||
|
||||
fn update_on_execute_and_commit_transactions_output(
|
||||
&self,
|
||||
ExecuteAndCommitTransactionsOutput {
|
||||
transactions_attempted_execution_count,
|
||||
executed_transactions_count,
|
||||
executed_with_successful_result_count,
|
||||
retryable_transaction_indexes,
|
||||
execute_and_commit_timings,
|
||||
error_counters,
|
||||
..
|
||||
}: &ExecuteAndCommitTransactionsOutput,
|
||||
) {
|
||||
self.count_metrics
|
||||
.transactions_attempted_execution_count
|
||||
.fetch_add(*transactions_attempted_execution_count, Ordering::Relaxed);
|
||||
self.count_metrics
|
||||
.executed_transactions_count
|
||||
.fetch_add(*executed_transactions_count, Ordering::Relaxed);
|
||||
self.count_metrics
|
||||
.executed_with_successful_result_count
|
||||
.fetch_add(*executed_with_successful_result_count, Ordering::Relaxed);
|
||||
self.count_metrics
|
||||
.retryable_transaction_count
|
||||
.fetch_add(retryable_transaction_indexes.len(), Ordering::Relaxed);
|
||||
|
||||
self.update_on_execute_and_commit_timings(execute_and_commit_timings);
|
||||
self.update_on_error_counters(error_counters);
|
||||
}
|
||||
|
||||
fn update_on_execute_and_commit_timings(
|
||||
&self,
|
||||
LeaderExecuteAndCommitTimings {
|
||||
collect_balances_us,
|
||||
load_execute_us,
|
||||
freeze_lock_us,
|
||||
record_us,
|
||||
commit_us,
|
||||
find_and_send_votes_us,
|
||||
..
|
||||
}: &LeaderExecuteAndCommitTimings,
|
||||
) {
|
||||
self.timing_metrics
|
||||
.collect_balances_us
|
||||
.fetch_add(*collect_balances_us, Ordering::Relaxed);
|
||||
self.timing_metrics
|
||||
.load_execute_us
|
||||
.fetch_add(*load_execute_us, Ordering::Relaxed);
|
||||
self.timing_metrics
|
||||
.freeze_lock_us
|
||||
.fetch_add(*freeze_lock_us, Ordering::Relaxed);
|
||||
self.timing_metrics
|
||||
.record_us
|
||||
.fetch_add(*record_us, Ordering::Relaxed);
|
||||
self.timing_metrics
|
||||
.commit_us
|
||||
.fetch_add(*commit_us, Ordering::Relaxed);
|
||||
self.timing_metrics
|
||||
.find_and_send_votes_us
|
||||
.fetch_add(*find_and_send_votes_us, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn update_on_error_counters(
|
||||
&self,
|
||||
TransactionErrorMetrics {
|
||||
total,
|
||||
account_in_use,
|
||||
too_many_account_locks,
|
||||
account_loaded_twice,
|
||||
account_not_found,
|
||||
blockhash_not_found,
|
||||
blockhash_too_old,
|
||||
call_chain_too_deep,
|
||||
already_processed,
|
||||
instruction_error,
|
||||
insufficient_funds,
|
||||
invalid_account_for_fee,
|
||||
invalid_account_index,
|
||||
invalid_program_for_execution,
|
||||
not_allowed_during_cluster_maintenance,
|
||||
invalid_writable_account,
|
||||
invalid_rent_paying_account,
|
||||
would_exceed_max_block_cost_limit,
|
||||
would_exceed_max_account_cost_limit,
|
||||
would_exceed_max_vote_cost_limit,
|
||||
would_exceed_account_data_block_limit,
|
||||
max_loaded_accounts_data_size_exceeded,
|
||||
program_execution_temporarily_restricted,
|
||||
}: &TransactionErrorMetrics,
|
||||
) {
|
||||
self.error_metrics
|
||||
.total
|
||||
.fetch_add(*total, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.account_in_use
|
||||
.fetch_add(*account_in_use, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.too_many_account_locks
|
||||
.fetch_add(*too_many_account_locks, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.account_loaded_twice
|
||||
.fetch_add(*account_loaded_twice, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.account_not_found
|
||||
.fetch_add(*account_not_found, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.blockhash_not_found
|
||||
.fetch_add(*blockhash_not_found, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.blockhash_too_old
|
||||
.fetch_add(*blockhash_too_old, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.call_chain_too_deep
|
||||
.fetch_add(*call_chain_too_deep, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.already_processed
|
||||
.fetch_add(*already_processed, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.instruction_error
|
||||
.fetch_add(*instruction_error, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.insufficient_funds
|
||||
.fetch_add(*insufficient_funds, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.invalid_account_for_fee
|
||||
.fetch_add(*invalid_account_for_fee, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.invalid_account_index
|
||||
.fetch_add(*invalid_account_index, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.invalid_program_for_execution
|
||||
.fetch_add(*invalid_program_for_execution, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.not_allowed_during_cluster_maintenance
|
||||
.fetch_add(*not_allowed_during_cluster_maintenance, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.invalid_writable_account
|
||||
.fetch_add(*invalid_writable_account, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.invalid_rent_paying_account
|
||||
.fetch_add(*invalid_rent_paying_account, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.would_exceed_max_block_cost_limit
|
||||
.fetch_add(*would_exceed_max_block_cost_limit, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.would_exceed_max_account_cost_limit
|
||||
.fetch_add(*would_exceed_max_account_cost_limit, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.would_exceed_max_vote_cost_limit
|
||||
.fetch_add(*would_exceed_max_vote_cost_limit, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.would_exceed_account_data_block_limit
|
||||
.fetch_add(*would_exceed_account_data_block_limit, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.max_loaded_accounts_data_size_exceeded
|
||||
.fetch_add(*max_loaded_accounts_data_size_exceeded, Ordering::Relaxed);
|
||||
self.error_metrics
|
||||
.program_execution_temporarily_restricted
|
||||
.fetch_add(*program_execution_temporarily_restricted, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ConsumeWorkerCountMetrics {
|
||||
transactions_attempted_execution_count: AtomicUsize,
|
||||
executed_transactions_count: AtomicUsize,
|
||||
executed_with_successful_result_count: AtomicUsize,
|
||||
retryable_transaction_count: AtomicUsize,
|
||||
retryable_expired_bank_count: AtomicUsize,
|
||||
cost_model_throttled_transactions_count: AtomicUsize,
|
||||
}
|
||||
|
||||
impl ConsumeWorkerCountMetrics {
|
||||
fn report_and_reset(&self, id: u32) {
|
||||
datapoint_info!(
|
||||
"banking_stage_worker_counts",
|
||||
("id", id, i64),
|
||||
(
|
||||
"transactions_attempted_execution_count",
|
||||
self.transactions_attempted_execution_count
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"executed_transactions_count",
|
||||
self.executed_transactions_count.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"executed_with_successful_result_count",
|
||||
self.executed_with_successful_result_count
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"retryable_transaction_count",
|
||||
self.retryable_transaction_count.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"retryable_expired_bank_count",
|
||||
self.retryable_expired_bank_count.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"cost_model_throttled_transactions_count",
|
||||
self.cost_model_throttled_transactions_count
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ConsumeWorkerTimingMetrics {
|
||||
cost_model_us: AtomicU64,
|
||||
collect_balances_us: AtomicU64,
|
||||
load_execute_us: AtomicU64,
|
||||
freeze_lock_us: AtomicU64,
|
||||
record_us: AtomicU64,
|
||||
commit_us: AtomicU64,
|
||||
find_and_send_votes_us: AtomicU64,
|
||||
}
|
||||
|
||||
impl ConsumeWorkerTimingMetrics {
|
||||
fn report_and_reset(&self, id: u32) {
|
||||
datapoint_info!(
|
||||
"banking_stage_worker_timing",
|
||||
("id", id, i64),
|
||||
(
|
||||
"cost_model_us",
|
||||
self.cost_model_us.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"collect_balances_us",
|
||||
self.collect_balances_us.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"load_execute_us",
|
||||
self.load_execute_us.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"freeze_lock_us",
|
||||
self.freeze_lock_us.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
("record_us", self.record_us.swap(0, Ordering::Relaxed), i64),
|
||||
("commit_us", self.commit_us.swap(0, Ordering::Relaxed), i64),
|
||||
(
|
||||
"find_and_send_votes_us",
|
||||
self.find_and_send_votes_us.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ConsumeWorkerTransactionErrorMetrics {
|
||||
total: AtomicUsize,
|
||||
account_in_use: AtomicUsize,
|
||||
too_many_account_locks: AtomicUsize,
|
||||
account_loaded_twice: AtomicUsize,
|
||||
account_not_found: AtomicUsize,
|
||||
blockhash_not_found: AtomicUsize,
|
||||
blockhash_too_old: AtomicUsize,
|
||||
call_chain_too_deep: AtomicUsize,
|
||||
already_processed: AtomicUsize,
|
||||
instruction_error: AtomicUsize,
|
||||
insufficient_funds: AtomicUsize,
|
||||
invalid_account_for_fee: AtomicUsize,
|
||||
invalid_account_index: AtomicUsize,
|
||||
invalid_program_for_execution: AtomicUsize,
|
||||
not_allowed_during_cluster_maintenance: AtomicUsize,
|
||||
invalid_writable_account: AtomicUsize,
|
||||
invalid_rent_paying_account: AtomicUsize,
|
||||
would_exceed_max_block_cost_limit: AtomicUsize,
|
||||
would_exceed_max_account_cost_limit: AtomicUsize,
|
||||
would_exceed_max_vote_cost_limit: AtomicUsize,
|
||||
would_exceed_account_data_block_limit: AtomicUsize,
|
||||
max_loaded_accounts_data_size_exceeded: AtomicUsize,
|
||||
program_execution_temporarily_restricted: AtomicUsize,
|
||||
}
|
||||
|
||||
impl ConsumeWorkerTransactionErrorMetrics {
|
||||
fn report_and_reset(&self, id: u32) {
|
||||
datapoint_info!(
|
||||
"banking_stage_worker_error_metrics",
|
||||
("id", id, i64),
|
||||
("total", self.total.swap(0, Ordering::Relaxed), i64),
|
||||
(
|
||||
"account_in_use",
|
||||
self.account_in_use.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"too_many_account_locks",
|
||||
self.too_many_account_locks.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"account_loaded_twice",
|
||||
self.account_loaded_twice.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"account_not_found",
|
||||
self.account_not_found.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"blockhash_not_found",
|
||||
self.blockhash_not_found.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"blockhash_too_old",
|
||||
self.blockhash_too_old.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"call_chain_too_deep",
|
||||
self.call_chain_too_deep.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"already_processed",
|
||||
self.already_processed.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"instruction_error",
|
||||
self.instruction_error.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"insufficient_funds",
|
||||
self.insufficient_funds.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"invalid_account_for_fee",
|
||||
self.invalid_account_for_fee.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"invalid_account_index",
|
||||
self.invalid_account_index.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"invalid_program_for_execution",
|
||||
self.invalid_program_for_execution
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"not_allowed_during_cluster_maintenance",
|
||||
self.not_allowed_during_cluster_maintenance
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"invalid_writable_account",
|
||||
self.invalid_writable_account.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"invalid_rent_paying_account",
|
||||
self.invalid_rent_paying_account.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"would_exceed_max_block_cost_limit",
|
||||
self.would_exceed_max_block_cost_limit
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"would_exceed_max_account_cost_limit",
|
||||
self.would_exceed_max_account_cost_limit
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"would_exceed_max_vote_cost_limit",
|
||||
self.would_exceed_max_vote_cost_limit
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
|
@ -207,6 +685,7 @@ mod tests {
|
|||
let (consume_sender, consume_receiver) = unbounded();
|
||||
let (consumed_sender, consumed_receiver) = unbounded();
|
||||
let worker = ConsumeWorker::new(
|
||||
0,
|
||||
consume_receiver,
|
||||
consumer,
|
||||
consumed_sender,
|
||||
|
|
|
@ -41,29 +41,29 @@ pub const TARGET_NUM_TRANSACTIONS_PER_BATCH: usize = 64;
|
|||
|
||||
pub struct ProcessTransactionBatchOutput {
|
||||
// The number of transactions filtered out by the cost model
|
||||
cost_model_throttled_transactions_count: usize,
|
||||
pub(crate) cost_model_throttled_transactions_count: usize,
|
||||
// Amount of time spent running the cost model
|
||||
cost_model_us: u64,
|
||||
pub(crate) cost_model_us: u64,
|
||||
pub execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput,
|
||||
}
|
||||
|
||||
pub struct ExecuteAndCommitTransactionsOutput {
|
||||
// Total number of transactions that were passed as candidates for execution
|
||||
transactions_attempted_execution_count: usize,
|
||||
pub(crate) transactions_attempted_execution_count: usize,
|
||||
// The number of transactions of that were executed. See description of in `ProcessTransactionsSummary`
|
||||
// for possible outcomes of execution.
|
||||
executed_transactions_count: usize,
|
||||
pub(crate) executed_transactions_count: usize,
|
||||
// Total number of the executed transactions that returned success/not
|
||||
// an error.
|
||||
executed_with_successful_result_count: usize,
|
||||
pub(crate) executed_with_successful_result_count: usize,
|
||||
// Transactions that either were not executed, or were executed and failed to be committed due
|
||||
// to the block ending.
|
||||
pub(crate) retryable_transaction_indexes: Vec<usize>,
|
||||
// A result that indicates whether transactions were successfully
|
||||
// committed into the Poh stream.
|
||||
pub commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
|
||||
execute_and_commit_timings: LeaderExecuteAndCommitTimings,
|
||||
error_counters: TransactionErrorMetrics,
|
||||
pub(crate) execute_and_commit_timings: LeaderExecuteAndCommitTimings,
|
||||
pub(crate) error_counters: TransactionErrorMetrics,
|
||||
}
|
||||
|
||||
pub struct Consumer {
|
||||
|
|
|
@ -9,6 +9,7 @@ use {
|
|||
transaction_state_container::TransactionStateContainer,
|
||||
},
|
||||
crate::banking_stage::{
|
||||
consume_worker::ConsumeWorkerMetrics,
|
||||
decision_maker::{BufferedPacketsDecision, DecisionMaker},
|
||||
immutable_deserialized_packet::ImmutableDeserializedPacket,
|
||||
packet_deserializer::PacketDeserializer,
|
||||
|
@ -42,6 +43,8 @@ pub(crate) struct SchedulerController {
|
|||
count_metrics: SchedulerCountMetrics,
|
||||
/// Metrics tracking time spent in different code sections.
|
||||
timing_metrics: SchedulerTimingMetrics,
|
||||
/// Metric report handles for the worker threads.
|
||||
worker_metrics: Vec<Arc<ConsumeWorkerMetrics>>,
|
||||
}
|
||||
|
||||
impl SchedulerController {
|
||||
|
@ -50,6 +53,7 @@ impl SchedulerController {
|
|||
packet_deserializer: PacketDeserializer,
|
||||
bank_forks: Arc<RwLock<BankForks>>,
|
||||
scheduler: PrioGraphScheduler,
|
||||
worker_metrics: Vec<Arc<ConsumeWorkerMetrics>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
decision_maker,
|
||||
|
@ -60,6 +64,7 @@ impl SchedulerController {
|
|||
scheduler,
|
||||
count_metrics: SchedulerCountMetrics::default(),
|
||||
timing_metrics: SchedulerTimingMetrics::default(),
|
||||
worker_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,6 +95,9 @@ impl SchedulerController {
|
|||
let should_report = self.count_metrics.has_data();
|
||||
self.count_metrics.maybe_report_and_reset(should_report);
|
||||
self.timing_metrics.maybe_report_and_reset(should_report);
|
||||
self.worker_metrics
|
||||
.iter()
|
||||
.for_each(|metrics| metrics.maybe_report_and_reset());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -449,6 +457,7 @@ mod tests {
|
|||
packet_deserializer,
|
||||
bank_forks,
|
||||
PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver),
|
||||
vec![], // no actual workers with metrics to report, this can be empty
|
||||
);
|
||||
|
||||
(test_frame, scheduler_controller)
|
||||
|
|
Loading…
Reference in New Issue