From cd14a940d8464c52ce401a076243daafd0b6c514 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 13 Aug 2019 17:20:14 -0700 Subject: [PATCH] Allow process_blocktree() to start processing from any root (#5484) * Remove unnecessary entry_height from BankInfo * Refactor process_blocktree to support process_blocktree_from_root * Refactor to process blocktree after loading from snapshot * On restart make sure bank_forks contains all the banks between the root and the tip of each fork, not just the head of each fork * Account for 1 tick_per_slot in bank 0 so that blockhash of bank0 matches the tick --- core/src/bank_forks.rs | 108 ++----- core/src/blocktree_processor.rs | 523 +++++++++++++++++++++----------- core/src/replay_stage.rs | 2 +- core/src/snapshot_utils.rs | 61 +++- core/src/storage_stage.rs | 15 +- core/src/validator.rs | 26 +- runtime/src/bank.rs | 10 +- 7 files changed, 452 insertions(+), 293 deletions(-) diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index f7ccd4872f..047b79f2a8 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -1,19 +1,14 @@ //! The `bank_forks` module implments BankForks a DAG of checkpointed Banks -use crate::result::{Error, Result}; +use crate::result::Result; use crate::snapshot_package::SnapshotPackageSender; -use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; use crate::snapshot_utils; -use crate::snapshot_utils::untar_snapshot_in; -use fs_extra::dir::CopyOptions; use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_info; use solana_runtime::bank::Bank; use solana_runtime::status_cache::MAX_CACHE_ENTRIES; use solana_sdk::timing; use std::collections::{HashMap, HashSet}; -use std::fs; -use std::io::{Error as IOError, ErrorKind}; use std::ops::Index; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -164,18 +159,29 @@ impl BankForks { self.banks.get(&bank_slot) } - pub fn new_from_banks(initial_banks: &[Arc], root: u64) -> Self { + pub fn new_from_banks(initial_forks: &[Arc], rooted_path: Vec) -> Self { let mut banks = HashMap::new(); - let working_bank = initial_banks[0].clone(); - for bank in initial_banks { + let working_bank = initial_forks[0].clone(); + + // Iterate through the heads of all the different forks + for bank in initial_forks { banks.insert(bank.slot(), bank.clone()); + let parents = bank.parents(); + for parent in parents { + if banks.contains_key(&parent.slot()) { + // All ancestors have already been inserted by another fork + break; + } + banks.insert(parent.slot(), parent.clone()); + } } + Self { - root, + root: *rooted_path.last().unwrap(), banks, working_bank, snapshot_config: None, - slots_since_snapshot: vec![], + slots_since_snapshot: rooted_path, confidence: HashMap::new(), } } @@ -367,54 +373,6 @@ impl BankForks { pub fn snapshot_config(&self) -> &Option { &self.snapshot_config } - - pub fn load_from_snapshot>( - account_paths: String, - snapshot_config: &SnapshotConfig, - snapshot_tar: P, - ) -> Result { - // Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()` - let unpack_dir = tempfile::tempdir_in(snapshot_config.snapshot_path())?; - untar_snapshot_in(&snapshot_tar, &unpack_dir)?; - - let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR); - let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR); - let snapshot_paths = snapshot_utils::get_snapshot_paths(&unpacked_snapshots_dir); - let bank = snapshot_utils::bank_from_snapshots( - account_paths, - &snapshot_paths, - unpacked_accounts_dir, - )?; - - let bank = Arc::new(bank); - // Move the unpacked snapshots into `snapshot_config.snapshot_path()` - let dir_files = fs::read_dir(unpacked_snapshots_dir).expect("Invalid snapshot path"); - let paths: Vec = dir_files - .filter_map(|entry| entry.ok().map(|e| e.path())) - .collect(); - let mut copy_options = CopyOptions::new(); - copy_options.overwrite = true; - fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?; - - let mut banks = HashMap::new(); - banks.insert(bank.slot(), bank.clone()); - let root = bank.slot(); - let snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path); - if snapshot_paths.is_empty() { - return Err(Error::IO(IOError::new( - ErrorKind::Other, - "no snapshots found", - ))); - } - Ok(BankForks { - banks, - working_bank: bank, - root, - snapshot_config: None, - slots_since_snapshot: vec![snapshot_paths.last().unwrap().slot], - confidence: HashMap::new(), - }) - } } #[cfg(test)] @@ -429,6 +387,7 @@ mod tests { use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; + use std::fs; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use tempfile::TempDir; @@ -541,35 +500,35 @@ mod tests { ); } - fn restore_from_snapshot(bank_forks: BankForks, account_paths: String, last_slot: u64) { - let (snapshot_path, snapshot_package_output_path) = bank_forks + fn restore_from_snapshot(old_bank_forks: BankForks, account_paths: String) { + let (snapshot_path, snapshot_package_output_path) = old_bank_forks .snapshot_config .as_ref() .map(|c| (&c.snapshot_path, &c.snapshot_package_output_path)) .unwrap(); - let new = BankForks::load_from_snapshot( + let deserialized_bank = snapshot_utils::bank_from_archive( account_paths, - bank_forks.snapshot_config.as_ref().unwrap(), + old_bank_forks.snapshot_config.as_ref().unwrap(), snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path), ) .unwrap(); - for (slot, _) in new.banks.iter() { - if *slot > 0 { - let bank = bank_forks.banks.get(slot).unwrap().clone(); - let new_bank = new.banks.get(slot).unwrap(); - bank.compare_bank(&new_bank); - } - } + let bank = old_bank_forks + .banks + .get(&deserialized_bank.slot()) + .unwrap() + .clone(); + bank.compare_bank(&deserialized_bank); - assert_eq!(new.working_bank().slot(), last_slot); - for (slot, _) in new.banks.iter() { - snapshot_utils::remove_snapshot(*slot, snapshot_path).unwrap(); + let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_path); + + for p in slot_snapshot_paths { + snapshot_utils::remove_snapshot(p.slot, &snapshot_path).unwrap(); } } - // creates banks upto "last_slot" and runs the input function `f` on each bank created + // creates banks up to "last_slot" and runs the input function `f` on each bank created // also marks each bank as root and generates snapshots // finally tries to restore from the last bank's snapshot and compares the restored bank to the // `last_slot` bank @@ -627,7 +586,6 @@ mod tests { restore_from_snapshot( bank_forks, accounts_dir.path().to_str().unwrap().to_string(), - last_slot, ); } diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index c280308957..0949e2d627 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -1,5 +1,5 @@ use crate::bank_forks::BankForks; -use crate::blocktree::Blocktree; +use crate::blocktree::{Blocktree, SlotMeta}; use crate::entry::{Entry, EntrySlice}; use crate::leader_schedule_cache::LeaderScheduleCache; use rayon::prelude::*; @@ -8,6 +8,7 @@ use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug}; use solana_runtime::bank::Bank; use solana_runtime::locked_accounts_results::LockedAccountsResults; use solana_sdk::genesis_block::GenesisBlock; +use solana_sdk::hash::Hash; use solana_sdk::timing::{duration_as_ms, Slot, MAX_RECENT_BLOCKHASHES}; use solana_sdk::transaction::Result; use std::result; @@ -129,7 +130,6 @@ pub fn process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> { #[derive(Debug, PartialEq)] pub struct BankForksInfo { pub bank_slot: u64, - pub entry_height: u64, } #[derive(Debug)] @@ -144,160 +144,65 @@ pub fn process_blocktree( verify_ledger: bool, dev_halt_at_slot: Option, ) -> result::Result<(BankForks, Vec, LeaderScheduleCache), BlocktreeProcessorError> { - let now = Instant::now(); - info!("processing ledger..."); + info!("processing ledger from bank 0..."); + // Setup bank for slot 0 - let mut pending_slots = { - let slot = 0; - let bank = Arc::new(Bank::new_with_paths(&genesis_block, account_paths)); - let entry_height = 0; - let last_entry_hash = bank.last_blockhash(); + let bank0 = Arc::new(Bank::new_with_paths(&genesis_block, account_paths)); + process_bank_0(&bank0, blocktree, verify_ledger)?; + process_blocktree_from_root(blocktree, bank0, verify_ledger, dev_halt_at_slot) +} - // Load the metadata for this slot - let meta = blocktree - .meta(slot) - .map_err(|err| { - warn!("Failed to load meta for slot {}: {:?}", slot, err); - BlocktreeProcessorError::LedgerVerificationFailed - })? - .unwrap(); +// Process blocktree from a known root bank +pub fn process_blocktree_from_root( + blocktree: &Blocktree, + bank: Arc, + verify_ledger: bool, + dev_halt_at_slot: Option, +) -> result::Result<(BankForks, Vec, LeaderScheduleCache), BlocktreeProcessorError> { + info!("processing ledger from root: {}...", bank.slot()); + // Starting slot must be a root, and thus has no parents + assert!(bank.parent().is_none()); + let start_slot = bank.slot(); + let now = Instant::now(); + let mut rooted_path = vec![start_slot]; + let dev_halt_at_slot = dev_halt_at_slot.unwrap_or(std::u64::MAX); - vec![(slot, meta, bank, entry_height, last_entry_hash)] + blocktree + .set_roots(&[start_slot]) + .expect("Couldn't set root on startup"); + + let meta = blocktree.meta(start_slot).unwrap(); + + // Iterate and replay slots from blocktree starting from `start_slot` + let (bank_forks, bank_forks_info, leader_schedule_cache) = { + if let Some(meta) = meta { + let epoch_schedule = bank.epoch_schedule(); + let mut leader_schedule_cache = LeaderScheduleCache::new(*epoch_schedule, &bank); + let fork_info = process_pending_slots( + &bank, + &meta, + blocktree, + &mut leader_schedule_cache, + &mut rooted_path, + verify_ledger, + dev_halt_at_slot, + )?; + let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().unzip(); + let bank_forks = BankForks::new_from_banks(&banks, rooted_path); + (bank_forks, bank_forks_info, leader_schedule_cache) + } else { + // If there's no meta for the input `start_slot`, then we started from a snapshot + // and there's no point in processing the rest of blocktree and implies blocktree + // should be empty past this point. + let bfi = BankForksInfo { + bank_slot: start_slot, + }; + let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); + let bank_forks = BankForks::new_from_banks(&[bank], rooted_path); + (bank_forks, vec![bfi], leader_schedule_cache) + } }; - blocktree.set_roots(&[0]).expect("Couldn't set first root"); - - let leader_schedule_cache = - LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule(), &pending_slots[0].2); - - let mut fork_info = vec![]; - let mut last_status_report = Instant::now(); - let mut root = 0; - let dev_halt_at_slot = dev_halt_at_slot.unwrap_or(std::u64::MAX); - while !pending_slots.is_empty() { - let (slot, meta, bank, mut entry_height, mut last_entry_hash) = - pending_slots.pop().unwrap(); - - if last_status_report.elapsed() > Duration::from_secs(2) { - info!("processing ledger...block {}", slot); - last_status_report = Instant::now(); - } - - // Fetch all entries for this slot - let mut entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| { - warn!("Failed to load entries for slot {}: {:?}", slot, err); - BlocktreeProcessorError::LedgerVerificationFailed - })?; - - if slot == 0 { - // The first entry in the ledger is a pseudo-tick used only to ensure the number of ticks - // in slot 0 is the same as the number of ticks in all subsequent slots. It is not - // processed by the bank, skip over it. - if entries.is_empty() { - warn!("entry0 not present"); - return Err(BlocktreeProcessorError::LedgerVerificationFailed); - } - let entry0 = entries.remove(0); - if !(entry0.is_tick() && entry0.verify(&last_entry_hash)) { - warn!("Ledger proof of history failed at entry0"); - return Err(BlocktreeProcessorError::LedgerVerificationFailed); - } - last_entry_hash = entry0.hash; - entry_height += 1; - } - - if !entries.is_empty() { - if verify_ledger && !entries.verify(&last_entry_hash) { - warn!( - "Ledger proof of history failed at slot: {}, entry: {}", - slot, entry_height - ); - return Err(BlocktreeProcessorError::LedgerVerificationFailed); - } - - process_entries(&bank, &entries).map_err(|err| { - warn!("Failed to process entries for slot {}: {:?}", slot, err); - BlocktreeProcessorError::LedgerVerificationFailed - })?; - - last_entry_hash = entries.last().unwrap().hash; - entry_height += entries.len() as u64; - } - - bank.freeze(); // all banks handled by this routine are created from complete slots - - if blocktree.is_root(slot) { - root = slot; - leader_schedule_cache.set_root(&bank); - bank.squash(); - pending_slots.clear(); - fork_info.clear(); - } - - if slot >= dev_halt_at_slot { - let bfi = BankForksInfo { - bank_slot: slot, - entry_height, - }; - fork_info.push((bank, bfi)); - break; - } - - if meta.next_slots.is_empty() { - // Reached the end of this fork. Record the final entry height and last entry.hash - let bfi = BankForksInfo { - bank_slot: slot, - entry_height, - }; - fork_info.push((bank, bfi)); - continue; - } - - // This is a fork point, create a new child bank for each fork - for next_slot in meta.next_slots { - let next_meta = blocktree - .meta(next_slot) - .map_err(|err| { - warn!("Failed to load meta for slot {}: {:?}", slot, err); - BlocktreeProcessorError::LedgerVerificationFailed - })? - .unwrap(); - - // only process full slots in blocktree_processor, replay_stage - // handles any partials - if next_meta.is_full() { - let next_bank = Arc::new(Bank::new_from_parent( - &bank, - &leader_schedule_cache - .slot_leader_at(next_slot, Some(&bank)) - .unwrap(), - next_slot, - )); - trace!("Add child bank for slot={}", next_slot); - // bank_forks.insert(*next_slot, child_bank); - pending_slots.push(( - next_slot, - next_meta, - next_bank, - entry_height, - last_entry_hash, - )); - } else { - let bfi = BankForksInfo { - bank_slot: slot, - entry_height, - }; - fork_info.push((bank.clone(), bfi)); - } - } - - // reverse sort by slot, so the next slot to be processed can be pop()ed - // TODO: remove me once leader_scheduler can hang with out-of-order slots? - pending_slots.sort_by(|a, b| b.0.cmp(&a.0)); - } - - let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().unzip(); - let bank_forks = BankForks::new_from_banks(&banks, root); info!( "processing ledger...complete in {}ms, forks={}...", duration_as_ms(&now.elapsed()), @@ -307,6 +212,195 @@ pub fn process_blocktree( Ok((bank_forks, bank_forks_info, leader_schedule_cache)) } +fn verify_and_process_entries( + bank: &Bank, + entries: &[Entry], + verify_ledger: bool, + last_entry_hash: Hash, +) -> result::Result { + assert!(!entries.is_empty()); + + if verify_ledger && !entries.verify(&last_entry_hash) { + warn!("Ledger proof of history failed at slot: {}", bank.slot()); + return Err(BlocktreeProcessorError::LedgerVerificationFailed); + } + + process_entries(&bank, &entries).map_err(|err| { + warn!( + "Failed to process entries for slot {}: {:?}", + bank.slot(), + err + ); + BlocktreeProcessorError::LedgerVerificationFailed + })?; + + Ok(entries.last().unwrap().hash) +} + +// Special handling required for processing the entries in slot 0 +fn process_bank_0( + bank0: &Bank, + blocktree: &Blocktree, + verify_ledger: bool, +) -> result::Result<(), BlocktreeProcessorError> { + assert_eq!(bank0.slot(), 0); + + // Fetch all entries for this slot + let mut entries = blocktree.get_slot_entries(0, 0, None).map_err(|err| { + warn!("Failed to load entries for slot 0, err: {:?}", err); + BlocktreeProcessorError::LedgerVerificationFailed + })?; + + // The first entry in the ledger is a pseudo-tick used only to ensure the number of ticks + // in slot 0 is the same as the number of ticks in all subsequent slots. It is not + // processed by the bank, skip over it. + if entries.is_empty() { + warn!("entry0 not present"); + return Err(BlocktreeProcessorError::LedgerVerificationFailed); + } + let entry0 = entries.remove(0); + if !(entry0.is_tick() && entry0.verify(&bank0.last_blockhash())) { + warn!("Ledger proof of history failed at entry0"); + return Err(BlocktreeProcessorError::LedgerVerificationFailed); + } + + if !entries.is_empty() { + verify_and_process_entries(bank0, &entries, verify_ledger, entry0.hash)?; + } else { + bank0.register_tick(&entry0.hash); + } + + bank0.freeze(); + + Ok(()) +} + +// Given a slot, add its children to the pending slots queue if those children slots are +// complete +fn process_next_slots( + bank: &Arc, + meta: &SlotMeta, + blocktree: &Blocktree, + leader_schedule_cache: &LeaderScheduleCache, + pending_slots: &mut Vec<(u64, SlotMeta, Arc, Hash)>, + fork_info: &mut Vec<(Arc, BankForksInfo)>, +) -> result::Result<(), BlocktreeProcessorError> { + if meta.next_slots.is_empty() { + // Reached the end of this fork. Record the final entry height and last entry.hash + let bfi = BankForksInfo { + bank_slot: bank.slot(), + }; + fork_info.push((bank.clone(), bfi)); + return Ok(()); + } + + // This is a fork point if there are multiple children, create a new child bank for each fork + for next_slot in &meta.next_slots { + let next_meta = blocktree + .meta(*next_slot) + .map_err(|err| { + warn!("Failed to load meta for slot {}: {:?}", next_slot, err); + BlocktreeProcessorError::LedgerVerificationFailed + })? + .unwrap(); + + // Only process full slots in blocktree_processor, replay_stage + // handles any partials + if next_meta.is_full() { + let next_bank = Arc::new(Bank::new_from_parent( + &bank, + &leader_schedule_cache + .slot_leader_at(*next_slot, Some(&bank)) + .unwrap(), + *next_slot, + )); + trace!("Add child bank {} of slot={}", next_slot, bank.slot()); + pending_slots.push((*next_slot, next_meta, next_bank, bank.last_blockhash())); + } else { + let bfi = BankForksInfo { + bank_slot: bank.slot(), + }; + fork_info.push((bank.clone(), bfi)); + } + } + + // Reverse sort by slot, so the next slot to be processed can be popped + // TODO: remove me once leader_scheduler can hang with out-of-order slots? + pending_slots.sort_by(|a, b| b.0.cmp(&a.0)); + Ok(()) +} + +// Iterate through blocktree processing slots starting from the root slot pointed to by the +// given `meta` +fn process_pending_slots( + root_bank: &Arc, + root_meta: &SlotMeta, + blocktree: &Blocktree, + leader_schedule_cache: &mut LeaderScheduleCache, + rooted_path: &mut Vec, + verify_ledger: bool, + dev_halt_at_slot: Slot, +) -> result::Result, BankForksInfo)>, BlocktreeProcessorError> { + let mut fork_info = vec![]; + let mut last_status_report = Instant::now(); + let mut pending_slots = vec![]; + process_next_slots( + root_bank, + root_meta, + blocktree, + leader_schedule_cache, + &mut pending_slots, + &mut fork_info, + )?; + + while !pending_slots.is_empty() { + let (slot, meta, bank, last_entry_hash) = pending_slots.pop().unwrap(); + + if last_status_report.elapsed() > Duration::from_secs(2) { + info!("processing ledger...block {}", slot); + last_status_report = Instant::now(); + } + + // Fetch all entries for this slot + let entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| { + warn!("Failed to load entries for slot {}: {:?}", slot, err); + BlocktreeProcessorError::LedgerVerificationFailed + })?; + + verify_and_process_entries(&bank, &entries, verify_ledger, last_entry_hash)?; + + bank.freeze(); // all banks handled by this routine are created from complete slots + + if blocktree.is_root(slot) { + let parents = bank.parents().into_iter().map(|b| b.slot()).rev().skip(1); + let parents: Vec<_> = parents.collect(); + rooted_path.extend(parents); + rooted_path.push(slot); + leader_schedule_cache.set_root(&bank); + bank.squash(); + pending_slots.clear(); + fork_info.clear(); + } + + if slot >= dev_halt_at_slot { + let bfi = BankForksInfo { bank_slot: slot }; + fork_info.push((bank, bfi)); + break; + } + + process_next_slots( + &bank, + &meta, + blocktree, + leader_schedule_cache, + &mut pending_slots, + &mut fork_info, + )?; + } + + Ok(fork_info) +} + #[cfg(test)] pub mod tests { use super::*; @@ -393,7 +487,6 @@ pub mod tests { bank_forks_info[0], BankForksInfo { bank_slot: 0, // slot 1 isn't "full", we stop at slot zero - entry_height: ticks_per_slot, } ); } @@ -453,7 +546,6 @@ pub mod tests { bank_forks_info[0], BankForksInfo { bank_slot: 4, // Fork 2's head is slot 4 - entry_height: ticks_per_slot * 3, } ); assert!(&bank_forks[4] @@ -464,10 +556,7 @@ pub mod tests { .is_empty()); // Ensure bank_forks holds the right banks - for info in bank_forks_info { - assert_eq!(bank_forks[info.bank_slot].slot(), info.bank_slot); - assert!(bank_forks[info.bank_slot].is_frozen()); - } + verify_fork_infos(&bank_forks, &bank_forks_info); assert_eq!(bank_forks.root(), 4); } @@ -526,7 +615,6 @@ pub mod tests { bank_forks_info[0], BankForksInfo { bank_slot: 3, // Fork 1's head is slot 3 - entry_height: ticks_per_slot * 4, } ); assert_eq!( @@ -541,7 +629,6 @@ pub mod tests { bank_forks_info[1], BankForksInfo { bank_slot: 4, // Fork 2's head is slot 4 - entry_height: ticks_per_slot * 3, } ); assert_eq!( @@ -556,10 +643,7 @@ pub mod tests { assert_eq!(bank_forks.root(), 1); // Ensure bank_forks holds the right banks - for info in bank_forks_info { - assert_eq!(bank_forks[info.bank_slot].slot(), info.bank_slot); - assert!(bank_forks[info.bank_slot].is_frozen()); - } + verify_fork_infos(&bank_forks, &bank_forks_info); } #[test] @@ -607,7 +691,6 @@ pub mod tests { bank_forks_info[0], BankForksInfo { bank_slot: last_slot + 1, // Head is last_slot + 1 - entry_height: ticks_per_slot * (last_slot + 2), } ); @@ -734,19 +817,12 @@ pub mod tests { blocktree .write_entries(1, 0, 0, genesis_block.ticks_per_slot, &entries) .unwrap(); - let entry_height = genesis_block.ticks_per_slot + entries.len() as u64; let (bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap(); assert_eq!(bank_forks_info.len(), 1); assert_eq!(bank_forks.root(), 0); - assert_eq!( - bank_forks_info[0], - BankForksInfo { - bank_slot: 1, - entry_height, - } - ); + assert_eq!(bank_forks_info[0], BankForksInfo { bank_slot: 1 }); let bank = bank_forks[1].clone(); assert_eq!( @@ -770,13 +846,7 @@ pub mod tests { process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap(); assert_eq!(bank_forks_info.len(), 1); - assert_eq!( - bank_forks_info[0], - BankForksInfo { - bank_slot: 0, - entry_height: 1, - } - ); + assert_eq!(bank_forks_info[0], BankForksInfo { bank_slot: 0 }); let bank = bank_forks[0].clone(); assert_eq!(bank.tick_height(), 0); } @@ -1264,6 +1334,83 @@ pub mod tests { assert_eq!(bank.process_transaction(&fail_tx), Ok(())); } + #[test] + fn test_process_blocktree_from_root() { + let GenesisBlockInfo { + mut genesis_block, .. + } = create_genesis_block(123); + + let ticks_per_slot = 1; + genesis_block.ticks_per_slot = ticks_per_slot; + let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_block); + let blocktree = Blocktree::open(&ledger_path).unwrap(); + + /* + Build a blocktree in the ledger with the following fork structure: + + slot 0 (all ticks) + | + slot 1 (all ticks) + | + slot 2 (all ticks) + | + slot 3 (all ticks) -> root + | + slot 4 (all ticks) + | + slot 5 (all ticks) -> root + | + slot 6 (all ticks) + */ + + let mut last_hash = blockhash; + for i in 0..6 { + last_hash = + fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, i + 1, i, last_hash); + } + blocktree.set_roots(&[3, 5]).unwrap(); + + // Set up bank1 + let bank0 = Arc::new(Bank::new(&genesis_block)); + process_bank_0(&bank0, &blocktree, true).unwrap(); + let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); + bank1.squash(); + let slot1_entries = blocktree.get_slot_entries(1, 0, None).unwrap(); + verify_and_process_entries(&bank1, &slot1_entries, true, bank0.last_blockhash()).unwrap(); + + // Test process_blocktree_from_root() from slot 1 onwards + let (bank_forks, bank_forks_info, _) = + process_blocktree_from_root(&blocktree, bank1, true, None).unwrap(); + + assert_eq!(bank_forks_info.len(), 1); // One fork + assert_eq!( + bank_forks_info[0], + BankForksInfo { + bank_slot: 6, // The head of the fork is slot 6 + } + ); + + // slots_since_snapshot should contain everything on the rooted path + assert_eq!( + bank_forks.slots_since_snapshot().to_vec(), + vec![1, 2, 3, 4, 5] + ); + assert_eq!(bank_forks.root(), 5); + + // Verify the parents of the head of the fork + assert_eq!( + &bank_forks[6] + .parents() + .iter() + .map(|bank| bank.slot()) + .collect::>(), + &[5] + ); + + // Check that bank forks has the correct banks + verify_fork_infos(&bank_forks, &bank_forks_info); + } + #[test] #[ignore] fn test_process_entries_stress() { @@ -1357,4 +1504,22 @@ pub mod tests { let bank = Bank::new_with_paths(&genesis_block, account_paths); bank.epoch_schedule().clone() } + + // Check that `bank_forks` contains all the ancestors and banks for each fork identified in + // `bank_forks_info` + fn verify_fork_infos(bank_forks: &BankForks, bank_forks_info: &[BankForksInfo]) { + for fork in bank_forks_info { + let head_slot = fork.bank_slot; + let head_bank = &bank_forks[head_slot]; + let mut parents = head_bank.parents(); + parents.push(head_bank.clone()); + + // Ensure the tip of each fork and all its parents are in the given bank_forks + for parent in parents { + let parent_bank = &bank_forks[parent.slot()]; + assert_eq!(parent_bank.slot(), parent.slot()); + assert!(parent_bank.is_frozen()); + } + } + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3786fd224a..9d213a413e 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1059,7 +1059,7 @@ mod test { let arc_bank0 = Arc::new(bank0); let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( &[arc_bank0.clone()], - 0, + vec![0], ))); let pubkey = Pubkey::new_rand(); let mut tower = Tower::new_from_forks(&bank_forks.read().unwrap(), &pubkey); diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs index 684f4a804b..626b16542c 100644 --- a/core/src/snapshot_utils.rs +++ b/core/src/snapshot_utils.rs @@ -1,7 +1,10 @@ +use crate::bank_forks::SnapshotConfig; use crate::result::{Error, Result}; use crate::snapshot_package::SnapshotPackage; +use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; use bincode::{deserialize_from, serialize_into}; use flate2::read::GzDecoder; +use fs_extra::dir::CopyOptions; use solana_runtime::bank::Bank; use solana_runtime::status_cache::SlotDelta; use solana_sdk::transaction; @@ -168,7 +171,48 @@ pub fn remove_snapshot>(slot: u64, snapshot_path: P) -> Result<() Ok(()) } -pub fn bank_from_snapshots

( +pub fn bank_from_archive>( + account_paths: String, + snapshot_config: &SnapshotConfig, + snapshot_tar: P, +) -> Result { + // Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()` + let unpack_dir = tempfile::tempdir_in(snapshot_config.snapshot_path())?; + untar_snapshot_in(&snapshot_tar, &unpack_dir)?; + + let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR); + let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR); + let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); + let bank = rebuild_bank_from_snapshots(account_paths, &snapshot_paths, unpacked_accounts_dir)?; + + // Move the unpacked snapshots into `snapshot_config.snapshot_path()` + let dir_files = fs::read_dir(unpacked_snapshots_dir).expect("Invalid snapshot path"); + let paths: Vec = dir_files + .filter_map(|entry| entry.ok().map(|e| e.path())) + .collect(); + let mut copy_options = CopyOptions::new(); + copy_options.overwrite = true; + fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?; + + Ok(bank) +} + +pub fn get_snapshot_tar_path>(snapshot_output_dir: P) -> PathBuf { + snapshot_output_dir.as_ref().join("snapshot.tgz") +} + +pub fn untar_snapshot_in, Q: AsRef>( + snapshot_tar: P, + unpack_dir: Q, +) -> Result<()> { + let tar_gz = File::open(snapshot_tar)?; + let tar = GzDecoder::new(tar_gz); + let mut archive = Archive::new(tar); + archive.unpack(&unpack_dir)?; + Ok(()) +} + +fn rebuild_bank_from_snapshots

( local_account_paths: String, snapshot_paths: &[SlotSnapshotPaths], append_vecs_path: P, @@ -203,21 +247,6 @@ where Ok(bank) } -pub fn get_snapshot_tar_path>(snapshot_output_dir: P) -> PathBuf { - snapshot_output_dir.as_ref().join("snapshot.tgz") -} - -pub fn untar_snapshot_in, Q: AsRef>( - snapshot_tar: P, - unpack_dir: Q, -) -> Result<()> { - let tar_gz = File::open(snapshot_tar)?; - let tar = GzDecoder::new(tar_gz); - let mut archive = Archive::new(tar); - archive.unpack(&unpack_dir)?; - Ok(()) -} - fn get_snapshot_file_name(slot: u64) -> String { slot.to_string() } diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 8294351191..8bd7d6407c 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -659,7 +659,10 @@ mod tests { let cluster_info = test_cluster_info(&keypair.pubkey()); let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000); let bank = Arc::new(Bank::new(&genesis_block)); - let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank.clone()], 0))); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( + &[bank.clone()], + vec![0], + ))); let (_slot_sender, slot_receiver) = channel(); let storage_state = StorageState::new( &bank.last_blockhash(), @@ -699,7 +702,10 @@ mod tests { let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); let slot = 1; let bank = Arc::new(Bank::new(&genesis_block)); - let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank.clone()], 0))); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( + &[bank.clone()], + vec![0], + ))); let cluster_info = test_cluster_info(&keypair.pubkey()); let (bank_sender, bank_receiver) = channel(); @@ -791,7 +797,10 @@ mod tests { let bank = Bank::new(&genesis_block); let bank = Arc::new(bank); - let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank.clone()], 0))); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( + &[bank.clone()], + vec![0], + ))); let cluster_info = test_cluster_info(&keypair.pubkey()); let (bank_sender, bank_receiver) = channel(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 3fbcfdeb40..27d0707f35 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -355,30 +355,28 @@ fn get_bank_forks( // Check that the snapshot tar exists, try to load the snapshot if it does if tar.exists() { // Fail hard here if snapshot fails to load, don't silently continue - let bank_forks = BankForks::load_from_snapshot( - //&genesis_block, + let deserialized_bank = snapshot_utils::bank_from_archive( account_paths .clone() .expect("Account paths not present when booting from snapshot"), snapshot_config, - tar, + &tar, ) .expect("Load from snapshot failed"); - let bank = &bank_forks.working_bank(); - let fork_info = BankForksInfo { - bank_slot: bank.slot(), - entry_height: bank.tick_height(), - }; - result = Some(( - bank_forks, - vec![fork_info], - LeaderScheduleCache::new_from_bank(bank), - )); + result = Some( + blocktree_processor::process_blocktree_from_root( + blocktree, + Arc::new(deserialized_bank), + verify_ledger, + dev_halt_at_slot, + ) + .expect("processing blocktree after loading snapshot failed"), + ); } } - // If loading from a snapshot failed/snapshot didn't exist + // If a snapshot doesn't exist if result.is_none() { result = Some( blocktree_processor::process_blocktree( diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 0ddb9ab09e..fb9ad2325b 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -724,12 +724,12 @@ impl Bank { // TODO: put this assert back in // assert!(!self.is_frozen()); - - let current_tick_height = { + if self.ticks_per_slot() != 1 || self.slot() != 0 { self.tick_height.fetch_add(1, Ordering::Relaxed); - self.tick_height.load(Ordering::Relaxed) as u64 - }; - inc_new_counter_debug!("bank-register_tick-registered", 1); + inc_new_counter_debug!("bank-register_tick-registered", 1); + } + + let current_tick_height = self.tick_height.load(Ordering::Relaxed) as u64; // Register a new block hash if at the last tick in the slot if current_tick_height % self.ticks_per_slot == self.ticks_per_slot - 1 {