From a7f33b50145c0efd4fd3fc4e6865c80e1a188bd9 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 24 Apr 2020 16:49:57 -0600 Subject: [PATCH] Cache banks in BankForks until optional largest_confirmed_root (#9678) automerge --- banking-bench/src/main.rs | 2 +- core/src/consensus.rs | 1 + core/src/replay_stage.rs | 95 ++++++++++++++++++++++++++++++++++----- core/src/rpc.rs | 2 +- core/tests/bank_forks.rs | 10 +++-- ledger/src/bank_forks.rs | 18 ++++++-- 6 files changed, 106 insertions(+), 22 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 176117d23e..bba9ec24da 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -253,7 +253,7 @@ fn main() { poh_recorder.lock().unwrap().set_bank(&bank); assert!(poh_recorder.lock().unwrap().bank().is_some()); if bank.slot() > 32 { - bank_forks.set_root(root, &None); + bank_forks.set_root(root, &None, None); root += 1; } debug!( diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 10a7795d9a..055b02c394 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -637,6 +637,7 @@ pub mod test { &mut self.progress, &None, &mut HashSet::new(), + None, ); } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 149fa0c6ad..0e5e0a5e10 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -217,13 +217,15 @@ impl ReplayStage { let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); let descendants = HashMap::new(); + let forks_root = bank_forks.read().unwrap().root(); let start = allocated.get(); let mut frozen_banks: Vec<_> = bank_forks .read() .unwrap() .frozen_banks() - .values() - .cloned() + .into_iter() + .filter(|(slot, _)| *slot >= forks_root) + .map(|(_, bank)| bank) .collect(); let newly_computed_slot_stats = Self::compute_bank_stats( &my_pubkey, @@ -326,6 +328,7 @@ impl ReplayStage { &latest_root_senders, &mut all_pubkeys, &subscriptions, + &block_commitment_cache, )?; }; @@ -697,6 +700,7 @@ impl ReplayStage { latest_root_senders: &[Sender], all_pubkeys: &mut HashSet>, subscriptions: &Arc, + block_commitment_cache: &Arc>, ) -> Result<()> { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -722,12 +726,19 @@ impl ReplayStage { blockstore .set_roots(&rooted_slots) .expect("Ledger set roots failed"); + let largest_confirmed_root = Some( + block_commitment_cache + .read() + .unwrap() + .largest_confirmed_root(), + ); Self::handle_new_root( new_root, &bank_forks, progress, accounts_hash_sender, all_pubkeys, + largest_confirmed_root, ); subscriptions.notify_roots(rooted_slots); latest_root_senders.iter().for_each(|s| { @@ -1482,17 +1493,19 @@ impl ReplayStage { } pub(crate) fn handle_new_root( - new_root: u64, + new_root: Slot, bank_forks: &RwLock, progress: &mut ProgressMap, accounts_hash_sender: &Option, all_pubkeys: &mut HashSet>, + largest_confirmed_root: Option, ) { let old_epoch = bank_forks.read().unwrap().root_bank().epoch(); - bank_forks - .write() - .unwrap() - .set_root(new_root, accounts_hash_sender); + bank_forks.write().unwrap().set_root( + new_root, + accounts_hash_sender, + largest_confirmed_root, + ); let r_bank_forks = bank_forks.read().unwrap(); let new_epoch = bank_forks.read().unwrap().root_bank().epoch(); if old_epoch != new_epoch { @@ -1513,7 +1526,11 @@ impl ReplayStage { // Find the next slot that chains to the old slot let forks = bank_forks.read().unwrap(); let frozen_banks = forks.frozen_banks(); - let frozen_bank_slots: Vec = frozen_banks.keys().cloned().collect(); + let frozen_bank_slots: Vec = frozen_banks + .keys() + .cloned() + .filter(|s| *s >= forks.root()) + .collect(); let next_slots = blockstore .get_slots_since(&frozen_bank_slots) .expect("Db error"); @@ -2097,12 +2114,66 @@ pub(crate) mod tests { for i in 0..=root { progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); } - ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None, &mut HashSet::new()); + ReplayStage::handle_new_root( + root, + &bank_forks, + &mut progress, + &None, + &mut HashSet::new(), + None, + ); assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(progress.len(), 1); assert!(progress.get(&root).is_some()); } + #[test] + fn test_handle_new_root_ahead_of_largest_confirmed_root() { + let genesis_config = create_genesis_config(10_000).genesis_config; + let bank0 = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0))); + let confirmed_root = 1; + let fork = 2; + let bank1 = Bank::new_from_parent( + bank_forks.read().unwrap().get(0).unwrap(), + &Pubkey::default(), + confirmed_root, + ); + bank_forks.write().unwrap().insert(bank1); + let bank2 = Bank::new_from_parent( + bank_forks.read().unwrap().get(confirmed_root).unwrap(), + &Pubkey::default(), + fork, + ); + bank_forks.write().unwrap().insert(bank2); + let root = 3; + let root_bank = Bank::new_from_parent( + bank_forks.read().unwrap().get(confirmed_root).unwrap(), + &Pubkey::default(), + root, + ); + bank_forks.write().unwrap().insert(root_bank); + let mut progress = ProgressMap::default(); + for i in 0..=root { + progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); + } + ReplayStage::handle_new_root( + root, + &bank_forks, + &mut progress, + &None, + &mut HashSet::new(), + Some(confirmed_root), + ); + assert_eq!(bank_forks.read().unwrap().root(), root); + assert!(bank_forks.read().unwrap().get(confirmed_root).is_some()); + assert!(bank_forks.read().unwrap().get(fork).is_none()); + assert_eq!(progress.len(), 2); + assert!(progress.get(&root).is_some()); + assert!(progress.get(&confirmed_root).is_some()); + assert!(progress.get(&fork).is_none()); + } + #[test] fn test_dead_fork_transaction_error() { let keypair1 = Keypair::new(); @@ -3072,7 +3143,7 @@ pub(crate) mod tests { bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 9)); let bank9 = bank_forks.get(9).unwrap().clone(); bank_forks.insert(Bank::new_from_parent(&bank9, &Pubkey::default(), 10)); - bank_forks.set_root(9, &None); + bank_forks.set_root(9, &None, None); let total_epoch_stake = bank0.total_epoch_stake(); // Insert new ForkProgress for slot 10 and its @@ -3165,7 +3236,7 @@ pub(crate) mod tests { let stake_per_validator = 10_000; let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake_per_validator); - bank_forks.set_root(0, &None); + bank_forks.set_root(0, &None, None); let total_epoch_stake = bank_forks.root_bank().total_epoch_stake(); // Insert new ForkProgress representing a slot for all slots 1..=num_banks. Only @@ -3247,7 +3318,7 @@ pub(crate) mod tests { let stake_per_validator = 10_000; let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake_per_validator); - bank_forks.set_root(0, &None); + bank_forks.set_root(0, &None, None); let total_epoch_stake = num_validators as u64 * stake_per_validator; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index bd76d152f9..17aeee545d 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -1528,7 +1528,7 @@ pub mod tests { Bank::new_from_parent(&parent_bank, parent_bank.collector_id(), *root); parent_bank = bank_forks.write().unwrap().insert(new_bank); parent_bank.squash(); - bank_forks.write().unwrap().set_root(*root, &None); + bank_forks.write().unwrap().set_root(*root, &None, None); let parent = if i > 0 { roots[i - 1] } else { 1 }; fill_blockstore_slot_with_ticks(&blockstore, 5, *root, parent, Hash::default()); } diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index da84b01526..572674d331 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -139,7 +139,7 @@ mod tests { // and to allow snapshotting of bank and the purging logic on status_cache to // kick in if slot % set_root_interval == 0 || slot == last_slot - 1 { - bank_forks.set_root(bank.slot(), &sender); + bank_forks.set_root(bank.slot(), &sender, None); } } // Generate a snapshot package for last bank @@ -380,9 +380,11 @@ mod tests { snapshot_test_config.bank_forks.insert(new_bank); current_bank = snapshot_test_config.bank_forks[new_slot].clone(); } - snapshot_test_config - .bank_forks - .set_root(current_bank.slot(), &snapshot_sender); + snapshot_test_config.bank_forks.set_root( + current_bank.slot(), + &snapshot_sender, + None, + ); } let num_old_slots = num_set_roots * *add_root_interval - MAX_CACHE_ENTRIES + 1; diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index a0c85973b3..278e6be0c8 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -186,6 +186,7 @@ impl BankForks { &mut self, root: Slot, accounts_package_sender: &Option, + largest_confirmed_root: Option, ) { let old_epoch = self.root_bank().epoch(); self.root = root; @@ -263,7 +264,7 @@ impl BankForks { } let new_tx_count = root_bank.transaction_count(); - self.prune_non_root(root); + self.prune_non_root(root, largest_confirmed_root); inc_new_counter_info!( "bank-forks_set_root_ms", @@ -334,10 +335,19 @@ impl BankForks { Ok(()) } - fn prune_non_root(&mut self, root: Slot) { + fn prune_non_root(&mut self, root: Slot, largest_confirmed_root: Option) { let descendants = self.descendants(); - self.banks - .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); + self.banks.retain(|slot, _| { + *slot == root + || descendants[&root].contains(slot) + || (*slot < root + && *slot >= largest_confirmed_root.unwrap_or(root) + && descendants[slot].contains(&root)) + }); + datapoint_debug!( + "bank_forks_purge_non_root", + ("num_banks_retained", self.banks.len(), i64), + ); } pub fn set_snapshot_config(&mut self, snapshot_config: Option) {