diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 083e4e5b33..247d8a1a40 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -779,17 +779,12 @@ pub fn process_blockstore_from_root( } let mut timing = ExecuteTimings::default(); - - // Iterate and replay slots from blockstore starting from `start_slot` - let mut num_slots_processed = 0; - let mut num_new_roots_found = 0; - if let Some(start_slot_meta) = blockstore + let (num_slots_processed, num_new_roots_found) = if let Some(start_slot_meta) = blockstore .meta(start_slot) .unwrap_or_else(|_| panic!("Failed to get meta for slot {start_slot}")) { - (num_slots_processed, num_new_roots_found) = load_frozen_forks( + load_frozen_forks( bank_forks, - start_slot, &start_slot_meta, blockstore, leader_schedule_cache, @@ -798,7 +793,7 @@ pub fn process_blockstore_from_root( cache_block_meta_sender, &mut timing, accounts_background_request_sender, - )?; + )? } else { // If there's no meta in the blockstore for the input `start_slot`, // then we started from a snapshot and are unable to process anything. @@ -809,6 +804,7 @@ pub fn process_blockstore_from_root( "Starting slot {} is not in Blockstore, unable to process", start_slot ); + (0, 0) }; let processing_time = now.elapsed(); @@ -1369,7 +1365,6 @@ fn process_next_slots( #[allow(clippy::too_many_arguments)] fn load_frozen_forks( bank_forks: &RwLock, - start_slot: Slot, start_slot_meta: &SlotMeta, blockstore: &Blockstore, leader_schedule_cache: &LeaderScheduleCache, @@ -1379,28 +1374,26 @@ fn load_frozen_forks( timing: &mut ExecuteTimings, accounts_background_request_sender: &AbsRequestSender, ) -> result::Result<(u64, usize), BlockstoreProcessorError> { - let recyclers = VerifyRecyclers::default(); - let mut all_banks = HashMap::new(); - let mut last_status_report = Instant::now(); - let mut pending_slots = vec![]; - // The total number of slots processed - let mut total_slots_elapsed = 0; - // The total number of newly identified root slots - let mut total_rooted_slots = 0; - // The number of slots processed between status report updates - let mut slots_elapsed = 0; - let mut txs = 0; let blockstore_max_root = blockstore.max_root(); let mut root = bank_forks.read().unwrap().root(); let max_root = std::cmp::max(root, blockstore_max_root); - info!( "load_frozen_forks() latest root from blockstore: {}, max_root: {}", blockstore_max_root, max_root, ); + // The total number of slots processed + let mut total_slots_processed = 0; + // The total number of newly identified root slots + let mut total_rooted_slots = 0; + + let mut pending_slots = vec![]; process_next_slots( - &bank_forks.read().unwrap().get(start_slot).unwrap(), + &bank_forks + .read() + .unwrap() + .get(start_slot_meta.slot) + .unwrap(), start_slot_meta, blockstore, leader_schedule_cache, @@ -1410,26 +1403,43 @@ fn load_frozen_forks( let on_halt_store_hash_raw_data_for_debug = opts.on_halt_store_hash_raw_data_for_debug; if Some(bank_forks.read().unwrap().root()) != opts.halt_at_slot { + let recyclers = VerifyRecyclers::default(); + let mut all_banks = HashMap::new(); + + const STATUS_REPORT_INTERVAL: Duration = Duration::from_secs(2); + let mut last_status_report = Instant::now(); + let mut slots_processed = 0; + let mut txs = 0; let mut set_root_us = 0; let mut root_retain_us = 0; let mut process_single_slot_us = 0; let mut voting_us = 0; + while !pending_slots.is_empty() { timing.details.per_program_timings.clear(); let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap(); let slot = bank.slot(); - if last_status_report.elapsed() > Duration::from_secs(2) { + if last_status_report.elapsed() > STATUS_REPORT_INTERVAL { let secs = last_status_report.elapsed().as_secs() as f32; - last_status_report = Instant::now(); + let slots_per_sec = slots_processed as f32 / secs; + let txs_per_sec = txs as f32 / secs; info!( - "processing ledger: slot={}, last root slot={} slots={} slots/s={:?} txs/s={}, set_root_us={set_root_us}, root_retain_us={root_retain_us}, process_single_slot_us:{process_single_slot_us}, voting_us: {voting_us}", - slot, - root, - slots_elapsed, - slots_elapsed as f32 / secs, - txs as f32 / secs, + "processing ledger: slot={slot}, \ + root_slot={root} \ + slots={slots_processed}, \ + slots/s={slots_per_sec}, \ + txs/s={txs_per_sec}" ); - slots_elapsed = 0; + debug!( + "processing ledger timing: \ + set_root_us={set_root_us}, \ + root_retain_us={root_retain_us}, \ + process_single_slot_us:{process_single_slot_us}, \ + voting_us: {voting_us}" + ); + + last_status_report = Instant::now(); + slots_processed = 0; txs = 0; set_root_us = 0; root_retain_us = 0; @@ -1438,7 +1448,6 @@ fn load_frozen_forks( } let mut progress = ConfirmationProgress::new(last_entry_hash); - let mut m = Measure::start("process_single_slot"); let bank = bank_forks.write().unwrap().insert_from_ledger(bank); if process_single_slot( @@ -1459,15 +1468,14 @@ fn load_frozen_forks( } txs += progress.num_txs; - // Block must be frozen by this point, otherwise `process_single_slot` would - // have errored above + // Block must be frozen by this point; otherwise, + // process_single_slot() would have errored above. assert!(bank.is_frozen()); all_banks.insert(bank.slot(), bank.clone()); m.stop(); process_single_slot_us += m.as_us(); let mut m = Measure::start("voting"); - // If we've reached the last known root in blockstore, start looking // for newer cluster confirmed roots let new_root_bank = { @@ -1520,7 +1528,6 @@ fn load_frozen_forks( None } }; - m.stop(); voting_us += m.as_us(); @@ -1546,8 +1553,8 @@ fn load_frozen_forks( root_retain_us += m.as_us(); } - slots_elapsed += 1; - total_slots_elapsed += 1; + slots_processed += 1; + total_slots_processed += 1; trace!( "Bank for {}slot {} is complete", @@ -1583,7 +1590,7 @@ fn load_frozen_forks( .run_final_hash_calc(on_halt_store_hash_raw_data_for_debug); } - Ok((total_slots_elapsed, total_rooted_slots)) + Ok((total_slots_processed, total_rooted_slots)) } // `roots` is sorted largest to smallest by root slot