Cache banks in BankForks until optional largest_confirmed_root (#9678)

automerge
This commit is contained in:
Tyera Eulberg 2020-04-24 16:49:57 -06:00 committed by GitHub
parent d7f37a703e
commit a7f33b5014
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 106 additions and 22 deletions

View File

@ -253,7 +253,7 @@ fn main() {
poh_recorder.lock().unwrap().set_bank(&bank); poh_recorder.lock().unwrap().set_bank(&bank);
assert!(poh_recorder.lock().unwrap().bank().is_some()); assert!(poh_recorder.lock().unwrap().bank().is_some());
if bank.slot() > 32 { if bank.slot() > 32 {
bank_forks.set_root(root, &None); bank_forks.set_root(root, &None, None);
root += 1; root += 1;
} }
debug!( debug!(

View File

@ -637,6 +637,7 @@ pub mod test {
&mut self.progress, &mut self.progress,
&None, &None,
&mut HashSet::new(), &mut HashSet::new(),
None,
); );
} }

View File

@ -217,13 +217,15 @@ impl ReplayStage {
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
let descendants = HashMap::new(); let descendants = HashMap::new();
let forks_root = bank_forks.read().unwrap().root();
let start = allocated.get(); let start = allocated.get();
let mut frozen_banks: Vec<_> = bank_forks let mut frozen_banks: Vec<_> = bank_forks
.read() .read()
.unwrap() .unwrap()
.frozen_banks() .frozen_banks()
.values() .into_iter()
.cloned() .filter(|(slot, _)| *slot >= forks_root)
.map(|(_, bank)| bank)
.collect(); .collect();
let newly_computed_slot_stats = Self::compute_bank_stats( let newly_computed_slot_stats = Self::compute_bank_stats(
&my_pubkey, &my_pubkey,
@ -326,6 +328,7 @@ impl ReplayStage {
&latest_root_senders, &latest_root_senders,
&mut all_pubkeys, &mut all_pubkeys,
&subscriptions, &subscriptions,
&block_commitment_cache,
)?; )?;
}; };
@ -697,6 +700,7 @@ impl ReplayStage {
latest_root_senders: &[Sender<Slot>], latest_root_senders: &[Sender<Slot>],
all_pubkeys: &mut HashSet<Rc<Pubkey>>, all_pubkeys: &mut HashSet<Rc<Pubkey>>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
) -> Result<()> { ) -> Result<()> {
if bank.is_empty() { if bank.is_empty() {
inc_new_counter_info!("replay_stage-voted_empty_bank", 1); inc_new_counter_info!("replay_stage-voted_empty_bank", 1);
@ -722,12 +726,19 @@ impl ReplayStage {
blockstore blockstore
.set_roots(&rooted_slots) .set_roots(&rooted_slots)
.expect("Ledger set roots failed"); .expect("Ledger set roots failed");
let largest_confirmed_root = Some(
block_commitment_cache
.read()
.unwrap()
.largest_confirmed_root(),
);
Self::handle_new_root( Self::handle_new_root(
new_root, new_root,
&bank_forks, &bank_forks,
progress, progress,
accounts_hash_sender, accounts_hash_sender,
all_pubkeys, all_pubkeys,
largest_confirmed_root,
); );
subscriptions.notify_roots(rooted_slots); subscriptions.notify_roots(rooted_slots);
latest_root_senders.iter().for_each(|s| { latest_root_senders.iter().for_each(|s| {
@ -1482,17 +1493,19 @@ impl ReplayStage {
} }
pub(crate) fn handle_new_root( pub(crate) fn handle_new_root(
new_root: u64, new_root: Slot,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap, progress: &mut ProgressMap,
accounts_hash_sender: &Option<AccountsPackageSender>, accounts_hash_sender: &Option<AccountsPackageSender>,
all_pubkeys: &mut HashSet<Rc<Pubkey>>, all_pubkeys: &mut HashSet<Rc<Pubkey>>,
largest_confirmed_root: Option<Slot>,
) { ) {
let old_epoch = bank_forks.read().unwrap().root_bank().epoch(); let old_epoch = bank_forks.read().unwrap().root_bank().epoch();
bank_forks bank_forks.write().unwrap().set_root(
.write() new_root,
.unwrap() accounts_hash_sender,
.set_root(new_root, accounts_hash_sender); largest_confirmed_root,
);
let r_bank_forks = bank_forks.read().unwrap(); let r_bank_forks = bank_forks.read().unwrap();
let new_epoch = bank_forks.read().unwrap().root_bank().epoch(); let new_epoch = bank_forks.read().unwrap().root_bank().epoch();
if old_epoch != new_epoch { if old_epoch != new_epoch {
@ -1513,7 +1526,11 @@ impl ReplayStage {
// Find the next slot that chains to the old slot // Find the next slot that chains to the old slot
let forks = bank_forks.read().unwrap(); let forks = bank_forks.read().unwrap();
let frozen_banks = forks.frozen_banks(); let frozen_banks = forks.frozen_banks();
let frozen_bank_slots: Vec<u64> = frozen_banks.keys().cloned().collect(); let frozen_bank_slots: Vec<u64> = frozen_banks
.keys()
.cloned()
.filter(|s| *s >= forks.root())
.collect();
let next_slots = blockstore let next_slots = blockstore
.get_slots_since(&frozen_bank_slots) .get_slots_since(&frozen_bank_slots)
.expect("Db error"); .expect("Db error");
@ -2097,12 +2114,66 @@ pub(crate) mod tests {
for i in 0..=root { for i in 0..=root {
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0)); progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
} }
ReplayStage::handle_new_root(root, &bank_forks, &mut progress, &None, &mut HashSet::new()); ReplayStage::handle_new_root(
root,
&bank_forks,
&mut progress,
&None,
&mut HashSet::new(),
None,
);
assert_eq!(bank_forks.read().unwrap().root(), root); assert_eq!(bank_forks.read().unwrap().root(), root);
assert_eq!(progress.len(), 1); assert_eq!(progress.len(), 1);
assert!(progress.get(&root).is_some()); assert!(progress.get(&root).is_some());
} }
#[test]
fn test_handle_new_root_ahead_of_largest_confirmed_root() {
let genesis_config = create_genesis_config(10_000).genesis_config;
let bank0 = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0)));
let confirmed_root = 1;
let fork = 2;
let bank1 = Bank::new_from_parent(
bank_forks.read().unwrap().get(0).unwrap(),
&Pubkey::default(),
confirmed_root,
);
bank_forks.write().unwrap().insert(bank1);
let bank2 = Bank::new_from_parent(
bank_forks.read().unwrap().get(confirmed_root).unwrap(),
&Pubkey::default(),
fork,
);
bank_forks.write().unwrap().insert(bank2);
let root = 3;
let root_bank = Bank::new_from_parent(
bank_forks.read().unwrap().get(confirmed_root).unwrap(),
&Pubkey::default(),
root,
);
bank_forks.write().unwrap().insert(root_bank);
let mut progress = ProgressMap::default();
for i in 0..=root {
progress.insert(i, ForkProgress::new(Hash::default(), None, None, 0, 0));
}
ReplayStage::handle_new_root(
root,
&bank_forks,
&mut progress,
&None,
&mut HashSet::new(),
Some(confirmed_root),
);
assert_eq!(bank_forks.read().unwrap().root(), root);
assert!(bank_forks.read().unwrap().get(confirmed_root).is_some());
assert!(bank_forks.read().unwrap().get(fork).is_none());
assert_eq!(progress.len(), 2);
assert!(progress.get(&root).is_some());
assert!(progress.get(&confirmed_root).is_some());
assert!(progress.get(&fork).is_none());
}
#[test] #[test]
fn test_dead_fork_transaction_error() { fn test_dead_fork_transaction_error() {
let keypair1 = Keypair::new(); let keypair1 = Keypair::new();
@ -3072,7 +3143,7 @@ pub(crate) mod tests {
bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 9)); bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 9));
let bank9 = bank_forks.get(9).unwrap().clone(); let bank9 = bank_forks.get(9).unwrap().clone();
bank_forks.insert(Bank::new_from_parent(&bank9, &Pubkey::default(), 10)); bank_forks.insert(Bank::new_from_parent(&bank9, &Pubkey::default(), 10));
bank_forks.set_root(9, &None); bank_forks.set_root(9, &None, None);
let total_epoch_stake = bank0.total_epoch_stake(); let total_epoch_stake = bank0.total_epoch_stake();
// Insert new ForkProgress for slot 10 and its // Insert new ForkProgress for slot 10 and its
@ -3165,7 +3236,7 @@ pub(crate) mod tests {
let stake_per_validator = 10_000; let stake_per_validator = 10_000;
let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake_per_validator); let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake_per_validator);
bank_forks.set_root(0, &None); bank_forks.set_root(0, &None, None);
let total_epoch_stake = bank_forks.root_bank().total_epoch_stake(); let total_epoch_stake = bank_forks.root_bank().total_epoch_stake();
// Insert new ForkProgress representing a slot for all slots 1..=num_banks. Only // Insert new ForkProgress representing a slot for all slots 1..=num_banks. Only
@ -3247,7 +3318,7 @@ pub(crate) mod tests {
let stake_per_validator = 10_000; let stake_per_validator = 10_000;
let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake_per_validator); let (mut bank_forks, mut progress_map) = initialize_state(&keypairs, stake_per_validator);
bank_forks.set_root(0, &None); bank_forks.set_root(0, &None, None);
let total_epoch_stake = num_validators as u64 * stake_per_validator; let total_epoch_stake = num_validators as u64 * stake_per_validator;

View File

@ -1528,7 +1528,7 @@ pub mod tests {
Bank::new_from_parent(&parent_bank, parent_bank.collector_id(), *root); Bank::new_from_parent(&parent_bank, parent_bank.collector_id(), *root);
parent_bank = bank_forks.write().unwrap().insert(new_bank); parent_bank = bank_forks.write().unwrap().insert(new_bank);
parent_bank.squash(); parent_bank.squash();
bank_forks.write().unwrap().set_root(*root, &None); bank_forks.write().unwrap().set_root(*root, &None, None);
let parent = if i > 0 { roots[i - 1] } else { 1 }; let parent = if i > 0 { roots[i - 1] } else { 1 };
fill_blockstore_slot_with_ticks(&blockstore, 5, *root, parent, Hash::default()); fill_blockstore_slot_with_ticks(&blockstore, 5, *root, parent, Hash::default());
} }

View File

@ -139,7 +139,7 @@ mod tests {
// and to allow snapshotting of bank and the purging logic on status_cache to // and to allow snapshotting of bank and the purging logic on status_cache to
// kick in // kick in
if slot % set_root_interval == 0 || slot == last_slot - 1 { if slot % set_root_interval == 0 || slot == last_slot - 1 {
bank_forks.set_root(bank.slot(), &sender); bank_forks.set_root(bank.slot(), &sender, None);
} }
} }
// Generate a snapshot package for last bank // Generate a snapshot package for last bank
@ -380,9 +380,11 @@ mod tests {
snapshot_test_config.bank_forks.insert(new_bank); snapshot_test_config.bank_forks.insert(new_bank);
current_bank = snapshot_test_config.bank_forks[new_slot].clone(); current_bank = snapshot_test_config.bank_forks[new_slot].clone();
} }
snapshot_test_config snapshot_test_config.bank_forks.set_root(
.bank_forks current_bank.slot(),
.set_root(current_bank.slot(), &snapshot_sender); &snapshot_sender,
None,
);
} }
let num_old_slots = num_set_roots * *add_root_interval - MAX_CACHE_ENTRIES + 1; let num_old_slots = num_set_roots * *add_root_interval - MAX_CACHE_ENTRIES + 1;

