diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index ac84e56823..c8aa1c0876 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -210,10 +210,11 @@ fn load_from_snapshot( incremental: starting_incremental_snapshot_hash, }; + let bank_forks = BankForks::new(deserialized_bank); to_loadresult( blockstore_processor::process_blockstore_from_root( blockstore, - deserialized_bank, + bank_forks, &process_options, &VerifyRecyclers::default(), transaction_status_sender, diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 5f3c2e1483..1bb58d1e5b 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -590,11 +590,12 @@ pub fn process_blockstore( opts.accounts_db_config.clone(), accounts_update_notifier, ); - let bank0 = Arc::new(bank0); + let bank_forks = BankForks::new(bank0); + info!("processing ledger for slot 0..."); let recyclers = VerifyRecyclers::default(); process_bank_0( - &bank0, + &bank_forks.root_bank(), blockstore, &opts, &recyclers, @@ -602,7 +603,7 @@ pub fn process_blockstore( ); do_process_blockstore_from_root( blockstore, - bank0, + bank_forks, &opts, &recyclers, None, @@ -618,7 +619,7 @@ pub fn process_blockstore( #[allow(clippy::too_many_arguments)] pub(crate) fn process_blockstore_from_root( blockstore: &Blockstore, - bank: Bank, + bank_forks: BankForks, opts: &ProcessOptions, recyclers: &VerifyRecyclers, transaction_status_sender: Option<&TransactionStatusSender>, @@ -630,7 +631,7 @@ pub(crate) fn process_blockstore_from_root( ) -> BlockstoreProcessorResult { do_process_blockstore_from_root( blockstore, - Arc::new(bank), + bank_forks, opts, recyclers, transaction_status_sender, @@ -645,7 +646,7 @@ pub(crate) fn process_blockstore_from_root( #[allow(clippy::too_many_arguments)] fn do_process_blockstore_from_root( blockstore: &Blockstore, - bank: Arc, + mut bank_forks: BankForks, opts: &ProcessOptions, recyclers: &VerifyRecyclers, transaction_status_sender: Option<&TransactionStatusSender>, @@ -655,13 +656,14 @@ fn do_process_blockstore_from_root( timings: BankFromArchiveTimings, mut last_full_snapshot_slot: Option, ) -> BlockstoreProcessorResult { - info!("processing ledger from slot {}...", bank.slot()); - // Starting slot must be a root, and thus has no parents + assert_eq!(bank_forks.banks().len(), 1); + let bank = bank_forks.root_bank(); assert!(bank.parent().is_none()); + let start_slot = bank.slot(); + info!("processing ledger from slot {}...", start_slot); let now = Instant::now(); - let mut root = start_slot; if let Some(ref new_hard_forks) = opts.new_hard_forks { let hard_forks = bank.hard_forks(); @@ -684,7 +686,10 @@ fn do_process_blockstore_from_root( .set_roots(std::iter::once(&start_slot)) .expect("Couldn't set root slot on startup"); } else { - assert!(blockstore.is_root(start_slot), "starting slot isn't root and can't update due to being secondary blockstore access: {}", start_slot); + assert!( + blockstore.is_root(start_slot), + "starting slot isn't root and can't update due to being secondary blockstore access: {}", start_slot + ); } if let Ok(metas) = blockstore.slot_meta_iterator(start_slot) { @@ -699,16 +704,17 @@ fn do_process_blockstore_from_root( if opts.full_leader_cache { leader_schedule_cache.set_max_schedules(std::usize::MAX); } - let initial_forks = if let Some(meta) = blockstore + + if let Some(start_slot_meta) = blockstore .meta(start_slot) .unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot)) { - let mut initial_forks = load_frozen_forks( - &bank, - &meta, + load_frozen_forks( + &mut bank_forks, + start_slot, + &start_slot_meta, blockstore, &mut leader_schedule_cache, - &mut root, opts, recyclers, transaction_status_sender, @@ -718,19 +724,11 @@ fn do_process_blockstore_from_root( &mut timing, &mut last_full_snapshot_slot, )?; - initial_forks.sort_by_key(|bank| bank.slot()); - initial_forks } else { // If there's no meta for the input `start_slot`, then we started from a snapshot // and there's no point in processing the rest of blockstore and implies blockstore // should be empty past this point. - vec![bank] }; - if initial_forks.is_empty() { - return Err(BlockstoreProcessorError::NoValidForksFound); - } - - let bank_forks = BankForks::new_from_banks(&initial_forks, root); let processing_time = now.elapsed(); @@ -743,7 +741,9 @@ fn do_process_blockstore_from_root( .root_bank() .calculate_and_verify_capitalization(debug_verify) { - return Err(BlockstoreProcessorError::RootBankWithMismatchedCapitalization(root)); + return Err( + BlockstoreProcessorError::RootBankWithMismatchedCapitalization(bank_forks.root()), + ); } time_cap.stop(); @@ -752,7 +752,7 @@ fn do_process_blockstore_from_root( ("total_time_us", processing_time.as_micros(), i64), ("frozen_banks", bank_forks.frozen_banks().len(), i64), ("slot", bank_forks.root(), i64), - ("forks", initial_forks.len(), i64), + ("forks", bank_forks.banks().len(), i64), ("calculate_capitalization_us", time_cap.as_us(), i64), ( "full_snapshot_untar_us", @@ -777,23 +777,17 @@ fn do_process_blockstore_from_root( ); info!("ledger processing timing: {:?}", timing); + let mut bank_slots = bank_forks.banks().keys().collect::>(); + bank_slots.sort_unstable(); + info!( - "ledger processed in {}. root slot is {}, {} fork{} at {}, with {} frozen bank{}", + "ledger processed in {}. root slot is {}, {} bank{}: {}", HumanTime::from(chrono::Duration::from_std(processing_time).unwrap()) .to_text_en(Accuracy::Precise, Tense::Present), bank_forks.root(), - initial_forks.len(), - if initial_forks.len() > 1 { "s" } else { "" }, - initial_forks - .iter() - .map(|b| b.slot().to_string()) - .join(", "), - bank_forks.frozen_banks().len(), - if bank_forks.frozen_banks().len() > 1 { - "s" - } else { - "" - }, + bank_slots.len(), + if bank_slots.len() > 1 { "s" } else { "" }, + bank_slots.iter().map(|slot| slot.to_string()).join(", "), ); assert!(bank_forks.active_banks().is_empty()); @@ -801,8 +795,8 @@ fn do_process_blockstore_from_root( } /// Verify that a segment of entries has the correct number of ticks and hashes -pub fn verify_ticks( - bank: &Arc, +fn verify_ticks( + bank: &Bank, entries: &[Entry], slot_full: bool, tick_hash_count: &mut u64, @@ -1114,14 +1108,8 @@ fn process_next_slots( meta: &SlotMeta, blockstore: &Blockstore, leader_schedule_cache: &LeaderScheduleCache, - pending_slots: &mut Vec<(SlotMeta, Arc, Hash)>, - initial_forks: &mut HashMap>, + pending_slots: &mut Vec<(SlotMeta, Bank, Hash)>, ) -> result::Result<(), BlockstoreProcessorError> { - if let Some(parent) = bank.parent() { - initial_forks.remove(&parent.slot()); - } - initial_forks.insert(bank.slot(), bank.clone()); - if meta.next_slots.is_empty() { return Ok(()); } @@ -1139,13 +1127,13 @@ fn process_next_slots( // Only process full slots in blockstore_processor, replay_stage // handles any partials if next_meta.is_full() { - let next_bank = Arc::new(Bank::new_from_parent( + let next_bank = Bank::new_from_parent( bank, &leader_schedule_cache .slot_leader_at(*next_slot, Some(bank)) .unwrap(), *next_slot, - )); + ); trace!( "New bank for slot {}, parent slot is {}", next_slot, @@ -1164,11 +1152,11 @@ fn process_next_slots( // given `meta` and return a vector of frozen bank forks #[allow(clippy::too_many_arguments)] fn load_frozen_forks( - root_bank: &Arc, - root_meta: &SlotMeta, + bank_forks: &mut BankForks, + start_slot: Slot, + start_slot_meta: &SlotMeta, blockstore: &Blockstore, leader_schedule_cache: &mut LeaderScheduleCache, - root: &mut Slot, opts: &ProcessOptions, recyclers: &VerifyRecyclers, transaction_status_sender: Option<&TransactionStatusSender>, @@ -1177,33 +1165,32 @@ fn load_frozen_forks( accounts_package_sender: AccountsPackageSender, timing: &mut ExecuteTimings, last_full_snapshot_slot: &mut Option, -) -> result::Result>, BlockstoreProcessorError> { - let mut initial_forks = HashMap::new(); +) -> result::Result<(), BlockstoreProcessorError> { let mut all_banks = HashMap::new(); let mut last_status_report = Instant::now(); let mut last_free = Instant::now(); let mut pending_slots = vec![]; - let mut last_root = root_bank.slot(); let mut slots_elapsed = 0; let mut txs = 0; let blockstore_max_root = blockstore.max_root(); - let max_root = std::cmp::max(root_bank.slot(), blockstore_max_root); + let mut root = bank_forks.root(); + let max_root = std::cmp::max(root, blockstore_max_root); + info!( "load_frozen_forks() latest root from blockstore: {}, max_root: {}", blockstore_max_root, max_root, ); process_next_slots( - root_bank, - root_meta, + bank_forks.get(start_slot).unwrap(), + start_slot_meta, blockstore, leader_schedule_cache, &mut pending_slots, - &mut initial_forks, )?; let dev_halt_at_slot = opts.dev_halt_at_slot.unwrap_or(std::u64::MAX); - if root_bank.slot() != dev_halt_at_slot { + if bank_forks.root() != dev_halt_at_slot { while !pending_slots.is_empty() { timing.details.per_program_timings.clear(); let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap(); @@ -1214,7 +1201,7 @@ fn load_frozen_forks( info!( "processing ledger: slot={}, last root slot={} slots={} slots/s={:?} txs/s={}", slot, - last_root, + root, slots_elapsed, slots_elapsed as f32 / secs, txs as f32 / secs, @@ -1225,6 +1212,7 @@ fn load_frozen_forks( let mut progress = ConfirmationProgress::new(last_entry_hash); + let bank = bank_forks.insert(bank); if process_single_slot( blockstore, &bank, @@ -1238,6 +1226,7 @@ fn load_frozen_forks( ) .is_err() { + assert!(bank_forks.remove(bank.slot()).is_some()); continue; } txs += progress.num_txs; @@ -1250,13 +1239,13 @@ fn load_frozen_forks( // If we've reached the last known root in blockstore, start looking // for newer cluster confirmed roots let new_root_bank = { - if *root >= max_root { + if bank_forks.root() >= max_root { supermajority_root_from_vote_accounts( bank.slot(), bank.total_epoch_stake(), &bank.vote_accounts(), ).and_then(|supermajority_root| { - if supermajority_root > *root { + if supermajority_root > root { // If there's a cluster confirmed root greater than our last // replayed root, then because the cluster confirmed root should // be descended from our last root, it must exist in `all_banks` @@ -1264,15 +1253,18 @@ fn load_frozen_forks( // cluster root must be a descendant of our root, otherwise something // is drastically wrong - assert!(cluster_root_bank.ancestors.contains_key(root)); - info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot()); + assert!(cluster_root_bank.ancestors.contains_key(&root)); + info!( + "blockstore processor found new cluster confirmed root: {}, observed in bank: {}", + cluster_root_bank.slot(), bank.slot() + ); // Ensure cluster-confirmed root and parents are set as root in blockstore let mut rooted_slots = vec![]; let mut new_root_bank = cluster_root_bank.clone(); loop { - if new_root_bank.slot() == *root { break; } // Found the last root in the chain, yay! - assert!(new_root_bank.slot() > *root); + if new_root_bank.slot() == root { break; } // Found the last root in the chain, yay! + assert!(new_root_bank.slot() > root); rooted_slots.push((new_root_bank.slot(), new_root_bank.hash())); // As noted, the cluster confirmed root should be descended from @@ -1295,11 +1287,14 @@ fn load_frozen_forks( }; if let Some(new_root_bank) = new_root_bank { - *root = new_root_bank.slot(); - last_root = new_root_bank.slot(); + root = new_root_bank.slot(); leader_schedule_cache.set_root(new_root_bank); - new_root_bank.squash(); + let _ = bank_forks.set_root( + root, + &solana_runtime::accounts_background_service::AbsRequestSender::default(), + None, + ); if let Some(snapshot_config) = snapshot_config { let block_height = new_root_bank.block_height(); @@ -1307,8 +1302,8 @@ fn load_frozen_forks( block_height, snapshot_config.full_snapshot_archive_interval_slots, ) { - info!("Taking snapshot of new root bank that has crossed the full snapshot interval! slot: {}", *root); - *last_full_snapshot_slot = Some(*root); + info!("Taking snapshot of new root bank that has crossed the full snapshot interval! slot: {}", root); + *last_full_snapshot_slot = Some(root); new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot); last_free = Instant::now(); new_root_bank.update_accounts_hash_with_index_option( @@ -1331,7 +1326,7 @@ fn load_frozen_forks( trace!( "took bank snapshot for new root bank, block height: {}, slot: {}", block_height, - *root + root ); } } @@ -1346,16 +1341,15 @@ fn load_frozen_forks( // Filter out all non descendants of the new root pending_slots - .retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(root)); - initial_forks.retain(|_, fork_tip_bank| fork_tip_bank.ancestors.contains_key(root)); - all_banks.retain(|_, bank| bank.ancestors.contains_key(root)); + .retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(&root)); + all_banks.retain(|_, bank| bank.ancestors.contains_key(&root)); } slots_elapsed += 1; trace!( "Bank for {}slot {} is complete", - if last_root == slot { "root " } else { "" }, + if root == slot { "root " } else { "" }, slot, ); @@ -1365,7 +1359,6 @@ fn load_frozen_forks( blockstore, leader_schedule_cache, &mut pending_slots, - &mut initial_forks, )?; if slot >= dev_halt_at_slot { @@ -1376,7 +1369,7 @@ fn load_frozen_forks( } } - Ok(initial_forks.values().cloned().collect::>()) + Ok(()) } // `roots` is sorted largest to smallest by root slot @@ -3163,7 +3156,8 @@ pub mod tests { blockstore.set_roots(vec![3, 5].iter()).unwrap(); // Set up bank1 - let bank0 = Arc::new(Bank::new_for_tests(&genesis_config)); + let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config)); + let bank0 = bank_forks.get(0).unwrap().clone(); let opts = ProcessOptions { poh_verify: true, accounts_db_test_hash_calculation: true, @@ -3171,7 +3165,7 @@ pub mod tests { }; let recyclers = VerifyRecyclers::default(); process_bank_0(&bank0, &blockstore, &opts, &recyclers, None); - let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); + let bank1 = bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); confirm_full_slot( &blockstore, &bank1, @@ -3183,13 +3177,17 @@ pub mod tests { &mut ExecuteTimings::default(), ) .unwrap(); - bank1.squash(); + bank_forks.set_root( + 1, + &solana_runtime::accounts_background_service::AbsRequestSender::default(), + None, + ); // Test process_blockstore_from_root() from slot 1 onwards let (accounts_package_sender, _) = unbounded(); let (bank_forks, ..) = do_process_blockstore_from_root( &blockstore, - bank1, + bank_forks, &opts, &recyclers, None, @@ -3255,7 +3253,8 @@ pub mod tests { blockstore.set_roots(roots_to_set.iter()).unwrap(); // Set up bank1 - let bank0 = Arc::new(Bank::new_for_tests(&genesis_config)); + let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config)); + let bank0 = bank_forks.get(0).unwrap().clone(); let opts = ProcessOptions { poh_verify: true, accounts_db_test_hash_calculation: true, @@ -3265,7 +3264,7 @@ pub mod tests { process_bank_0(&bank0, &blockstore, &opts, &recyclers, None); let slot_start_processing = 1; - let bank = Arc::new(Bank::new_from_parent( + let bank = bank_forks.insert(Bank::new_from_parent( &bank0, &Pubkey::default(), slot_start_processing, @@ -3281,7 +3280,11 @@ pub mod tests { &mut ExecuteTimings::default(), ) .unwrap(); - bank.squash(); + bank_forks.set_root( + 1, + &solana_runtime::accounts_background_service::AbsRequestSender::default(), + None, + ); let bank_snapshots_tempdir = TempDir::new().unwrap(); let snapshot_config = SnapshotConfig { @@ -3294,7 +3297,7 @@ pub mod tests { do_process_blockstore_from_root( &blockstore, - bank, + bank_forks, &opts, &recyclers, None,