Make ReplayStage own the threadpool for tx replay (#190)

The threadpool used to replay multiple transactions in parallel is
currently global state via a lazy_static definition. Making this pool
owned by ReplayStage will enable subsequent work to make the pool
size configurable on the CLI.

This makes `ReplayStage` create and hold the threadpool which is passed
down to blockstore_processor::confirm_slot().

blockstore_processor::process_blockstore_from_root() now creates its'
own threadpool as well; however, this pool is only alive while for
the scope of that function and does not persist the lifetime of the
process.
This commit is contained in:
steviez 2024-03-12 13:21:11 -05:00 committed by GitHub
parent 2078153aa1
commit 7a144e2b9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 86 additions and 22 deletions

View File

@ -51,6 +51,7 @@ use {
solana_measure::measure::Measure,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
@ -652,16 +653,23 @@ impl ReplayStage {
r_bank_forks.get_vote_only_mode_signal(),
)
};
// Thread pool to (maybe) replay multiple threads in parallel
let replay_mode = if replay_slots_concurrently {
ForkReplayMode::Serial
} else {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|i| format!("solReplay{i:02}"))
.thread_name(|i| format!("solReplayFork{i:02}"))
.build()
.expect("new rayon threadpool");
ForkReplayMode::Parallel(pool)
};
// Thread pool to replay multiple transactions within one block in parallel
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solReplayTx{i:02}"))
.build()
.expect("new rayon threadpool");
Self::reset_poh_recorder(
&my_pubkey,
@ -724,6 +732,7 @@ impl ReplayStage {
&mut replay_timing,
log_messages_bytes_limit,
&replay_mode,
&replay_tx_thread_pool,
&prioritization_fee_cache,
&mut purge_repair_slot_counter,
);
@ -2136,6 +2145,7 @@ impl ReplayStage {
fn replay_blockstore_into_bank(
bank: &BankWithScheduler,
blockstore: &Blockstore,
replay_tx_thread_pool: &ThreadPool,
replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>,
transaction_status_sender: Option<&TransactionStatusSender>,
@ -2154,6 +2164,7 @@ impl ReplayStage {
blockstore_processor::confirm_slot(
blockstore,
bank,
replay_tx_thread_pool,
&mut w_replay_stats,
&mut w_replay_progress,
false,
@ -2712,7 +2723,8 @@ impl ReplayStage {
fn replay_active_banks_concurrently(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
thread_pool: &ThreadPool,
fork_thread_pool: &ThreadPool,
replay_tx_thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
@ -2730,7 +2742,7 @@ impl ReplayStage {
let longest_replay_time_us = AtomicU64::new(0);
// Allow for concurrent replaying of slots from different forks.
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = thread_pool.install(|| {
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = fork_thread_pool.install(|| {
active_bank_slots
.into_par_iter()
.map(|bank_slot| {
@ -2744,7 +2756,7 @@ impl ReplayStage {
trace!(
"Replay active bank: slot {}, thread_idx {}",
bank_slot,
thread_pool.current_thread_index().unwrap_or_default()
fork_thread_pool.current_thread_index().unwrap_or_default()
);
let mut progress_lock = progress.write().unwrap();
if progress_lock
@ -2797,6 +2809,7 @@ impl ReplayStage {
let blockstore_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
replay_tx_thread_pool,
&replay_stats,
&replay_progress,
transaction_status_sender,
@ -2826,6 +2839,7 @@ impl ReplayStage {
fn replay_active_bank(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
replay_tx_thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
@ -2884,6 +2898,7 @@ impl ReplayStage {
let blockstore_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
replay_tx_thread_pool,
&bank_progress.replay_stats,
&bank_progress.replay_progress,
transaction_status_sender,
@ -3183,6 +3198,7 @@ impl ReplayStage {
replay_timing: &mut ReplayLoopTiming,
log_messages_bytes_limit: Option<usize>,
replay_mode: &ForkReplayMode,
replay_tx_thread_pool: &ThreadPool,
prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) -> bool /* completed a bank */ {
@ -3199,11 +3215,12 @@ impl ReplayStage {
let replay_result_vec = match replay_mode {
// Skip the overhead of the threadpool if there is only one bank to play
ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => {
ForkReplayMode::Parallel(fork_thread_pool) if num_active_banks > 1 => {
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
thread_pool,
fork_thread_pool,
replay_tx_thread_pool,
my_pubkey,
vote_account,
progress,
@ -3223,6 +3240,7 @@ impl ReplayStage {
Self::replay_active_bank(
blockstore,
bank_forks,
replay_tx_thread_pool,
my_pubkey,
vote_account,
progress,
@ -5034,9 +5052,15 @@ pub(crate) mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let exit = Arc::new(AtomicBool::new(false));
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|i| format!("solReplayTest{i:02}"))
.build()
.expect("new rayon threadpool");
let res = ReplayStage::replay_blockstore_into_bank(
&bank1,
&blockstore,
&replay_tx_thread_pool,
&bank1_progress.replay_stats,
&bank1_progress.replay_progress,
None,

View File

@ -89,16 +89,6 @@ struct ReplayEntry {
starting_index: usize,
}
// get_max_thread_count to match number of threads in the old code.
// see: https://github.com/solana-labs/solana/pull/24853
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solBstoreProc{i:02}"))
.build()
.unwrap();
}
fn first_err(results: &[Result<()>]) -> Result<()> {
for r in results {
if r.is_err() {
@ -139,6 +129,14 @@ fn get_first_error(
first_err
}
fn create_thread_pool(num_threads: usize) -> ThreadPool {
rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|i| format!("solReplayTx{i:02}"))
.build()
.expect("new rayon threadpool")
}
pub fn execute_batch(
batch: &TransactionBatchWithIndexes,
bank: &Arc<Bank>,
@ -242,6 +240,7 @@ impl ExecuteBatchesInternalMetrics {
fn execute_batches_internal(
bank: &Arc<Bank>,
replay_tx_thread_pool: &ThreadPool,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
@ -253,7 +252,7 @@ fn execute_batches_internal(
Mutex::new(HashMap::new());
let mut execute_batches_elapsed = Measure::start("execute_batches_elapsed");
let results: Vec<Result<()>> = PAR_THREAD_POOL.install(|| {
let results: Vec<Result<()>> = replay_tx_thread_pool.install(|| {
batches
.into_par_iter()
.map(|transaction_batch| {
@ -275,7 +274,7 @@ fn execute_batches_internal(
"execute_batch",
);
let thread_index = PAR_THREAD_POOL.current_thread_index().unwrap();
let thread_index = replay_tx_thread_pool.current_thread_index().unwrap();
execution_timings_per_thread
.lock()
.unwrap()
@ -324,6 +323,7 @@ fn execute_batches_internal(
// invocation).
fn process_batches(
bank: &BankWithScheduler,
replay_tx_thread_pool: &ThreadPool,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
@ -348,6 +348,7 @@ fn process_batches(
);
rebatch_and_execute_batches(
bank,
replay_tx_thread_pool,
batches,
transaction_status_sender,
replay_vote_sender,
@ -398,6 +399,7 @@ fn rebatch_transactions<'a>(
fn rebatch_and_execute_batches(
bank: &Arc<Bank>,
replay_tx_thread_pool: &ThreadPool,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
@ -481,6 +483,7 @@ fn rebatch_and_execute_batches(
let execute_batches_internal_metrics = execute_batches_internal(
bank,
replay_tx_thread_pool,
rebatched_txs,
transaction_status_sender,
replay_vote_sender,
@ -506,6 +509,7 @@ pub fn process_entries_for_tests(
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
) -> Result<()> {
let replay_tx_thread_pool = create_thread_pool(1);
let verify_transaction = {
let bank = bank.clone_with_scheduler();
move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> {
@ -533,6 +537,7 @@ pub fn process_entries_for_tests(
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
let result = process_entries(
bank,
&replay_tx_thread_pool,
&mut replay_entries,
transaction_status_sender,
replay_vote_sender,
@ -547,6 +552,7 @@ pub fn process_entries_for_tests(
fn process_entries(
bank: &BankWithScheduler,
replay_tx_thread_pool: &ThreadPool,
entries: &mut [ReplayEntry],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
@ -572,6 +578,7 @@ fn process_entries(
// execute the group and register the tick
process_batches(
bank,
replay_tx_thread_pool,
&batches,
transaction_status_sender,
replay_vote_sender,
@ -625,6 +632,7 @@ fn process_entries(
// execute the current queue and try to process this entry again
process_batches(
bank,
replay_tx_thread_pool,
&batches,
transaction_status_sender,
replay_vote_sender,
@ -640,6 +648,7 @@ fn process_entries(
}
process_batches(
bank,
replay_tx_thread_pool,
&batches,
transaction_status_sender,
replay_vote_sender,
@ -805,6 +814,7 @@ pub(crate) fn process_blockstore_for_bank_0(
let bank_forks = BankForks::new_rw_arc(bank0);
info!("Processing ledger for slot 0...");
let replay_tx_thread_pool = create_thread_pool(get_max_thread_count());
process_bank_0(
&bank_forks
.read()
@ -812,6 +822,7 @@ pub(crate) fn process_blockstore_for_bank_0(
.get_with_scheduler(bank0_slot)
.unwrap(),
blockstore,
&replay_tx_thread_pool,
opts,
&VerifyRecyclers::default(),
cache_block_meta_sender,
@ -871,10 +882,12 @@ pub fn process_blockstore_from_root(
.meta(start_slot)
.unwrap_or_else(|_| panic!("Failed to get meta for slot {start_slot}"))
{
let replay_tx_thread_pool = create_thread_pool(get_max_thread_count());
load_frozen_forks(
bank_forks,
&start_slot_meta,
blockstore,
&replay_tx_thread_pool,
leader_schedule_cache,
opts,
transaction_status_sender,
@ -978,9 +991,11 @@ fn verify_ticks(
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn confirm_full_slot(
blockstore: &Blockstore,
bank: &BankWithScheduler,
replay_tx_thread_pool: &ThreadPool,
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress,
@ -996,6 +1011,7 @@ fn confirm_full_slot(
confirm_slot(
blockstore,
bank,
replay_tx_thread_pool,
&mut confirmation_timing,
progress,
skip_verification,
@ -1142,6 +1158,7 @@ impl ConfirmationProgress {
pub fn confirm_slot(
blockstore: &Blockstore,
bank: &BankWithScheduler,
replay_tx_thread_pool: &ThreadPool,
timing: &mut ConfirmationTiming,
progress: &mut ConfirmationProgress,
skip_verification: bool,
@ -1171,6 +1188,7 @@ pub fn confirm_slot(
confirm_slot_entries(
bank,
replay_tx_thread_pool,
slot_entries_load_result,
timing,
progress,
@ -1187,6 +1205,7 @@ pub fn confirm_slot(
#[allow(clippy::too_many_arguments)]
fn confirm_slot_entries(
bank: &BankWithScheduler,
replay_tx_thread_pool: &ThreadPool,
slot_entries_load_result: (Vec<Entry>, u64, bool),
timing: &mut ConfirmationTiming,
progress: &mut ConfirmationProgress,
@ -1328,6 +1347,7 @@ fn confirm_slot_entries(
.collect();
let process_result = process_entries(
bank,
replay_tx_thread_pool,
&mut replay_entries,
transaction_status_sender,
replay_vote_sender,
@ -1385,6 +1405,7 @@ fn confirm_slot_entries(
fn process_bank_0(
bank0: &BankWithScheduler,
blockstore: &Blockstore,
replay_tx_thread_pool: &ThreadPool,
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
@ -1395,6 +1416,7 @@ fn process_bank_0(
confirm_full_slot(
blockstore,
bank0,
replay_tx_thread_pool,
opts,
recyclers,
&mut progress,
@ -1479,6 +1501,7 @@ fn load_frozen_forks(
bank_forks: &RwLock<BankForks>,
start_slot_meta: &SlotMeta,
blockstore: &Blockstore,
replay_tx_thread_pool: &ThreadPool,
leader_schedule_cache: &LeaderScheduleCache,
opts: &ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>,
@ -1566,6 +1589,7 @@ fn load_frozen_forks(
if process_single_slot(
blockstore,
&bank,
replay_tx_thread_pool,
opts,
&recyclers,
&mut progress,
@ -1771,6 +1795,7 @@ fn supermajority_root_from_vote_accounts(
fn process_single_slot(
blockstore: &Blockstore,
bank: &BankWithScheduler,
replay_tx_thread_pool: &ThreadPool,
opts: &ProcessOptions,
recyclers: &VerifyRecyclers,
progress: &mut ConfirmationProgress,
@ -1785,6 +1810,7 @@ fn process_single_slot(
confirm_full_slot(
blockstore,
bank,
replay_tx_thread_pool,
opts,
recyclers,
progress,
@ -3692,7 +3718,16 @@ pub mod tests {
..ProcessOptions::default()
};
let recyclers = VerifyRecyclers::default();
process_bank_0(&bank0, &blockstore, &opts, &recyclers, None, None);
let replay_tx_thread_pool = create_thread_pool(1);
process_bank_0(
&bank0,
&blockstore,
&replay_tx_thread_pool,
&opts,
&recyclers,
None,
None,
);
let bank0_last_blockhash = bank0.last_blockhash();
let bank1 = bank_forks.write().unwrap().insert(Bank::new_from_parent(
bank0.clone_without_scheduler(),
@ -3702,6 +3737,7 @@ pub mod tests {
confirm_full_slot(
&blockstore,
&bank1,
&replay_tx_thread_pool,
&opts,
&recyclers,
&mut ConfirmationProgress::new(bank0_last_blockhash),
@ -4342,8 +4378,10 @@ pub mod tests {
slot_full: bool,
prev_entry_hash: Hash,
) -> result::Result<(), BlockstoreProcessorError> {
let replay_tx_thread_pool = create_thread_pool(1);
confirm_slot_entries(
&BankWithScheduler::new_without_scheduler(bank.clone()),
&replay_tx_thread_pool,
(slot_entries, 0, slot_full),
&mut ConfirmationTiming::default(),
&mut ConfirmationProgress::new(prev_entry_hash),
@ -4400,6 +4438,7 @@ pub mod tests {
let bank = BankWithScheduler::new_without_scheduler(
Bank::new_with_bank_forks_for_tests(&genesis_config).0,
);
let replay_tx_thread_pool = create_thread_pool(1);
let mut timing = ConfirmationTiming::default();
let mut progress = ConfirmationProgress::new(genesis_hash);
let amount = genesis_config.rent.minimum_balance(0);
@ -4436,6 +4475,7 @@ pub mod tests {
confirm_slot_entries(
&bank,
&replay_tx_thread_pool,
(vec![entry], 0, false),
&mut timing,
&mut progress,
@ -4480,6 +4520,7 @@ pub mod tests {
confirm_slot_entries(
&bank,
&replay_tx_thread_pool,
(vec![entry], 0, false),
&mut timing,
&mut progress,
@ -4592,10 +4633,12 @@ pub mod tests {
transaction_indexes: (0..txs.len()).collect(),
};
let replay_tx_thread_pool = create_thread_pool(1);
let mut batch_execution_timing = BatchExecutionTiming::default();
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
assert!(process_batches(
&bank,
&replay_tx_thread_pool,
&[batch_with_indexes],
None,
None,

View File

@ -39,8 +39,5 @@ extern crate solana_metrics;
#[macro_use]
extern crate log;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate solana_frozen_abi_macro;