sort storages by slot before scan (#17411)
* sort storages by slot before scan * fix return value
This commit is contained in:
parent
72bb271a94
commit
ef5169ff24
|
@ -30,6 +30,7 @@ use crate::{
|
|||
append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion},
|
||||
contains::Contains,
|
||||
read_only_accounts_cache::ReadOnlyAccountsCache,
|
||||
sorted_storages::SortedStorages,
|
||||
};
|
||||
use blake3::traits::digest::Digest;
|
||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
||||
|
@ -4207,7 +4208,7 @@ impl AccountsDb {
|
|||
|
||||
/// Scan through all the account storage in parallel
|
||||
fn scan_account_storage_no_bank<F, B>(
|
||||
snapshot_storages: &[SnapshotStorage],
|
||||
snapshot_storages: &SortedStorages,
|
||||
scan_func: F,
|
||||
) -> Vec<B>
|
||||
where
|
||||
|
@ -4216,19 +4217,24 @@ impl AccountsDb {
|
|||
{
|
||||
// Without chunks, we end up with 1 output vec for each outer snapshot storage.
|
||||
// This results in too many vectors to be efficient.
|
||||
const MAX_ITEMS_PER_CHUNK: usize = 5_000;
|
||||
snapshot_storages
|
||||
.par_chunks(MAX_ITEMS_PER_CHUNK)
|
||||
.map(|storages: &[Vec<Arc<AccountStorageEntry>>]| {
|
||||
const MAX_ITEMS_PER_CHUNK: Slot = 5_000;
|
||||
let chunks = 1 + (snapshot_storages.range_width() as Slot / MAX_ITEMS_PER_CHUNK);
|
||||
(0..chunks)
|
||||
.into_par_iter()
|
||||
.map(|chunk| {
|
||||
let mut retval = B::default();
|
||||
|
||||
for sub_storages in storages {
|
||||
for storage in sub_storages {
|
||||
let slot = storage.slot();
|
||||
let accounts = storage.accounts.accounts(0);
|
||||
accounts.into_iter().for_each(|stored_account| {
|
||||
scan_func(LoadedAccount::Stored(stored_account), &mut retval, slot)
|
||||
});
|
||||
let start = snapshot_storages.range().start + chunk * MAX_ITEMS_PER_CHUNK;
|
||||
let end = std::cmp::min(start + MAX_ITEMS_PER_CHUNK, snapshot_storages.range().end);
|
||||
for slot in start..end {
|
||||
let sub_storages = snapshot_storages.get(slot);
|
||||
if let Some(sub_storages) = sub_storages {
|
||||
for storage in sub_storages {
|
||||
let slot = storage.slot();
|
||||
let accounts = storage.accounts.accounts(0);
|
||||
accounts.into_iter().for_each(|stored_account| {
|
||||
scan_func(LoadedAccount::Stored(stored_account), &mut retval, slot)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
retval
|
||||
|
@ -4294,7 +4300,7 @@ impl AccountsDb {
|
|||
}
|
||||
|
||||
fn scan_snapshot_stores(
|
||||
storage: &[SnapshotStorage],
|
||||
storage: &SortedStorages,
|
||||
mut stats: &mut crate::accounts_hash::HashStats,
|
||||
bins: usize,
|
||||
bin_range: &Range<usize>,
|
||||
|
@ -4308,7 +4314,7 @@ impl AccountsDb {
|
|||
let mismatch_found = AtomicU64::new(0);
|
||||
|
||||
let result: Vec<Vec<Vec<CalculateHashIntermediate>>> = Self::scan_account_storage_no_bank(
|
||||
&storage,
|
||||
storage,
|
||||
|loaded_account: LoadedAccount,
|
||||
accum: &mut Vec<Vec<CalculateHashIntermediate>>,
|
||||
slot: Slot| {
|
||||
|
@ -4395,6 +4401,11 @@ impl AccountsDb {
|
|||
let mut previous_pass = PreviousPass::default();
|
||||
let mut final_result = (Hash::default(), 0);
|
||||
|
||||
let mut sort_time = Measure::start("sort_storages");
|
||||
let storages = SortedStorages::new(storages);
|
||||
sort_time.stop();
|
||||
stats.storage_sort_us = sort_time.as_us();
|
||||
|
||||
for pass in 0..num_scan_passes {
|
||||
let bounds = Range {
|
||||
start: pass * bins_per_pass,
|
||||
|
@ -4402,7 +4413,7 @@ impl AccountsDb {
|
|||
};
|
||||
|
||||
let result = Self::scan_snapshot_stores(
|
||||
storages,
|
||||
&storages,
|
||||
&mut stats,
|
||||
PUBKEY_BINS_FOR_CALCULATING_HASHES,
|
||||
&bounds,
|
||||
|
@ -5607,8 +5618,8 @@ pub mod tests {
|
|||
ancestors
|
||||
}
|
||||
|
||||
fn empty_storages<'a>() -> &'a [SnapshotStorage] {
|
||||
&[]
|
||||
fn empty_storages<'a>() -> SortedStorages<'a> {
|
||||
SortedStorages::new(&[])
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -5740,15 +5751,14 @@ pub mod tests {
|
|||
sample_storages_and_account_in_slot(1)
|
||||
}
|
||||
|
||||
fn get_storage_refs(input: &[SnapshotStorage]) -> &[SnapshotStorage] {
|
||||
// these types will be refactored later and this will be a convenient helper function for tests
|
||||
input
|
||||
fn get_storage_refs(input: &[SnapshotStorage]) -> SortedStorages {
|
||||
SortedStorages::new(input)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accountsdb_scan_snapshot_stores() {
|
||||
solana_logger::setup();
|
||||
let (mut storages, raw_expected) = sample_storages_and_accounts();
|
||||
let (storages, raw_expected) = sample_storages_and_accounts();
|
||||
|
||||
let bins = 1;
|
||||
let mut stats = HashStats::default();
|
||||
|
@ -5822,15 +5832,23 @@ pub mod tests {
|
|||
expected[128].push(raw_expected[2].clone());
|
||||
expected[bins - 1].push(raw_expected.last().unwrap().clone());
|
||||
assert_eq!(result, vec![expected]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accountsdb_scan_snapshot_stores_2nd_chunk() {
|
||||
// enough stores to get to 2nd chunk
|
||||
let bins = 1;
|
||||
const MAX_ITEMS_PER_CHUNK: usize = 5_000;
|
||||
storages.splice(0..0, vec![vec![]; MAX_ITEMS_PER_CHUNK]);
|
||||
let slot = MAX_ITEMS_PER_CHUNK as Slot;
|
||||
let (storages, raw_expected) = sample_storages_and_account_in_slot(slot);
|
||||
let storage_data = vec![(&storages[0], slot)];
|
||||
|
||||
let sorted_storages =
|
||||
SortedStorages::new_debug(&storage_data[..], 0, MAX_ITEMS_PER_CHUNK + 1);
|
||||
|
||||
let mut stats = HashStats::default();
|
||||
let result = AccountsDb::scan_snapshot_stores(
|
||||
&get_storage_refs(&storages),
|
||||
&sorted_storages,
|
||||
&mut stats,
|
||||
bins,
|
||||
&Range {
|
||||
|
@ -5849,7 +5867,7 @@ pub mod tests {
|
|||
#[test]
|
||||
fn test_accountsdb_scan_snapshot_stores_binning() {
|
||||
let mut stats = HashStats::default();
|
||||
let (mut storages, raw_expected) = sample_storages_and_accounts();
|
||||
let (storages, raw_expected) = sample_storages_and_accounts();
|
||||
|
||||
// just the first bin of 2
|
||||
let bins = 2;
|
||||
|
@ -5927,16 +5945,24 @@ pub mod tests {
|
|||
}
|
||||
assert_eq!(result, vec![expected]);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accountsdb_scan_snapshot_stores_binning_2nd_chunk() {
|
||||
// enough stores to get to 2nd chunk
|
||||
// range is for only 1 bin out of 256.
|
||||
let bins = 256;
|
||||
const MAX_ITEMS_PER_CHUNK: usize = 5_000;
|
||||
storages.splice(0..0, vec![vec![]; MAX_ITEMS_PER_CHUNK]);
|
||||
let slot = MAX_ITEMS_PER_CHUNK as Slot;
|
||||
let (storages, raw_expected) = sample_storages_and_account_in_slot(slot);
|
||||
let storage_data = vec![(&storages[0], slot)];
|
||||
|
||||
let sorted_storages =
|
||||
SortedStorages::new_debug(&storage_data[..], 0, MAX_ITEMS_PER_CHUNK + 1);
|
||||
|
||||
let mut stats = HashStats::default();
|
||||
let result = AccountsDb::scan_snapshot_stores(
|
||||
&get_storage_refs(&storages),
|
||||
&sorted_storages,
|
||||
&mut stats,
|
||||
bins,
|
||||
&Range {
|
||||
|
|
|
@ -29,6 +29,7 @@ pub struct HashStats {
|
|||
pub unreduced_entries: usize,
|
||||
pub num_snapshot_storage: usize,
|
||||
pub collect_snapshots_us: u64,
|
||||
pub storage_sort_us: u64,
|
||||
}
|
||||
impl HashStats {
|
||||
fn log(&mut self) {
|
||||
|
@ -37,6 +38,7 @@ impl HashStats {
|
|||
+ self.hash_time_total_us
|
||||
+ self.sort_time_total_us
|
||||
+ self.collect_snapshots_us
|
||||
+ self.storage_sort_us
|
||||
+ self.flatten_time_total_us;
|
||||
datapoint_info!(
|
||||
"calculate_accounts_hash_without_index",
|
||||
|
@ -46,6 +48,7 @@ impl HashStats {
|
|||
("sort", self.sort_time_total_us, i64),
|
||||
("hash_total", self.hash_total, i64),
|
||||
("flatten", self.flatten_time_total_us, i64),
|
||||
("storage_sort_us", self.storage_sort_us, i64),
|
||||
("unreduced_entries", self.unreduced_entries as i64, i64),
|
||||
(
|
||||
"collect_snapshots_us",
|
||||
|
|
|
@ -34,6 +34,7 @@ pub mod secondary_index;
|
|||
pub mod serde_snapshot;
|
||||
pub mod snapshot_package;
|
||||
pub mod snapshot_utils;
|
||||
pub mod sorted_storages;
|
||||
pub mod stakes;
|
||||
pub mod status_cache;
|
||||
mod system_instruction_processor;
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
use crate::accounts_db::SnapshotStorage;
|
||||
use log::*;
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_sdk::clock::Slot;
|
||||
use std::ops::Range;
|
||||
|
||||
pub struct SortedStorages<'a> {
|
||||
range: Range<Slot>,
|
||||
storages: Vec<Option<&'a SnapshotStorage>>,
|
||||
count: usize,
|
||||
}
|
||||
|
||||
impl<'a> SortedStorages<'a> {
|
||||
pub fn get(&self, slot: Slot) -> Option<&SnapshotStorage> {
|
||||
if !self.range.contains(&slot) {
|
||||
None
|
||||
} else {
|
||||
let index = (slot - self.range.start) as usize;
|
||||
self.storages[index]
|
||||
}
|
||||
}
|
||||
|
||||
pub fn range_width(&self) -> Slot {
|
||||
self.range.end - self.range.start
|
||||
}
|
||||
|
||||
pub fn range(&self) -> &Range<Slot> {
|
||||
&self.range
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.count
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn new(source: &'a [SnapshotStorage]) -> Self {
|
||||
let mut min = Slot::MAX;
|
||||
let mut max = Slot::MIN;
|
||||
let mut count = 0;
|
||||
let mut time = Measure::start("get slot");
|
||||
let slots = source
|
||||
.iter()
|
||||
.map(|storages| {
|
||||
count += storages.len();
|
||||
if !storages.is_empty() {
|
||||
storages.first().map(|storage| {
|
||||
let slot = storage.slot();
|
||||
min = std::cmp::min(slot, min);
|
||||
max = std::cmp::max(slot + 1, max);
|
||||
slot
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
time.stop();
|
||||
let mut time2 = Measure::start("sort");
|
||||
let range;
|
||||
let mut storages;
|
||||
if min > max {
|
||||
range = Range::default();
|
||||
storages = vec![];
|
||||
} else {
|
||||
range = Range {
|
||||
start: min,
|
||||
end: max,
|
||||
};
|
||||
let len = max - min;
|
||||
storages = vec![None; len as usize];
|
||||
source
|
||||
.iter()
|
||||
.zip(slots)
|
||||
.for_each(|(original_storages, slot)| {
|
||||
if let Some(slot) = slot {
|
||||
let index = (slot - min) as usize;
|
||||
assert!(storages[index].is_none());
|
||||
storages[index] = Some(original_storages);
|
||||
}
|
||||
});
|
||||
}
|
||||
time2.stop();
|
||||
debug!("SortedStorages, times: {}, {}", time.as_us(), time2.as_us());
|
||||
Self {
|
||||
range,
|
||||
storages,
|
||||
count,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use super::*;
|
||||
impl<'a> SortedStorages<'a> {
|
||||
pub fn new_debug(source: &[(&'a SnapshotStorage, Slot)], min: Slot, len: usize) -> Self {
|
||||
let mut storages = vec![None; len];
|
||||
let range = Range {
|
||||
start: min,
|
||||
end: min + len as Slot,
|
||||
};
|
||||
let count = source.len();
|
||||
for (storage, slot) in source {
|
||||
storages[*slot as usize] = Some(*storage);
|
||||
}
|
||||
|
||||
Self {
|
||||
range,
|
||||
storages,
|
||||
count,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue