diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index 823d3cbb2..fc0697505 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -188,8 +188,8 @@ pub struct ForkProgress { pub is_dead: bool, pub fork_stats: ForkStats, pub propagated_stats: PropagatedStats, - pub replay_stats: ReplaySlotStats, - pub replay_progress: ConfirmationProgress, + pub replay_stats: Arc>, + pub replay_progress: Arc>, pub retransmit_info: RetransmitInfo, // Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only // count new blocks replayed since last restart, which won't include @@ -235,8 +235,8 @@ impl ForkProgress { Self { is_dead: false, fork_stats: ForkStats::default(), - replay_stats: ReplaySlotStats::default(), - replay_progress: ConfirmationProgress::new(last_entry), + replay_stats: Arc::new(RwLock::new(ReplaySlotStats::default())), + replay_progress: Arc::new(RwLock::new(ConfirmationProgress::new(last_entry))), num_blocks_on_fork, num_dropped_blocks_on_fork, propagated_stats: PropagatedStats { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 22a872265..d1f408b1c 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1,4 +1,5 @@ //! The `replay_stage` replays transactions broadcast by the leader. + use { crate::{ ancestor_hashes_service::AncestorHashesReplayUpdateSender, @@ -18,7 +19,7 @@ use { fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, - progress_map::{ForkProgress, ProgressMap, PropagatedStats}, + progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats}, repair_service::DuplicateSlotsResetReceiver, rewards_recorder_service::RewardsRecorderSender, tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, @@ -28,6 +29,8 @@ use { window_service::DuplicateSlotReceiver, }, crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + lazy_static::lazy_static, + rayon::{prelude::*, ThreadPool}, solana_client::rpc_response::SlotUpdate, solana_entry::entry::VerifyRecyclers, solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, @@ -35,7 +38,9 @@ use { solana_ledger::{ block_error::BlockError, blockstore::Blockstore, - blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, + blockstore_processor::{ + self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender, + }, leader_schedule_cache::LeaderScheduleCache, leader_schedule_utils::first_of_consecutive_leader_slots, }, @@ -70,7 +75,7 @@ use { collections::{HashMap, HashSet}, result, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -85,6 +90,17 @@ pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; +// Expect this number to be small enough to minimize thread pool overhead while large enough +// to be able to replay all active forks at the same time in most cases. +const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4; + +lazy_static! { + static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new() + .num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY) + .thread_name(|ix| format!("replay_{}", ix)) + .build() + .unwrap(); +} #[derive(PartialEq, Eq, Debug)] pub enum HeaviestForkFailures { @@ -113,6 +129,12 @@ impl Drop for Finalizer { } } +struct ReplaySlotFromBlockstore { + is_slot_dead: bool, + bank_slot: Slot, + replay_result: Option>, +} + struct LastVoteRefreshTime { last_refresh_time: Instant, last_print_time: Instant, @@ -174,7 +196,7 @@ pub struct ReplayTiming { generate_new_bank_forks_get_slots_since_us: u64, generate_new_bank_forks_loop_us: u64, generate_new_bank_forks_write_lock_us: u64, - replay_blockstore_us: u64, + replay_blockstore_us: u64, //< When processing forks concurrently, only captures the longest fork } impl ReplayTiming { #[allow(clippy::too_many_arguments)] @@ -1685,21 +1707,24 @@ impl ReplayStage { fn replay_blockstore_into_bank( bank: &Arc, blockstore: &Blockstore, - bank_progress: &mut ForkProgress, + replay_stats: &RwLock, + replay_progress: &RwLock, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, ) -> result::Result { - let tx_count_before = bank_progress.replay_progress.num_txs; + let mut w_replay_stats = replay_stats.write().unwrap(); + let mut w_replay_progress = replay_progress.write().unwrap(); + let tx_count_before = w_replay_progress.num_txs; // All errors must lead to marking the slot as dead, otherwise, // the `check_slot_agrees_with_cluster()` called by `replay_active_banks()` // will break! blockstore_processor::confirm_slot( blockstore, bank, - &mut bank_progress.replay_stats, - &mut bank_progress.replay_progress, + &mut w_replay_stats, + &mut w_replay_progress, false, transaction_status_sender, Some(replay_vote_sender), @@ -1708,7 +1733,7 @@ impl ReplayStage { false, log_messages_bytes_limit, )?; - let tx_count_after = bank_progress.replay_progress.num_txs; + let tx_count_after = w_replay_progress.num_txs; let tx_count = tx_count_after - tx_count_before; Ok(tx_count) } @@ -2191,17 +2216,189 @@ impl ReplayStage { } #[allow(clippy::too_many_arguments)] - fn replay_active_banks( + fn replay_active_banks_concurrently( blockstore: &Blockstore, bank_forks: &RwLock, my_pubkey: &Pubkey, vote_account: &Pubkey, progress: &mut ProgressMap, transaction_status_sender: Option<&TransactionStatusSender>, - cache_block_meta_sender: Option<&CacheBlockMetaSender>, verify_recyclers: &VerifyRecyclers, - heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, replay_vote_sender: &ReplayVoteSender, + replay_timing: &mut ReplayTiming, + log_messages_bytes_limit: Option, + active_bank_slots: &[Slot], + ) -> Vec { + // Make mutable shared structures thread safe. + let progress = RwLock::new(progress); + let longest_replay_time_us = AtomicU64::new(0); + + // Allow for concurrent replaying of slots from different forks. + let replay_result_vec: Vec = PAR_THREAD_POOL.install(|| { + active_bank_slots + .into_par_iter() + .map(|bank_slot| { + let bank_slot = *bank_slot; + let mut replay_result = ReplaySlotFromBlockstore { + is_slot_dead: false, + bank_slot, + replay_result: None, + }; + let my_pubkey = &my_pubkey.clone(); + trace!( + "Replay active bank: slot {}, thread_idx {}", + bank_slot, + PAR_THREAD_POOL.current_thread_index().unwrap_or_default() + ); + let mut progress_lock = progress.write().unwrap(); + if progress_lock + .get(&bank_slot) + .map(|p| p.is_dead) + .unwrap_or(false) + { + // If the fork was marked as dead, don't replay it + debug!("bank_slot {:?} is marked dead", bank_slot); + replay_result.is_slot_dead = true; + return replay_result; + } + + let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); + let parent_slot = bank.parent_slot(); + let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { + let stats = progress_lock + .get(&parent_slot) + .expect("parent of active bank must exist in progress map"); + let num_blocks_on_fork = stats.num_blocks_on_fork + 1; + let new_dropped_blocks = bank.slot() - parent_slot - 1; + let num_dropped_blocks_on_fork = + stats.num_dropped_blocks_on_fork + new_dropped_blocks; + (num_blocks_on_fork, num_dropped_blocks_on_fork) + }; + let prev_leader_slot = progress_lock.get_bank_prev_leader_slot(bank); + + let bank_progress = progress_lock.entry(bank.slot()).or_insert_with(|| { + ForkProgress::new_from_bank( + bank, + my_pubkey, + &vote_account.clone(), + prev_leader_slot, + num_blocks_on_fork, + num_dropped_blocks_on_fork, + ) + }); + + let replay_stats = bank_progress.replay_stats.clone(); + let replay_progress = bank_progress.replay_progress.clone(); + drop(progress_lock); + + if bank.collector_id() != my_pubkey { + let mut replay_blockstore_time = + Measure::start("replay_blockstore_into_bank"); + let blockstore_result = Self::replay_blockstore_into_bank( + bank, + blockstore, + &replay_stats, + &replay_progress, + transaction_status_sender, + &replay_vote_sender.clone(), + &verify_recyclers.clone(), + log_messages_bytes_limit, + ); + replay_blockstore_time.stop(); + replay_result.replay_result = Some(blockstore_result); + longest_replay_time_us + .fetch_max(replay_blockstore_time.as_us(), Ordering::Relaxed); + } + replay_result + }) + .collect() + }); + // Accumulating time across all slots could inflate this number and make it seem like an + // overly large amount of time is being spent on blockstore compared to other activities. + replay_timing.replay_blockstore_us += longest_replay_time_us.load(Ordering::Relaxed); + + replay_result_vec + } + + #[allow(clippy::too_many_arguments)] + fn replay_active_bank( + blockstore: &Blockstore, + bank_forks: &RwLock, + my_pubkey: &Pubkey, + vote_account: &Pubkey, + progress: &mut ProgressMap, + transaction_status_sender: Option<&TransactionStatusSender>, + verify_recyclers: &VerifyRecyclers, + replay_vote_sender: &ReplayVoteSender, + replay_timing: &mut ReplayTiming, + log_messages_bytes_limit: Option, + bank_slot: Slot, + ) -> ReplaySlotFromBlockstore { + let mut replay_result = ReplaySlotFromBlockstore { + is_slot_dead: false, + bank_slot, + replay_result: None, + }; + let my_pubkey = &my_pubkey.clone(); + trace!("Replay active bank: slot {}", bank_slot); + if progress.get(&bank_slot).map(|p| p.is_dead).unwrap_or(false) { + // If the fork was marked as dead, don't replay it + debug!("bank_slot {:?} is marked dead", bank_slot); + replay_result.is_slot_dead = true; + } else { + let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); + let parent_slot = bank.parent_slot(); + let prev_leader_slot = progress.get_bank_prev_leader_slot(bank); + let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { + let stats = progress + .get(&parent_slot) + .expect("parent of active bank must exist in progress map"); + let num_blocks_on_fork = stats.num_blocks_on_fork + 1; + let new_dropped_blocks = bank.slot() - parent_slot - 1; + let num_dropped_blocks_on_fork = + stats.num_dropped_blocks_on_fork + new_dropped_blocks; + (num_blocks_on_fork, num_dropped_blocks_on_fork) + }; + + let bank_progress = progress.entry(bank.slot()).or_insert_with(|| { + ForkProgress::new_from_bank( + bank, + my_pubkey, + &vote_account.clone(), + prev_leader_slot, + num_blocks_on_fork, + num_dropped_blocks_on_fork, + ) + }); + + if bank.collector_id() != my_pubkey { + let mut replay_blockstore_time = Measure::start("replay_blockstore_into_bank"); + let blockstore_result = Self::replay_blockstore_into_bank( + bank, + blockstore, + &bank_progress.replay_stats, + &bank_progress.replay_progress, + transaction_status_sender, + &replay_vote_sender.clone(), + &verify_recyclers.clone(), + log_messages_bytes_limit, + ); + replay_blockstore_time.stop(); + replay_result.replay_result = Some(blockstore_result); + replay_timing.replay_blockstore_us += replay_blockstore_time.as_us(); + } + } + replay_result + } + + #[allow(clippy::too_many_arguments)] + fn process_replay_results( + blockstore: &Blockstore, + bank_forks: &RwLock, + progress: &mut ProgressMap, + transaction_status_sender: Option<&TransactionStatusSender>, + cache_block_meta_sender: Option<&CacheBlockMetaSender>, + heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, bank_notification_sender: &Option, rewards_recorder_sender: &Option, rpc_subscriptions: &Arc, @@ -2215,72 +2412,29 @@ impl ReplayStage { duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, block_metadata_notifier: Option, - replay_timing: &mut ReplayTiming, - log_messages_bytes_limit: Option, + replay_result_vec: &[ReplaySlotFromBlockstore], ) -> bool { + // TODO: See if processing of blockstore replay results and bank completion can be made thread safe. let mut did_complete_bank = false; let mut tx_count = 0; let mut execute_timings = ExecuteTimings::default(); - let active_banks = bank_forks.read().unwrap().active_banks(); - trace!("active banks {:?}", active_banks); - - for bank_slot in &active_banks { - // If the fork was marked as dead, don't replay it - if progress.get(bank_slot).map(|p| p.is_dead).unwrap_or(false) { - debug!("bank_slot {:?} is marked dead", *bank_slot); + for replay_result in replay_result_vec { + if replay_result.is_slot_dead { continue; } - let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap(); - let parent_slot = bank.parent_slot(); - let prev_leader_slot = progress.get_bank_prev_leader_slot(&bank); - let (num_blocks_on_fork, num_dropped_blocks_on_fork) = { - let stats = progress - .get(&parent_slot) - .expect("parent of active bank must exist in progress map"); - let num_blocks_on_fork = stats.num_blocks_on_fork + 1; - let new_dropped_blocks = bank.slot() - parent_slot - 1; - let num_dropped_blocks_on_fork = - stats.num_dropped_blocks_on_fork + new_dropped_blocks; - (num_blocks_on_fork, num_dropped_blocks_on_fork) - }; - - // Insert a progress entry even for slots this node is the leader for, so that - // 1) confirm_forks can report confirmation, 2) we can cache computations about - // this bank in `select_forks()` - let bank_progress = &mut progress.entry(bank.slot()).or_insert_with(|| { - ForkProgress::new_from_bank( - &bank, - my_pubkey, - vote_account, - prev_leader_slot, - num_blocks_on_fork, - num_dropped_blocks_on_fork, - ) - }); - if bank.collector_id() != my_pubkey { - let root_slot = bank_forks.read().unwrap().root(); - let mut replay_blockstore_time = Measure::start("replay_blockstore_into_bank"); - let replay_result = Self::replay_blockstore_into_bank( - &bank, - blockstore, - bank_progress, - transaction_status_sender, - replay_vote_sender, - verify_recyclers, - log_messages_bytes_limit, - ); - replay_blockstore_time.stop(); - replay_timing.replay_blockstore_us += replay_blockstore_time.as_us(); + let bank_slot = replay_result.bank_slot; + let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap(); + if let Some(replay_result) = &replay_result.replay_result { match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, Err(err) => { // Error means the slot needs to be marked as dead Self::mark_dead_slot( blockstore, - &bank, - root_slot, - &err, + bank, + bank_forks.read().unwrap().root(), + err, rpc_subscriptions, duplicate_slots_tracker, gossip_duplicate_confirmed_slots, @@ -2296,20 +2450,27 @@ impl ReplayStage { } } } - assert_eq!(*bank_slot, bank.slot()); + + assert_eq!(bank_slot, bank.slot()); if bank.is_complete() { let mut bank_complete_time = Measure::start("bank_complete_time"); - execute_timings.accumulate(&bank_progress.replay_stats.execute_timings); - debug!("bank {} is completed replay from blockstore, contribute to update cost with {:?}", - bank.slot(), - bank_progress.replay_stats.execute_timings - ); + let bank_progress = progress + .get_mut(&bank.slot()) + .expect("Bank fork progress entry missing for completed bank"); + let replay_stats = bank_progress.replay_stats.clone(); + let r_replay_stats = replay_stats.read().unwrap(); + let replay_progress = bank_progress.replay_progress.clone(); + let r_replay_progress = replay_progress.read().unwrap(); + debug!("bank {} is completed replay from blockstore, contribute to update cost with {:?}", + bank.slot(), + r_replay_stats.execute_timings + ); did_complete_bank = true; info!("bank frozen: {}", bank.slot()); - let _ = cluster_slots_update_sender.send(vec![*bank_slot]); + let _ = cluster_slots_update_sender.send(vec![bank_slot]); if let Some(transaction_status_sender) = transaction_status_sender { - transaction_status_sender.send_transaction_status_freeze_message(&bank); + transaction_status_sender.send_transaction_status_freeze_message(bank); } bank.freeze(); // report cost tracker stats @@ -2319,8 +2480,7 @@ impl ReplayStage { warn!("cost_update_sender failed sending bank stats: {:?}", err) }); - let bank_hash = bank.hash(); - assert_ne!(bank_hash, Hash::default()); + assert_ne!(bank.hash(), Hash::default()); // Needs to be updated before `check_slot_agrees_with_cluster()` so that // any updates in `check_slot_agrees_with_cluster()` on fork choice take // effect @@ -2353,7 +2513,7 @@ impl ReplayStage { .send(BankNotification::Frozen(bank.clone())) .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); } - blockstore_processor::cache_block_meta(&bank, cache_block_meta_sender); + blockstore_processor::cache_block_meta(bank, cache_block_meta_sender); let bank_hash = bank.hash(); if let Some(new_frozen_voters) = @@ -2368,7 +2528,7 @@ impl ReplayStage { ); } } - Self::record_rewards(&bank, rewards_recorder_sender); + Self::record_rewards(bank, rewards_recorder_sender); if let Some(ref block_metadata_notifier) = block_metadata_notifier { let block_metadata_notifier = block_metadata_notifier.read().unwrap(); block_metadata_notifier.notify_block_metadata( @@ -2381,12 +2541,13 @@ impl ReplayStage { } bank_complete_time.stop(); - bank_progress.replay_stats.report_stats( + r_replay_stats.report_stats( bank.slot(), - bank_progress.replay_progress.num_entries, - bank_progress.replay_progress.num_shreds, + r_replay_progress.num_entries, + r_replay_progress.num_shreds, bank_complete_time.as_us(), ); + execute_timings.accumulate(&r_replay_stats.execute_timings); } else { trace!( "bank {} not completed tick_height: {}, max_tick_height: {}", @@ -2397,7 +2558,7 @@ impl ReplayStage { } } - // send accumulated execute-timings to cost_update_service + // Send accumulated execute-timings to cost_update_service. if !execute_timings.details.per_program_timings.is_empty() { cost_update_sender .send(CostUpdate::ExecuteTiming { @@ -2405,11 +2566,129 @@ impl ReplayStage { }) .unwrap_or_else(|err| warn!("cost_update_sender failed: {:?}", err)); } - inc_new_counter_info!("replay_stage-replay_transactions", tx_count); did_complete_bank } + #[allow(clippy::too_many_arguments)] + fn replay_active_banks( + blockstore: &Blockstore, + bank_forks: &RwLock, + my_pubkey: &Pubkey, + vote_account: &Pubkey, + progress: &mut ProgressMap, + transaction_status_sender: Option<&TransactionStatusSender>, + cache_block_meta_sender: Option<&CacheBlockMetaSender>, + verify_recyclers: &VerifyRecyclers, + heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, + replay_vote_sender: &ReplayVoteSender, + bank_notification_sender: &Option, + rewards_recorder_sender: &Option, + rpc_subscriptions: &Arc, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, + gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, + epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, + unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes, + latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks, + cluster_slots_update_sender: &ClusterSlotsUpdateSender, + cost_update_sender: &Sender, + duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, + block_metadata_notifier: Option, + replay_timing: &mut ReplayTiming, + log_messages_bytes_limit: Option, + ) -> bool { + let active_bank_slots = bank_forks.read().unwrap().active_bank_slots(); + let num_active_banks = active_bank_slots.len(); + warn!( + "{} active bank(s) to replay: {:?}", + num_active_banks, active_bank_slots + ); + if num_active_banks > 0 { + let replay_result_vec = if num_active_banks > 1 { + if bank_forks + .read() + .unwrap() + .get(active_bank_slots[0]) + .unwrap() + .concurrent_replay_of_forks() + { + Self::replay_active_banks_concurrently( + blockstore, + bank_forks, + my_pubkey, + vote_account, + progress, + transaction_status_sender, + verify_recyclers, + replay_vote_sender, + replay_timing, + log_messages_bytes_limit, + &active_bank_slots, + ) + } else { + active_bank_slots + .iter() + .map(|bank_slot| { + Self::replay_active_bank( + blockstore, + bank_forks, + my_pubkey, + vote_account, + progress, + transaction_status_sender, + verify_recyclers, + replay_vote_sender, + replay_timing, + log_messages_bytes_limit, + *bank_slot, + ) + }) + .collect() + } + } else { + vec![Self::replay_active_bank( + blockstore, + bank_forks, + my_pubkey, + vote_account, + progress, + transaction_status_sender, + verify_recyclers, + replay_vote_sender, + replay_timing, + log_messages_bytes_limit, + active_bank_slots[0], + )] + }; + + Self::process_replay_results( + blockstore, + bank_forks, + progress, + transaction_status_sender, + cache_block_meta_sender, + heaviest_subtree_fork_choice, + bank_notification_sender, + rewards_recorder_sender, + rpc_subscriptions, + duplicate_slots_tracker, + gossip_duplicate_confirmed_slots, + epoch_slots_frozen_slots, + unfrozen_gossip_verified_vote_hashes, + latest_validator_votes_for_frozen_banks, + cluster_slots_update_sender, + cost_update_sender, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + block_metadata_notifier, + &replay_result_vec, + ) + } else { + false + } + } + #[allow(clippy::too_many_arguments)] pub fn compute_bank_stats( my_vote_pubkey: &Pubkey, @@ -2988,7 +3267,13 @@ impl ReplayStage { .get(*slot) .expect("bank in progress must exist in BankForks") .clone(); - let duration = prog.replay_stats.started.elapsed().as_millis(); + let duration = prog + .replay_stats + .read() + .unwrap() + .started + .elapsed() + .as_millis(); if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) { info!("validator fork confirmed {} {}ms", *slot, duration); datapoint_info!("validator-confirmation", ("duration_ms", duration, i64)); @@ -3884,7 +4169,8 @@ pub(crate) mod tests { let res = ReplayStage::replay_blockstore_into_bank( &bank1, &blockstore, - bank1_progress, + &bank1_progress.replay_stats, + &bank1_progress.replay_progress, None, &replay_vote_sender, &VerifyRecyclers::default(), diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9128efe6a..43208cf70 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -894,7 +894,7 @@ pub fn process_blockstore_from_root( if bank_slots.len() > 1 { "s" } else { "" }, bank_slots.iter().map(|slot| slot.to_string()).join(", "), ); - assert!(bank_forks.active_banks().is_empty()); + assert!(bank_forks.active_bank_slots().is_empty()); } Ok(()) diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 4bc2b8bfe..c38eea8ce 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -7507,6 +7507,11 @@ impl Bank { .is_active(&feature_set::preserve_rent_epoch_for_rent_exempt_accounts::id()) } + pub fn concurrent_replay_of_forks(&self) -> bool { + self.feature_set + .is_active(&feature_set::concurrent_replay_of_forks::id()) + } + pub fn read_cost_tracker(&self) -> LockResult> { self.cost_tracker.read() } diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 54e448ae4..6785b962a 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -107,7 +107,7 @@ impl BankForks { .collect() } - pub fn active_banks(&self) -> Vec { + pub fn active_bank_slots(&self) -> Vec { self.banks .iter() .filter(|(_, v)| !v.is_frozen()) @@ -635,7 +635,7 @@ mod tests { let mut bank_forks = BankForks::new(bank); let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1); bank_forks.insert(child_bank); - assert_eq!(bank_forks.active_banks(), vec![1]); + assert_eq!(bank_forks.active_bank_slots(), vec![1]); } #[test] diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 899120dad..4486cdf5a 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -484,6 +484,10 @@ pub mod compact_vote_state_updates { solana_sdk::declare_id!("86HpNqzutEZwLcPxS6EHDcMNYWk6ikhteg9un7Y2PBKE"); } +pub mod concurrent_replay_of_forks { + solana_sdk::declare_id!("9F2Dcu8xkBPKxiiy65XKPZYdCG3VZDpjDTuSmeYLozJe"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -599,6 +603,7 @@ lazy_static! { (loosen_cpi_size_restriction::id(), "loosen cpi size restrictions #26641"), (use_default_units_in_fee_calculation::id(), "use default units per instruction in fee calculation #26785"), (compact_vote_state_updates::id(), "Compact vote state updates to lower block size"), + (concurrent_replay_of_forks::id(), "Allow slots from different forks to be replayed concurrently #26465"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()