Uses fold+reduce in de_dup_accounts() (#32765)
This commit is contained in:
parent
f4287d70bb
commit
6ff390802b
|
@ -24,7 +24,7 @@ use {
|
|||
path::PathBuf,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||
Arc, Mutex,
|
||||
Arc,
|
||||
},
|
||||
},
|
||||
tempfile::tempfile_in,
|
||||
|
@ -785,28 +785,53 @@ impl<'a> AccountsHasher<'a> {
|
|||
let _ = self.active_stats.activate(ActiveStatItem::HashDeDup);
|
||||
|
||||
let mut zeros = Measure::start("eliminate zeros");
|
||||
let sum = Mutex::new(0u64);
|
||||
let hash_total = AtomicUsize::default();
|
||||
let hashes: Vec<_> = (0..max_bin)
|
||||
let (hashes, hash_total, lamports_total) = (0..max_bin)
|
||||
.into_par_iter()
|
||||
.map(|bin| {
|
||||
let (hashes_file, lamports_bin) =
|
||||
self.de_dup_accounts_in_parallel(sorted_data_by_pubkey, bin, max_bin, stats);
|
||||
{
|
||||
hash_total.fetch_add(hashes_file.count(), Ordering::Relaxed);
|
||||
let mut lamports_sum = sum.lock().unwrap();
|
||||
*lamports_sum = Self::checked_cast_for_capitalization(
|
||||
*lamports_sum as u128 + lamports_bin as u128,
|
||||
.fold(
|
||||
|| {
|
||||
(
|
||||
/*hashes files*/ Vec::with_capacity(max_bin),
|
||||
/*hashes count*/ 0_usize,
|
||||
/*lamports sum*/ 0_u64,
|
||||
)
|
||||
},
|
||||
|mut accum, bin| {
|
||||
let (hashes_file, lamports_bin) = self.de_dup_accounts_in_parallel(
|
||||
sorted_data_by_pubkey,
|
||||
bin,
|
||||
max_bin,
|
||||
stats,
|
||||
);
|
||||
}
|
||||
hashes_file
|
||||
})
|
||||
.collect();
|
||||
accum.2 = accum
|
||||
.2
|
||||
.checked_add(lamports_bin)
|
||||
.expect("summing capitalization cannot overflow");
|
||||
accum.1 += hashes_file.count();
|
||||
accum.0.push(hashes_file);
|
||||
accum
|
||||
},
|
||||
)
|
||||
.reduce(
|
||||
|| {
|
||||
(
|
||||
/*hashes files*/ Vec::with_capacity(max_bin),
|
||||
/*hashes count*/ 0,
|
||||
/*lamports sum*/ 0,
|
||||
)
|
||||
},
|
||||
|mut a, mut b| {
|
||||
a.2 =
|
||||
a.2.checked_add(b.2)
|
||||
.expect("summing capitalization cannot overflow");
|
||||
a.1 += b.1;
|
||||
a.0.append(&mut b.0);
|
||||
a
|
||||
},
|
||||
);
|
||||
zeros.stop();
|
||||
stats.zeros_time_total_us += zeros.as_us();
|
||||
let lamports_sum = sum.into_inner().unwrap();
|
||||
stats.hash_total += hash_total.load(Ordering::Relaxed);
|
||||
(hashes, lamports_sum)
|
||||
stats.hash_total += hash_total;
|
||||
(hashes, lamports_total)
|
||||
}
|
||||
|
||||
/// returns the item referenced by `min_index`
|
||||
|
|
Loading…
Reference in New Issue