ledger: Extract `BatchExecutionTiming` (#30806)

Extracted time metrics related to transaction execution into a separate
structure.  This allows me to call `process_entries_with_callback()`
without locking the whole instance of `ConfirmationTiming`, passing just
the `BatchExecutionTiming` part.

I want to add a new metric that starts at the beginning of the
`confirm_slot_entries()` call and ends until the very end.  In order to
use a `scopeguard::defer`, I need to be able to have an excursive
reference to it for the whole body of `confirm_slot_entries()`.

Plus a few minor renamings to clarify which verifications and results
variables actually store.  And corrected a few messages, that
incorrectly stated PoH verification, while they were actually issued
for transaction verification failures.
This commit is contained in:
Illia Bobyr 2023-03-28 15:37:34 -07:00 committed by GitHub
parent b72be0f086
commit 564f8c9b17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 109 deletions

View File

@ -65,7 +65,7 @@ impl ReplaySlotStats {
i64
),
("replay_time", self.replay_elapsed as i64, i64),
("execute_batches_us", self.execute_batches_us as i64, i64),
("execute_batches_us", self.batch_execute.wall_clock_us as i64, i64),
(
"replay_total_elapsed",
self.started.elapsed().as_micros() as i64,
@ -77,14 +77,15 @@ impl ReplaySlotStats {
("total_shreds", num_shreds as i64, i64),
// Everything inside the `eager!` block will be eagerly expanded before
// evaluation of the rest of the surrounding macro.
eager!{report_execute_timings!(self.execute_timings)}
eager!{report_execute_timings!(self.batch_execute.totals)}
);
};
self.end_to_end_execute_timings.report_stats(slot);
self.batch_execute.slowest_thread.report_stats(slot);
let mut per_pubkey_timings: Vec<_> = self
.execute_timings
.batch_execute
.totals
.details
.per_program_timings
.iter()

View File

@ -2585,10 +2585,12 @@ impl ReplayStage {
let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
let r_replay_progress = replay_progress.read().unwrap();
debug!("bank {} is completed replay from blockstore, contribute to update cost with {:?}",
debug!(
"bank {} has completed replay from blockstore, \
contribute to update cost with {:?}",
bank.slot(),
r_replay_stats.execute_timings
);
r_replay_stats.batch_execute.totals
);
did_complete_bank = true;
let _ = cluster_slots_update_sender.send(vec![bank_slot]);
if let Some(transaction_status_sender) = transaction_status_sender {
@ -2682,7 +2684,7 @@ impl ReplayStage {
r_replay_progress.num_shreds,
bank_complete_time.as_us(),
);
execute_timings.accumulate(&r_replay_stats.execute_timings);
execute_timings.accumulate(&r_replay_stats.batch_execute.totals);
} else {
trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}",

View File

@ -311,7 +311,7 @@ fn execute_batches(
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
confirmation_timing: &mut ConfirmationTiming,
timing: &mut BatchExecutionTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
@ -398,7 +398,7 @@ fn execute_batches(
prioritization_fee_cache,
)?;
confirmation_timing.process_execute_batches_internal_metrics(execute_batches_internal_metrics);
timing.accumulate(execute_batches_internal_metrics);
Ok(())
}
@ -425,7 +425,7 @@ pub fn process_entries_for_tests(
};
let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap();
let mut confirmation_timing = ConfirmationTiming::default();
let mut batch_timing = BatchExecutionTiming::default();
let mut replay_entries: Vec<_> =
entry::verify_transactions(entries, Arc::new(verify_transaction))?
.into_iter()
@ -448,12 +448,12 @@ pub fn process_entries_for_tests(
randomize,
transaction_status_sender,
replay_vote_sender,
&mut confirmation_timing,
&mut batch_timing,
None,
&_ignored_prioritization_fee_cache,
);
debug!("process_entries: {:?}", confirmation_timing);
debug!("process_entries: {:?}", batch_timing);
result
}
@ -464,7 +464,7 @@ fn process_entries(
randomize: bool,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
confirmation_timing: &mut ConfirmationTiming,
batch_timing: &mut BatchExecutionTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
@ -490,7 +490,7 @@ fn process_entries(
&batches,
transaction_status_sender,
replay_vote_sender,
confirmation_timing,
batch_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
@ -553,7 +553,7 @@ fn process_entries(
&batches,
transaction_status_sender,
replay_vote_sender,
confirmation_timing,
batch_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
@ -568,7 +568,7 @@ fn process_entries(
&batches,
transaction_status_sender,
replay_vote_sender,
confirmation_timing,
batch_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
@ -915,7 +915,7 @@ fn confirm_full_slot(
&_ignored_prioritization_fee_cache,
)?;
timing.accumulate(&confirmation_timing.execute_timings);
timing.accumulate(&confirmation_timing.batch_execute.totals);
if !bank.is_complete() {
Err(BlockstoreProcessorError::InvalidBlock(
@ -939,10 +939,6 @@ pub struct ConfirmationTiming {
/// In microseconds.
pub replay_elapsed: u64,
/// Wall clock time used by the transaction execution part of pipeline. `replay_elapsed`
/// includes this time. In microseconds.
pub execute_batches_us: u64,
/// Wall clock times, used for the PoH verification of entries. In microseconds.
pub poh_verify_elapsed: u64,
@ -958,65 +954,8 @@ pub struct ConfirmationTiming {
/// microseconds.
pub fetch_fail_elapsed: u64,
/// Time used in transaction execution. Across multiple threads, running `execute_batch()`.
pub execute_timings: ExecuteTimings,
/// Time used to execute transactions, via `execute_batch()`, in the thread that consumed the
/// most time.
pub end_to_end_execute_timings: ThreadExecuteTimings,
}
impl ConfirmationTiming {
fn process_execute_batches_internal_metrics(
&mut self,
execute_batches_internal_metrics: ExecuteBatchesInternalMetrics,
) {
let ConfirmationTiming {
execute_timings: cumulative_execute_timings,
execute_batches_us: cumulative_execute_batches_us,
end_to_end_execute_timings,
..
} = self;
saturating_add_assign!(
*cumulative_execute_batches_us,
execute_batches_internal_metrics.execute_batches_us
);
cumulative_execute_timings.saturating_add_in_place(
ExecuteTimingType::TotalBatchesLen,
execute_batches_internal_metrics.total_batches_len,
);
cumulative_execute_timings.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);
let mut current_max_thread_execution_time: Option<ThreadExecuteTimings> = None;
for (_, thread_execution_time) in execute_batches_internal_metrics
.execution_timings_per_thread
.into_iter()
{
let ThreadExecuteTimings {
total_thread_us,
execute_timings,
..
} = &thread_execution_time;
cumulative_execute_timings.accumulate(execute_timings);
if *total_thread_us
> current_max_thread_execution_time
.as_ref()
.map(|thread_execution_time| thread_execution_time.total_thread_us)
.unwrap_or(0)
{
current_max_thread_execution_time = Some(thread_execution_time);
}
}
if let Some(current_max_thread_execution_time) = current_max_thread_execution_time {
end_to_end_execute_timings.accumulate(&current_max_thread_execution_time);
end_to_end_execute_timings
.execute_timings
.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1);
};
}
/// `batch_execute()` measurements.
pub batch_execute: BatchExecutionTiming,
}
impl Default for ConfirmationTiming {
@ -1024,17 +963,63 @@ impl Default for ConfirmationTiming {
Self {
started: Instant::now(),
replay_elapsed: 0,
execute_batches_us: 0,
poh_verify_elapsed: 0,
transaction_verify_elapsed: 0,
fetch_elapsed: 0,
fetch_fail_elapsed: 0,
execute_timings: ExecuteTimings::default(),
end_to_end_execute_timings: ThreadExecuteTimings::default(),
batch_execute: BatchExecutionTiming::default(),
}
}
}
/// Measures times related to transaction execution in a slot.
#[derive(Debug, Default)]
pub struct BatchExecutionTiming {
/// Time used by transaction execution. Accumulated across multiple threads that are running
/// `execute_batch()`.
pub totals: ExecuteTimings,
/// Wall clock time used by the transaction execution part of pipeline.
/// [`ConfirmationTiming::replay_elapsed`] includes this time. In microseconds.
pub wall_clock_us: u64,
/// Time used to execute transactions, via `execute_batch()`, in the thread that consumed the
/// most time.
pub slowest_thread: ThreadExecuteTimings,
}
impl BatchExecutionTiming {
fn accumulate(&mut self, new_batch: ExecuteBatchesInternalMetrics) {
let Self {
totals,
wall_clock_us,
slowest_thread,
} = self;
saturating_add_assign!(*wall_clock_us, new_batch.execute_batches_us);
use ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen};
totals.saturating_add_in_place(TotalBatchesLen, new_batch.total_batches_len);
totals.saturating_add_in_place(NumExecuteBatches, 1);
for thread_times in new_batch.execution_timings_per_thread.values() {
totals.accumulate(&thread_times.execute_timings);
}
let slowest = new_batch
.execution_timings_per_thread
.values()
.max_by_key(|thread_times| thread_times.total_thread_us);
if let Some(slowest) = slowest {
slowest_thread.accumulate(slowest);
slowest_thread
.execute_timings
.saturating_add_in_place(NumExecuteBatches, 1);
};
}
}
#[derive(Default)]
pub struct ConfirmationProgress {
pub last_entry: Hash,
@ -1110,6 +1095,14 @@ fn confirm_slot_entries(
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> result::Result<(), BlockstoreProcessorError> {
let ConfirmationTiming {
replay_elapsed,
poh_verify_elapsed,
transaction_verify_elapsed,
batch_execute: batch_execute_timing,
..
} = timing;
let slot = bank.slot();
let (entries, num_shreds, slot_full) = slot_entries_load_result;
let num_entries = entries.len();
@ -1175,29 +1168,33 @@ fn confirm_slot_entries(
}
};
let check_start = Instant::now();
let check_result = entry::start_verify_transactions(
let transaction_verification_start = Instant::now();
let transaction_verification_result = entry::start_verify_transactions(
entries,
skip_verification,
recyclers.clone(),
Arc::new(verify_transaction),
);
let transaction_cpu_duration_us = timing::duration_as_us(&check_start.elapsed());
let transaction_cpu_duration_us =
timing::duration_as_us(&transaction_verification_start.elapsed());
let mut check_result = match check_result {
Ok(check_result) => check_result,
let mut transaction_verification_result = match transaction_verification_result {
Ok(transaction_verification_result) => transaction_verification_result,
Err(err) => {
warn!("Ledger proof of history failed at slot: {}", bank.slot());
warn!(
"Ledger transaction signature verification failed at slot: {}",
bank.slot()
);
return Err(err.into());
}
};
let entries = check_result.entries();
assert!(entries.is_some());
let entries = transaction_verification_result
.entries()
.expect("Transaction verification generates entries");
let mut replay_elapsed = Measure::start("replay_elapsed");
let mut replay_timer = Measure::start("replay_elapsed");
let mut replay_entries: Vec<_> = entries
.unwrap()
.into_iter()
.zip(entry_starting_indexes)
.map(|(entry, starting_index)| ReplayEntry {
@ -1212,30 +1209,38 @@ fn confirm_slot_entries(
true, // shuffle transactions.
transaction_status_sender,
replay_vote_sender,
timing,
batch_execute_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)
.map_err(BlockstoreProcessorError::from);
replay_elapsed.stop();
timing.replay_elapsed += replay_elapsed.as_us();
replay_timer.stop();
*replay_elapsed += replay_timer.as_us();
// If running signature verification on the GPU, wait for that computation to finish, and get
// the result of it. If we did the signature verification on the CPU, this just returns the
// already-computed result produced in start_verify_transactions. Either way, check the result
// of the signature verification.
if !check_result.finish_verify() {
warn!("Ledger proof of history failed at slot: {}", bank.slot());
return Err(TransactionError::SignatureFailure.into());
{
// If running signature verification on the GPU, wait for that computation to finish, and
// get the result of it. If we did the signature verification on the CPU, this just returns
// the already-computed result produced in start_verify_transactions. Either way, check the
// result of the signature verification.
let valid = transaction_verification_result.finish_verify();
// The GPU Entry verification (if any) is kicked off right when the CPU-side Entry
// verification finishes, so these times should be disjoint
*transaction_verify_elapsed +=
transaction_cpu_duration_us + transaction_verification_result.gpu_verify_duration();
if !valid {
warn!(
"Ledger transaction signature verification failed at slot: {}",
bank.slot()
);
return Err(TransactionError::SignatureFailure.into());
}
}
if let Some(mut verifier) = verifier {
let verified = verifier.finish_verify();
timing.poh_verify_elapsed += verifier.poh_duration_us();
// The GPU Entry verification (if any) is kicked off right when the CPU-side Entry
// verification finishes, so these times should be disjoint
timing.transaction_verify_elapsed +=
transaction_cpu_duration_us + check_result.gpu_verify_duration();
*poh_verify_elapsed += verifier.poh_duration_us();
if !verified {
warn!("Ledger proof of history failed at slot: {}", bank.slot());
return Err(BlockError::InvalidEntryHash.into());