Allow process_blocktree() to start processing from any root (#5484)
* Remove unnecessary entry_height from BankInfo * Refactor process_blocktree to support process_blocktree_from_root * Refactor to process blocktree after loading from snapshot * On restart make sure bank_forks contains all the banks between the root and the tip of each fork, not just the head of each fork * Account for 1 tick_per_slot in bank 0 so that blockhash of bank0 matches the tick
This commit is contained in:
parent
58d4e32c97
commit
cd14a940d8
|
@ -1,19 +1,14 @@
|
|||
//! The `bank_forks` module implments BankForks a DAG of checkpointed Banks
|
||||
|
||||
use crate::result::{Error, Result};
|
||||
use crate::result::Result;
|
||||
use crate::snapshot_package::SnapshotPackageSender;
|
||||
use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
|
||||
use crate::snapshot_utils;
|
||||
use crate::snapshot_utils::untar_snapshot_in;
|
||||
use fs_extra::dir::CopyOptions;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::inc_new_counter_info;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::status_cache::MAX_CACHE_ENTRIES;
|
||||
use solana_sdk::timing;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::io::{Error as IOError, ErrorKind};
|
||||
use std::ops::Index;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
@ -164,18 +159,29 @@ impl BankForks {
|
|||
self.banks.get(&bank_slot)
|
||||
}
|
||||
|
||||
pub fn new_from_banks(initial_banks: &[Arc<Bank>], root: u64) -> Self {
|
||||
pub fn new_from_banks(initial_forks: &[Arc<Bank>], rooted_path: Vec<u64>) -> Self {
|
||||
let mut banks = HashMap::new();
|
||||
let working_bank = initial_banks[0].clone();
|
||||
for bank in initial_banks {
|
||||
let working_bank = initial_forks[0].clone();
|
||||
|
||||
// Iterate through the heads of all the different forks
|
||||
for bank in initial_forks {
|
||||
banks.insert(bank.slot(), bank.clone());
|
||||
let parents = bank.parents();
|
||||
for parent in parents {
|
||||
if banks.contains_key(&parent.slot()) {
|
||||
// All ancestors have already been inserted by another fork
|
||||
break;
|
||||
}
|
||||
banks.insert(parent.slot(), parent.clone());
|
||||
}
|
||||
}
|
||||
|
||||
Self {
|
||||
root,
|
||||
root: *rooted_path.last().unwrap(),
|
||||
banks,
|
||||
working_bank,
|
||||
snapshot_config: None,
|
||||
slots_since_snapshot: vec![],
|
||||
slots_since_snapshot: rooted_path,
|
||||
confidence: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
@ -367,54 +373,6 @@ impl BankForks {
|
|||
pub fn snapshot_config(&self) -> &Option<SnapshotConfig> {
|
||||
&self.snapshot_config
|
||||
}
|
||||
|
||||
pub fn load_from_snapshot<P: AsRef<Path>>(
|
||||
account_paths: String,
|
||||
snapshot_config: &SnapshotConfig,
|
||||
snapshot_tar: P,
|
||||
) -> Result<Self> {
|
||||
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
|
||||
let unpack_dir = tempfile::tempdir_in(snapshot_config.snapshot_path())?;
|
||||
untar_snapshot_in(&snapshot_tar, &unpack_dir)?;
|
||||
|
||||
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
||||
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
|
||||
let snapshot_paths = snapshot_utils::get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
let bank = snapshot_utils::bank_from_snapshots(
|
||||
account_paths,
|
||||
&snapshot_paths,
|
||||
unpacked_accounts_dir,
|
||||
)?;
|
||||
|
||||
let bank = Arc::new(bank);
|
||||
// Move the unpacked snapshots into `snapshot_config.snapshot_path()`
|
||||
let dir_files = fs::read_dir(unpacked_snapshots_dir).expect("Invalid snapshot path");
|
||||
let paths: Vec<PathBuf> = dir_files
|
||||
.filter_map(|entry| entry.ok().map(|e| e.path()))
|
||||
.collect();
|
||||
let mut copy_options = CopyOptions::new();
|
||||
copy_options.overwrite = true;
|
||||
fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?;
|
||||
|
||||
let mut banks = HashMap::new();
|
||||
banks.insert(bank.slot(), bank.clone());
|
||||
let root = bank.slot();
|
||||
let snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path);
|
||||
if snapshot_paths.is_empty() {
|
||||
return Err(Error::IO(IOError::new(
|
||||
ErrorKind::Other,
|
||||
"no snapshots found",
|
||||
)));
|
||||
}
|
||||
Ok(BankForks {
|
||||
banks,
|
||||
working_bank: bank,
|
||||
root,
|
||||
snapshot_config: None,
|
||||
slots_since_snapshot: vec![snapshot_paths.last().unwrap().slot],
|
||||
confidence: HashMap::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -429,6 +387,7 @@ mod tests {
|
|||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::system_transaction;
|
||||
use std::fs;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::channel;
|
||||
use tempfile::TempDir;
|
||||
|
@ -541,35 +500,35 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
fn restore_from_snapshot(bank_forks: BankForks, account_paths: String, last_slot: u64) {
|
||||
let (snapshot_path, snapshot_package_output_path) = bank_forks
|
||||
fn restore_from_snapshot(old_bank_forks: BankForks, account_paths: String) {
|
||||
let (snapshot_path, snapshot_package_output_path) = old_bank_forks
|
||||
.snapshot_config
|
||||
.as_ref()
|
||||
.map(|c| (&c.snapshot_path, &c.snapshot_package_output_path))
|
||||
.unwrap();
|
||||
|
||||
let new = BankForks::load_from_snapshot(
|
||||
let deserialized_bank = snapshot_utils::bank_from_archive(
|
||||
account_paths,
|
||||
bank_forks.snapshot_config.as_ref().unwrap(),
|
||||
old_bank_forks.snapshot_config.as_ref().unwrap(),
|
||||
snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
for (slot, _) in new.banks.iter() {
|
||||
if *slot > 0 {
|
||||
let bank = bank_forks.banks.get(slot).unwrap().clone();
|
||||
let new_bank = new.banks.get(slot).unwrap();
|
||||
bank.compare_bank(&new_bank);
|
||||
}
|
||||
}
|
||||
let bank = old_bank_forks
|
||||
.banks
|
||||
.get(&deserialized_bank.slot())
|
||||
.unwrap()
|
||||
.clone();
|
||||
bank.compare_bank(&deserialized_bank);
|
||||
|
||||
assert_eq!(new.working_bank().slot(), last_slot);
|
||||
for (slot, _) in new.banks.iter() {
|
||||
snapshot_utils::remove_snapshot(*slot, snapshot_path).unwrap();
|
||||
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_path);
|
||||
|
||||
for p in slot_snapshot_paths {
|
||||
snapshot_utils::remove_snapshot(p.slot, &snapshot_path).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// creates banks upto "last_slot" and runs the input function `f` on each bank created
|
||||
// creates banks up to "last_slot" and runs the input function `f` on each bank created
|
||||
// also marks each bank as root and generates snapshots
|
||||
// finally tries to restore from the last bank's snapshot and compares the restored bank to the
|
||||
// `last_slot` bank
|
||||
|
@ -627,7 +586,6 @@ mod tests {
|
|||
restore_from_snapshot(
|
||||
bank_forks,
|
||||
accounts_dir.path().to_str().unwrap().to_string(),
|
||||
last_slot,
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::bank_forks::BankForks;
|
||||
use crate::blocktree::Blocktree;
|
||||
use crate::blocktree::{Blocktree, SlotMeta};
|
||||
use crate::entry::{Entry, EntrySlice};
|
||||
use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||
use rayon::prelude::*;
|
||||
|
@ -8,6 +8,7 @@ use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug};
|
|||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::locked_accounts_results::LockedAccountsResults;
|
||||
use solana_sdk::genesis_block::GenesisBlock;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::timing::{duration_as_ms, Slot, MAX_RECENT_BLOCKHASHES};
|
||||
use solana_sdk::transaction::Result;
|
||||
use std::result;
|
||||
|
@ -129,7 +130,6 @@ pub fn process_entries(bank: &Bank, entries: &[Entry]) -> Result<()> {
|
|||
#[derive(Debug, PartialEq)]
|
||||
pub struct BankForksInfo {
|
||||
pub bank_slot: u64,
|
||||
pub entry_height: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -144,160 +144,65 @@ pub fn process_blocktree(
|
|||
verify_ledger: bool,
|
||||
dev_halt_at_slot: Option<Slot>,
|
||||
) -> result::Result<(BankForks, Vec<BankForksInfo>, LeaderScheduleCache), BlocktreeProcessorError> {
|
||||
let now = Instant::now();
|
||||
info!("processing ledger...");
|
||||
info!("processing ledger from bank 0...");
|
||||
|
||||
// Setup bank for slot 0
|
||||
let mut pending_slots = {
|
||||
let slot = 0;
|
||||
let bank = Arc::new(Bank::new_with_paths(&genesis_block, account_paths));
|
||||
let entry_height = 0;
|
||||
let last_entry_hash = bank.last_blockhash();
|
||||
let bank0 = Arc::new(Bank::new_with_paths(&genesis_block, account_paths));
|
||||
process_bank_0(&bank0, blocktree, verify_ledger)?;
|
||||
process_blocktree_from_root(blocktree, bank0, verify_ledger, dev_halt_at_slot)
|
||||
}
|
||||
|
||||
// Load the metadata for this slot
|
||||
let meta = blocktree
|
||||
.meta(slot)
|
||||
.map_err(|err| {
|
||||
warn!("Failed to load meta for slot {}: {:?}", slot, err);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?
|
||||
.unwrap();
|
||||
// Process blocktree from a known root bank
|
||||
pub fn process_blocktree_from_root(
|
||||
blocktree: &Blocktree,
|
||||
bank: Arc<Bank>,
|
||||
verify_ledger: bool,
|
||||
dev_halt_at_slot: Option<Slot>,
|
||||
) -> result::Result<(BankForks, Vec<BankForksInfo>, LeaderScheduleCache), BlocktreeProcessorError> {
|
||||
info!("processing ledger from root: {}...", bank.slot());
|
||||
// Starting slot must be a root, and thus has no parents
|
||||
assert!(bank.parent().is_none());
|
||||
let start_slot = bank.slot();
|
||||
let now = Instant::now();
|
||||
let mut rooted_path = vec![start_slot];
|
||||
let dev_halt_at_slot = dev_halt_at_slot.unwrap_or(std::u64::MAX);
|
||||
|
||||
vec![(slot, meta, bank, entry_height, last_entry_hash)]
|
||||
blocktree
|
||||
.set_roots(&[start_slot])
|
||||
.expect("Couldn't set root on startup");
|
||||
|
||||
let meta = blocktree.meta(start_slot).unwrap();
|
||||
|
||||
// Iterate and replay slots from blocktree starting from `start_slot`
|
||||
let (bank_forks, bank_forks_info, leader_schedule_cache) = {
|
||||
if let Some(meta) = meta {
|
||||
let epoch_schedule = bank.epoch_schedule();
|
||||
let mut leader_schedule_cache = LeaderScheduleCache::new(*epoch_schedule, &bank);
|
||||
let fork_info = process_pending_slots(
|
||||
&bank,
|
||||
&meta,
|
||||
blocktree,
|
||||
&mut leader_schedule_cache,
|
||||
&mut rooted_path,
|
||||
verify_ledger,
|
||||
dev_halt_at_slot,
|
||||
)?;
|
||||
let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().unzip();
|
||||
let bank_forks = BankForks::new_from_banks(&banks, rooted_path);
|
||||
(bank_forks, bank_forks_info, leader_schedule_cache)
|
||||
} else {
|
||||
// If there's no meta for the input `start_slot`, then we started from a snapshot
|
||||
// and there's no point in processing the rest of blocktree and implies blocktree
|
||||
// should be empty past this point.
|
||||
let bfi = BankForksInfo {
|
||||
bank_slot: start_slot,
|
||||
};
|
||||
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
|
||||
let bank_forks = BankForks::new_from_banks(&[bank], rooted_path);
|
||||
(bank_forks, vec![bfi], leader_schedule_cache)
|
||||
}
|
||||
};
|
||||
|
||||
blocktree.set_roots(&[0]).expect("Couldn't set first root");
|
||||
|
||||
let leader_schedule_cache =
|
||||
LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule(), &pending_slots[0].2);
|
||||
|
||||
let mut fork_info = vec![];
|
||||
let mut last_status_report = Instant::now();
|
||||
let mut root = 0;
|
||||
let dev_halt_at_slot = dev_halt_at_slot.unwrap_or(std::u64::MAX);
|
||||
while !pending_slots.is_empty() {
|
||||
let (slot, meta, bank, mut entry_height, mut last_entry_hash) =
|
||||
pending_slots.pop().unwrap();
|
||||
|
||||
if last_status_report.elapsed() > Duration::from_secs(2) {
|
||||
info!("processing ledger...block {}", slot);
|
||||
last_status_report = Instant::now();
|
||||
}
|
||||
|
||||
// Fetch all entries for this slot
|
||||
let mut entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| {
|
||||
warn!("Failed to load entries for slot {}: {:?}", slot, err);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?;
|
||||
|
||||
if slot == 0 {
|
||||
// The first entry in the ledger is a pseudo-tick used only to ensure the number of ticks
|
||||
// in slot 0 is the same as the number of ticks in all subsequent slots. It is not
|
||||
// processed by the bank, skip over it.
|
||||
if entries.is_empty() {
|
||||
warn!("entry0 not present");
|
||||
return Err(BlocktreeProcessorError::LedgerVerificationFailed);
|
||||
}
|
||||
let entry0 = entries.remove(0);
|
||||
if !(entry0.is_tick() && entry0.verify(&last_entry_hash)) {
|
||||
warn!("Ledger proof of history failed at entry0");
|
||||
return Err(BlocktreeProcessorError::LedgerVerificationFailed);
|
||||
}
|
||||
last_entry_hash = entry0.hash;
|
||||
entry_height += 1;
|
||||
}
|
||||
|
||||
if !entries.is_empty() {
|
||||
if verify_ledger && !entries.verify(&last_entry_hash) {
|
||||
warn!(
|
||||
"Ledger proof of history failed at slot: {}, entry: {}",
|
||||
slot, entry_height
|
||||
);
|
||||
return Err(BlocktreeProcessorError::LedgerVerificationFailed);
|
||||
}
|
||||
|
||||
process_entries(&bank, &entries).map_err(|err| {
|
||||
warn!("Failed to process entries for slot {}: {:?}", slot, err);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?;
|
||||
|
||||
last_entry_hash = entries.last().unwrap().hash;
|
||||
entry_height += entries.len() as u64;
|
||||
}
|
||||
|
||||
bank.freeze(); // all banks handled by this routine are created from complete slots
|
||||
|
||||
if blocktree.is_root(slot) {
|
||||
root = slot;
|
||||
leader_schedule_cache.set_root(&bank);
|
||||
bank.squash();
|
||||
pending_slots.clear();
|
||||
fork_info.clear();
|
||||
}
|
||||
|
||||
if slot >= dev_halt_at_slot {
|
||||
let bfi = BankForksInfo {
|
||||
bank_slot: slot,
|
||||
entry_height,
|
||||
};
|
||||
fork_info.push((bank, bfi));
|
||||
break;
|
||||
}
|
||||
|
||||
if meta.next_slots.is_empty() {
|
||||
// Reached the end of this fork. Record the final entry height and last entry.hash
|
||||
let bfi = BankForksInfo {
|
||||
bank_slot: slot,
|
||||
entry_height,
|
||||
};
|
||||
fork_info.push((bank, bfi));
|
||||
continue;
|
||||
}
|
||||
|
||||
// This is a fork point, create a new child bank for each fork
|
||||
for next_slot in meta.next_slots {
|
||||
let next_meta = blocktree
|
||||
.meta(next_slot)
|
||||
.map_err(|err| {
|
||||
warn!("Failed to load meta for slot {}: {:?}", slot, err);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?
|
||||
.unwrap();
|
||||
|
||||
// only process full slots in blocktree_processor, replay_stage
|
||||
// handles any partials
|
||||
if next_meta.is_full() {
|
||||
let next_bank = Arc::new(Bank::new_from_parent(
|
||||
&bank,
|
||||
&leader_schedule_cache
|
||||
.slot_leader_at(next_slot, Some(&bank))
|
||||
.unwrap(),
|
||||
next_slot,
|
||||
));
|
||||
trace!("Add child bank for slot={}", next_slot);
|
||||
// bank_forks.insert(*next_slot, child_bank);
|
||||
pending_slots.push((
|
||||
next_slot,
|
||||
next_meta,
|
||||
next_bank,
|
||||
entry_height,
|
||||
last_entry_hash,
|
||||
));
|
||||
} else {
|
||||
let bfi = BankForksInfo {
|
||||
bank_slot: slot,
|
||||
entry_height,
|
||||
};
|
||||
fork_info.push((bank.clone(), bfi));
|
||||
}
|
||||
}
|
||||
|
||||
// reverse sort by slot, so the next slot to be processed can be pop()ed
|
||||
// TODO: remove me once leader_scheduler can hang with out-of-order slots?
|
||||
pending_slots.sort_by(|a, b| b.0.cmp(&a.0));
|
||||
}
|
||||
|
||||
let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().unzip();
|
||||
let bank_forks = BankForks::new_from_banks(&banks, root);
|
||||
info!(
|
||||
"processing ledger...complete in {}ms, forks={}...",
|
||||
duration_as_ms(&now.elapsed()),
|
||||
|
@ -307,6 +212,195 @@ pub fn process_blocktree(
|
|||
Ok((bank_forks, bank_forks_info, leader_schedule_cache))
|
||||
}
|
||||
|
||||
fn verify_and_process_entries(
|
||||
bank: &Bank,
|
||||
entries: &[Entry],
|
||||
verify_ledger: bool,
|
||||
last_entry_hash: Hash,
|
||||
) -> result::Result<Hash, BlocktreeProcessorError> {
|
||||
assert!(!entries.is_empty());
|
||||
|
||||
if verify_ledger && !entries.verify(&last_entry_hash) {
|
||||
warn!("Ledger proof of history failed at slot: {}", bank.slot());
|
||||
return Err(BlocktreeProcessorError::LedgerVerificationFailed);
|
||||
}
|
||||
|
||||
process_entries(&bank, &entries).map_err(|err| {
|
||||
warn!(
|
||||
"Failed to process entries for slot {}: {:?}",
|
||||
bank.slot(),
|
||||
err
|
||||
);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?;
|
||||
|
||||
Ok(entries.last().unwrap().hash)
|
||||
}
|
||||
|
||||
// Special handling required for processing the entries in slot 0
|
||||
fn process_bank_0(
|
||||
bank0: &Bank,
|
||||
blocktree: &Blocktree,
|
||||
verify_ledger: bool,
|
||||
) -> result::Result<(), BlocktreeProcessorError> {
|
||||
assert_eq!(bank0.slot(), 0);
|
||||
|
||||
// Fetch all entries for this slot
|
||||
let mut entries = blocktree.get_slot_entries(0, 0, None).map_err(|err| {
|
||||
warn!("Failed to load entries for slot 0, err: {:?}", err);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?;
|
||||
|
||||
// The first entry in the ledger is a pseudo-tick used only to ensure the number of ticks
|
||||
// in slot 0 is the same as the number of ticks in all subsequent slots. It is not
|
||||
// processed by the bank, skip over it.
|
||||
if entries.is_empty() {
|
||||
warn!("entry0 not present");
|
||||
return Err(BlocktreeProcessorError::LedgerVerificationFailed);
|
||||
}
|
||||
let entry0 = entries.remove(0);
|
||||
if !(entry0.is_tick() && entry0.verify(&bank0.last_blockhash())) {
|
||||
warn!("Ledger proof of history failed at entry0");
|
||||
return Err(BlocktreeProcessorError::LedgerVerificationFailed);
|
||||
}
|
||||
|
||||
if !entries.is_empty() {
|
||||
verify_and_process_entries(bank0, &entries, verify_ledger, entry0.hash)?;
|
||||
} else {
|
||||
bank0.register_tick(&entry0.hash);
|
||||
}
|
||||
|
||||
bank0.freeze();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Given a slot, add its children to the pending slots queue if those children slots are
|
||||
// complete
|
||||
fn process_next_slots(
|
||||
bank: &Arc<Bank>,
|
||||
meta: &SlotMeta,
|
||||
blocktree: &Blocktree,
|
||||
leader_schedule_cache: &LeaderScheduleCache,
|
||||
pending_slots: &mut Vec<(u64, SlotMeta, Arc<Bank>, Hash)>,
|
||||
fork_info: &mut Vec<(Arc<Bank>, BankForksInfo)>,
|
||||
) -> result::Result<(), BlocktreeProcessorError> {
|
||||
if meta.next_slots.is_empty() {
|
||||
// Reached the end of this fork. Record the final entry height and last entry.hash
|
||||
let bfi = BankForksInfo {
|
||||
bank_slot: bank.slot(),
|
||||
};
|
||||
fork_info.push((bank.clone(), bfi));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// This is a fork point if there are multiple children, create a new child bank for each fork
|
||||
for next_slot in &meta.next_slots {
|
||||
let next_meta = blocktree
|
||||
.meta(*next_slot)
|
||||
.map_err(|err| {
|
||||
warn!("Failed to load meta for slot {}: {:?}", next_slot, err);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?
|
||||
.unwrap();
|
||||
|
||||
// Only process full slots in blocktree_processor, replay_stage
|
||||
// handles any partials
|
||||
if next_meta.is_full() {
|
||||
let next_bank = Arc::new(Bank::new_from_parent(
|
||||
&bank,
|
||||
&leader_schedule_cache
|
||||
.slot_leader_at(*next_slot, Some(&bank))
|
||||
.unwrap(),
|
||||
*next_slot,
|
||||
));
|
||||
trace!("Add child bank {} of slot={}", next_slot, bank.slot());
|
||||
pending_slots.push((*next_slot, next_meta, next_bank, bank.last_blockhash()));
|
||||
} else {
|
||||
let bfi = BankForksInfo {
|
||||
bank_slot: bank.slot(),
|
||||
};
|
||||
fork_info.push((bank.clone(), bfi));
|
||||
}
|
||||
}
|
||||
|
||||
// Reverse sort by slot, so the next slot to be processed can be popped
|
||||
// TODO: remove me once leader_scheduler can hang with out-of-order slots?
|
||||
pending_slots.sort_by(|a, b| b.0.cmp(&a.0));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Iterate through blocktree processing slots starting from the root slot pointed to by the
|
||||
// given `meta`
|
||||
fn process_pending_slots(
|
||||
root_bank: &Arc<Bank>,
|
||||
root_meta: &SlotMeta,
|
||||
blocktree: &Blocktree,
|
||||
leader_schedule_cache: &mut LeaderScheduleCache,
|
||||
rooted_path: &mut Vec<u64>,
|
||||
verify_ledger: bool,
|
||||
dev_halt_at_slot: Slot,
|
||||
) -> result::Result<Vec<(Arc<Bank>, BankForksInfo)>, BlocktreeProcessorError> {
|
||||
let mut fork_info = vec![];
|
||||
let mut last_status_report = Instant::now();
|
||||
let mut pending_slots = vec![];
|
||||
process_next_slots(
|
||||
root_bank,
|
||||
root_meta,
|
||||
blocktree,
|
||||
leader_schedule_cache,
|
||||
&mut pending_slots,
|
||||
&mut fork_info,
|
||||
)?;
|
||||
|
||||
while !pending_slots.is_empty() {
|
||||
let (slot, meta, bank, last_entry_hash) = pending_slots.pop().unwrap();
|
||||
|
||||
if last_status_report.elapsed() > Duration::from_secs(2) {
|
||||
info!("processing ledger...block {}", slot);
|
||||
last_status_report = Instant::now();
|
||||
}
|
||||
|
||||
// Fetch all entries for this slot
|
||||
let entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| {
|
||||
warn!("Failed to load entries for slot {}: {:?}", slot, err);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?;
|
||||
|
||||
verify_and_process_entries(&bank, &entries, verify_ledger, last_entry_hash)?;
|
||||
|
||||
bank.freeze(); // all banks handled by this routine are created from complete slots
|
||||
|
||||
if blocktree.is_root(slot) {
|
||||
let parents = bank.parents().into_iter().map(|b| b.slot()).rev().skip(1);
|
||||
let parents: Vec<_> = parents.collect();
|
||||
rooted_path.extend(parents);
|
||||
rooted_path.push(slot);
|
||||
leader_schedule_cache.set_root(&bank);
|
||||
bank.squash();
|
||||
pending_slots.clear();
|
||||
fork_info.clear();
|
||||
}
|
||||
|
||||
if slot >= dev_halt_at_slot {
|
||||
let bfi = BankForksInfo { bank_slot: slot };
|
||||
fork_info.push((bank, bfi));
|
||||
break;
|
||||
}
|
||||
|
||||
process_next_slots(
|
||||
&bank,
|
||||
&meta,
|
||||
blocktree,
|
||||
leader_schedule_cache,
|
||||
&mut pending_slots,
|
||||
&mut fork_info,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(fork_info)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
|
@ -393,7 +487,6 @@ pub mod tests {
|
|||
bank_forks_info[0],
|
||||
BankForksInfo {
|
||||
bank_slot: 0, // slot 1 isn't "full", we stop at slot zero
|
||||
entry_height: ticks_per_slot,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
@ -453,7 +546,6 @@ pub mod tests {
|
|||
bank_forks_info[0],
|
||||
BankForksInfo {
|
||||
bank_slot: 4, // Fork 2's head is slot 4
|
||||
entry_height: ticks_per_slot * 3,
|
||||
}
|
||||
);
|
||||
assert!(&bank_forks[4]
|
||||
|
@ -464,10 +556,7 @@ pub mod tests {
|
|||
.is_empty());
|
||||
|
||||
// Ensure bank_forks holds the right banks
|
||||
for info in bank_forks_info {
|
||||
assert_eq!(bank_forks[info.bank_slot].slot(), info.bank_slot);
|
||||
assert!(bank_forks[info.bank_slot].is_frozen());
|
||||
}
|
||||
verify_fork_infos(&bank_forks, &bank_forks_info);
|
||||
|
||||
assert_eq!(bank_forks.root(), 4);
|
||||
}
|
||||
|
@ -526,7 +615,6 @@ pub mod tests {
|
|||
bank_forks_info[0],
|
||||
BankForksInfo {
|
||||
bank_slot: 3, // Fork 1's head is slot 3
|
||||
entry_height: ticks_per_slot * 4,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
|
@ -541,7 +629,6 @@ pub mod tests {
|
|||
bank_forks_info[1],
|
||||
BankForksInfo {
|
||||
bank_slot: 4, // Fork 2's head is slot 4
|
||||
entry_height: ticks_per_slot * 3,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
|
@ -556,10 +643,7 @@ pub mod tests {
|
|||
assert_eq!(bank_forks.root(), 1);
|
||||
|
||||
// Ensure bank_forks holds the right banks
|
||||
for info in bank_forks_info {
|
||||
assert_eq!(bank_forks[info.bank_slot].slot(), info.bank_slot);
|
||||
assert!(bank_forks[info.bank_slot].is_frozen());
|
||||
}
|
||||
verify_fork_infos(&bank_forks, &bank_forks_info);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -607,7 +691,6 @@ pub mod tests {
|
|||
bank_forks_info[0],
|
||||
BankForksInfo {
|
||||
bank_slot: last_slot + 1, // Head is last_slot + 1
|
||||
entry_height: ticks_per_slot * (last_slot + 2),
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -734,19 +817,12 @@ pub mod tests {
|
|||
blocktree
|
||||
.write_entries(1, 0, 0, genesis_block.ticks_per_slot, &entries)
|
||||
.unwrap();
|
||||
let entry_height = genesis_block.ticks_per_slot + entries.len() as u64;
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1);
|
||||
assert_eq!(bank_forks.root(), 0);
|
||||
assert_eq!(
|
||||
bank_forks_info[0],
|
||||
BankForksInfo {
|
||||
bank_slot: 1,
|
||||
entry_height,
|
||||
}
|
||||
);
|
||||
assert_eq!(bank_forks_info[0], BankForksInfo { bank_slot: 1 });
|
||||
|
||||
let bank = bank_forks[1].clone();
|
||||
assert_eq!(
|
||||
|
@ -770,13 +846,7 @@ pub mod tests {
|
|||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1);
|
||||
assert_eq!(
|
||||
bank_forks_info[0],
|
||||
BankForksInfo {
|
||||
bank_slot: 0,
|
||||
entry_height: 1,
|
||||
}
|
||||
);
|
||||
assert_eq!(bank_forks_info[0], BankForksInfo { bank_slot: 0 });
|
||||
let bank = bank_forks[0].clone();
|
||||
assert_eq!(bank.tick_height(), 0);
|
||||
}
|
||||
|
@ -1264,6 +1334,83 @@ pub mod tests {
|
|||
assert_eq!(bank.process_transaction(&fail_tx), Ok(()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_blocktree_from_root() {
|
||||
let GenesisBlockInfo {
|
||||
mut genesis_block, ..
|
||||
} = create_genesis_block(123);
|
||||
|
||||
let ticks_per_slot = 1;
|
||||
genesis_block.ticks_per_slot = ticks_per_slot;
|
||||
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_block);
|
||||
let blocktree = Blocktree::open(&ledger_path).unwrap();
|
||||
|
||||
/*
|
||||
Build a blocktree in the ledger with the following fork structure:
|
||||
|
||||
slot 0 (all ticks)
|
||||
|
|
||||
slot 1 (all ticks)
|
||||
|
|
||||
slot 2 (all ticks)
|
||||
|
|
||||
slot 3 (all ticks) -> root
|
||||
|
|
||||
slot 4 (all ticks)
|
||||
|
|
||||
slot 5 (all ticks) -> root
|
||||
|
|
||||
slot 6 (all ticks)
|
||||
*/
|
||||
|
||||
let mut last_hash = blockhash;
|
||||
for i in 0..6 {
|
||||
last_hash =
|
||||
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, i + 1, i, last_hash);
|
||||
}
|
||||
blocktree.set_roots(&[3, 5]).unwrap();
|
||||
|
||||
// Set up bank1
|
||||
let bank0 = Arc::new(Bank::new(&genesis_block));
|
||||
process_bank_0(&bank0, &blocktree, true).unwrap();
|
||||
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
|
||||
bank1.squash();
|
||||
let slot1_entries = blocktree.get_slot_entries(1, 0, None).unwrap();
|
||||
verify_and_process_entries(&bank1, &slot1_entries, true, bank0.last_blockhash()).unwrap();
|
||||
|
||||
// Test process_blocktree_from_root() from slot 1 onwards
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree_from_root(&blocktree, bank1, true, None).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1); // One fork
|
||||
assert_eq!(
|
||||
bank_forks_info[0],
|
||||
BankForksInfo {
|
||||
bank_slot: 6, // The head of the fork is slot 6
|
||||
}
|
||||
);
|
||||
|
||||
// slots_since_snapshot should contain everything on the rooted path
|
||||
assert_eq!(
|
||||
bank_forks.slots_since_snapshot().to_vec(),
|
||||
vec![1, 2, 3, 4, 5]
|
||||
);
|
||||
assert_eq!(bank_forks.root(), 5);
|
||||
|
||||
// Verify the parents of the head of the fork
|
||||
assert_eq!(
|
||||
&bank_forks[6]
|
||||
.parents()
|
||||
.iter()
|
||||
.map(|bank| bank.slot())
|
||||
.collect::<Vec<_>>(),
|
||||
&[5]
|
||||
);
|
||||
|
||||
// Check that bank forks has the correct banks
|
||||
verify_fork_infos(&bank_forks, &bank_forks_info);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_process_entries_stress() {
|
||||
|
@ -1357,4 +1504,22 @@ pub mod tests {
|
|||
let bank = Bank::new_with_paths(&genesis_block, account_paths);
|
||||
bank.epoch_schedule().clone()
|
||||
}
|
||||
|
||||
// Check that `bank_forks` contains all the ancestors and banks for each fork identified in
|
||||
// `bank_forks_info`
|
||||
fn verify_fork_infos(bank_forks: &BankForks, bank_forks_info: &[BankForksInfo]) {
|
||||
for fork in bank_forks_info {
|
||||
let head_slot = fork.bank_slot;
|
||||
let head_bank = &bank_forks[head_slot];
|
||||
let mut parents = head_bank.parents();
|
||||
parents.push(head_bank.clone());
|
||||
|
||||
// Ensure the tip of each fork and all its parents are in the given bank_forks
|
||||
for parent in parents {
|
||||
let parent_bank = &bank_forks[parent.slot()];
|
||||
assert_eq!(parent_bank.slot(), parent.slot());
|
||||
assert!(parent_bank.is_frozen());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1059,7 +1059,7 @@ mod test {
|
|||
let arc_bank0 = Arc::new(bank0);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(
|
||||
&[arc_bank0.clone()],
|
||||
0,
|
||||
vec![0],
|
||||
)));
|
||||
let pubkey = Pubkey::new_rand();
|
||||
let mut tower = Tower::new_from_forks(&bank_forks.read().unwrap(), &pubkey);
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
use crate::bank_forks::SnapshotConfig;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::snapshot_package::SnapshotPackage;
|
||||
use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
|
||||
use bincode::{deserialize_from, serialize_into};
|
||||
use flate2::read::GzDecoder;
|
||||
use fs_extra::dir::CopyOptions;
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::status_cache::SlotDelta;
|
||||
use solana_sdk::transaction;
|
||||
|
@ -168,7 +171,48 @@ pub fn remove_snapshot<P: AsRef<Path>>(slot: u64, snapshot_path: P) -> Result<()
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn bank_from_snapshots<P>(
|
||||
pub fn bank_from_archive<P: AsRef<Path>>(
|
||||
account_paths: String,
|
||||
snapshot_config: &SnapshotConfig,
|
||||
snapshot_tar: P,
|
||||
) -> Result<Bank> {
|
||||
// Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()`
|
||||
let unpack_dir = tempfile::tempdir_in(snapshot_config.snapshot_path())?;
|
||||
untar_snapshot_in(&snapshot_tar, &unpack_dir)?;
|
||||
|
||||
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
|
||||
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
|
||||
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
|
||||
let bank = rebuild_bank_from_snapshots(account_paths, &snapshot_paths, unpacked_accounts_dir)?;
|
||||
|
||||
// Move the unpacked snapshots into `snapshot_config.snapshot_path()`
|
||||
let dir_files = fs::read_dir(unpacked_snapshots_dir).expect("Invalid snapshot path");
|
||||
let paths: Vec<PathBuf> = dir_files
|
||||
.filter_map(|entry| entry.ok().map(|e| e.path()))
|
||||
.collect();
|
||||
let mut copy_options = CopyOptions::new();
|
||||
copy_options.overwrite = true;
|
||||
fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?;
|
||||
|
||||
Ok(bank)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_tar_path<P: AsRef<Path>>(snapshot_output_dir: P) -> PathBuf {
|
||||
snapshot_output_dir.as_ref().join("snapshot.tgz")
|
||||
}
|
||||
|
||||
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
snapshot_tar: P,
|
||||
unpack_dir: Q,
|
||||
) -> Result<()> {
|
||||
let tar_gz = File::open(snapshot_tar)?;
|
||||
let tar = GzDecoder::new(tar_gz);
|
||||
let mut archive = Archive::new(tar);
|
||||
archive.unpack(&unpack_dir)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rebuild_bank_from_snapshots<P>(
|
||||
local_account_paths: String,
|
||||
snapshot_paths: &[SlotSnapshotPaths],
|
||||
append_vecs_path: P,
|
||||
|
@ -203,21 +247,6 @@ where
|
|||
Ok(bank)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_tar_path<P: AsRef<Path>>(snapshot_output_dir: P) -> PathBuf {
|
||||
snapshot_output_dir.as_ref().join("snapshot.tgz")
|
||||
}
|
||||
|
||||
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
snapshot_tar: P,
|
||||
unpack_dir: Q,
|
||||
) -> Result<()> {
|
||||
let tar_gz = File::open(snapshot_tar)?;
|
||||
let tar = GzDecoder::new(tar_gz);
|
||||
let mut archive = Archive::new(tar);
|
||||
archive.unpack(&unpack_dir)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_snapshot_file_name(slot: u64) -> String {
|
||||
slot.to_string()
|
||||
}
|
||||
|
|
|
@ -659,7 +659,10 @@ mod tests {
|
|||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank.clone()], 0)));
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(
|
||||
&[bank.clone()],
|
||||
vec![0],
|
||||
)));
|
||||
let (_slot_sender, slot_receiver) = channel();
|
||||
let storage_state = StorageState::new(
|
||||
&bank.last_blockhash(),
|
||||
|
@ -699,7 +702,10 @@ mod tests {
|
|||
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
||||
let slot = 1;
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank.clone()], 0)));
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(
|
||||
&[bank.clone()],
|
||||
vec![0],
|
||||
)));
|
||||
|
||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||
let (bank_sender, bank_receiver) = channel();
|
||||
|
@ -791,7 +797,10 @@ mod tests {
|
|||
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(bank);
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank.clone()], 0)));
|
||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(
|
||||
&[bank.clone()],
|
||||
vec![0],
|
||||
)));
|
||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||
|
||||
let (bank_sender, bank_receiver) = channel();
|
||||
|
|
|
@ -355,30 +355,28 @@ fn get_bank_forks(
|
|||
// Check that the snapshot tar exists, try to load the snapshot if it does
|
||||
if tar.exists() {
|
||||
// Fail hard here if snapshot fails to load, don't silently continue
|
||||
let bank_forks = BankForks::load_from_snapshot(
|
||||
//&genesis_block,
|
||||
let deserialized_bank = snapshot_utils::bank_from_archive(
|
||||
account_paths
|
||||
.clone()
|
||||
.expect("Account paths not present when booting from snapshot"),
|
||||
snapshot_config,
|
||||
tar,
|
||||
&tar,
|
||||
)
|
||||
.expect("Load from snapshot failed");
|
||||
|
||||
let bank = &bank_forks.working_bank();
|
||||
let fork_info = BankForksInfo {
|
||||
bank_slot: bank.slot(),
|
||||
entry_height: bank.tick_height(),
|
||||
};
|
||||
result = Some((
|
||||
bank_forks,
|
||||
vec![fork_info],
|
||||
LeaderScheduleCache::new_from_bank(bank),
|
||||
));
|
||||
result = Some(
|
||||
blocktree_processor::process_blocktree_from_root(
|
||||
blocktree,
|
||||
Arc::new(deserialized_bank),
|
||||
verify_ledger,
|
||||
dev_halt_at_slot,
|
||||
)
|
||||
.expect("processing blocktree after loading snapshot failed"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// If loading from a snapshot failed/snapshot didn't exist
|
||||
// If a snapshot doesn't exist
|
||||
if result.is_none() {
|
||||
result = Some(
|
||||
blocktree_processor::process_blocktree(
|
||||
|
|
|
@ -724,12 +724,12 @@ impl Bank {
|
|||
|
||||
// TODO: put this assert back in
|
||||
// assert!(!self.is_frozen());
|
||||
|
||||
let current_tick_height = {
|
||||
if self.ticks_per_slot() != 1 || self.slot() != 0 {
|
||||
self.tick_height.fetch_add(1, Ordering::Relaxed);
|
||||
self.tick_height.load(Ordering::Relaxed) as u64
|
||||
};
|
||||
inc_new_counter_debug!("bank-register_tick-registered", 1);
|
||||
inc_new_counter_debug!("bank-register_tick-registered", 1);
|
||||
}
|
||||
|
||||
let current_tick_height = self.tick_height.load(Ordering::Relaxed) as u64;
|
||||
|
||||
// Register a new block hash if at the last tick in the slot
|
||||
if current_tick_height % self.ticks_per_slot == self.ticks_per_slot - 1 {
|
||||
|
|
Loading…
Reference in New Issue