Add end-to-end replay slot metrics (#25752)

This commit is contained in:
carllin 2022-07-05 13:58:51 -05:00 committed by GitHub
parent 7e4a5de99c
commit ce39c14025
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 481 additions and 265 deletions

9
Cargo.lock generated
View File

@ -1213,6 +1213,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "eager"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abe71d579d1812060163dff96056261deb5bf6729b100fa2e36a68b9649ba3d3"
[[package]]
name = "ed25519"
version = "1.2.0"
@ -4932,6 +4938,7 @@ dependencies = [
"chrono",
"crossbeam-channel",
"dashmap",
"eager",
"etcd-client",
"fs_extra",
"histogram",
@ -5705,6 +5712,7 @@ version = "1.11.2"
dependencies = [
"base64 0.13.0",
"bincode",
"eager",
"enum-iterator",
"itertools",
"libc",
@ -5718,6 +5726,7 @@ dependencies = [
"solana-frozen-abi-macro 1.11.2",
"solana-logger 1.11.2",
"solana-measure",
"solana-metrics",
"solana-sdk 1.11.2",
"thiserror",
]

View File

@ -21,6 +21,7 @@ bs58 = "0.4.0"
chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-channel = "0.5"
dashmap = { version = "4.0.2", features = ["rayon", "raw-api"] }
eager = "0.1.0"
etcd-client = { version = "0.8.1", features = ["tls"] }
fs_extra = "1.2.0"
histogram = "0.6.9"

View File

@ -1,5 +1,6 @@
#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))]
#![allow(clippy::integer_arithmetic)]
#![recursion_limit = "2048"]
//! The `solana` library implements the Solana high-performance blockchain architecture.
//! It includes a full Rust implementation of the architecture (see
//! [Validator](server/struct.Validator.html)) as well as hooks to GPU implementations of its most
@ -78,6 +79,9 @@ pub mod voting_service;
pub mod warm_quic_cache_service;
pub mod window_service;
#[macro_use]
extern crate eager;
#[macro_use]
extern crate log;

View File

@ -6,7 +6,7 @@ use {
replay_stage::SUPERMINORITY_THRESHOLD,
},
solana_ledger::blockstore_processor::{ConfirmationProgress, ConfirmationTiming},
solana_program_runtime::timings::ExecuteTimingType,
solana_program_runtime::{report_execute_timings, timings::ExecuteTimingType},
solana_runtime::{bank::Bank, bank_forks::BankForks, vote_account::VoteAccountsHashMap},
solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey},
std::{
@ -43,219 +43,43 @@ impl ReplaySlotStats {
num_shreds: u64,
bank_complete_time_us: u64,
) {
datapoint_info!(
"replay-slot-stats",
("slot", slot as i64, i64),
("fetch_entries_time", self.fetch_elapsed as i64, i64),
(
"fetch_entries_fail_time",
self.fetch_fail_elapsed as i64,
i64
),
(
"entry_poh_verification_time",
self.poh_verify_elapsed as i64,
i64
),
(
"entry_transaction_verification_time",
self.transaction_verify_elapsed as i64,
i64
),
("replay_time", self.replay_elapsed as i64, i64),
(
"replay_total_elapsed",
self.started.elapsed().as_micros() as i64,
i64
),
("total_entries", num_entries as i64, i64),
("total_shreds", num_shreds as i64, i64),
(
"check_us",
*self
.execute_timings
.metrics
.index(ExecuteTimingType::CheckUs),
i64
),
(
"load_us",
*self
.execute_timings
.metrics
.index(ExecuteTimingType::LoadUs),
i64
),
(
"execute_us",
*self
.execute_timings
.metrics
.index(ExecuteTimingType::ExecuteUs),
i64
),
(
"store_us",
*self
.execute_timings
.metrics
.index(ExecuteTimingType::StoreUs),
i64
),
(
"update_stakes_cache_us",
*self
.execute_timings
.metrics
.index(ExecuteTimingType::UpdateStakesCacheUs),
i64
),
("bank_complete_time_us", bank_complete_time_us, i64),
(
"total_batches_len",
*self
.execute_timings
.metrics
.index(ExecuteTimingType::TotalBatchesLen),
i64
),
(
"num_execute_batches",
*self
.execute_timings
.metrics
.index(ExecuteTimingType::NumExecuteBatches),
i64
),
(
"execute_details_serialize_us",
self.execute_timings.details.serialize_us,
i64
),
(
"execute_details_create_vm_us",
self.execute_timings.details.create_vm_us,
i64
),
(
"execute_details_execute_inner_us",
self.execute_timings.details.execute_us,
i64
),
(
"execute_details_deserialize_us",
self.execute_timings.details.deserialize_us,
i64
),
(
"execute_details_get_or_create_executor_us",
self.execute_timings.details.get_or_create_executor_us,
i64
),
(
"execute_details_changed_account_count",
self.execute_timings.details.changed_account_count,
i64
),
(
"execute_details_total_account_count",
self.execute_timings.details.total_account_count,
i64
),
(
"execute_details_total_data_size",
self.execute_timings.details.total_data_size,
i64
),
(
"execute_details_data_size_changed",
self.execute_timings.details.data_size_changed,
i64
),
(
"execute_details_create_executor_register_syscalls_us",
self.execute_timings
.details
.create_executor_register_syscalls_us,
i64
),
(
"execute_details_create_executor_load_elf_us",
self.execute_timings.details.create_executor_load_elf_us,
i64
),
(
"execute_details_create_executor_verify_code_us",
self.execute_timings.details.create_executor_verify_code_us,
i64
),
(
"execute_details_create_executor_jit_compile_us",
self.execute_timings.details.create_executor_jit_compile_us,
i64
),
(
"execute_accessories_feature_set_clone_us",
self.execute_timings
.execute_accessories
.feature_set_clone_us,
i64
),
(
"execute_accessories_compute_budget_process_transaction_us",
self.execute_timings
.execute_accessories
.compute_budget_process_transaction_us,
i64
),
(
"execute_accessories_get_executors_us",
self.execute_timings.execute_accessories.get_executors_us,
i64
),
(
"execute_accessories_process_message_us",
self.execute_timings.execute_accessories.process_message_us,
i64
),
(
"execute_accessories_update_executors_us",
self.execute_timings.execute_accessories.update_executors_us,
i64
),
(
"execute_accessories_process_instructions_total_us",
self.execute_timings
.execute_accessories
.process_instructions
.total_us,
i64
),
(
"execute_accessories_process_instructions_verify_caller_us",
self.execute_timings
.execute_accessories
.process_instructions
.verify_caller_us,
i64
),
(
"execute_accessories_process_instructions_process_executable_chain_us",
self.execute_timings
.execute_accessories
.process_instructions
.process_executable_chain_us,
i64
),
(
"execute_accessories_process_instructions_verify_callee_us",
self.execute_timings
.execute_accessories
.process_instructions
.verify_callee_us,
i64
),
);
lazy! {
datapoint_info!(
"replay-slot-stats",
("slot", slot as i64, i64),
("fetch_entries_time", self.fetch_elapsed as i64, i64),
(
"fetch_entries_fail_time",
self.fetch_fail_elapsed as i64,
i64
),
(
"entry_poh_verification_time",
self.poh_verify_elapsed as i64,
i64
),
(
"entry_transaction_verification_time",
self.transaction_verify_elapsed as i64,
i64
),
("replay_time", self.replay_elapsed as i64, i64),
("execute_batches_us", self.execute_batches_us as i64, i64),
(
"replay_total_elapsed",
self.started.elapsed().as_micros() as i64,
i64
),
("bank_complete_time_us", bank_complete_time_us, i64),
("total_entries", num_entries as i64, i64),
("total_shreds", num_shreds as i64, i64),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.execute_timings)}
);
};
self.end_to_end_execute_timings.report_stats(slot);
let mut per_pubkey_timings: Vec<_> = self
.execute_timings

View File

@ -12,9 +12,9 @@ use {
solana_entry::entry::{
self, create_ticks, Entry, EntrySlice, EntryType, EntryVerificationStatus, VerifyRecyclers,
},
solana_measure::measure::Measure,
solana_measure::{measure, measure::Measure},
solana_metrics::{datapoint_error, inc_new_counter_debug},
solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings},
solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings, ThreadExecuteTimings},
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
solana_runtime::{
accounts_background_service::AbsRequestSender,
@ -58,7 +58,7 @@ use {
collections::{HashMap, HashSet},
path::PathBuf,
result,
sync::{Arc, RwLock},
sync::{Arc, Mutex, RwLock},
time::{Duration, Instant},
},
thiserror::Error,
@ -266,47 +266,91 @@ fn execute_batch(
first_err.map(|(result, _)| result).unwrap_or(Ok(()))
}
#[derive(Default)]
struct ExecuteBatchesInternalMetrics {
execution_timings_per_thread: HashMap<usize, ThreadExecuteTimings>,
total_batches_len: u64,
execute_batches_us: u64,
}
fn execute_batches_internal(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
tx_costs: &[u64],
) -> Result<()> {
) -> Result<ExecuteBatchesInternalMetrics> {
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
let (results, new_timings): (Vec<Result<()>>, Vec<ExecuteTimings>) =
PAR_THREAD_POOL.install(|| {
batches
.into_par_iter()
.enumerate()
.map(|(index, batch)| {
let mut timings = ExecuteTimings::default();
let result = execute_batch(
batch,
bank,
transaction_status_sender,
replay_vote_sender,
&mut timings,
cost_capacity_meter.clone(),
tx_costs[index],
);
if let Some(entry_callback) = entry_callback {
entry_callback(bank);
}
(result, timings)
})
.unzip()
});
timings.saturating_add_in_place(ExecuteTimingType::TotalBatchesLen, batches.len() as u64);
timings.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);
for timing in new_timings {
timings.accumulate(&timing);
}
let execution_timings_per_thread: Mutex<HashMap<usize, ThreadExecuteTimings>> =
Mutex::new(HashMap::new());
first_err(&results)
let mut execute_batches_elapsed = Measure::start("execute_batches_elapsed");
let results: Vec<Result<()>> = PAR_THREAD_POOL.install(|| {
batches
.into_par_iter()
.enumerate()
.map(|(index, transaction_batch_with_indexes)| {
let transaction_count = transaction_batch_with_indexes
.batch
.sanitized_transactions()
.len() as u64;
let mut timings = ExecuteTimings::default();
let (result, execute_batches_time): (Result<()>, Measure) = measure!(
{
let result = execute_batch(
transaction_batch_with_indexes,
bank,
transaction_status_sender,
replay_vote_sender,
&mut timings,
cost_capacity_meter.clone(),
tx_costs[index],
);
if let Some(entry_callback) = entry_callback {
entry_callback(bank);
}
result
},
"execute_batch",
);
let thread_index = PAR_THREAD_POOL.current_thread_index().unwrap();
execution_timings_per_thread
.lock()
.unwrap()
.entry(thread_index)
.and_modify(|thread_execution_time| {
let ThreadExecuteTimings {
total_thread_us,
total_transactions_executed,
execute_timings: total_thread_execute_timings,
} = thread_execution_time;
*total_thread_us += execute_batches_time.as_us();
*total_transactions_executed += transaction_count;
total_thread_execute_timings
.saturating_add_in_place(ExecuteTimingType::TotalBatchesLen, 1);
total_thread_execute_timings.accumulate(&timings);
})
.or_insert(ThreadExecuteTimings {
total_thread_us: execute_batches_time.as_us(),
total_transactions_executed: transaction_count,
execute_timings: timings,
});
result
})
.collect()
});
execute_batches_elapsed.stop();
first_err(&results)?;
Ok(ExecuteBatchesInternalMetrics {
execution_timings_per_thread: execution_timings_per_thread.into_inner().unwrap(),
total_batches_len: batches.len() as u64,
execute_batches_us: execute_batches_elapsed.as_us(),
})
}
fn rebatch_transactions<'a>(
@ -335,7 +379,7 @@ fn execute_batches(
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
timings: &mut ExecuteTimings,
confirmation_timing: &mut ConfirmationTiming,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
cost_model: &CostModel,
) -> Result<()> {
@ -414,16 +458,18 @@ fn execute_batches(
batches
};
execute_batches_internal(
let execute_batches_internal_metrics = execute_batches_internal(
bank,
rebatched_txs,
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter,
&tx_batch_costs,
)
)?;
confirmation_timing.process_execute_batches_internal_metrics(execute_batches_internal_metrics);
Ok(())
}
/// Process an ordered list of entries in parallel
@ -448,8 +494,8 @@ pub fn process_entries_for_tests(
}
};
let mut timings = ExecuteTimings::default();
let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap();
let mut confirmation_timing = ConfirmationTiming::default();
let mut replay_entries: Vec<_> =
entry::verify_transactions(entries, Arc::new(verify_transaction))?
.into_iter()
@ -464,6 +510,7 @@ pub fn process_entries_for_tests(
}
})
.collect();
let result = process_entries_with_callback(
bank,
&mut replay_entries,
@ -472,11 +519,11 @@ pub fn process_entries_for_tests(
transaction_status_sender,
replay_vote_sender,
None,
&mut timings,
&mut confirmation_timing,
Arc::new(RwLock::new(BlockCostCapacityMeter::default())),
);
debug!("process_entries: {:?}", timings);
debug!("process_entries: {:?}", confirmation_timing);
result
}
@ -490,7 +537,7 @@ fn process_entries_with_callback(
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
timings: &mut ExecuteTimings,
confirmation_timing: &mut ConfirmationTiming,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
) -> Result<()> {
// accumulator for entries that can be processed in parallel
@ -517,7 +564,7 @@ fn process_entries_with_callback(
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
confirmation_timing,
cost_capacity_meter.clone(),
&cost_model,
)?;
@ -587,7 +634,7 @@ fn process_entries_with_callback(
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
confirmation_timing,
cost_capacity_meter.clone(),
&cost_model,
)?;
@ -603,7 +650,7 @@ fn process_entries_with_callback(
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
confirmation_timing,
cost_capacity_meter,
&cost_model,
)?;
@ -765,8 +812,8 @@ pub fn process_blockstore_from_root(
}
let mut timing = ExecuteTimings::default();
// Iterate and replay slots from blockstore starting from `start_slot`
// Iterate and replay slots from blockstore starting from `start_slot`
if let Some(start_slot_meta) = blockstore
.meta(start_slot)
.unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot))
@ -926,14 +973,70 @@ fn confirm_full_slot(
}
}
#[derive(Debug)]
pub struct ConfirmationTiming {
pub started: Instant,
pub replay_elapsed: u64,
pub execute_batches_us: u64,
pub poh_verify_elapsed: u64,
pub transaction_verify_elapsed: u64,
pub fetch_elapsed: u64,
pub fetch_fail_elapsed: u64,
pub execute_timings: ExecuteTimings,
pub end_to_end_execute_timings: ThreadExecuteTimings,
}
impl ConfirmationTiming {
fn process_execute_batches_internal_metrics(
&mut self,
execute_batches_internal_metrics: ExecuteBatchesInternalMetrics,
) {
let ConfirmationTiming {
execute_timings: ref mut cumulative_execute_timings,
execute_batches_us: ref mut cumulative_execute_batches_us,
ref mut end_to_end_execute_timings,
..
} = self;
saturating_add_assign!(
*cumulative_execute_batches_us,
execute_batches_internal_metrics.execute_batches_us
);
cumulative_execute_timings.saturating_add_in_place(
ExecuteTimingType::TotalBatchesLen,
execute_batches_internal_metrics.total_batches_len,
);
cumulative_execute_timings.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);
let mut current_max_thread_execution_time: Option<ThreadExecuteTimings> = None;
for (_, thread_execution_time) in execute_batches_internal_metrics
.execution_timings_per_thread
.into_iter()
{
let ThreadExecuteTimings {
total_thread_us,
execute_timings,
..
} = &thread_execution_time;
cumulative_execute_timings.accumulate(execute_timings);
if *total_thread_us
> current_max_thread_execution_time
.as_ref()
.map(|thread_execution_time| thread_execution_time.total_thread_us)
.unwrap_or(0)
{
current_max_thread_execution_time = Some(thread_execution_time);
}
}
if let Some(current_max_thread_execution_time) = current_max_thread_execution_time {
end_to_end_execute_timings.accumulate(&current_max_thread_execution_time);
end_to_end_execute_timings
.execute_timings
.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);
};
}
}
impl Default for ConfirmationTiming {
@ -941,11 +1044,13 @@ impl Default for ConfirmationTiming {
Self {
started: Instant::now(),
replay_elapsed: 0,
execute_batches_us: 0,
poh_verify_elapsed: 0,
transaction_verify_elapsed: 0,
fetch_elapsed: 0,
fetch_fail_elapsed: 0,
execute_timings: ExecuteTimings::default(),
end_to_end_execute_timings: ThreadExecuteTimings::default(),
}
}
}
@ -1104,7 +1209,6 @@ fn confirm_slot_entries(
assert!(entries.is_some());
let mut replay_elapsed = Measure::start("replay_elapsed");
let mut execute_timings = ExecuteTimings::default();
let cost_capacity_meter = Arc::new(RwLock::new(BlockCostCapacityMeter::default()));
let mut replay_entries: Vec<_> = entries
.unwrap()
@ -1124,15 +1228,13 @@ fn confirm_slot_entries(
transaction_status_sender,
replay_vote_sender,
transaction_cost_metrics_sender,
&mut execute_timings,
timing,
cost_capacity_meter,
)
.map_err(BlockstoreProcessorError::from);
replay_elapsed.stop();
timing.replay_elapsed += replay_elapsed.as_us();
timing.execute_timings.accumulate(&execute_timings);
// If running signature verification on the GPU, wait for that
// computation to finish, and get the result of it. If we did the
// signature verification on the CPU, this just returns the

View File

@ -12,6 +12,7 @@ edition = "2021"
[dependencies]
base64 = "0.13"
bincode = "1.3.3"
eager = "0.1.0"
itertools = "0.10.1"
libc = "0.2.101"
libloading = "0.7.0"
@ -22,6 +23,7 @@ serde = { version = "1.0.129", features = ["derive", "rc"] }
solana-frozen-abi = { path = "../frozen-abi", version = "=1.11.2" }
solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.11.2" }
solana-measure = { path = "../measure", version = "=1.11.2" }
solana-metrics = { path = "../metrics", version = "=1.11.2" }
solana-sdk = { path = "../sdk", version = "=1.11.2" }
thiserror = "1.0"
enum-iterator = "0.8.1"

View File

@ -1,6 +1,13 @@
#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))]
#![deny(clippy::integer_arithmetic)]
#![deny(clippy::indexing_slicing)]
#![recursion_limit = "2048"]
#[macro_use]
extern crate eager;
#[macro_use]
extern crate solana_metrics;
pub mod accounts_data_meter;
pub mod compute_budget;

View File

@ -1,7 +1,7 @@
use {
core::fmt,
enum_iterator::IntoEnumIterator,
solana_sdk::{pubkey::Pubkey, saturating_add_assign},
solana_sdk::{clock::Slot, pubkey::Pubkey, saturating_add_assign},
std::{
collections::HashMap,
ops::{Index, IndexMut},
@ -50,6 +50,7 @@ pub enum ExecuteTimingType {
NumExecuteBatches,
CollectLogsUs,
TotalBatchesLen,
UpdateTransactionStatuses,
}
pub struct Metrics([u64; ExecuteTimingType::ITEM_COUNT]);
@ -79,6 +80,252 @@ impl core::fmt::Debug for Metrics {
}
}
// The auxiliary variable that must always be provided to eager_macro_rules! must use the
// identifier `eager_1`. Macros declared with `eager_macro_rules!` can then be used inside
// an eager! block.
eager_macro_rules! { $eager_1
#[macro_export]
macro_rules! report_execute_timings {
($self: expr) => {
(
"validate_transactions_us",
*$self
.metrics
.index(ExecuteTimingType::CheckUs),
i64
),
(
"load_us",
*$self
.metrics
.index(ExecuteTimingType::LoadUs),
i64
),
(
"execute_us",
*$self
.metrics
.index(ExecuteTimingType::ExecuteUs),
i64
),
(
"collect_logs_us",
*$self
.metrics
.index(ExecuteTimingType::CollectLogsUs),
i64
),
(
"store_us",
*$self
.metrics
.index(ExecuteTimingType::StoreUs),
i64
),
(
"update_stakes_cache_us",
*$self
.metrics
.index(ExecuteTimingType::UpdateStakesCacheUs),
i64
),
(
"total_batches_len",
*$self
.metrics
.index(ExecuteTimingType::TotalBatchesLen),
i64
),
(
"num_execute_batches",
*$self
.metrics
.index(ExecuteTimingType::NumExecuteBatches),
i64
),
(
"update_transaction_statuses",
*$self
.metrics
.index(ExecuteTimingType::UpdateTransactionStatuses),
i64
),
(
"execute_details_serialize_us",
$self.details.serialize_us,
i64
),
(
"execute_details_create_vm_us",
$self.details.create_vm_us,
i64
),
(
"execute_details_execute_inner_us",
$self.details.execute_us,
i64
),
(
"execute_details_deserialize_us",
$self.details.deserialize_us,
i64
),
(
"execute_details_get_or_create_executor_us",
$self.details.get_or_create_executor_us,
i64
),
(
"execute_details_changed_account_count",
$self.details.changed_account_count,
i64
),
(
"execute_details_total_account_count",
$self.details.total_account_count,
i64
),
(
"execute_details_total_data_size",
$self.details.total_data_size,
i64
),
(
"execute_details_data_size_changed",
$self.details.data_size_changed,
i64
),
(
"execute_details_create_executor_register_syscalls_us",
$self
.details
.create_executor_register_syscalls_us,
i64
),
(
"execute_details_create_executor_load_elf_us",
$self.details.create_executor_load_elf_us,
i64
),
(
"execute_details_create_executor_verify_code_us",
$self.details.create_executor_verify_code_us,
i64
),
(
"execute_details_create_executor_jit_compile_us",
$self.details.create_executor_jit_compile_us,
i64
),
(
"execute_accessories_feature_set_clone_us",
$self
.execute_accessories
.feature_set_clone_us,
i64
),
(
"execute_accessories_compute_budget_process_transaction_us",
$self
.execute_accessories
.compute_budget_process_transaction_us,
i64
),
(
"execute_accessories_get_executors_us",
$self.execute_accessories.get_executors_us,
i64
),
(
"execute_accessories_process_message_us",
$self.execute_accessories.process_message_us,
i64
),
(
"execute_accessories_update_executors_us",
$self.execute_accessories.update_executors_us,
i64
),
(
"execute_accessories_process_instructions_total_us",
$self
.execute_accessories
.process_instructions
.total_us,
i64
),
(
"execute_accessories_process_instructions_verify_caller_us",
$self
.execute_accessories
.process_instructions
.verify_caller_us,
i64
),
(
"execute_accessories_process_instructions_process_executable_chain_us",
$self
.execute_accessories
.process_instructions
.process_executable_chain_us,
i64
),
(
"execute_accessories_process_instructions_verify_callee_us",
$self
.execute_accessories
.process_instructions
.verify_callee_us,
i64
),
}
}
}
#[derive(Debug, Default)]
pub struct ThreadExecuteTimings {
pub total_thread_us: u64,
pub total_transactions_executed: u64,
pub execute_timings: ExecuteTimings,
}
impl ThreadExecuteTimings {
pub fn report_stats(&self, slot: Slot) {
lazy! {
datapoint_info!(
"replay-slot-end-to-end-stats",
("slot", slot as i64, i64),
("total_thread_us", self.total_thread_us as i64, i64),
("total_transactions_executed", self.total_transactions_executed as i64, i64),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.execute_timings)}
);
};
}
pub fn accumulate(&mut self, other: &ThreadExecuteTimings) {
self.execute_timings.saturating_add_in_place(
ExecuteTimingType::TotalBatchesLen,
*other
.execute_timings
.metrics
.index(ExecuteTimingType::TotalBatchesLen),
);
self.execute_timings.accumulate(&other.execute_timings);
saturating_add_assign!(self.total_thread_us, other.total_thread_us);
saturating_add_assign!(
self.total_transactions_executed,
other.total_transactions_executed
);
}
}
#[derive(Debug, Default)]
pub struct ExecuteTimings {
pub metrics: Metrics,

View File

@ -1023,6 +1023,12 @@ dependencies = [
"syn 0.15.44",
]
[[package]]
name = "eager"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abe71d579d1812060163dff96056261deb5bf6729b100fa2e36a68b9649ba3d3"
[[package]]
name = "ed25519"
version = "1.0.1"
@ -4561,6 +4567,7 @@ dependencies = [
"chrono",
"crossbeam-channel",
"dashmap",
"eager",
"etcd-client",
"fs_extra",
"histogram",
@ -5063,6 +5070,7 @@ version = "1.11.2"
dependencies = [
"base64 0.13.0",
"bincode",
"eager",
"enum-iterator",
"itertools",
"libc",
@ -5075,6 +5083,7 @@ dependencies = [
"solana-frozen-abi 1.11.2",
"solana-frozen-abi-macro 1.11.2",
"solana-measure",
"solana-metrics",
"solana-sdk 1.11.2",
"thiserror",
]

View File

@ -4575,6 +4575,7 @@ impl Bank {
let transaction_log_collector_config =
self.transaction_log_collector_config.read().unwrap();
let mut collect_logs_time = Measure::start("collect_logs_time");
for (execution_result, tx) in execution_results.iter().zip(sanitized_txs) {
if let Some(debug_keys) = &self.transaction_debug_keys {
for key in tx.message().account_keys().iter() {
@ -4664,6 +4665,10 @@ impl Bank {
}
}
}
collect_logs_time.stop();
timings
.saturating_add_in_place(ExecuteTimingType::CollectLogsUs, collect_logs_time.as_us());
if *err_count > 0 {
debug!(
"{} errors of {} txs",
@ -5003,9 +5008,15 @@ impl Bank {
update_stakes_cache_time.as_us(),
);
let mut update_transaction_statuses_time = Measure::start("update_transaction_statuses");
self.update_transaction_statuses(sanitized_txs, &execution_results);
let fee_collection_results =
self.filter_program_errors_and_collect_fee(sanitized_txs, &execution_results);
update_transaction_statuses_time.stop();
timings.saturating_add_in_place(
ExecuteTimingType::UpdateTransactionStatuses,
update_transaction_statuses_time.as_us(),
);
TransactionResults {
fee_collection_results,