bypasses rayon thread-pool for single entry batches (#28077)
With no parallelization, thread-pool only adds overhead.
This commit is contained in:
parent
b9849179c9
commit
72537e7e07
|
@ -2793,14 +2793,28 @@ impl Blockstore {
|
||||||
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1)
|
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1)
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
let entries: Result<Vec<Vec<Entry>>> = PAR_THREAD_POOL.install(|| {
|
let entries: Result<Vec<Vec<Entry>>> = if completed_ranges.len() <= 1 {
|
||||||
completed_ranges
|
completed_ranges
|
||||||
.par_iter()
|
.into_iter()
|
||||||
.map(|(start_index, end_index)| {
|
.map(|(start_index, end_index)| {
|
||||||
self.get_entries_in_data_block(slot, *start_index, *end_index, Some(&slot_meta))
|
self.get_entries_in_data_block(slot, start_index, end_index, Some(&slot_meta))
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
});
|
} else {
|
||||||
|
PAR_THREAD_POOL.install(|| {
|
||||||
|
completed_ranges
|
||||||
|
.into_par_iter()
|
||||||
|
.map(|(start_index, end_index)| {
|
||||||
|
self.get_entries_in_data_block(
|
||||||
|
slot,
|
||||||
|
start_index,
|
||||||
|
end_index,
|
||||||
|
Some(&slot_meta),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
};
|
||||||
let entries: Vec<Entry> = entries?.into_iter().flatten().collect();
|
let entries: Vec<Entry> = entries?.into_iter().flatten().collect();
|
||||||
Ok((entries, num_shreds, slot_meta.is_full()))
|
Ok((entries, num_shreds, slot_meta.is_full()))
|
||||||
}
|
}
|
||||||
|
|
|
@ -931,15 +931,25 @@ pub(super) fn make_shreds_from_data(
|
||||||
.collect();
|
.collect();
|
||||||
// Generate coding shreds, populate merkle branch
|
// Generate coding shreds, populate merkle branch
|
||||||
// for all shreds and attach signature.
|
// for all shreds and attach signature.
|
||||||
let shreds = thread_pool.install(|| {
|
let shreds: Result<Vec<_>, Error> = if shreds.len() <= 1 {
|
||||||
shreds
|
shreds
|
||||||
.into_par_iter()
|
.into_iter()
|
||||||
.zip(next_code_index)
|
.zip(next_code_index)
|
||||||
.map(|(shreds, next_code_index)| {
|
.map(|(shreds, next_code_index)| {
|
||||||
make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache)
|
make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache)
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<_>, Error>>()
|
.collect()
|
||||||
});
|
} else {
|
||||||
|
thread_pool.install(|| {
|
||||||
|
shreds
|
||||||
|
.into_par_iter()
|
||||||
|
.zip(next_code_index)
|
||||||
|
.map(|(shreds, next_code_index)| {
|
||||||
|
make_erasure_batch(keypair, shreds, next_code_index, reed_solomon_cache)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
};
|
||||||
stats.gen_coding_elapsed += now.elapsed().as_micros() as u64;
|
stats.gen_coding_elapsed += now.elapsed().as_micros() as u64;
|
||||||
shreds
|
shreds
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,15 +216,29 @@ impl Shredder {
|
||||||
)
|
)
|
||||||
.collect();
|
.collect();
|
||||||
// 1) Generate coding shreds
|
// 1) Generate coding shreds
|
||||||
let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.install(|| {
|
let mut coding_shreds: Vec<_> = if chunks.len() <= 1 {
|
||||||
chunks
|
chunks
|
||||||
.into_par_iter()
|
.into_iter()
|
||||||
.zip(next_code_index)
|
.zip(next_code_index)
|
||||||
.flat_map(|(shreds, next_code_index)| {
|
.flat_map(|(shreds, next_code_index)| {
|
||||||
Shredder::generate_coding_shreds(&shreds, next_code_index, reed_solomon_cache)
|
Shredder::generate_coding_shreds(&shreds, next_code_index, reed_solomon_cache)
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
});
|
} else {
|
||||||
|
PAR_THREAD_POOL.install(|| {
|
||||||
|
chunks
|
||||||
|
.into_par_iter()
|
||||||
|
.zip(next_code_index)
|
||||||
|
.flat_map(|(shreds, next_code_index)| {
|
||||||
|
Shredder::generate_coding_shreds(
|
||||||
|
&shreds,
|
||||||
|
next_code_index,
|
||||||
|
reed_solomon_cache,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
};
|
||||||
gen_coding_time.stop();
|
gen_coding_time.stop();
|
||||||
|
|
||||||
let mut sign_coding_time = Measure::start("sign_coding_shreds");
|
let mut sign_coding_time = Measure::start("sign_coding_shreds");
|
||||||
|
|
Loading…
Reference in New Issue