Ledger-tool recreate snapshot of starting snapshots

Co-authored-by: Carl Lin <carl@solana.com>
This commit is contained in:
carllin 2020-12-04 21:14:59 -08:00 committed by GitHub
parent ca35bb3ac8
commit 51d8f36dae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 141 additions and 107 deletions

View File

@ -820,118 +820,121 @@ fn load_frozen_forks(
)?;
let dev_halt_at_slot = opts.dev_halt_at_slot.unwrap_or(std::u64::MAX);
while !pending_slots.is_empty() {
let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap();
let slot = bank.slot();
if last_status_report.elapsed() > Duration::from_secs(2) {
let secs = last_status_report.elapsed().as_secs() as f32;
last_status_report = Instant::now();
info!(
"processing ledger: slot={}, last root slot={} slots={} slots/s={:?} txs/s={}",
if root_bank.slot() != dev_halt_at_slot {
while !pending_slots.is_empty() {
let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap();
let slot = bank.slot();
if last_status_report.elapsed() > Duration::from_secs(2) {
let secs = last_status_report.elapsed().as_secs() as f32;
last_status_report = Instant::now();
info!(
"processing ledger: slot={}, last root slot={} slots={} slots/s={:?} txs/s={}",
slot,
last_root_slot,
slots_elapsed,
slots_elapsed as f32 / secs,
txs as f32 / secs,
);
slots_elapsed = 0;
txs = 0;
}
let allocated = thread_mem_usage::Allocatedp::default();
let initial_allocation = allocated.get();
let mut progress = ConfirmationProgress::new(last_entry_hash);
if process_single_slot(
blockstore,
&bank,
opts,
recyclers,
&mut progress,
transaction_status_sender.clone(),
None,
)
.is_err()
{
continue;
}
txs += progress.num_txs;
// Block must be frozen by this point, otherwise `process_single_slot` would
// have errored above
assert!(bank.is_frozen());
all_banks.insert(bank.slot(), bank.clone());
// If we've reached the last known root in blockstore, start looking
// for newer cluster confirmed roots
let new_root_bank = {
if *root == max_root {
supermajority_root_from_vote_accounts(
bank.slot(),
bank.total_epoch_stake(),
bank.vote_accounts(),
).and_then(|supermajority_root| {
if supermajority_root > *root {
// If there's a cluster confirmed root greater than our last
// replayed root, then beccause the cluster confirmed root should
// be descended from our last root, it must exist in `all_banks`
let cluster_root_bank = all_banks.get(&supermajority_root).unwrap();
// cluster root must be a descendant of our root, otherwise something
// is drastically wrong
assert!(cluster_root_bank.ancestors.contains_key(root));
info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot());
Some(cluster_root_bank)
} else {
None
}
})
} else if blockstore.is_root(slot) {
Some(&bank)
} else {
None
}
};
if let Some(new_root_bank) = new_root_bank {
*root = new_root_bank.slot();
last_root_slot = new_root_bank.slot();
leader_schedule_cache.set_root(&new_root_bank);
new_root_bank.squash();
if last_free.elapsed() > Duration::from_secs(30) {
// This could take few secs; so update last_free later
new_root_bank.exhaustively_free_unused_resource();
last_free = Instant::now();
}
// Filter out all non descendants of the new root
pending_slots
.retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(root));
initial_forks.retain(|_, fork_tip_bank| fork_tip_bank.ancestors.contains_key(root));
all_banks.retain(|_, bank| bank.ancestors.contains_key(root));
}
slots_elapsed += 1;
trace!(
"Bank for {}slot {} is complete. {} bytes allocated",
if last_root_slot == slot { "root " } else { "" },
slot,
last_root_slot,
slots_elapsed,
slots_elapsed as f32 / secs,
txs as f32 / secs,
allocated.since(initial_allocation)
);
slots_elapsed = 0;
txs = 0;
}
let allocated = thread_mem_usage::Allocatedp::default();
let initial_allocation = allocated.get();
process_next_slots(
&bank,
&meta,
blockstore,
leader_schedule_cache,
&mut pending_slots,
&mut initial_forks,
)?;
let mut progress = ConfirmationProgress::new(last_entry_hash);
if process_single_slot(
blockstore,
&bank,
opts,
recyclers,
&mut progress,
transaction_status_sender.clone(),
None,
)
.is_err()
{
continue;
}
txs += progress.num_txs;
// Block must be frozen by this point, otherwise `process_single_slot` would
// have errored above
assert!(bank.is_frozen());
all_banks.insert(bank.slot(), bank.clone());
// If we've reached the last known root in blockstore, start looking
// for newer cluster confirmed roots
let new_root_bank = {
if *root == max_root {
supermajority_root_from_vote_accounts(
bank.slot(),
bank.total_epoch_stake(),
bank.vote_accounts(),
).and_then(|supermajority_root| {
if supermajority_root > *root {
// If there's a cluster confirmed root greater than our last
// replayed root, then beccause the cluster confirmed root should
// be descended from our last root, it must exist in `all_banks`
let cluster_root_bank = all_banks.get(&supermajority_root).unwrap();
// cluster root must be a descendant of our root, otherwise something
// is drastically wrong
assert!(cluster_root_bank.ancestors.contains_key(root));
info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot());
Some(cluster_root_bank)
} else {
None
}
})
} else if blockstore.is_root(slot) {
Some(&bank)
} else {
None
if slot >= dev_halt_at_slot {
break;
}
};
if let Some(new_root_bank) = new_root_bank {
*root = new_root_bank.slot();
last_root_slot = new_root_bank.slot();
leader_schedule_cache.set_root(&new_root_bank);
new_root_bank.squash();
if last_free.elapsed() > Duration::from_secs(30) {
// This could take few secs; so update last_free later
new_root_bank.exhaustively_free_unused_resource();
last_free = Instant::now();
}
// Filter out all non descendants of the new root
pending_slots.retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(root));
initial_forks.retain(|_, fork_tip_bank| fork_tip_bank.ancestors.contains_key(root));
all_banks.retain(|_, bank| bank.ancestors.contains_key(root));
}
slots_elapsed += 1;
trace!(
"Bank for {}slot {} is complete. {} bytes allocated",
if last_root_slot == slot { "root " } else { "" },
slot,
allocated.since(initial_allocation)
);
process_next_slots(
&bank,
&meta,
blockstore,
leader_schedule_cache,
&mut pending_slots,
&mut initial_forks,
)?;
if slot >= dev_halt_at_slot {
break;
}
}
@ -2607,6 +2610,37 @@ pub mod tests {
assert_eq!(bank.process_transaction(&fail_tx), Ok(()));
}
#[test]
fn test_halt_at_slot_starting_snapshot_root() {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123);
// Create roots at slots 0, 1
let forks = tr(0) / tr(1);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
blockstore.add_tree(
forks,
false,
true,
genesis_config.ticks_per_slot,
genesis_config.hash(),
);
blockstore.set_roots(&[0, 1]).unwrap();
// Specify halting at slot 0
let opts = ProcessOptions {
poh_verify: true,
dev_halt_at_slot: Some(0),
..ProcessOptions::default()
};
let (bank_forks, _leader_schedule) =
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts).unwrap();
// Should be able to fetch slot 0 because we specified halting at slot 0, even
// if there is a greater root at slot 1.
assert!(bank_forks.get(0).is_some());
}
#[test]
fn test_process_blockstore_from_root() {
let GenesisConfigInfo {