Cleanup blockstore_processor::load_frozen_forks() (#31475)

The function seemed to be in need of some cleanup. Changes include:
- Remove redundant argument
- Shift related variable definitions to be adjacent
- Shift several variables into inner scope
- Rename several variables
- Insert newlines into log statement that was very long in addition to
  moving some of the logged values to debug
This commit is contained in:
steviez 2023-05-04 23:36:02 -05:00 committed by GitHub
parent cc24d2207c
commit c77fa29747
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 39 deletions

View File

@ -779,17 +779,12 @@ pub fn process_blockstore_from_root(
} }
let mut timing = ExecuteTimings::default(); let mut timing = ExecuteTimings::default();
let (num_slots_processed, num_new_roots_found) = if let Some(start_slot_meta) = blockstore
// Iterate and replay slots from blockstore starting from `start_slot`
let mut num_slots_processed = 0;
let mut num_new_roots_found = 0;
if let Some(start_slot_meta) = blockstore
.meta(start_slot) .meta(start_slot)
.unwrap_or_else(|_| panic!("Failed to get meta for slot {start_slot}")) .unwrap_or_else(|_| panic!("Failed to get meta for slot {start_slot}"))
{ {
(num_slots_processed, num_new_roots_found) = load_frozen_forks( load_frozen_forks(
bank_forks, bank_forks,
start_slot,
&start_slot_meta, &start_slot_meta,
blockstore, blockstore,
leader_schedule_cache, leader_schedule_cache,
@ -798,7 +793,7 @@ pub fn process_blockstore_from_root(
cache_block_meta_sender, cache_block_meta_sender,
&mut timing, &mut timing,
accounts_background_request_sender, accounts_background_request_sender,
)?; )?
} else { } else {
// If there's no meta in the blockstore for the input `start_slot`, // If there's no meta in the blockstore for the input `start_slot`,
// then we started from a snapshot and are unable to process anything. // then we started from a snapshot and are unable to process anything.
@ -809,6 +804,7 @@ pub fn process_blockstore_from_root(
"Starting slot {} is not in Blockstore, unable to process", "Starting slot {} is not in Blockstore, unable to process",
start_slot start_slot
); );
(0, 0)
}; };
let processing_time = now.elapsed(); let processing_time = now.elapsed();
@ -1369,7 +1365,6 @@ fn process_next_slots(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn load_frozen_forks( fn load_frozen_forks(
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
start_slot: Slot,
start_slot_meta: &SlotMeta, start_slot_meta: &SlotMeta,
blockstore: &Blockstore, blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache, leader_schedule_cache: &LeaderScheduleCache,
@ -1379,28 +1374,26 @@ fn load_frozen_forks(
timing: &mut ExecuteTimings, timing: &mut ExecuteTimings,
accounts_background_request_sender: &AbsRequestSender, accounts_background_request_sender: &AbsRequestSender,
) -> result::Result<(u64, usize), BlockstoreProcessorError> { ) -> result::Result<(u64, usize), BlockstoreProcessorError> {
let recyclers = VerifyRecyclers::default();
let mut all_banks = HashMap::new();
let mut last_status_report = Instant::now();
let mut pending_slots = vec![];
// The total number of slots processed
let mut total_slots_elapsed = 0;
// The total number of newly identified root slots
let mut total_rooted_slots = 0;
// The number of slots processed between status report updates
let mut slots_elapsed = 0;
let mut txs = 0;
let blockstore_max_root = blockstore.max_root(); let blockstore_max_root = blockstore.max_root();
let mut root = bank_forks.read().unwrap().root(); let mut root = bank_forks.read().unwrap().root();
let max_root = std::cmp::max(root, blockstore_max_root); let max_root = std::cmp::max(root, blockstore_max_root);
info!( info!(
"load_frozen_forks() latest root from blockstore: {}, max_root: {}", "load_frozen_forks() latest root from blockstore: {}, max_root: {}",
blockstore_max_root, max_root, blockstore_max_root, max_root,
); );
// The total number of slots processed
let mut total_slots_processed = 0;
// The total number of newly identified root slots
let mut total_rooted_slots = 0;
let mut pending_slots = vec![];
process_next_slots( process_next_slots(
&bank_forks.read().unwrap().get(start_slot).unwrap(), &bank_forks
.read()
.unwrap()
.get(start_slot_meta.slot)
.unwrap(),
start_slot_meta, start_slot_meta,
blockstore, blockstore,
leader_schedule_cache, leader_schedule_cache,
@ -1410,26 +1403,43 @@ fn load_frozen_forks(
let on_halt_store_hash_raw_data_for_debug = opts.on_halt_store_hash_raw_data_for_debug; let on_halt_store_hash_raw_data_for_debug = opts.on_halt_store_hash_raw_data_for_debug;
if Some(bank_forks.read().unwrap().root()) != opts.halt_at_slot { if Some(bank_forks.read().unwrap().root()) != opts.halt_at_slot {
let recyclers = VerifyRecyclers::default();
let mut all_banks = HashMap::new();
const STATUS_REPORT_INTERVAL: Duration = Duration::from_secs(2);
let mut last_status_report = Instant::now();
let mut slots_processed = 0;
let mut txs = 0;
let mut set_root_us = 0; let mut set_root_us = 0;
let mut root_retain_us = 0; let mut root_retain_us = 0;
let mut process_single_slot_us = 0; let mut process_single_slot_us = 0;
let mut voting_us = 0; let mut voting_us = 0;
while !pending_slots.is_empty() { while !pending_slots.is_empty() {
timing.details.per_program_timings.clear(); timing.details.per_program_timings.clear();
let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap(); let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap();
let slot = bank.slot(); let slot = bank.slot();
if last_status_report.elapsed() > Duration::from_secs(2) { if last_status_report.elapsed() > STATUS_REPORT_INTERVAL {
let secs = last_status_report.elapsed().as_secs() as f32; let secs = last_status_report.elapsed().as_secs() as f32;
last_status_report = Instant::now(); let slots_per_sec = slots_processed as f32 / secs;
let txs_per_sec = txs as f32 / secs;
info!( info!(
"processing ledger: slot={}, last root slot={} slots={} slots/s={:?} txs/s={}, set_root_us={set_root_us}, root_retain_us={root_retain_us}, process_single_slot_us:{process_single_slot_us}, voting_us: {voting_us}", "processing ledger: slot={slot}, \
slot, root_slot={root} \
root, slots={slots_processed}, \
slots_elapsed, slots/s={slots_per_sec}, \
slots_elapsed as f32 / secs, txs/s={txs_per_sec}"
txs as f32 / secs,
); );
slots_elapsed = 0; debug!(
"processing ledger timing: \
set_root_us={set_root_us}, \
root_retain_us={root_retain_us}, \
process_single_slot_us:{process_single_slot_us}, \
voting_us: {voting_us}"
);
last_status_report = Instant::now();
slots_processed = 0;
txs = 0; txs = 0;
set_root_us = 0; set_root_us = 0;
root_retain_us = 0; root_retain_us = 0;
@ -1438,7 +1448,6 @@ fn load_frozen_forks(
} }
let mut progress = ConfirmationProgress::new(last_entry_hash); let mut progress = ConfirmationProgress::new(last_entry_hash);
let mut m = Measure::start("process_single_slot"); let mut m = Measure::start("process_single_slot");
let bank = bank_forks.write().unwrap().insert_from_ledger(bank); let bank = bank_forks.write().unwrap().insert_from_ledger(bank);
if process_single_slot( if process_single_slot(
@ -1459,15 +1468,14 @@ fn load_frozen_forks(
} }
txs += progress.num_txs; txs += progress.num_txs;
// Block must be frozen by this point, otherwise `process_single_slot` would // Block must be frozen by this point; otherwise,
// have errored above // process_single_slot() would have errored above.
assert!(bank.is_frozen()); assert!(bank.is_frozen());
all_banks.insert(bank.slot(), bank.clone()); all_banks.insert(bank.slot(), bank.clone());
m.stop(); m.stop();
process_single_slot_us += m.as_us(); process_single_slot_us += m.as_us();
let mut m = Measure::start("voting"); let mut m = Measure::start("voting");
// If we've reached the last known root in blockstore, start looking // If we've reached the last known root in blockstore, start looking
// for newer cluster confirmed roots // for newer cluster confirmed roots
let new_root_bank = { let new_root_bank = {
@ -1520,7 +1528,6 @@ fn load_frozen_forks(
None None
} }
}; };
m.stop(); m.stop();
voting_us += m.as_us(); voting_us += m.as_us();
@ -1546,8 +1553,8 @@ fn load_frozen_forks(
root_retain_us += m.as_us(); root_retain_us += m.as_us();
} }
slots_elapsed += 1; slots_processed += 1;
total_slots_elapsed += 1; total_slots_processed += 1;
trace!( trace!(
"Bank for {}slot {} is complete", "Bank for {}slot {} is complete",
@ -1583,7 +1590,7 @@ fn load_frozen_forks(
.run_final_hash_calc(on_halt_store_hash_raw_data_for_debug); .run_final_hash_calc(on_halt_store_hash_raw_data_for_debug);
} }
Ok((total_slots_elapsed, total_rooted_slots)) Ok((total_slots_processed, total_rooted_slots))
} }
// `roots` is sorted largest to smallest by root slot // `roots` is sorted largest to smallest by root slot