AcctIdx: hold ranges in memory uses multiple threads (#22031)
This commit is contained in:
parent
5b464a32f5
commit
bdae2993e0
|
@ -870,13 +870,17 @@ impl Accounts {
|
|||
)
|
||||
}
|
||||
|
||||
pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
|
||||
where
|
||||
R: RangeBounds<Pubkey> + std::fmt::Debug,
|
||||
pub fn hold_range_in_memory<R>(
|
||||
&self,
|
||||
range: &R,
|
||||
start_holding: bool,
|
||||
thread_pool: &rayon::ThreadPool,
|
||||
) where
|
||||
R: RangeBounds<Pubkey> + std::fmt::Debug + Sync,
|
||||
{
|
||||
self.accounts_db
|
||||
.accounts_index
|
||||
.hold_range_in_memory(range, start_holding)
|
||||
.hold_range_in_memory(range, start_holding, thread_pool)
|
||||
}
|
||||
|
||||
pub fn load_to_collect_rent_eagerly<R: RangeBounds<Pubkey> + std::fmt::Debug>(
|
||||
|
@ -1331,12 +1335,12 @@ mod tests {
|
|||
fn test_hold_range_in_memory() {
|
||||
let accts = Accounts::default_for_tests();
|
||||
let range = Pubkey::new(&[0; 32])..=Pubkey::new(&[0xff; 32]);
|
||||
accts.hold_range_in_memory(&range, true);
|
||||
accts.hold_range_in_memory(&range, false);
|
||||
accts.hold_range_in_memory(&range, true);
|
||||
accts.hold_range_in_memory(&range, true);
|
||||
accts.hold_range_in_memory(&range, false);
|
||||
accts.hold_range_in_memory(&range, false);
|
||||
accts.hold_range_in_memory(&range, true, &test_thread_pool());
|
||||
accts.hold_range_in_memory(&range, false, &test_thread_pool());
|
||||
accts.hold_range_in_memory(&range, true, &test_thread_pool());
|
||||
accts.hold_range_in_memory(&range, true, &test_thread_pool());
|
||||
accts.hold_range_in_memory(&range, false, &test_thread_pool());
|
||||
accts.hold_range_in_memory(&range, false, &test_thread_pool());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1353,7 +1357,7 @@ mod tests {
|
|||
let range2_inclusive = range2.start..=range2.end;
|
||||
assert_eq!(0, idx.bin_calculator.bin_from_pubkey(&range2.start));
|
||||
assert_eq!(0, idx.bin_calculator.bin_from_pubkey(&range2.end));
|
||||
accts.hold_range_in_memory(&range, true);
|
||||
accts.hold_range_in_memory(&range, true, &test_thread_pool());
|
||||
idx.account_maps.iter().enumerate().for_each(|(_bin, map)| {
|
||||
let map = map.read().unwrap();
|
||||
assert_eq!(
|
||||
|
@ -1361,7 +1365,7 @@ mod tests {
|
|||
vec![range.clone()]
|
||||
);
|
||||
});
|
||||
accts.hold_range_in_memory(&range2, true);
|
||||
accts.hold_range_in_memory(&range2, true, &test_thread_pool());
|
||||
idx.account_maps.iter().enumerate().for_each(|(bin, map)| {
|
||||
let map = map.read().unwrap();
|
||||
let expected = if bin == 0 {
|
||||
|
@ -1376,8 +1380,12 @@ mod tests {
|
|||
bin
|
||||
);
|
||||
});
|
||||
accts.hold_range_in_memory(&range, false);
|
||||
accts.hold_range_in_memory(&range2, false);
|
||||
accts.hold_range_in_memory(&range, false, &test_thread_pool());
|
||||
accts.hold_range_in_memory(&range2, false, &test_thread_pool());
|
||||
}
|
||||
|
||||
fn test_thread_pool() -> rayon::ThreadPool {
|
||||
crate::accounts_db::make_min_priority_thread_pool()
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -13,6 +13,10 @@ use {
|
|||
log::*,
|
||||
ouroboros::self_referencing,
|
||||
rand::{thread_rng, Rng},
|
||||
rayon::{
|
||||
iter::{IntoParallelIterator, ParallelIterator},
|
||||
ThreadPool,
|
||||
},
|
||||
solana_measure::measure::Measure,
|
||||
solana_sdk::{
|
||||
clock::{BankId, Slot},
|
||||
|
@ -742,21 +746,22 @@ impl<'a, T: IndexValue> AccountsIndexIterator<'a, T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
|
||||
pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool)
|
||||
where
|
||||
R: RangeBounds<Pubkey> + Debug,
|
||||
R: RangeBounds<Pubkey> + Debug + Sync,
|
||||
{
|
||||
// forward this hold request ONLY to the bins which contain keys in the specified range
|
||||
let (start_bin, bin_range) = self.bin_start_and_range();
|
||||
self.account_maps
|
||||
.iter()
|
||||
.skip(start_bin)
|
||||
.take(bin_range)
|
||||
.for_each(|map| {
|
||||
// the idea is this range shouldn't be more than a few buckets, but the process of loading from disk buckets is very slow
|
||||
// so, parallelize the bucket loads
|
||||
thread_pool.install(|| {
|
||||
(0..bin_range).into_par_iter().for_each(|idx| {
|
||||
let map = &self.account_maps[idx + start_bin];
|
||||
map.read()
|
||||
.unwrap()
|
||||
.hold_range_in_memory(range, start_holding);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1460,12 +1465,12 @@ impl<T: IndexValue> AccountsIndex<T> {
|
|||
rv.map(|index| slice.len() - 1 - index)
|
||||
}
|
||||
|
||||
pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool)
|
||||
pub fn hold_range_in_memory<R>(&self, range: &R, start_holding: bool, thread_pool: &ThreadPool)
|
||||
where
|
||||
R: RangeBounds<Pubkey> + Debug,
|
||||
R: RangeBounds<Pubkey> + Debug + Sync,
|
||||
{
|
||||
let iter = self.iter(Some(range), true);
|
||||
iter.hold_range_in_memory(range, start_holding);
|
||||
iter.hold_range_in_memory(range, start_holding, thread_pool);
|
||||
}
|
||||
|
||||
pub fn set_startup(&self, value: bool) {
|
||||
|
|
|
@ -4179,7 +4179,10 @@ impl Bank {
|
|||
fn collect_rent_in_partition(&self, partition: Partition) -> usize {
|
||||
let subrange = Self::pubkey_range_from_partition(partition);
|
||||
|
||||
self.rc.accounts.hold_range_in_memory(&subrange, true);
|
||||
let thread_pool = &self.rc.accounts.accounts_db.thread_pool;
|
||||
self.rc
|
||||
.accounts
|
||||
.hold_range_in_memory(&subrange, true, thread_pool);
|
||||
|
||||
let accounts = self
|
||||
.rc
|
||||
|
@ -4213,7 +4216,9 @@ impl Bank {
|
|||
.unwrap()
|
||||
.extend(rent_debits.into_unordered_rewards_iter());
|
||||
|
||||
self.rc.accounts.hold_range_in_memory(&subrange, false);
|
||||
self.rc
|
||||
.accounts
|
||||
.hold_range_in_memory(&subrange, false, thread_pool);
|
||||
account_count
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue