Add estimated and actual block cost units metrics (#22326)
* - report cost details for transactions selected to be packed into block; - report estimated execution units packed into block, and actual units and time after execution * revert reporting per-transaction details * rollup transaction cost details (eg signature cost, wirte lock, data cost and execution costs) into block stats * change naming from units to cu, use struct to replace tuple
This commit is contained in:
parent
e14ae33e86
commit
1309a9cea0
|
@ -23,7 +23,7 @@ use {
|
||||||
accounts_db::ErrorCounters,
|
accounts_db::ErrorCounters,
|
||||||
bank::{Bank, TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult},
|
bank::{Bank, TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult},
|
||||||
bank_utils,
|
bank_utils,
|
||||||
cost_model::CostModel,
|
cost_model::{CostModel, TransactionCost},
|
||||||
transaction_batch::TransactionBatch,
|
transaction_batch::TransactionBatch,
|
||||||
vote_sender_types::ReplayVoteSender,
|
vote_sender_types::ReplayVoteSender,
|
||||||
},
|
},
|
||||||
|
@ -260,6 +260,14 @@ impl BankingStageStats {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct BatchedTransactionCostDetails {
|
||||||
|
pub batched_signature_cost: u64,
|
||||||
|
pub batched_write_lock_cost: u64,
|
||||||
|
pub batched_data_bytes_cost: u64,
|
||||||
|
pub batched_execute_cost: u64,
|
||||||
|
}
|
||||||
|
|
||||||
/// Stores the stage's thread handle and output receiver.
|
/// Stores the stage's thread handle and output receiver.
|
||||||
pub struct BankingStage {
|
pub struct BankingStage {
|
||||||
bank_thread_hdls: Vec<JoinHandle<()>>,
|
bank_thread_hdls: Vec<JoinHandle<()>>,
|
||||||
|
@ -856,7 +864,7 @@ impl BankingStage {
|
||||||
batch: &TransactionBatch,
|
batch: &TransactionBatch,
|
||||||
transaction_status_sender: Option<TransactionStatusSender>,
|
transaction_status_sender: Option<TransactionStatusSender>,
|
||||||
gossip_vote_sender: &ReplayVoteSender,
|
gossip_vote_sender: &ReplayVoteSender,
|
||||||
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
|
) -> (Result<usize, PohRecorderError>, Vec<usize>, ExecuteTimings) {
|
||||||
let mut load_execute_time = Measure::start("load_execute_time");
|
let mut load_execute_time = Measure::start("load_execute_time");
|
||||||
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
|
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
|
||||||
// the likelihood of any single thread getting starved and processing old ids.
|
// the likelihood of any single thread getting starved and processing old ids.
|
||||||
|
@ -906,7 +914,7 @@ impl BankingStage {
|
||||||
);
|
);
|
||||||
retryable_txs.extend(retryable_record_txs);
|
retryable_txs.extend(retryable_record_txs);
|
||||||
if num_to_commit.is_err() {
|
if num_to_commit.is_err() {
|
||||||
return (num_to_commit, retryable_txs);
|
return (num_to_commit, retryable_txs, execute_timings);
|
||||||
}
|
}
|
||||||
record_time.stop();
|
record_time.stop();
|
||||||
|
|
||||||
|
@ -956,7 +964,7 @@ impl BankingStage {
|
||||||
execute_timings
|
execute_timings
|
||||||
);
|
);
|
||||||
|
|
||||||
(Ok(num_to_commit), retryable_txs)
|
(Ok(num_to_commit), retryable_txs, execute_timings)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process_and_record_transactions(
|
pub fn process_and_record_transactions(
|
||||||
|
@ -973,6 +981,13 @@ impl BankingStage {
|
||||||
let transactions_qos_results =
|
let transactions_qos_results =
|
||||||
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
|
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
|
||||||
|
|
||||||
|
qos_service.accumulate_estimated_transaction_costs(
|
||||||
|
&Self::accumulate_batched_transaction_costs(
|
||||||
|
tx_costs.iter(),
|
||||||
|
transactions_qos_results.iter(),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
// Only lock accounts for those transactions are selected for the block;
|
// Only lock accounts for those transactions are selected for the block;
|
||||||
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
||||||
// same account state
|
// same account state
|
||||||
|
@ -984,13 +999,14 @@ impl BankingStage {
|
||||||
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
|
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
|
||||||
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
|
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
|
||||||
// and WouldExceedMaxAccountDataCostLimit
|
// and WouldExceedMaxAccountDataCostLimit
|
||||||
let (result, mut retryable_txs) = Self::process_and_record_transactions_locked(
|
let (result, mut retryable_txs, execute_timings) =
|
||||||
bank,
|
Self::process_and_record_transactions_locked(
|
||||||
poh,
|
bank,
|
||||||
&batch,
|
poh,
|
||||||
transaction_status_sender,
|
&batch,
|
||||||
gossip_vote_sender,
|
transaction_status_sender,
|
||||||
);
|
gossip_vote_sender,
|
||||||
|
);
|
||||||
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);
|
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);
|
||||||
|
|
||||||
let mut unlock_time = Measure::start("unlock_time");
|
let mut unlock_time = Measure::start("unlock_time");
|
||||||
|
@ -998,6 +1014,10 @@ impl BankingStage {
|
||||||
drop(batch);
|
drop(batch);
|
||||||
unlock_time.stop();
|
unlock_time.stop();
|
||||||
|
|
||||||
|
let (cu, us) = Self::accumulate_execute_units_and_time(&execute_timings);
|
||||||
|
qos_service.accumulate_actual_execute_cu(cu);
|
||||||
|
qos_service.accumulate_actual_execute_time(us);
|
||||||
|
|
||||||
// reports qos service stats for this batch
|
// reports qos service stats for this batch
|
||||||
qos_service.report_metrics(bank.clone());
|
qos_service.report_metrics(bank.clone());
|
||||||
|
|
||||||
|
@ -1012,6 +1032,49 @@ impl BankingStage {
|
||||||
(result, retryable_txs)
|
(result, retryable_txs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
|
||||||
|
// execution_cost from the batch of transactions selected for block.
|
||||||
|
fn accumulate_batched_transaction_costs<'a>(
|
||||||
|
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
|
||||||
|
transaction_results: impl Iterator<Item = &'a transaction::Result<()>>,
|
||||||
|
) -> BatchedTransactionCostDetails {
|
||||||
|
let mut cost_details = BatchedTransactionCostDetails::default();
|
||||||
|
transactions_costs
|
||||||
|
.zip(transaction_results)
|
||||||
|
.for_each(|(cost, result)| {
|
||||||
|
if result.is_ok() {
|
||||||
|
cost_details.batched_signature_cost = cost_details
|
||||||
|
.batched_signature_cost
|
||||||
|
.saturating_add(cost.signature_cost);
|
||||||
|
cost_details.batched_write_lock_cost = cost_details
|
||||||
|
.batched_write_lock_cost
|
||||||
|
.saturating_add(cost.write_lock_cost);
|
||||||
|
cost_details.batched_data_bytes_cost = cost_details
|
||||||
|
.batched_data_bytes_cost
|
||||||
|
.saturating_add(cost.data_bytes_cost);
|
||||||
|
cost_details.batched_execute_cost = cost_details
|
||||||
|
.batched_execute_cost
|
||||||
|
.saturating_add(cost.execution_cost);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
cost_details
|
||||||
|
}
|
||||||
|
|
||||||
|
fn accumulate_execute_units_and_time(execute_timings: &ExecuteTimings) -> (u64, u64) {
|
||||||
|
let (units, times): (Vec<_>, Vec<_>) = execute_timings
|
||||||
|
.details
|
||||||
|
.per_program_timings
|
||||||
|
.iter()
|
||||||
|
.map(|(_program_id, program_timings)| {
|
||||||
|
(
|
||||||
|
program_timings.accumulated_units,
|
||||||
|
program_timings.accumulated_us,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.unzip();
|
||||||
|
(units.iter().sum(), times.iter().sum())
|
||||||
|
}
|
||||||
|
|
||||||
/// Sends transactions to the bank.
|
/// Sends transactions to the bank.
|
||||||
///
|
///
|
||||||
/// Returns the number of transactions successfully processed by the bank, which may be less
|
/// Returns the number of transactions successfully processed by the bank, which may be less
|
||||||
|
@ -1492,6 +1555,7 @@ mod tests {
|
||||||
poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
|
poh_recorder::{create_test_recorder, Record, WorkingBankEntry},
|
||||||
poh_service::PohService,
|
poh_service::PohService,
|
||||||
},
|
},
|
||||||
|
solana_program_runtime::timings::ProgramTiming,
|
||||||
solana_rpc::transaction_status_service::TransactionStatusService,
|
solana_rpc::transaction_status_service::TransactionStatusService,
|
||||||
solana_runtime::bank::TransactionExecutionDetails,
|
solana_runtime::bank::TransactionExecutionDetails,
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
|
@ -3372,4 +3436,74 @@ mod tests {
|
||||||
assert_eq!(vec![0, 1, 2], tx_packet_index);
|
assert_eq!(vec![0, 1, 2], tx_packet_index);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_accumulate_batched_transaction_costs() {
|
||||||
|
let tx_costs = vec![
|
||||||
|
TransactionCost {
|
||||||
|
signature_cost: 1,
|
||||||
|
write_lock_cost: 2,
|
||||||
|
data_bytes_cost: 3,
|
||||||
|
execution_cost: 10,
|
||||||
|
..TransactionCost::default()
|
||||||
|
},
|
||||||
|
TransactionCost {
|
||||||
|
signature_cost: 4,
|
||||||
|
write_lock_cost: 5,
|
||||||
|
data_bytes_cost: 6,
|
||||||
|
execution_cost: 20,
|
||||||
|
..TransactionCost::default()
|
||||||
|
},
|
||||||
|
TransactionCost {
|
||||||
|
signature_cost: 7,
|
||||||
|
write_lock_cost: 8,
|
||||||
|
data_bytes_cost: 9,
|
||||||
|
execution_cost: 40,
|
||||||
|
..TransactionCost::default()
|
||||||
|
},
|
||||||
|
];
|
||||||
|
let tx_results = vec![
|
||||||
|
Ok(()),
|
||||||
|
Ok(()),
|
||||||
|
Err(TransactionError::WouldExceedMaxBlockCostLimit),
|
||||||
|
];
|
||||||
|
// should only accumulate first two cost that are OK
|
||||||
|
let expected_signatures = 5;
|
||||||
|
let expected_write_locks = 7;
|
||||||
|
let expected_data_bytes = 9;
|
||||||
|
let expected_executions = 30;
|
||||||
|
let cost_details =
|
||||||
|
BankingStage::accumulate_batched_transaction_costs(tx_costs.iter(), tx_results.iter());
|
||||||
|
assert_eq!(expected_signatures, cost_details.batched_signature_cost);
|
||||||
|
assert_eq!(expected_write_locks, cost_details.batched_write_lock_cost);
|
||||||
|
assert_eq!(expected_data_bytes, cost_details.batched_data_bytes_cost);
|
||||||
|
assert_eq!(expected_executions, cost_details.batched_execute_cost);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_accumulate_execute_units_and_time() {
|
||||||
|
let mut execute_timings = ExecuteTimings::default();
|
||||||
|
let mut expected_units = 0;
|
||||||
|
let mut expected_us = 0;
|
||||||
|
|
||||||
|
for n in 0..10 {
|
||||||
|
execute_timings.details.per_program_timings.insert(
|
||||||
|
Pubkey::new_unique(),
|
||||||
|
ProgramTiming {
|
||||||
|
accumulated_us: n * 100,
|
||||||
|
accumulated_units: n * 1000,
|
||||||
|
count: n as u32,
|
||||||
|
errored_txs_compute_consumed: vec![],
|
||||||
|
total_errored_units: 0,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
expected_us += n * 100;
|
||||||
|
expected_units += n * 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (units, us) = BankingStage::accumulate_execute_units_and_time(&execute_timings);
|
||||||
|
|
||||||
|
assert_eq!(expected_units, units);
|
||||||
|
assert_eq!(expected_us, us);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
//! how transactions are included in blocks, and optimize those blocks.
|
//! how transactions are included in blocks, and optimize those blocks.
|
||||||
//!
|
//!
|
||||||
use {
|
use {
|
||||||
|
crate::banking_stage::BatchedTransactionCostDetails,
|
||||||
crossbeam_channel::{unbounded, Receiver, Sender},
|
crossbeam_channel::{unbounded, Receiver, Sender},
|
||||||
solana_measure::measure::Measure,
|
solana_measure::measure::Measure,
|
||||||
solana_runtime::{
|
solana_runtime::{
|
||||||
|
@ -24,6 +25,10 @@ use {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub enum QosMetrics {
|
||||||
|
BlockBatchUpdate { bank: Arc<Bank> },
|
||||||
|
}
|
||||||
|
|
||||||
// QosService is local to each banking thread, each instance of QosService provides services to
|
// QosService is local to each banking thread, each instance of QosService provides services to
|
||||||
// one banking thread.
|
// one banking thread.
|
||||||
// It hosts a private thread for async metrics reporting, tagged with banking thredas ID. Banking
|
// It hosts a private thread for async metrics reporting, tagged with banking thredas ID. Banking
|
||||||
|
@ -39,7 +44,7 @@ pub struct QosService {
|
||||||
cost_model: Arc<RwLock<CostModel>>,
|
cost_model: Arc<RwLock<CostModel>>,
|
||||||
// QosService hosts metrics object and a private reporting thread, as well as sender to
|
// QosService hosts metrics object and a private reporting thread, as well as sender to
|
||||||
// communicate with thread.
|
// communicate with thread.
|
||||||
report_sender: Sender<Arc<Bank>>,
|
report_sender: Sender<QosMetrics>,
|
||||||
metrics: Arc<QosServiceMetrics>,
|
metrics: Arc<QosServiceMetrics>,
|
||||||
// metrics reporting runs on a private thread
|
// metrics reporting runs on a private thread
|
||||||
reporting_thread: Option<JoinHandle<()>>,
|
reporting_thread: Option<JoinHandle<()>>,
|
||||||
|
@ -163,7 +168,7 @@ impl QosService {
|
||||||
// metrics are reported by bank slot
|
// metrics are reported by bank slot
|
||||||
pub fn report_metrics(&self, bank: Arc<Bank>) {
|
pub fn report_metrics(&self, bank: Arc<Bank>) {
|
||||||
self.report_sender
|
self.report_sender
|
||||||
.send(bank)
|
.send(QosMetrics::BlockBatchUpdate { bank })
|
||||||
.unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err));
|
.unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,14 +203,48 @@ impl QosService {
|
||||||
.fetch_add(count, Ordering::Relaxed);
|
.fetch_add(count, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn accumulate_estimated_transaction_costs(
|
||||||
|
&self,
|
||||||
|
cost_details: &BatchedTransactionCostDetails,
|
||||||
|
) {
|
||||||
|
self.metrics
|
||||||
|
.estimated_signature_cu
|
||||||
|
.fetch_add(cost_details.batched_signature_cost, Ordering::Relaxed);
|
||||||
|
self.metrics
|
||||||
|
.estimated_write_lock_cu
|
||||||
|
.fetch_add(cost_details.batched_write_lock_cost, Ordering::Relaxed);
|
||||||
|
self.metrics
|
||||||
|
.estimated_data_bytes_cu
|
||||||
|
.fetch_add(cost_details.batched_data_bytes_cost, Ordering::Relaxed);
|
||||||
|
self.metrics
|
||||||
|
.estimated_execute_cu
|
||||||
|
.fetch_add(cost_details.batched_execute_cost, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn accumulate_actual_execute_cu(&self, units: u64) {
|
||||||
|
self.metrics
|
||||||
|
.actual_execute_cu
|
||||||
|
.fetch_add(units, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn accumulate_actual_execute_time(&self, micro_sec: u64) {
|
||||||
|
self.metrics
|
||||||
|
.actual_execute_time_us
|
||||||
|
.fetch_add(micro_sec, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
fn reporting_loop(
|
fn reporting_loop(
|
||||||
running_flag: Arc<AtomicBool>,
|
running_flag: Arc<AtomicBool>,
|
||||||
metrics: Arc<QosServiceMetrics>,
|
metrics: Arc<QosServiceMetrics>,
|
||||||
report_receiver: Receiver<Arc<Bank>>,
|
report_receiver: Receiver<QosMetrics>,
|
||||||
) {
|
) {
|
||||||
while running_flag.load(Ordering::Relaxed) {
|
while running_flag.load(Ordering::Relaxed) {
|
||||||
for bank in report_receiver.try_iter() {
|
for qos_metrics in report_receiver.try_iter() {
|
||||||
metrics.report(bank.slot());
|
match qos_metrics {
|
||||||
|
QosMetrics::BlockBatchUpdate { bank } => {
|
||||||
|
metrics.report(bank.slot());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
thread::sleep(Duration::from_millis(100));
|
thread::sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
|
@ -267,6 +306,24 @@ struct QosServiceMetrics {
|
||||||
|
|
||||||
// number of transactions to be queued for retry due to its account data limits
|
// number of transactions to be queued for retry due to its account data limits
|
||||||
retried_txs_per_account_data_limit_count: AtomicU64,
|
retried_txs_per_account_data_limit_count: AtomicU64,
|
||||||
|
|
||||||
|
// accumulated estimated signature Compute Unites to be packed into block
|
||||||
|
estimated_signature_cu: AtomicU64,
|
||||||
|
|
||||||
|
// accumulated estimated write locks Compute Units to be packed into block
|
||||||
|
estimated_write_lock_cu: AtomicU64,
|
||||||
|
|
||||||
|
// accumulated estimated instructino data Compute Units to be packed into block
|
||||||
|
estimated_data_bytes_cu: AtomicU64,
|
||||||
|
|
||||||
|
// accumulated estimated program Compute Units to be packed into block
|
||||||
|
estimated_execute_cu: AtomicU64,
|
||||||
|
|
||||||
|
// accumulated actual program Compute Units that have been packed into block
|
||||||
|
actual_execute_cu: AtomicU64,
|
||||||
|
|
||||||
|
// accumulated actual program execute micro-sec that have been packed into block
|
||||||
|
actual_execute_time_us: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QosServiceMetrics {
|
impl QosServiceMetrics {
|
||||||
|
@ -352,6 +409,36 @@ impl QosServiceMetrics {
|
||||||
.swap(0, Ordering::Relaxed) as i64,
|
.swap(0, Ordering::Relaxed) as i64,
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"estimated_signature_cu",
|
||||||
|
self.estimated_signature_cu.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"estimated_write_lock_cu",
|
||||||
|
self.estimated_write_lock_cu.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"estimated_data_bytes_cu",
|
||||||
|
self.estimated_data_bytes_cu.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"estimated_execute_cu",
|
||||||
|
self.estimated_execute_cu.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"actual_execute_cu",
|
||||||
|
self.actual_execute_cu.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"actual_execute_time_us",
|
||||||
|
self.actual_execute_time_us.swap(0, Ordering::Relaxed) as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
self.slot.store(bank_slot, Ordering::Relaxed);
|
self.slot.store(bank_slot, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue