diff --git a/Cargo.lock b/Cargo.lock index afe38aaaa8..0151f87c65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4618,6 +4618,7 @@ dependencies = [ "fs_extra", "histogram", "itertools", + "lazy_static", "log", "lru", "matches", @@ -4721,6 +4722,7 @@ dependencies = [ "crossbeam-channel", "dlopen", "dlopen_derive", + "lazy_static", "log", "matches", "rand 0.7.3", diff --git a/core/Cargo.toml b/core/Cargo.toml index 14cb7bba5a..a92f919380 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -25,6 +25,7 @@ etcd-client = { version = "0.9.1", features = ["tls"] } fs_extra = "1.2.0" histogram = "0.6.9" itertools = "0.10.3" +lazy_static = "1.4.0" log = "0.4.17" lru = "0.7.5" min-max-heap = "1.3.0" diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index d584f74b70..4ec3f2d8d2 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -1,5 +1,6 @@ use { crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + lazy_static::lazy_static, rayon::{prelude::*, ThreadPool}, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, @@ -9,7 +10,6 @@ use { solana_sdk::timing::timestamp, solana_streamer::streamer::{self, StreamerError}, std::{ - cell::RefCell, collections::HashMap, net::IpAddr, sync::{Arc, RwLock}, @@ -20,11 +20,13 @@ use { const IP_TO_STAKE_REFRESH_DURATION: Duration = Duration::from_secs(5); -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("transaction_sender_stake_stage_{}", ix)) - .build() - .unwrap())); +lazy_static! { + static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("transaction_sender_stake_stage_{}", ix)) + .build() + .unwrap(); +} pub type FindPacketSenderStakeSender = Sender>; pub type FindPacketSenderStakeReceiver = Receiver>; @@ -166,16 +168,14 @@ impl FindPacketSenderStakeStage { } fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap) { - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .into_par_iter() - .flat_map(|batch| batch.packets.par_iter_mut()) - .for_each(|packet| { - packet.meta.sender_stake = - *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0); - }); - }) + PAR_THREAD_POOL.install(|| { + batches + .into_par_iter() + .flat_map(|batch| batch.packets.par_iter_mut()) + .for_each(|packet| { + packet.meta.sender_stake = + *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0); + }); }); } diff --git a/entry/Cargo.toml b/entry/Cargo.toml index 16985fbbdc..bf4f3932cf 100644 --- a/entry/Cargo.toml +++ b/entry/Cargo.toml @@ -14,6 +14,7 @@ bincode = "1.3.3" crossbeam-channel = "0.5" dlopen = "0.1.8" dlopen_derive = "0.1.4" +lazy_static = "1.4.0" log = "0.4.17" rand = "0.7.0" rayon = "1.5.2" diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 97ccf98c62..ad7abbd4d8 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -7,6 +7,7 @@ use { crossbeam_channel::{Receiver, Sender}, dlopen::symbor::{Container, SymBorApi, Symbol}, dlopen_derive::SymBorApi, + lazy_static::lazy_static, log::*, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, @@ -21,7 +22,7 @@ use { recycler::Recycler, sigverify, }, - solana_rayon_threadlimit::get_thread_count, + solana_rayon_threadlimit::get_max_thread_count, solana_sdk::{ hash::Hash, packet::Meta, @@ -32,7 +33,6 @@ use { }, }, std::{ - cell::RefCell, cmp, ffi::OsStr, sync::{ @@ -44,11 +44,15 @@ use { }, }; -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("entry_{}", ix)) - .build() - .unwrap())); +// get_max_thread_count to match number of threads in the old code. +// see: https://github.com/solana-labs/solana/pull/24853 +lazy_static! { + static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(get_max_thread_count()) + .thread_name(|ix| format!("entry_{}", ix)) + .build() + .unwrap(); +} pub type EntrySender = Sender>; pub type EntryReceiver = Receiver>; @@ -341,25 +345,22 @@ impl EntryVerificationState { .expect("unwrap Arc") .into_inner() .expect("into_inner"); - let res = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - hashes - .into_par_iter() - .cloned() - .zip(verification_state.verifications.take().unwrap()) - .all(|(hash, (action, expected))| { - let actual = match action { - VerifyAction::Mixin(mixin) => { - Poh::new(hash, None).record(mixin).unwrap().hash - } - VerifyAction::Tick => Poh::new(hash, None).tick().unwrap().hash, - VerifyAction::None => hash, - }; - actual == expected - }) - }) + let res = PAR_THREAD_POOL.install(|| { + hashes + .into_par_iter() + .cloned() + .zip(verification_state.verifications.take().unwrap()) + .all(|(hash, (action, expected))| { + let actual = match action { + VerifyAction::Mixin(mixin) => { + Poh::new(hash, None).record(mixin).unwrap().hash + } + VerifyAction::Tick => Poh::new(hash, None).tick().unwrap().hash, + VerifyAction::None => hash, + }; + actual == expected + }) }); - verify_check_time.stop(); self.poh_duration_us += gpu_time_us + verify_check_time.as_us(); @@ -381,25 +382,23 @@ pub fn verify_transactions( entries: Vec, verify: Arc Result + Send + Sync>, ) -> Result> { - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - entries - .into_par_iter() - .map(|entry| { - if entry.transactions.is_empty() { - Ok(EntryType::Tick(entry.hash)) - } else { - Ok(EntryType::Transactions( - entry - .transactions - .into_par_iter() - .map(verify.as_ref()) - .collect::>>()?, - )) - } - }) - .collect() - }) + PAR_THREAD_POOL.install(|| { + entries + .into_par_iter() + .map(|entry| { + if entry.transactions.is_empty() { + Ok(EntryType::Tick(entry.hash)) + } else { + Ok(EntryType::Transactions( + entry + .transactions + .into_par_iter() + .map(verify.as_ref()) + .collect::>>()?, + )) + } + }) + .collect() }) } @@ -615,23 +614,20 @@ impl EntrySlice for [Entry] { transactions: vec![], }]; let entry_pairs = genesis.par_iter().chain(self).zip(self); - let res = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - entry_pairs.all(|(x0, x1)| { - let r = x1.verify(&x0.hash); - if !r { - warn!( - "entry invalid!: x0: {:?}, x1: {:?} num txs: {}", - x0.hash, - x1.hash, - x1.transactions.len() - ); - } - r - }) + let res = PAR_THREAD_POOL.install(|| { + entry_pairs.all(|(x0, x1)| { + let r = x1.verify(&x0.hash); + if !r { + warn!( + "entry invalid!: x0: {:?}, x1: {:?} num txs: {}", + x0.hash, + x1.hash, + x1.transactions.len() + ); + } + r }) }); - let poh_duration_us = timing::duration_as_us(&now.elapsed()); EntryVerificationState { verification_status: if res { @@ -675,45 +671,43 @@ impl EntrySlice for [Entry] { num_hashes.resize(aligned_len, 0); let num_hashes: Vec<_> = num_hashes.chunks(simd_len).collect(); - let res = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - hashes_chunked - .par_iter_mut() - .zip(num_hashes) - .enumerate() - .all(|(i, (chunk, num_hashes))| { - match simd_len { - 8 => unsafe { - (api().unwrap().poh_verify_many_simd_avx2)( - chunk.as_mut_ptr(), - num_hashes.as_ptr(), - ); - }, - 16 => unsafe { - (api().unwrap().poh_verify_many_simd_avx512skx)( - chunk.as_mut_ptr(), - num_hashes.as_ptr(), - ); - }, - _ => { - panic!("unsupported simd len: {}", simd_len); - } + let res = PAR_THREAD_POOL.install(|| { + hashes_chunked + .par_iter_mut() + .zip(num_hashes) + .enumerate() + .all(|(i, (chunk, num_hashes))| { + match simd_len { + 8 => unsafe { + (api().unwrap().poh_verify_many_simd_avx2)( + chunk.as_mut_ptr(), + num_hashes.as_ptr(), + ); + }, + 16 => unsafe { + (api().unwrap().poh_verify_many_simd_avx512skx)( + chunk.as_mut_ptr(), + num_hashes.as_ptr(), + ); + }, + _ => { + panic!("unsupported simd len: {}", simd_len); } - let entry_start = i * simd_len; - // The last chunk may produce indexes larger than what we have in the reference entries - // because it is aligned to simd_len. - let entry_end = std::cmp::min(entry_start + simd_len, self.len()); - self[entry_start..entry_end] - .iter() - .enumerate() - .all(|(j, ref_entry)| { - let start = j * HASH_BYTES; - let end = start + HASH_BYTES; - let hash = Hash::new(&chunk[start..end]); - compare_hashes(hash, ref_entry) - }) - }) - }) + } + let entry_start = i * simd_len; + // The last chunk may produce indexes larger than what we have in the reference entries + // because it is aligned to simd_len. + let entry_end = std::cmp::min(entry_start + simd_len, self.len()); + self[entry_start..entry_end] + .iter() + .enumerate() + .all(|(j, ref_entry)| { + let start = j * HASH_BYTES; + let end = start + HASH_BYTES; + let hash = Hash::new(&chunk[start..end]); + compare_hashes(hash, ref_entry) + }) + }) }); let poh_duration_us = timing::duration_as_us(&now.elapsed()); EntryVerificationState { @@ -812,26 +806,23 @@ impl EntrySlice for [Entry] { timing::duration_as_us(&gpu_wait.elapsed()) }); - let verifications = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - self.into_par_iter() - .map(|entry| { - let answer = entry.hash; - let action = if entry.transactions.is_empty() { - if entry.num_hashes == 0 { - VerifyAction::None - } else { - VerifyAction::Tick - } + let verifications = PAR_THREAD_POOL.install(|| { + self.into_par_iter() + .map(|entry| { + let answer = entry.hash; + let action = if entry.transactions.is_empty() { + if entry.num_hashes == 0 { + VerifyAction::None } else { - VerifyAction::Mixin(hash_transactions(&entry.transactions)) - }; - (action, answer) - }) - .collect() - }) + VerifyAction::Tick + } + } else { + VerifyAction::Mixin(hash_transactions(&entry.transactions)) + }; + (action, answer) + }) + .collect() }); - let device_verification_data = DeviceVerificationData::Gpu(GpuVerificationData { thread_h: Some(gpu_verify_thread), verifications: Some(verifications), diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 5d985d8cad..79d433d541 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -28,7 +28,7 @@ use { datapoint_debug, datapoint_error, poh_timing_point::{send_poh_timing_point, PohTimingSender, SlotPohTimingInfo}, }, - solana_rayon_threadlimit::get_thread_count, + solana_rayon_threadlimit::get_max_thread_count, solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, solana_sdk::{ clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK}, @@ -75,17 +75,20 @@ pub use { pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb"; pub const BLOCKSTORE_DIRECTORY_ROCKS_FIFO: &str = "rocksdb_fifo"; -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("blockstore_{}", ix)) - .build() - .unwrap())); - -thread_local!(static PAR_THREAD_POOL_ALL_CPUS: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(num_cpus::get()) - .thread_name(|ix| format!("blockstore_{}", ix)) - .build() - .unwrap())); +// get_max_thread_count to match number of threads in the old code. +// see: https://github.com/solana-labs/solana/pull/24853 +lazy_static! { + static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(get_max_thread_count()) + .thread_name(|ix| format!("blockstore_{}", ix)) + .build() + .unwrap(); + static ref PAR_THREAD_POOL_ALL_CPUS: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(num_cpus::get()) + .thread_name(|ix| format!("blockstore_{}", ix)) + .build() + .unwrap(); +} pub const MAX_REPLAY_WAKE_UP_SIGNALS: usize = 1; pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; @@ -2790,22 +2793,14 @@ impl Blockstore { .map(|(_, end_index)| u64::from(*end_index) - start_index + 1) .unwrap_or(0); - let entries: Result>> = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - completed_ranges - .par_iter() - .map(|(start_index, end_index)| { - self.get_entries_in_data_block( - slot, - *start_index, - *end_index, - Some(&slot_meta), - ) - }) - .collect() - }) + let entries: Result>> = PAR_THREAD_POOL.install(|| { + completed_ranges + .par_iter() + .map(|(start_index, end_index)| { + self.get_entries_in_data_block(slot, *start_index, *end_index, Some(&slot_meta)) + }) + .collect() }); - let entries: Vec = entries?.into_iter().flatten().collect(); Ok((entries, num_shreds, slot_meta.is_full())) } @@ -2935,23 +2930,15 @@ impl Blockstore { } let slot_meta = slot_meta.unwrap(); - let entries: Vec> = PAR_THREAD_POOL_ALL_CPUS.with(|thread_pool| { - thread_pool.borrow().install(|| { - completed_ranges - .par_iter() - .map(|(start_index, end_index)| { - self.get_entries_in_data_block( - slot, - *start_index, - *end_index, - Some(&slot_meta), - ) + let entries: Vec> = PAR_THREAD_POOL_ALL_CPUS.install(|| { + completed_ranges + .par_iter() + .map(|(start_index, end_index)| { + self.get_entries_in_data_block(slot, *start_index, *end_index, Some(&slot_meta)) .unwrap_or_default() - }) - .collect() - }) + }) + .collect() }); - entries.into_iter().flatten().collect() } diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index df0b8ec576..38a9b7571d 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -15,7 +15,7 @@ use { solana_measure::measure::Measure, solana_metrics::{datapoint_error, inc_new_counter_debug}, solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings}, - solana_rayon_threadlimit::get_thread_count, + solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_runtime::{ accounts_background_service::AbsRequestSender, accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, @@ -55,7 +55,6 @@ use { }, std::{ borrow::Cow, - cell::RefCell, collections::{HashMap, HashSet}, path::PathBuf, result, @@ -94,12 +93,15 @@ impl BlockCostCapacityMeter { } } -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("blockstore_processor_{}", ix)) - .build() - .unwrap()) -); +// get_max_thread_count to match number of threads in the old code. +// see: https://github.com/solana-labs/solana/pull/24853 +lazy_static! { + static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(get_max_thread_count()) + .thread_name(|ix| format!("blockstore_processor_{}", ix)) + .build() + .unwrap(); +} fn first_err(results: &[Result<()>]) -> Result<()> { for r in results { @@ -263,29 +265,26 @@ fn execute_batches_internal( ) -> Result<()> { inc_new_counter_debug!("bank-par_execute_entries-count", batches.len()); let (results, new_timings): (Vec>, Vec) = - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - batches - .into_par_iter() - .map(|batch| { - let mut timings = ExecuteTimings::default(); - let result = execute_batch( - batch, - bank, - transaction_status_sender, - replay_vote_sender, - &mut timings, - cost_capacity_meter.clone(), - ); - if let Some(entry_callback) = entry_callback { - entry_callback(bank); - } - (result, timings) - }) - .unzip() - }) + PAR_THREAD_POOL.install(|| { + batches + .into_par_iter() + .map(|batch| { + let mut timings = ExecuteTimings::default(); + let result = execute_batch( + batch, + bank, + transaction_status_sender, + replay_vote_sender, + &mut timings, + cost_capacity_meter.clone(), + ); + if let Some(entry_callback) = entry_callback { + entry_callback(bank); + } + (result, timings) + }) + .unzip() }); - timings.saturating_add_in_place(ExecuteTimingType::TotalBatchesLen, batches.len() as u64); timings.saturating_add_in_place(ExecuteTimingType::NumExecuteBatches, 1); for timing in new_timings { @@ -546,7 +545,6 @@ pub struct ProcessOptions { pub full_leader_cache: bool, pub halt_at_slot: Option, pub entry_callback: Option, - pub override_num_threads: Option, pub new_hard_forks: Option>, pub debug_keys: Option>>, pub account_indexes: AccountSecondaryIndexes, @@ -635,15 +633,6 @@ pub fn process_blockstore_from_root( cache_block_meta_sender: Option<&CacheBlockMetaSender>, accounts_background_request_sender: &AbsRequestSender, ) -> result::Result<(), BlockstoreProcessorError> { - if let Some(num_threads) = opts.override_num_threads { - PAR_THREAD_POOL.with(|pool| { - *pool.borrow_mut() = rayon::ThreadPoolBuilder::new() - .num_threads(num_threads) - .build() - .unwrap() - }); - } - // Starting slot must be a root, and thus has no parents assert_eq!(bank_forks.read().unwrap().banks().len(), 1); let bank = bank_forks.read().unwrap().root_bank(); @@ -2409,23 +2398,6 @@ pub mod tests { assert_eq!(bank.tick_height(), 1); } - #[test] - fn test_process_ledger_options_override_threads() { - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123); - let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); - - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - let opts = ProcessOptions { - override_num_threads: Some(1), - accounts_db_test_hash_calculation: true, - ..ProcessOptions::default() - }; - test_process_blockstore(&genesis_config, &blockstore, &opts); - PAR_THREAD_POOL.with(|pool| { - assert_eq!(pool.borrow().current_num_threads(), 1); - }); - } - #[test] fn test_process_ledger_options_full_leader_cache() { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123); @@ -2493,7 +2465,6 @@ pub mod tests { }; let opts = ProcessOptions { - override_num_threads: Some(1), entry_callback: Some(entry_callback), accounts_db_test_hash_calculation: true, ..ProcessOptions::default() diff --git a/ledger/src/shredder.rs b/ledger/src/shredder.rs index 24e950f30c..508c871c11 100644 --- a/ledger/src/shredder.rs +++ b/ledger/src/shredder.rs @@ -5,6 +5,7 @@ use { }, shred_stats::ProcessShredsStats, }, + lazy_static::lazy_static, rayon::{prelude::*, ThreadPool}, reed_solomon_erasure::{ galois_8::Field, @@ -14,14 +15,16 @@ use { solana_measure::measure::Measure, solana_rayon_threadlimit::get_thread_count, solana_sdk::{clock::Slot, signature::Keypair}, - std::{cell::RefCell, fmt::Debug}, + std::fmt::Debug, }; -thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() - .num_threads(get_thread_count()) - .thread_name(|ix| format!("shredder_{}", ix)) - .build() - .unwrap())); +lazy_static! { + static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .thread_name(|ix| format!("shredder_{}", ix)) + .build() + .unwrap(); +} type ReedSolomon = reed_solomon_erasure::ReedSolomon; @@ -138,17 +141,15 @@ impl Shredder { shred.sign(keypair); shred }; - let data_shreds: Vec = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - serialized_shreds - .par_chunks(payload_capacity) - .enumerate() - .map(|(i, shred_data)| { - let shred_index = next_shred_index + i as u32; - make_data_shred(shred_index, shred_data) - }) - .collect() - }) + let data_shreds: Vec = PAR_THREAD_POOL.install(|| { + serialized_shreds + .par_chunks(payload_capacity) + .enumerate() + .map(|(i, shred_data)| { + let shred_index = next_shred_index + i as u32; + make_data_shred(shred_index, shred_data) + }) + .collect() }); gen_data_time.stop(); @@ -170,43 +171,39 @@ impl Shredder { } let mut gen_coding_time = Measure::start("gen_coding_shreds"); // 1) Generate coding shreds - let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - data_shreds - .par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) - .enumerate() - .flat_map(|(i, shred_data_batch)| { - // Assumption here is that, for now, each fec block has - // as many coding shreds as data shreds (except for the - // last one in the slot). - // TODO: tie this more closely with - // generate_coding_shreds. - let next_code_index = next_code_index - .checked_add( - u32::try_from(i) - .unwrap() - .checked_mul(MAX_DATA_SHREDS_PER_FEC_BLOCK) - .unwrap(), - ) - .unwrap(); - Shredder::generate_coding_shreds( - shred_data_batch, - is_last_in_slot, - next_code_index, + let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.install(|| { + data_shreds + .par_chunks(MAX_DATA_SHREDS_PER_FEC_BLOCK as usize) + .enumerate() + .flat_map(|(i, shred_data_batch)| { + // Assumption here is that, for now, each fec block has + // as many coding shreds as data shreds (except for the + // last one in the slot). + // TODO: tie this more closely with + // generate_coding_shreds. + let next_code_index = next_code_index + .checked_add( + u32::try_from(i) + .unwrap() + .checked_mul(MAX_DATA_SHREDS_PER_FEC_BLOCK) + .unwrap(), ) - }) - .collect() - }) + .unwrap(); + Shredder::generate_coding_shreds( + shred_data_batch, + is_last_in_slot, + next_code_index, + ) + }) + .collect() }); gen_coding_time.stop(); let mut sign_coding_time = Measure::start("sign_coding_shreds"); // 2) Sign coding shreds - PAR_THREAD_POOL.with(|thread_pool| { - thread_pool.borrow().install(|| { - coding_shreds.par_iter_mut().for_each(|coding_shred| { - coding_shred.sign(keypair); - }) + PAR_THREAD_POOL.install(|| { + coding_shreds.par_iter_mut().for_each(|coding_shred| { + coding_shred.sign(keypair); }) }); sign_coding_time.stop(); diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 04dabf4310..9dfbf8f5be 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -4292,6 +4292,7 @@ dependencies = [ "fs_extra", "histogram", "itertools", + "lazy_static", "log", "lru", "min-max-heap", @@ -4353,6 +4354,7 @@ dependencies = [ "crossbeam-channel", "dlopen", "dlopen_derive", + "lazy_static", "log", "rand 0.7.3", "rayon", diff --git a/rayon-threadlimit/src/lib.rs b/rayon-threadlimit/src/lib.rs index 16605b9351..81086d0bf1 100644 --- a/rayon-threadlimit/src/lib.rs +++ b/rayon-threadlimit/src/lib.rs @@ -16,3 +16,9 @@ lazy_static! { pub fn get_thread_count() -> usize { *MAX_RAYON_THREADS } + +// Only used in legacy code. +// Use get_thread_count instead in all new code. +pub fn get_max_thread_count() -> usize { + get_thread_count().saturating_mul(2) +}