Rework do_process_blockstore_from_root to use BankForks

This commit is contained in:
Michael Vines 2022-03-03 11:46:29 +01:00
parent 93c8e04d51
commit 0d33b54d74
2 changed files with 92 additions and 88 deletions

View File

@ -210,10 +210,11 @@ fn load_from_snapshot(
incremental: starting_incremental_snapshot_hash,
};
let bank_forks = BankForks::new(deserialized_bank);
to_loadresult(
blockstore_processor::process_blockstore_from_root(
blockstore,
deserialized_bank,
bank_forks,
&process_options,
&VerifyRecyclers::default(),
transaction_status_sender,

View File

@ -590,11 +590,12 @@ pub fn process_blockstore(
opts.accounts_db_config.clone(),
accounts_update_notifier,
);
let bank0 = Arc::new(bank0);
let bank_forks = BankForks::new(bank0);
info!("processing ledger for slot 0...");
let recyclers = VerifyRecyclers::default();
process_bank_0(
&bank0,
&bank_forks.root_bank(),
blockstore,
&opts,
&recyclers,
@ -602,7 +603,7 @@ pub fn process_blockstore(
);
do_process_blockstore_from_root(
blockstore,
bank0,
bank_forks,
&opts,
&recyclers,
None,
@ -618,7 +619,7 @@ pub fn process_blockstore(
#[allow(clippy::too_many_arguments)]
pub(crate) fn process_blockstore_from_root(
blockstore: &Blockstore,
bank: Bank,
bank_forks: BankForks,
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>,
@ -630,7 +631,7 @@ pub(crate) fn process_blockstore_from_root(
) -> BlockstoreProcessorResult {
do_process_blockstore_from_root(
blockstore,
Arc::new(bank),
bank_forks,
opts,
recyclers,
transaction_status_sender,
@ -645,7 +646,7 @@ pub(crate) fn process_blockstore_from_root(
#[allow(clippy::too_many_arguments)]
fn do_process_blockstore_from_root(
blockstore: &Blockstore,
bank: Arc<Bank>,
mut bank_forks: BankForks,
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>,
@ -655,13 +656,14 @@ fn do_process_blockstore_from_root(
timings: BankFromArchiveTimings,
mut last_full_snapshot_slot: Option<Slot>,
) -> BlockstoreProcessorResult {
info!("processing ledger from slot {}...", bank.slot());
// Starting slot must be a root, and thus has no parents
assert_eq!(bank_forks.banks().len(), 1);
let bank = bank_forks.root_bank();
assert!(bank.parent().is_none());
let start_slot = bank.slot();
info!("processing ledger from slot {}...", start_slot);
let now = Instant::now();
let mut root = start_slot;
if let Some(ref new_hard_forks) = opts.new_hard_forks {
let hard_forks = bank.hard_forks();
@ -684,7 +686,10 @@ fn do_process_blockstore_from_root(
.set_roots(std::iter::once(&start_slot))
.expect("Couldn't set root slot on startup");
} else {
assert!(blockstore.is_root(start_slot), "starting slot isn't root and can't update due to being secondary blockstore access: {}", start_slot);
assert!(
blockstore.is_root(start_slot),
"starting slot isn't root and can't update due to being secondary blockstore access: {}", start_slot
);
}
if let Ok(metas) = blockstore.slot_meta_iterator(start_slot) {
@ -699,16 +704,17 @@ fn do_process_blockstore_from_root(
if opts.full_leader_cache {
leader_schedule_cache.set_max_schedules(std::usize::MAX);
}
let initial_forks = if let Some(meta) = blockstore
if let Some(start_slot_meta) = blockstore
.meta(start_slot)
.unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot))
{
let mut initial_forks = load_frozen_forks(
&bank,
&meta,
load_frozen_forks(
&mut bank_forks,
start_slot,
&start_slot_meta,
blockstore,
&mut leader_schedule_cache,
&mut root,
opts,
recyclers,
transaction_status_sender,
@ -718,19 +724,11 @@ fn do_process_blockstore_from_root(
&mut timing,
&mut last_full_snapshot_slot,
)?;
initial_forks.sort_by_key(|bank| bank.slot());
initial_forks
} else {
// If there's no meta for the input `start_slot`, then we started from a snapshot
// and there's no point in processing the rest of blockstore and implies blockstore
// should be empty past this point.
vec![bank]
};
if initial_forks.is_empty() {
return Err(BlockstoreProcessorError::NoValidForksFound);
}
let bank_forks = BankForks::new_from_banks(&initial_forks, root);
let processing_time = now.elapsed();
@ -743,7 +741,9 @@ fn do_process_blockstore_from_root(
.root_bank()
.calculate_and_verify_capitalization(debug_verify)
{
return Err(BlockstoreProcessorError::RootBankWithMismatchedCapitalization(root));
return Err(
BlockstoreProcessorError::RootBankWithMismatchedCapitalization(bank_forks.root()),
);
}
time_cap.stop();
@ -752,7 +752,7 @@ fn do_process_blockstore_from_root(
("total_time_us", processing_time.as_micros(), i64),
("frozen_banks", bank_forks.frozen_banks().len(), i64),
("slot", bank_forks.root(), i64),
("forks", initial_forks.len(), i64),
("forks", bank_forks.banks().len(), i64),
("calculate_capitalization_us", time_cap.as_us(), i64),
(
"full_snapshot_untar_us",
@ -777,23 +777,17 @@ fn do_process_blockstore_from_root(
);
info!("ledger processing timing: {:?}", timing);
let mut bank_slots = bank_forks.banks().keys().collect::<Vec<_>>();
bank_slots.sort_unstable();
info!(
"ledger processed in {}. root slot is {}, {} fork{} at {}, with {} frozen bank{}",
"ledger processed in {}. root slot is {}, {} bank{}: {}",
HumanTime::from(chrono::Duration::from_std(processing_time).unwrap())
.to_text_en(Accuracy::Precise, Tense::Present),
bank_forks.root(),
initial_forks.len(),
if initial_forks.len() > 1 { "s" } else { "" },
initial_forks
.iter()
.map(|b| b.slot().to_string())
.join(", "),
bank_forks.frozen_banks().len(),
if bank_forks.frozen_banks().len() > 1 {
"s"
} else {
""
},
bank_slots.len(),
if bank_slots.len() > 1 { "s" } else { "" },
bank_slots.iter().map(|slot| slot.to_string()).join(", "),
);
assert!(bank_forks.active_banks().is_empty());
@ -801,8 +795,8 @@ fn do_process_blockstore_from_root(
}
/// Verify that a segment of entries has the correct number of ticks and hashes
pub fn verify_ticks(
bank: &Arc<Bank>,
fn verify_ticks(
bank: &Bank,
entries: &[Entry],
slot_full: bool,
tick_hash_count: &mut u64,
@ -1114,14 +1108,8 @@ fn process_next_slots(
meta: &SlotMeta,
blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache,
pending_slots: &mut Vec<(SlotMeta, Arc<Bank>, Hash)>,
initial_forks: &mut HashMap<Slot, Arc<Bank>>,
pending_slots: &mut Vec<(SlotMeta, Bank, Hash)>,
) -> result::Result<(), BlockstoreProcessorError> {
if let Some(parent) = bank.parent() {
initial_forks.remove(&parent.slot());
}
initial_forks.insert(bank.slot(), bank.clone());
if meta.next_slots.is_empty() {
return Ok(());
}
@ -1139,13 +1127,13 @@ fn process_next_slots(
// Only process full slots in blockstore_processor, replay_stage
// handles any partials
if next_meta.is_full() {
let next_bank = Arc::new(Bank::new_from_parent(
let next_bank = Bank::new_from_parent(
bank,
&leader_schedule_cache
.slot_leader_at(*next_slot, Some(bank))
.unwrap(),
*next_slot,
));
);
trace!(
"New bank for slot {}, parent slot is {}",
next_slot,
@ -1164,11 +1152,11 @@ fn process_next_slots(
// given `meta` and return a vector of frozen bank forks
#[allow(clippy::too_many_arguments)]
fn load_frozen_forks(
root_bank: &Arc<Bank>,
root_meta: &SlotMeta,
bank_forks: &mut BankForks,
start_slot: Slot,
start_slot_meta: &SlotMeta,
blockstore: &Blockstore,
leader_schedule_cache: &mut LeaderScheduleCache,
root: &mut Slot,
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
transaction_status_sender: Option<&TransactionStatusSender>,
@ -1177,33 +1165,32 @@ fn load_frozen_forks(
accounts_package_sender: AccountsPackageSender,
timing: &mut ExecuteTimings,
last_full_snapshot_slot: &mut Option<Slot>,
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
let mut initial_forks = HashMap::new();
) -> result::Result<(), BlockstoreProcessorError> {
let mut all_banks = HashMap::new();
let mut last_status_report = Instant::now();
let mut last_free = Instant::now();
let mut pending_slots = vec![];
let mut last_root = root_bank.slot();
let mut slots_elapsed = 0;
let mut txs = 0;
let blockstore_max_root = blockstore.max_root();
let max_root = std::cmp::max(root_bank.slot(), blockstore_max_root);
let mut root = bank_forks.root();
let max_root = std::cmp::max(root, blockstore_max_root);
info!(
"load_frozen_forks() latest root from blockstore: {}, max_root: {}",
blockstore_max_root, max_root,
);
process_next_slots(
root_bank,
root_meta,
bank_forks.get(start_slot).unwrap(),
start_slot_meta,
blockstore,
leader_schedule_cache,
&mut pending_slots,
&mut initial_forks,
)?;
let dev_halt_at_slot = opts.dev_halt_at_slot.unwrap_or(std::u64::MAX);
if root_bank.slot() != dev_halt_at_slot {
if bank_forks.root() != dev_halt_at_slot {
while !pending_slots.is_empty() {
timing.details.per_program_timings.clear();
let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap();
@ -1214,7 +1201,7 @@ fn load_frozen_forks(
info!(
"processing ledger: slot={}, last root slot={} slots={} slots/s={:?} txs/s={}",
slot,
last_root,
root,
slots_elapsed,
slots_elapsed as f32 / secs,
txs as f32 / secs,
@ -1225,6 +1212,7 @@ fn load_frozen_forks(
let mut progress = ConfirmationProgress::new(last_entry_hash);
let bank = bank_forks.insert(bank);
if process_single_slot(
blockstore,
&bank,
@ -1238,6 +1226,7 @@ fn load_frozen_forks(
)
.is_err()
{
assert!(bank_forks.remove(bank.slot()).is_some());
continue;
}
txs += progress.num_txs;
@ -1250,13 +1239,13 @@ fn load_frozen_forks(
// If we've reached the last known root in blockstore, start looking
// for newer cluster confirmed roots
let new_root_bank = {
if *root >= max_root {
if bank_forks.root() >= max_root {
supermajority_root_from_vote_accounts(
bank.slot(),
bank.total_epoch_stake(),
&bank.vote_accounts(),
).and_then(|supermajority_root| {
if supermajority_root > *root {
if supermajority_root > root {
// If there's a cluster confirmed root greater than our last
// replayed root, then because the cluster confirmed root should
// be descended from our last root, it must exist in `all_banks`
@ -1264,15 +1253,18 @@ fn load_frozen_forks(
// cluster root must be a descendant of our root, otherwise something
// is drastically wrong
assert!(cluster_root_bank.ancestors.contains_key(root));
info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot());
assert!(cluster_root_bank.ancestors.contains_key(&root));
info!(
"blockstore processor found new cluster confirmed root: {}, observed in bank: {}",
cluster_root_bank.slot(), bank.slot()
);
// Ensure cluster-confirmed root and parents are set as root in blockstore
let mut rooted_slots = vec![];
let mut new_root_bank = cluster_root_bank.clone();
loop {
if new_root_bank.slot() == *root { break; } // Found the last root in the chain, yay!
assert!(new_root_bank.slot() > *root);
if new_root_bank.slot() == root { break; } // Found the last root in the chain, yay!
assert!(new_root_bank.slot() > root);
rooted_slots.push((new_root_bank.slot(), new_root_bank.hash()));
// As noted, the cluster confirmed root should be descended from
@ -1295,11 +1287,14 @@ fn load_frozen_forks(
};
if let Some(new_root_bank) = new_root_bank {
*root = new_root_bank.slot();
last_root = new_root_bank.slot();
root = new_root_bank.slot();
leader_schedule_cache.set_root(new_root_bank);
new_root_bank.squash();
let _ = bank_forks.set_root(
root,
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
None,
);
if let Some(snapshot_config) = snapshot_config {
let block_height = new_root_bank.block_height();
@ -1307,8 +1302,8 @@ fn load_frozen_forks(
block_height,
snapshot_config.full_snapshot_archive_interval_slots,
) {
info!("Taking snapshot of new root bank that has crossed the full snapshot interval! slot: {}", *root);
*last_full_snapshot_slot = Some(*root);
info!("Taking snapshot of new root bank that has crossed the full snapshot interval! slot: {}", root);
*last_full_snapshot_slot = Some(root);
new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot);
last_free = Instant::now();
new_root_bank.update_accounts_hash_with_index_option(
@ -1331,7 +1326,7 @@ fn load_frozen_forks(
trace!(
"took bank snapshot for new root bank, block height: {}, slot: {}",
block_height,
*root
root
);
}
}
@ -1346,16 +1341,15 @@ fn load_frozen_forks(
// Filter out all non descendants of the new root
pending_slots
.retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(root));
initial_forks.retain(|_, fork_tip_bank| fork_tip_bank.ancestors.contains_key(root));
all_banks.retain(|_, bank| bank.ancestors.contains_key(root));
.retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(&root));
all_banks.retain(|_, bank| bank.ancestors.contains_key(&root));
}
slots_elapsed += 1;
trace!(
"Bank for {}slot {} is complete",
if last_root == slot { "root " } else { "" },
if root == slot { "root " } else { "" },
slot,
);
@ -1365,7 +1359,6 @@ fn load_frozen_forks(
blockstore,
leader_schedule_cache,
&mut pending_slots,
&mut initial_forks,
)?;
if slot >= dev_halt_at_slot {
@ -1376,7 +1369,7 @@ fn load_frozen_forks(
}
}
Ok(initial_forks.values().cloned().collect::<Vec<_>>())
Ok(())
}
// `roots` is sorted largest to smallest by root slot
@ -3163,7 +3156,8 @@ pub mod tests {
blockstore.set_roots(vec![3, 5].iter()).unwrap();
// Set up bank1
let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config));
let bank0 = bank_forks.get(0).unwrap().clone();
let opts = ProcessOptions {
poh_verify: true,
accounts_db_test_hash_calculation: true,
@ -3171,7 +3165,7 @@ pub mod tests {
};
let recyclers = VerifyRecyclers::default();
process_bank_0(&bank0, &blockstore, &opts, &recyclers, None);
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
let bank1 = bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
confirm_full_slot(
&blockstore,
&bank1,
@ -3183,13 +3177,17 @@ pub mod tests {
&mut ExecuteTimings::default(),
)
.unwrap();
bank1.squash();
bank_forks.set_root(
1,
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
None,
);
// Test process_blockstore_from_root() from slot 1 onwards
let (accounts_package_sender, _) = unbounded();
let (bank_forks, ..) = do_process_blockstore_from_root(
&blockstore,
bank1,
bank_forks,
&opts,
&recyclers,
None,
@ -3255,7 +3253,8 @@ pub mod tests {
blockstore.set_roots(roots_to_set.iter()).unwrap();
// Set up bank1
let bank0 = Arc::new(Bank::new_for_tests(&genesis_config));
let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config));
let bank0 = bank_forks.get(0).unwrap().clone();
let opts = ProcessOptions {
poh_verify: true,
accounts_db_test_hash_calculation: true,
@ -3265,7 +3264,7 @@ pub mod tests {
process_bank_0(&bank0, &blockstore, &opts, &recyclers, None);
let slot_start_processing = 1;
let bank = Arc::new(Bank::new_from_parent(
let bank = bank_forks.insert(Bank::new_from_parent(
&bank0,
&Pubkey::default(),
slot_start_processing,
@ -3281,7 +3280,11 @@ pub mod tests {
&mut ExecuteTimings::default(),
)
.unwrap();
bank.squash();
bank_forks.set_root(
1,
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
None,
);
let bank_snapshots_tempdir = TempDir::new().unwrap();
let snapshot_config = SnapshotConfig {
@ -3294,7 +3297,7 @@ pub mod tests {
do_process_blockstore_from_root(
&blockstore,
bank,
bank_forks,
&opts,
&recyclers,
None,