Make ReplayStage own the threadpool for tx replay (#190)
The threadpool used to replay multiple transactions in parallel is currently global state via a lazy_static definition. Making this pool owned by ReplayStage will enable subsequent work to make the pool size configurable on the CLI. This makes `ReplayStage` create and hold the threadpool which is passed down to blockstore_processor::confirm_slot(). blockstore_processor::process_blockstore_from_root() now creates its' own threadpool as well; however, this pool is only alive while for the scope of that function and does not persist the lifetime of the process.
This commit is contained in:
parent
e8b5f8f375
commit
55408093cd
|
@ -51,6 +51,7 @@ use {
|
||||||
solana_measure::measure::Measure,
|
solana_measure::measure::Measure,
|
||||||
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
|
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
|
||||||
solana_program_runtime::timings::ExecuteTimings,
|
solana_program_runtime::timings::ExecuteTimings,
|
||||||
|
solana_rayon_threadlimit::get_max_thread_count,
|
||||||
solana_rpc::{
|
solana_rpc::{
|
||||||
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
|
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
|
||||||
rpc_subscriptions::RpcSubscriptions,
|
rpc_subscriptions::RpcSubscriptions,
|
||||||
|
@ -652,16 +653,23 @@ impl ReplayStage {
|
||||||
r_bank_forks.get_vote_only_mode_signal(),
|
r_bank_forks.get_vote_only_mode_signal(),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
// Thread pool to (maybe) replay multiple threads in parallel
|
||||||
let replay_mode = if replay_slots_concurrently {
|
let replay_mode = if replay_slots_concurrently {
|
||||||
ForkReplayMode::Serial
|
ForkReplayMode::Serial
|
||||||
} else {
|
} else {
|
||||||
let pool = rayon::ThreadPoolBuilder::new()
|
let pool = rayon::ThreadPoolBuilder::new()
|
||||||
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
|
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
|
||||||
.thread_name(|i| format!("solReplay{i:02}"))
|
.thread_name(|i| format!("solReplayFork{i:02}"))
|
||||||
.build()
|
.build()
|
||||||
.expect("new rayon threadpool");
|
.expect("new rayon threadpool");
|
||||||
ForkReplayMode::Parallel(pool)
|
ForkReplayMode::Parallel(pool)
|
||||||
};
|
};
|
||||||
|
// Thread pool to replay multiple transactions within one block in parallel
|
||||||
|
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
|
||||||
|
.num_threads(get_max_thread_count())
|
||||||
|
.thread_name(|i| format!("solReplayTx{i:02}"))
|
||||||
|
.build()
|
||||||
|
.expect("new rayon threadpool");
|
||||||
|
|
||||||
Self::reset_poh_recorder(
|
Self::reset_poh_recorder(
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
|
@ -724,6 +732,7 @@ impl ReplayStage {
|
||||||
&mut replay_timing,
|
&mut replay_timing,
|
||||||
log_messages_bytes_limit,
|
log_messages_bytes_limit,
|
||||||
&replay_mode,
|
&replay_mode,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
&prioritization_fee_cache,
|
&prioritization_fee_cache,
|
||||||
&mut purge_repair_slot_counter,
|
&mut purge_repair_slot_counter,
|
||||||
);
|
);
|
||||||
|
@ -2136,6 +2145,7 @@ impl ReplayStage {
|
||||||
fn replay_blockstore_into_bank(
|
fn replay_blockstore_into_bank(
|
||||||
bank: &BankWithScheduler,
|
bank: &BankWithScheduler,
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
replay_stats: &RwLock<ReplaySlotStats>,
|
replay_stats: &RwLock<ReplaySlotStats>,
|
||||||
replay_progress: &RwLock<ConfirmationProgress>,
|
replay_progress: &RwLock<ConfirmationProgress>,
|
||||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||||
|
@ -2154,6 +2164,7 @@ impl ReplayStage {
|
||||||
blockstore_processor::confirm_slot(
|
blockstore_processor::confirm_slot(
|
||||||
blockstore,
|
blockstore,
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
&mut w_replay_stats,
|
&mut w_replay_stats,
|
||||||
&mut w_replay_progress,
|
&mut w_replay_progress,
|
||||||
false,
|
false,
|
||||||
|
@ -2712,7 +2723,8 @@ impl ReplayStage {
|
||||||
fn replay_active_banks_concurrently(
|
fn replay_active_banks_concurrently(
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
thread_pool: &ThreadPool,
|
fork_thread_pool: &ThreadPool,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
vote_account: &Pubkey,
|
vote_account: &Pubkey,
|
||||||
progress: &mut ProgressMap,
|
progress: &mut ProgressMap,
|
||||||
|
@ -2730,7 +2742,7 @@ impl ReplayStage {
|
||||||
let longest_replay_time_us = AtomicU64::new(0);
|
let longest_replay_time_us = AtomicU64::new(0);
|
||||||
|
|
||||||
// Allow for concurrent replaying of slots from different forks.
|
// Allow for concurrent replaying of slots from different forks.
|
||||||
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = thread_pool.install(|| {
|
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = fork_thread_pool.install(|| {
|
||||||
active_bank_slots
|
active_bank_slots
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|bank_slot| {
|
.map(|bank_slot| {
|
||||||
|
@ -2744,7 +2756,7 @@ impl ReplayStage {
|
||||||
trace!(
|
trace!(
|
||||||
"Replay active bank: slot {}, thread_idx {}",
|
"Replay active bank: slot {}, thread_idx {}",
|
||||||
bank_slot,
|
bank_slot,
|
||||||
thread_pool.current_thread_index().unwrap_or_default()
|
fork_thread_pool.current_thread_index().unwrap_or_default()
|
||||||
);
|
);
|
||||||
let mut progress_lock = progress.write().unwrap();
|
let mut progress_lock = progress.write().unwrap();
|
||||||
if progress_lock
|
if progress_lock
|
||||||
|
@ -2797,6 +2809,7 @@ impl ReplayStage {
|
||||||
let blockstore_result = Self::replay_blockstore_into_bank(
|
let blockstore_result = Self::replay_blockstore_into_bank(
|
||||||
&bank,
|
&bank,
|
||||||
blockstore,
|
blockstore,
|
||||||
|
replay_tx_thread_pool,
|
||||||
&replay_stats,
|
&replay_stats,
|
||||||
&replay_progress,
|
&replay_progress,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
|
@ -2826,6 +2839,7 @@ impl ReplayStage {
|
||||||
fn replay_active_bank(
|
fn replay_active_bank(
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
vote_account: &Pubkey,
|
vote_account: &Pubkey,
|
||||||
progress: &mut ProgressMap,
|
progress: &mut ProgressMap,
|
||||||
|
@ -2884,6 +2898,7 @@ impl ReplayStage {
|
||||||
let blockstore_result = Self::replay_blockstore_into_bank(
|
let blockstore_result = Self::replay_blockstore_into_bank(
|
||||||
&bank,
|
&bank,
|
||||||
blockstore,
|
blockstore,
|
||||||
|
replay_tx_thread_pool,
|
||||||
&bank_progress.replay_stats,
|
&bank_progress.replay_stats,
|
||||||
&bank_progress.replay_progress,
|
&bank_progress.replay_progress,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
|
@ -3183,6 +3198,7 @@ impl ReplayStage {
|
||||||
replay_timing: &mut ReplayLoopTiming,
|
replay_timing: &mut ReplayLoopTiming,
|
||||||
log_messages_bytes_limit: Option<usize>,
|
log_messages_bytes_limit: Option<usize>,
|
||||||
replay_mode: &ForkReplayMode,
|
replay_mode: &ForkReplayMode,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
prioritization_fee_cache: &PrioritizationFeeCache,
|
prioritization_fee_cache: &PrioritizationFeeCache,
|
||||||
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
|
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
|
||||||
) -> bool /* completed a bank */ {
|
) -> bool /* completed a bank */ {
|
||||||
|
@ -3199,11 +3215,12 @@ impl ReplayStage {
|
||||||
|
|
||||||
let replay_result_vec = match replay_mode {
|
let replay_result_vec = match replay_mode {
|
||||||
// Skip the overhead of the threadpool if there is only one bank to play
|
// Skip the overhead of the threadpool if there is only one bank to play
|
||||||
ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => {
|
ForkReplayMode::Parallel(fork_thread_pool) if num_active_banks > 1 => {
|
||||||
Self::replay_active_banks_concurrently(
|
Self::replay_active_banks_concurrently(
|
||||||
blockstore,
|
blockstore,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
thread_pool,
|
fork_thread_pool,
|
||||||
|
replay_tx_thread_pool,
|
||||||
my_pubkey,
|
my_pubkey,
|
||||||
vote_account,
|
vote_account,
|
||||||
progress,
|
progress,
|
||||||
|
@ -3223,6 +3240,7 @@ impl ReplayStage {
|
||||||
Self::replay_active_bank(
|
Self::replay_active_bank(
|
||||||
blockstore,
|
blockstore,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
|
replay_tx_thread_pool,
|
||||||
my_pubkey,
|
my_pubkey,
|
||||||
vote_account,
|
vote_account,
|
||||||
progress,
|
progress,
|
||||||
|
@ -5034,9 +5052,15 @@ pub(crate) mod tests {
|
||||||
blockstore.insert_shreds(shreds, None, false).unwrap();
|
blockstore.insert_shreds(shreds, None, false).unwrap();
|
||||||
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
|
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
|
||||||
|
.num_threads(1)
|
||||||
|
.thread_name(|i| format!("solReplayTest{i:02}"))
|
||||||
|
.build()
|
||||||
|
.expect("new rayon threadpool");
|
||||||
let res = ReplayStage::replay_blockstore_into_bank(
|
let res = ReplayStage::replay_blockstore_into_bank(
|
||||||
&bank1,
|
&bank1,
|
||||||
&blockstore,
|
&blockstore,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
&bank1_progress.replay_stats,
|
&bank1_progress.replay_stats,
|
||||||
&bank1_progress.replay_progress,
|
&bank1_progress.replay_progress,
|
||||||
None,
|
None,
|
||||||
|
|
|
@ -89,16 +89,6 @@ struct ReplayEntry {
|
||||||
starting_index: usize,
|
starting_index: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
// get_max_thread_count to match number of threads in the old code.
|
|
||||||
// see: https://github.com/solana-labs/solana/pull/24853
|
|
||||||
lazy_static! {
|
|
||||||
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
|
|
||||||
.num_threads(get_max_thread_count())
|
|
||||||
.thread_name(|i| format!("solBstoreProc{i:02}"))
|
|
||||||
.build()
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn first_err(results: &[Result<()>]) -> Result<()> {
|
fn first_err(results: &[Result<()>]) -> Result<()> {
|
||||||
for r in results {
|
for r in results {
|
||||||
if r.is_err() {
|
if r.is_err() {
|
||||||
|
@ -139,6 +129,14 @@ fn get_first_error(
|
||||||
first_err
|
first_err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn create_thread_pool(num_threads: usize) -> ThreadPool {
|
||||||
|
rayon::ThreadPoolBuilder::new()
|
||||||
|
.num_threads(num_threads)
|
||||||
|
.thread_name(|i| format!("solReplayTx{i:02}"))
|
||||||
|
.build()
|
||||||
|
.expect("new rayon threadpool")
|
||||||
|
}
|
||||||
|
|
||||||
pub fn execute_batch(
|
pub fn execute_batch(
|
||||||
batch: &TransactionBatchWithIndexes,
|
batch: &TransactionBatchWithIndexes,
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
|
@ -242,6 +240,7 @@ impl ExecuteBatchesInternalMetrics {
|
||||||
|
|
||||||
fn execute_batches_internal(
|
fn execute_batches_internal(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
batches: &[TransactionBatchWithIndexes],
|
batches: &[TransactionBatchWithIndexes],
|
||||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||||
replay_vote_sender: Option<&ReplayVoteSender>,
|
replay_vote_sender: Option<&ReplayVoteSender>,
|
||||||
|
@ -253,7 +252,7 @@ fn execute_batches_internal(
|
||||||
Mutex::new(HashMap::new());
|
Mutex::new(HashMap::new());
|
||||||
|
|
||||||
let mut execute_batches_elapsed = Measure::start("execute_batches_elapsed");
|
let mut execute_batches_elapsed = Measure::start("execute_batches_elapsed");
|
||||||
let results: Vec<Result<()>> = PAR_THREAD_POOL.install(|| {
|
let results: Vec<Result<()>> = replay_tx_thread_pool.install(|| {
|
||||||
batches
|
batches
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|transaction_batch| {
|
.map(|transaction_batch| {
|
||||||
|
@ -275,7 +274,7 @@ fn execute_batches_internal(
|
||||||
"execute_batch",
|
"execute_batch",
|
||||||
);
|
);
|
||||||
|
|
||||||
let thread_index = PAR_THREAD_POOL.current_thread_index().unwrap();
|
let thread_index = replay_tx_thread_pool.current_thread_index().unwrap();
|
||||||
execution_timings_per_thread
|
execution_timings_per_thread
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -324,6 +323,7 @@ fn execute_batches_internal(
|
||||||
// invocation).
|
// invocation).
|
||||||
fn process_batches(
|
fn process_batches(
|
||||||
bank: &BankWithScheduler,
|
bank: &BankWithScheduler,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
batches: &[TransactionBatchWithIndexes],
|
batches: &[TransactionBatchWithIndexes],
|
||||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||||
replay_vote_sender: Option<&ReplayVoteSender>,
|
replay_vote_sender: Option<&ReplayVoteSender>,
|
||||||
|
@ -348,6 +348,7 @@ fn process_batches(
|
||||||
);
|
);
|
||||||
rebatch_and_execute_batches(
|
rebatch_and_execute_batches(
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
batches,
|
batches,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
|
@ -398,6 +399,7 @@ fn rebatch_transactions<'a>(
|
||||||
|
|
||||||
fn rebatch_and_execute_batches(
|
fn rebatch_and_execute_batches(
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
batches: &[TransactionBatchWithIndexes],
|
batches: &[TransactionBatchWithIndexes],
|
||||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||||
replay_vote_sender: Option<&ReplayVoteSender>,
|
replay_vote_sender: Option<&ReplayVoteSender>,
|
||||||
|
@ -481,6 +483,7 @@ fn rebatch_and_execute_batches(
|
||||||
|
|
||||||
let execute_batches_internal_metrics = execute_batches_internal(
|
let execute_batches_internal_metrics = execute_batches_internal(
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
rebatched_txs,
|
rebatched_txs,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
|
@ -506,6 +509,7 @@ pub fn process_entries_for_tests(
|
||||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||||
replay_vote_sender: Option<&ReplayVoteSender>,
|
replay_vote_sender: Option<&ReplayVoteSender>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
let replay_tx_thread_pool = create_thread_pool(1);
|
||||||
let verify_transaction = {
|
let verify_transaction = {
|
||||||
let bank = bank.clone_with_scheduler();
|
let bank = bank.clone_with_scheduler();
|
||||||
move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> {
|
move |versioned_tx: VersionedTransaction| -> Result<SanitizedTransaction> {
|
||||||
|
@ -533,6 +537,7 @@ pub fn process_entries_for_tests(
|
||||||
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
|
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
|
||||||
let result = process_entries(
|
let result = process_entries(
|
||||||
bank,
|
bank,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
&mut replay_entries,
|
&mut replay_entries,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
|
@ -547,6 +552,7 @@ pub fn process_entries_for_tests(
|
||||||
|
|
||||||
fn process_entries(
|
fn process_entries(
|
||||||
bank: &BankWithScheduler,
|
bank: &BankWithScheduler,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
entries: &mut [ReplayEntry],
|
entries: &mut [ReplayEntry],
|
||||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||||
replay_vote_sender: Option<&ReplayVoteSender>,
|
replay_vote_sender: Option<&ReplayVoteSender>,
|
||||||
|
@ -572,6 +578,7 @@ fn process_entries(
|
||||||
// execute the group and register the tick
|
// execute the group and register the tick
|
||||||
process_batches(
|
process_batches(
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
&batches,
|
&batches,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
|
@ -625,6 +632,7 @@ fn process_entries(
|
||||||
// execute the current queue and try to process this entry again
|
// execute the current queue and try to process this entry again
|
||||||
process_batches(
|
process_batches(
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
&batches,
|
&batches,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
|
@ -640,6 +648,7 @@ fn process_entries(
|
||||||
}
|
}
|
||||||
process_batches(
|
process_batches(
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
&batches,
|
&batches,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
|
@ -805,6 +814,7 @@ pub(crate) fn process_blockstore_for_bank_0(
|
||||||
let bank_forks = BankForks::new_rw_arc(bank0);
|
let bank_forks = BankForks::new_rw_arc(bank0);
|
||||||
|
|
||||||
info!("Processing ledger for slot 0...");
|
info!("Processing ledger for slot 0...");
|
||||||
|
let replay_tx_thread_pool = create_thread_pool(get_max_thread_count());
|
||||||
process_bank_0(
|
process_bank_0(
|
||||||
&bank_forks
|
&bank_forks
|
||||||
.read()
|
.read()
|
||||||
|
@ -812,6 +822,7 @@ pub(crate) fn process_blockstore_for_bank_0(
|
||||||
.get_with_scheduler(bank0_slot)
|
.get_with_scheduler(bank0_slot)
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
blockstore,
|
blockstore,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
opts,
|
opts,
|
||||||
&VerifyRecyclers::default(),
|
&VerifyRecyclers::default(),
|
||||||
cache_block_meta_sender,
|
cache_block_meta_sender,
|
||||||
|
@ -871,10 +882,12 @@ pub fn process_blockstore_from_root(
|
||||||
.meta(start_slot)
|
.meta(start_slot)
|
||||||
.unwrap_or_else(|_| panic!("Failed to get meta for slot {start_slot}"))
|
.unwrap_or_else(|_| panic!("Failed to get meta for slot {start_slot}"))
|
||||||
{
|
{
|
||||||
|
let replay_tx_thread_pool = create_thread_pool(get_max_thread_count());
|
||||||
load_frozen_forks(
|
load_frozen_forks(
|
||||||
bank_forks,
|
bank_forks,
|
||||||
&start_slot_meta,
|
&start_slot_meta,
|
||||||
blockstore,
|
blockstore,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
leader_schedule_cache,
|
leader_schedule_cache,
|
||||||
opts,
|
opts,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
|
@ -978,9 +991,11 @@ fn verify_ticks(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn confirm_full_slot(
|
fn confirm_full_slot(
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
bank: &BankWithScheduler,
|
bank: &BankWithScheduler,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
opts: &ProcessOptions,
|
opts: &ProcessOptions,
|
||||||
recyclers: &VerifyRecyclers,
|
recyclers: &VerifyRecyclers,
|
||||||
progress: &mut ConfirmationProgress,
|
progress: &mut ConfirmationProgress,
|
||||||
|
@ -996,6 +1011,7 @@ fn confirm_full_slot(
|
||||||
confirm_slot(
|
confirm_slot(
|
||||||
blockstore,
|
blockstore,
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
&mut confirmation_timing,
|
&mut confirmation_timing,
|
||||||
progress,
|
progress,
|
||||||
skip_verification,
|
skip_verification,
|
||||||
|
@ -1142,6 +1158,7 @@ impl ConfirmationProgress {
|
||||||
pub fn confirm_slot(
|
pub fn confirm_slot(
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
bank: &BankWithScheduler,
|
bank: &BankWithScheduler,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
timing: &mut ConfirmationTiming,
|
timing: &mut ConfirmationTiming,
|
||||||
progress: &mut ConfirmationProgress,
|
progress: &mut ConfirmationProgress,
|
||||||
skip_verification: bool,
|
skip_verification: bool,
|
||||||
|
@ -1171,6 +1188,7 @@ pub fn confirm_slot(
|
||||||
|
|
||||||
confirm_slot_entries(
|
confirm_slot_entries(
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
slot_entries_load_result,
|
slot_entries_load_result,
|
||||||
timing,
|
timing,
|
||||||
progress,
|
progress,
|
||||||
|
@ -1187,6 +1205,7 @@ pub fn confirm_slot(
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn confirm_slot_entries(
|
fn confirm_slot_entries(
|
||||||
bank: &BankWithScheduler,
|
bank: &BankWithScheduler,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
slot_entries_load_result: (Vec<Entry>, u64, bool),
|
slot_entries_load_result: (Vec<Entry>, u64, bool),
|
||||||
timing: &mut ConfirmationTiming,
|
timing: &mut ConfirmationTiming,
|
||||||
progress: &mut ConfirmationProgress,
|
progress: &mut ConfirmationProgress,
|
||||||
|
@ -1328,6 +1347,7 @@ fn confirm_slot_entries(
|
||||||
.collect();
|
.collect();
|
||||||
let process_result = process_entries(
|
let process_result = process_entries(
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
&mut replay_entries,
|
&mut replay_entries,
|
||||||
transaction_status_sender,
|
transaction_status_sender,
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
|
@ -1385,6 +1405,7 @@ fn confirm_slot_entries(
|
||||||
fn process_bank_0(
|
fn process_bank_0(
|
||||||
bank0: &BankWithScheduler,
|
bank0: &BankWithScheduler,
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
opts: &ProcessOptions,
|
opts: &ProcessOptions,
|
||||||
recyclers: &VerifyRecyclers,
|
recyclers: &VerifyRecyclers,
|
||||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||||
|
@ -1395,6 +1416,7 @@ fn process_bank_0(
|
||||||
confirm_full_slot(
|
confirm_full_slot(
|
||||||
blockstore,
|
blockstore,
|
||||||
bank0,
|
bank0,
|
||||||
|
replay_tx_thread_pool,
|
||||||
opts,
|
opts,
|
||||||
recyclers,
|
recyclers,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
|
@ -1479,6 +1501,7 @@ fn load_frozen_forks(
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
start_slot_meta: &SlotMeta,
|
start_slot_meta: &SlotMeta,
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
leader_schedule_cache: &LeaderScheduleCache,
|
leader_schedule_cache: &LeaderScheduleCache,
|
||||||
opts: &ProcessOptions,
|
opts: &ProcessOptions,
|
||||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||||
|
@ -1566,6 +1589,7 @@ fn load_frozen_forks(
|
||||||
if process_single_slot(
|
if process_single_slot(
|
||||||
blockstore,
|
blockstore,
|
||||||
&bank,
|
&bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
opts,
|
opts,
|
||||||
&recyclers,
|
&recyclers,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
|
@ -1771,6 +1795,7 @@ fn supermajority_root_from_vote_accounts(
|
||||||
fn process_single_slot(
|
fn process_single_slot(
|
||||||
blockstore: &Blockstore,
|
blockstore: &Blockstore,
|
||||||
bank: &BankWithScheduler,
|
bank: &BankWithScheduler,
|
||||||
|
replay_tx_thread_pool: &ThreadPool,
|
||||||
opts: &ProcessOptions,
|
opts: &ProcessOptions,
|
||||||
recyclers: &VerifyRecyclers,
|
recyclers: &VerifyRecyclers,
|
||||||
progress: &mut ConfirmationProgress,
|
progress: &mut ConfirmationProgress,
|
||||||
|
@ -1785,6 +1810,7 @@ fn process_single_slot(
|
||||||
confirm_full_slot(
|
confirm_full_slot(
|
||||||
blockstore,
|
blockstore,
|
||||||
bank,
|
bank,
|
||||||
|
replay_tx_thread_pool,
|
||||||
opts,
|
opts,
|
||||||
recyclers,
|
recyclers,
|
||||||
progress,
|
progress,
|
||||||
|
@ -3692,7 +3718,16 @@ pub mod tests {
|
||||||
..ProcessOptions::default()
|
..ProcessOptions::default()
|
||||||
};
|
};
|
||||||
let recyclers = VerifyRecyclers::default();
|
let recyclers = VerifyRecyclers::default();
|
||||||
process_bank_0(&bank0, &blockstore, &opts, &recyclers, None, None);
|
let replay_tx_thread_pool = create_thread_pool(1);
|
||||||
|
process_bank_0(
|
||||||
|
&bank0,
|
||||||
|
&blockstore,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
|
&opts,
|
||||||
|
&recyclers,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
);
|
||||||
let bank0_last_blockhash = bank0.last_blockhash();
|
let bank0_last_blockhash = bank0.last_blockhash();
|
||||||
let bank1 = bank_forks.write().unwrap().insert(Bank::new_from_parent(
|
let bank1 = bank_forks.write().unwrap().insert(Bank::new_from_parent(
|
||||||
bank0.clone_without_scheduler(),
|
bank0.clone_without_scheduler(),
|
||||||
|
@ -3702,6 +3737,7 @@ pub mod tests {
|
||||||
confirm_full_slot(
|
confirm_full_slot(
|
||||||
&blockstore,
|
&blockstore,
|
||||||
&bank1,
|
&bank1,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
&opts,
|
&opts,
|
||||||
&recyclers,
|
&recyclers,
|
||||||
&mut ConfirmationProgress::new(bank0_last_blockhash),
|
&mut ConfirmationProgress::new(bank0_last_blockhash),
|
||||||
|
@ -4342,8 +4378,10 @@ pub mod tests {
|
||||||
slot_full: bool,
|
slot_full: bool,
|
||||||
prev_entry_hash: Hash,
|
prev_entry_hash: Hash,
|
||||||
) -> result::Result<(), BlockstoreProcessorError> {
|
) -> result::Result<(), BlockstoreProcessorError> {
|
||||||
|
let replay_tx_thread_pool = create_thread_pool(1);
|
||||||
confirm_slot_entries(
|
confirm_slot_entries(
|
||||||
&BankWithScheduler::new_without_scheduler(bank.clone()),
|
&BankWithScheduler::new_without_scheduler(bank.clone()),
|
||||||
|
&replay_tx_thread_pool,
|
||||||
(slot_entries, 0, slot_full),
|
(slot_entries, 0, slot_full),
|
||||||
&mut ConfirmationTiming::default(),
|
&mut ConfirmationTiming::default(),
|
||||||
&mut ConfirmationProgress::new(prev_entry_hash),
|
&mut ConfirmationProgress::new(prev_entry_hash),
|
||||||
|
@ -4400,6 +4438,7 @@ pub mod tests {
|
||||||
let bank = BankWithScheduler::new_without_scheduler(
|
let bank = BankWithScheduler::new_without_scheduler(
|
||||||
Bank::new_with_bank_forks_for_tests(&genesis_config).0,
|
Bank::new_with_bank_forks_for_tests(&genesis_config).0,
|
||||||
);
|
);
|
||||||
|
let replay_tx_thread_pool = create_thread_pool(1);
|
||||||
let mut timing = ConfirmationTiming::default();
|
let mut timing = ConfirmationTiming::default();
|
||||||
let mut progress = ConfirmationProgress::new(genesis_hash);
|
let mut progress = ConfirmationProgress::new(genesis_hash);
|
||||||
let amount = genesis_config.rent.minimum_balance(0);
|
let amount = genesis_config.rent.minimum_balance(0);
|
||||||
|
@ -4436,6 +4475,7 @@ pub mod tests {
|
||||||
|
|
||||||
confirm_slot_entries(
|
confirm_slot_entries(
|
||||||
&bank,
|
&bank,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
(vec![entry], 0, false),
|
(vec![entry], 0, false),
|
||||||
&mut timing,
|
&mut timing,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
|
@ -4480,6 +4520,7 @@ pub mod tests {
|
||||||
|
|
||||||
confirm_slot_entries(
|
confirm_slot_entries(
|
||||||
&bank,
|
&bank,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
(vec![entry], 0, false),
|
(vec![entry], 0, false),
|
||||||
&mut timing,
|
&mut timing,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
|
@ -4592,10 +4633,12 @@ pub mod tests {
|
||||||
transaction_indexes: (0..txs.len()).collect(),
|
transaction_indexes: (0..txs.len()).collect(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let replay_tx_thread_pool = create_thread_pool(1);
|
||||||
let mut batch_execution_timing = BatchExecutionTiming::default();
|
let mut batch_execution_timing = BatchExecutionTiming::default();
|
||||||
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
|
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
|
||||||
assert!(process_batches(
|
assert!(process_batches(
|
||||||
&bank,
|
&bank,
|
||||||
|
&replay_tx_thread_pool,
|
||||||
&[batch_with_indexes],
|
&[batch_with_indexes],
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
|
|
|
@ -39,8 +39,5 @@ extern crate solana_metrics;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
#[macro_use]
|
|
||||||
extern crate lazy_static;
|
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate solana_frozen_abi_macro;
|
extern crate solana_frozen_abi_macro;
|
||||||
|
|
Loading…
Reference in New Issue