Purge banks, with the same slot, sequentially (#33149)

This commit is contained in:
Brooks 2023-09-05 17:06:35 -04:00 committed by GitHub
parent 3b108564f9
commit 2cf53b6293
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 149 additions and 13 deletions

View File

@ -502,21 +502,28 @@ pub struct PrunedBanksRequestHandler {
impl PrunedBanksRequestHandler {
pub fn handle_request(&self, bank: &Bank, is_serialized_with_abs: bool) -> usize {
let slots = self.pruned_banks_receiver.try_iter().collect::<Vec<_>>();
let count = slots.len();
bank.rc.accounts.accounts_db.thread_pool_clean.install(|| {
slots
.into_par_iter()
.for_each(|(pruned_slot, pruned_bank_id)| {
bank.rc.accounts.accounts_db.purge_slot(
pruned_slot,
pruned_bank_id,
is_serialized_with_abs,
);
});
let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
// We need a stable sort to ensure we purge banks—with the same slot—in the same order
// they were sent into the channel.
banks_to_purge.sort_by_key(|(slot, _id)| *slot);
let num_banks_to_purge = banks_to_purge.len();
// Group the banks into slices with the same slot
let grouped_banks_to_purge: Vec<_> =
GroupBy::new(banks_to_purge.as_slice(), |a, b| a.0 == b.0).collect();
// Purge all the slots in parallel
// Banks for the same slot are purged sequentially
let accounts_db = bank.rc.accounts.accounts_db.as_ref();
accounts_db.thread_pool_clean.install(|| {
grouped_banks_to_purge.into_par_iter().for_each(|group| {
group.iter().for_each(|(slot, bank_id)| {
accounts_db.purge_slot(*slot, *bank_id, is_serialized_with_abs);
})
});
});
count
num_banks_to_purge
}
fn remove_dead_slots(
@ -790,6 +797,56 @@ fn cmp_requests_by_priority(
.then(slot_a.cmp(&slot_b))
}
/// An iterator over a slice producing non-overlapping runs
/// of elements using a predicate to separate them.
///
/// This can be used to extract sorted subslices.
///
/// (`Vec::group_by()`](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.group_by)
/// is currently a nightly-only experimental API. Once the API is stablized, use it instead.
///
/// tracking issue: https://github.com/rust-lang/rust/issues/80552
/// rust-lang PR: https://github.com/rust-lang/rust/pull/79895/
/// implementation permalink: https://github.com/Kerollmops/rust/blob/8b53be660444d736bb6a6e1c6ba42c8180c968e7/library/core/src/slice/iter.rs#L2972-L3023
struct GroupBy<'a, T: 'a, P> {
slice: &'a [T],
predicate: P,
}
impl<'a, T: 'a, P> GroupBy<'a, T, P>
where
P: FnMut(&T, &T) -> bool,
{
fn new(slice: &'a [T], predicate: P) -> Self {
GroupBy { slice, predicate }
}
}
impl<'a, T: 'a, P> Iterator for GroupBy<'a, T, P>
where
P: FnMut(&T, &T) -> bool,
{
type Item = &'a [T];
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.slice.is_empty() {
None
} else {
let mut len = 1;
let mut iter = self.slice.windows(2);
while let Some([l, r]) = iter.next() {
if (self.predicate)(l, r) {
len += 1
} else {
break;
}
}
let (head, tail) = self.slice.split_at(len);
self.slice = tail;
Some(head)
}
}
}
#[cfg(test)]
mod test {
use {
@ -1036,4 +1093,83 @@ mod test {
.get_next_snapshot_request(Some(480))
.is_none());
}
/// Ensure that we can prune banks with the same slot (if they were on different forks)
#[test]
fn test_pruned_banks_request_handler_handle_request() {
let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
let pruned_banks_request_handler = PrunedBanksRequestHandler {
pruned_banks_receiver,
};
let genesis_config_info = create_genesis_config(10);
let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
bank.set_startup_verification_complete();
bank.rc.accounts.accounts_db.enable_bank_drop_callback();
bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
pruned_banks_sender,
))));
let fork0_bank0 = Arc::new(bank);
let fork0_bank1 = Arc::new(Bank::new_from_parent(
fork0_bank0.clone(),
&Pubkey::new_unique(),
fork0_bank0.slot() + 1,
));
let fork1_bank1 = Arc::new(Bank::new_from_parent(
fork0_bank0.clone(),
&Pubkey::new_unique(),
fork0_bank0.slot() + 1,
));
let fork2_bank1 = Arc::new(Bank::new_from_parent(
fork0_bank0.clone(),
&Pubkey::new_unique(),
fork0_bank0.slot() + 1,
));
let fork0_bank2 = Arc::new(Bank::new_from_parent(
fork0_bank1.clone(),
&Pubkey::new_unique(),
fork0_bank1.slot() + 1,
));
let fork1_bank2 = Arc::new(Bank::new_from_parent(
fork1_bank1.clone(),
&Pubkey::new_unique(),
fork1_bank1.slot() + 1,
));
let fork0_bank3 = Arc::new(Bank::new_from_parent(
fork0_bank2.clone(),
&Pubkey::new_unique(),
fork0_bank2.slot() + 1,
));
let fork3_bank3 = Arc::new(Bank::new_from_parent(
fork0_bank2.clone(),
&Pubkey::new_unique(),
fork0_bank2.slot() + 1,
));
fork0_bank3.squash();
drop(fork3_bank3);
drop(fork1_bank2);
drop(fork0_bank2);
drop(fork1_bank1);
drop(fork2_bank1);
drop(fork0_bank1);
drop(fork0_bank0);
let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3, true);
assert_eq!(num_banks_purged, 7);
}
// This test is for our copied impl of GroupBy, above.
// When it is removed, this test can be removed.
#[test]
fn test_group_by() {
let slice = &[1, 1, 1, 3, 3, 2, 2, 2, 1, 0];
let mut iter = GroupBy::new(slice, |a, b| a == b);
assert_eq!(iter.next(), Some(&[1, 1, 1][..]));
assert_eq!(iter.next(), Some(&[3, 3][..]));
assert_eq!(iter.next(), Some(&[2, 2, 2][..]));
assert_eq!(iter.next(), Some(&[1][..]));
assert_eq!(iter.next(), Some(&[0][..]));
assert_eq!(iter.next(), None);
}
}