From 2cf53b6293917d71a20e5ae336a80b2cf6539ff0 Mon Sep 17 00:00:00 2001 From: Brooks Date: Tue, 5 Sep 2023 17:06:35 -0400 Subject: [PATCH] Purge banks, with the same slot, sequentially (#33149) --- runtime/src/accounts_background_service.rs | 162 +++++++++++++++++++-- 1 file changed, 149 insertions(+), 13 deletions(-) diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 365913502..0e7c15837 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -502,21 +502,28 @@ pub struct PrunedBanksRequestHandler { impl PrunedBanksRequestHandler { pub fn handle_request(&self, bank: &Bank, is_serialized_with_abs: bool) -> usize { - let slots = self.pruned_banks_receiver.try_iter().collect::>(); - let count = slots.len(); - bank.rc.accounts.accounts_db.thread_pool_clean.install(|| { - slots - .into_par_iter() - .for_each(|(pruned_slot, pruned_bank_id)| { - bank.rc.accounts.accounts_db.purge_slot( - pruned_slot, - pruned_bank_id, - is_serialized_with_abs, - ); - }); + let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect(); + // We need a stable sort to ensure we purge banks—with the same slot—in the same order + // they were sent into the channel. + banks_to_purge.sort_by_key(|(slot, _id)| *slot); + let num_banks_to_purge = banks_to_purge.len(); + + // Group the banks into slices with the same slot + let grouped_banks_to_purge: Vec<_> = + GroupBy::new(banks_to_purge.as_slice(), |a, b| a.0 == b.0).collect(); + + // Purge all the slots in parallel + // Banks for the same slot are purged sequentially + let accounts_db = bank.rc.accounts.accounts_db.as_ref(); + accounts_db.thread_pool_clean.install(|| { + grouped_banks_to_purge.into_par_iter().for_each(|group| { + group.iter().for_each(|(slot, bank_id)| { + accounts_db.purge_slot(*slot, *bank_id, is_serialized_with_abs); + }) + }); }); - count + num_banks_to_purge } fn remove_dead_slots( @@ -790,6 +797,56 @@ fn cmp_requests_by_priority( .then(slot_a.cmp(&slot_b)) } +/// An iterator over a slice producing non-overlapping runs +/// of elements using a predicate to separate them. +/// +/// This can be used to extract sorted subslices. +/// +/// (`Vec::group_by()`](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.group_by) +/// is currently a nightly-only experimental API. Once the API is stablized, use it instead. +/// +/// tracking issue: https://github.com/rust-lang/rust/issues/80552 +/// rust-lang PR: https://github.com/rust-lang/rust/pull/79895/ +/// implementation permalink: https://github.com/Kerollmops/rust/blob/8b53be660444d736bb6a6e1c6ba42c8180c968e7/library/core/src/slice/iter.rs#L2972-L3023 +struct GroupBy<'a, T: 'a, P> { + slice: &'a [T], + predicate: P, +} +impl<'a, T: 'a, P> GroupBy<'a, T, P> +where + P: FnMut(&T, &T) -> bool, +{ + fn new(slice: &'a [T], predicate: P) -> Self { + GroupBy { slice, predicate } + } +} +impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P> +where + P: FnMut(&T, &T) -> bool, +{ + type Item = &'a [T]; + + #[inline] + fn next(&mut self) -> Option { + if self.slice.is_empty() { + None + } else { + let mut len = 1; + let mut iter = self.slice.windows(2); + while let Some([l, r]) = iter.next() { + if (self.predicate)(l, r) { + len += 1 + } else { + break; + } + } + let (head, tail) = self.slice.split_at(len); + self.slice = tail; + Some(head) + } + } +} + #[cfg(test)] mod test { use { @@ -1036,4 +1093,83 @@ mod test { .get_next_snapshot_request(Some(480)) .is_none()); } + + /// Ensure that we can prune banks with the same slot (if they were on different forks) + #[test] + fn test_pruned_banks_request_handler_handle_request() { + let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded(); + let pruned_banks_request_handler = PrunedBanksRequestHandler { + pruned_banks_receiver, + }; + let genesis_config_info = create_genesis_config(10); + let bank = Bank::new_for_tests(&genesis_config_info.genesis_config); + bank.set_startup_verification_complete(); + bank.rc.accounts.accounts_db.enable_bank_drop_callback(); + bank.set_callback(Some(Box::new(SendDroppedBankCallback::new( + pruned_banks_sender, + )))); + + let fork0_bank0 = Arc::new(bank); + let fork0_bank1 = Arc::new(Bank::new_from_parent( + fork0_bank0.clone(), + &Pubkey::new_unique(), + fork0_bank0.slot() + 1, + )); + let fork1_bank1 = Arc::new(Bank::new_from_parent( + fork0_bank0.clone(), + &Pubkey::new_unique(), + fork0_bank0.slot() + 1, + )); + let fork2_bank1 = Arc::new(Bank::new_from_parent( + fork0_bank0.clone(), + &Pubkey::new_unique(), + fork0_bank0.slot() + 1, + )); + let fork0_bank2 = Arc::new(Bank::new_from_parent( + fork0_bank1.clone(), + &Pubkey::new_unique(), + fork0_bank1.slot() + 1, + )); + let fork1_bank2 = Arc::new(Bank::new_from_parent( + fork1_bank1.clone(), + &Pubkey::new_unique(), + fork1_bank1.slot() + 1, + )); + let fork0_bank3 = Arc::new(Bank::new_from_parent( + fork0_bank2.clone(), + &Pubkey::new_unique(), + fork0_bank2.slot() + 1, + )); + let fork3_bank3 = Arc::new(Bank::new_from_parent( + fork0_bank2.clone(), + &Pubkey::new_unique(), + fork0_bank2.slot() + 1, + )); + fork0_bank3.squash(); + + drop(fork3_bank3); + drop(fork1_bank2); + drop(fork0_bank2); + drop(fork1_bank1); + drop(fork2_bank1); + drop(fork0_bank1); + drop(fork0_bank0); + let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3, true); + assert_eq!(num_banks_purged, 7); + } + + // This test is for our copied impl of GroupBy, above. + // When it is removed, this test can be removed. + #[test] + fn test_group_by() { + let slice = &[1, 1, 1, 3, 3, 2, 2, 2, 1, 0]; + + let mut iter = GroupBy::new(slice, |a, b| a == b); + assert_eq!(iter.next(), Some(&[1, 1, 1][..])); + assert_eq!(iter.next(), Some(&[3, 3][..])); + assert_eq!(iter.next(), Some(&[2, 2, 2][..])); + assert_eq!(iter.next(), Some(&[1][..])); + assert_eq!(iter.next(), Some(&[0][..])); + assert_eq!(iter.next(), None); + } }