View File

@ -186,6 +186,7 @@ impl BankForks {
&mut self, &mut self,
root: Slot, root: Slot,
accounts_package_sender: &Option<AccountsPackageSender>, accounts_package_sender: &Option<AccountsPackageSender>,
largest_confirmed_root: Option<Slot>,
) { ) {
let old_epoch = self.root_bank().epoch(); let old_epoch = self.root_bank().epoch();
self.root = root; self.root = root;
@ -263,7 +264,7 @@ impl BankForks {
} }
let new_tx_count = root_bank.transaction_count(); let new_tx_count = root_bank.transaction_count();
self.prune_non_root(root); self.prune_non_root(root, largest_confirmed_root);
inc_new_counter_info!( inc_new_counter_info!(
"bank-forks_set_root_ms", "bank-forks_set_root_ms",
@ -334,10 +335,19 @@ impl BankForks {
Ok(()) Ok(())
} }
fn prune_non_root(&mut self, root: Slot) { fn prune_non_root(&mut self, root: Slot, largest_confirmed_root: Option<Slot>) {
let descendants = self.descendants(); let descendants = self.descendants();
self.banks self.banks.retain(|slot, _| {
.retain(|slot, _| slot == &root || descendants[&root].contains(slot)); *slot == root
|| descendants[&root].contains(slot)
|| (*slot < root
&& *slot >= largest_confirmed_root.unwrap_or(root)
&& descendants[slot].contains(&root))
});
datapoint_debug!(
"bank_forks_purge_non_root",
("num_banks_retained", self.banks.len(), i64),
);
} }
pub fn set_snapshot_config(&mut self, snapshot_config: Option<SnapshotConfig>) { pub fn set_snapshot_config(&mut self, snapshot_config: Option<SnapshotConfig>) {