Concurrent slot replay (#26465)

* Concurrent replay slots

* Split out concurrent and single bank replay paths

* Sub function processing of replay results for readability

* Add feature switch for concurrent replay
This commit is contained in:
Brennan Watt 2022-07-28 11:33:19 -07:00 committed by GitHub
parent 013d045981
commit 467cb5def5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 387 additions and 91 deletions

View File

@ -188,8 +188,8 @@ pub struct ForkProgress {
pub is_dead: bool, pub is_dead: bool,
pub fork_stats: ForkStats, pub fork_stats: ForkStats,
pub propagated_stats: PropagatedStats, pub propagated_stats: PropagatedStats,
pub replay_stats: ReplaySlotStats, pub replay_stats: Arc<RwLock<ReplaySlotStats>>,
pub replay_progress: ConfirmationProgress, pub replay_progress: Arc<RwLock<ConfirmationProgress>>,
pub retransmit_info: RetransmitInfo, pub retransmit_info: RetransmitInfo,
// Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only // Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only
// count new blocks replayed since last restart, which won't include // count new blocks replayed since last restart, which won't include
@ -235,8 +235,8 @@ impl ForkProgress {
Self { Self {
is_dead: false, is_dead: false,
fork_stats: ForkStats::default(), fork_stats: ForkStats::default(),
replay_stats: ReplaySlotStats::default(), replay_stats: Arc::new(RwLock::new(ReplaySlotStats::default())),
replay_progress: ConfirmationProgress::new(last_entry), replay_progress: Arc::new(RwLock::new(ConfirmationProgress::new(last_entry))),
num_blocks_on_fork, num_blocks_on_fork,
num_dropped_blocks_on_fork, num_dropped_blocks_on_fork,
propagated_stats: PropagatedStats { propagated_stats: PropagatedStats {

View File

@ -1,4 +1,5 @@
//! The `replay_stage` replays transactions broadcast by the leader. //! The `replay_stage` replays transactions broadcast by the leader.
use { use {
crate::{ crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateSender, ancestor_hashes_service::AncestorHashesReplayUpdateSender,
@ -18,7 +19,7 @@ use {
fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
progress_map::{ForkProgress, ProgressMap, PropagatedStats}, progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats},
repair_service::DuplicateSlotsResetReceiver, repair_service::DuplicateSlotsResetReceiver,
rewards_recorder_service::RewardsRecorderSender, rewards_recorder_service::RewardsRecorderSender,
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
@ -28,6 +29,8 @@ use {
window_service::DuplicateSlotReceiver, window_service::DuplicateSlotReceiver,
}, },
crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
lazy_static::lazy_static,
rayon::{prelude::*, ThreadPool},
solana_client::rpc_response::SlotUpdate, solana_client::rpc_response::SlotUpdate,
solana_entry::entry::VerifyRecyclers, solana_entry::entry::VerifyRecyclers,
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
@ -35,7 +38,9 @@ use {
solana_ledger::{ solana_ledger::{
block_error::BlockError, block_error::BlockError,
blockstore::Blockstore, blockstore::Blockstore,
blockstore_processor::{self, BlockstoreProcessorError, TransactionStatusSender}, blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
},
leader_schedule_cache::LeaderScheduleCache, leader_schedule_cache::LeaderScheduleCache,
leader_schedule_utils::first_of_consecutive_leader_slots, leader_schedule_utils::first_of_consecutive_leader_slots,
}, },
@ -70,7 +75,7 @@ use {
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
result, result,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock, Arc, RwLock,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
@ -85,6 +90,17 @@ pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;
const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_SIGNATURES: usize = 200;
const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
// Expect this number to be small enough to minimize thread pool overhead while large enough
// to be able to replay all active forks at the same time in most cases.
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|ix| format!("replay_{}", ix))
.build()
.unwrap();
}
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
pub enum HeaviestForkFailures { pub enum HeaviestForkFailures {
@ -113,6 +129,12 @@ impl Drop for Finalizer {
} }
} }
struct ReplaySlotFromBlockstore {
is_slot_dead: bool,
bank_slot: Slot,
replay_result: Option<Result<usize /* tx count */, BlockstoreProcessorError>>,
}
struct LastVoteRefreshTime { struct LastVoteRefreshTime {
last_refresh_time: Instant, last_refresh_time: Instant,
last_print_time: Instant, last_print_time: Instant,
@ -174,7 +196,7 @@ pub struct ReplayTiming {
generate_new_bank_forks_get_slots_since_us: u64, generate_new_bank_forks_get_slots_since_us: u64,
generate_new_bank_forks_loop_us: u64, generate_new_bank_forks_loop_us: u64,
generate_new_bank_forks_write_lock_us: u64, generate_new_bank_forks_write_lock_us: u64,
replay_blockstore_us: u64, replay_blockstore_us: u64, //< When processing forks concurrently, only captures the longest fork
} }
impl ReplayTiming { impl ReplayTiming {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -1685,21 +1707,24 @@ impl ReplayStage {
fn replay_blockstore_into_bank( fn replay_blockstore_into_bank(
bank: &Arc<Bank>, bank: &Arc<Bank>,
blockstore: &Blockstore, blockstore: &Blockstore,
bank_progress: &mut ForkProgress, replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender, replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers, verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> result::Result<usize, BlockstoreProcessorError> { ) -> result::Result<usize, BlockstoreProcessorError> {
let tx_count_before = bank_progress.replay_progress.num_txs; let mut w_replay_stats = replay_stats.write().unwrap();
let mut w_replay_progress = replay_progress.write().unwrap();
let tx_count_before = w_replay_progress.num_txs;
// All errors must lead to marking the slot as dead, otherwise, // All errors must lead to marking the slot as dead, otherwise,
// the `check_slot_agrees_with_cluster()` called by `replay_active_banks()` // the `check_slot_agrees_with_cluster()` called by `replay_active_banks()`
// will break! // will break!
blockstore_processor::confirm_slot( blockstore_processor::confirm_slot(
blockstore, blockstore,
bank, bank,
&mut bank_progress.replay_stats, &mut w_replay_stats,
&mut bank_progress.replay_progress, &mut w_replay_progress,
false, false,
transaction_status_sender, transaction_status_sender,
Some(replay_vote_sender), Some(replay_vote_sender),
@ -1708,7 +1733,7 @@ impl ReplayStage {
false, false,
log_messages_bytes_limit, log_messages_bytes_limit,
)?; )?;
let tx_count_after = bank_progress.replay_progress.num_txs; let tx_count_after = w_replay_progress.num_txs;
let tx_count = tx_count_after - tx_count_before; let tx_count = tx_count_after - tx_count_before;
Ok(tx_count) Ok(tx_count)
} }
@ -2191,17 +2216,189 @@ impl ReplayStage {
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn replay_active_banks( fn replay_active_banks_concurrently(
blockstore: &Blockstore, blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
vote_account: &Pubkey, vote_account: &Pubkey,
progress: &mut ProgressMap, progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>, transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
verify_recyclers: &VerifyRecyclers, verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender, replay_vote_sender: &ReplayVoteSender,
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
active_bank_slots: &[Slot],
) -> Vec<ReplaySlotFromBlockstore> {
// Make mutable shared structures thread safe.
let progress = RwLock::new(progress);
let longest_replay_time_us = AtomicU64::new(0);
// Allow for concurrent replaying of slots from different forks.
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = PAR_THREAD_POOL.install(|| {
active_bank_slots
.into_par_iter()
.map(|bank_slot| {
let bank_slot = *bank_slot;
let mut replay_result = ReplaySlotFromBlockstore {
is_slot_dead: false,
bank_slot,
replay_result: None,
};
let my_pubkey = &my_pubkey.clone();
trace!(
"Replay active bank: slot {}, thread_idx {}",
bank_slot,
PAR_THREAD_POOL.current_thread_index().unwrap_or_default()
);
let mut progress_lock = progress.write().unwrap();
if progress_lock
.get(&bank_slot)
.map(|p| p.is_dead)
.unwrap_or(false)
{
// If the fork was marked as dead, don't replay it
debug!("bank_slot {:?} is marked dead", bank_slot);
replay_result.is_slot_dead = true;
return replay_result;
}
let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap();
let parent_slot = bank.parent_slot();
let (num_blocks_on_fork, num_dropped_blocks_on_fork) = {
let stats = progress_lock
.get(&parent_slot)
.expect("parent of active bank must exist in progress map");
let num_blocks_on_fork = stats.num_blocks_on_fork + 1;
let new_dropped_blocks = bank.slot() - parent_slot - 1;
let num_dropped_blocks_on_fork =
stats.num_dropped_blocks_on_fork + new_dropped_blocks;
(num_blocks_on_fork, num_dropped_blocks_on_fork)
};
let prev_leader_slot = progress_lock.get_bank_prev_leader_slot(bank);
let bank_progress = progress_lock.entry(bank.slot()).or_insert_with(|| {
ForkProgress::new_from_bank(
bank,
my_pubkey,
&vote_account.clone(),
prev_leader_slot,
num_blocks_on_fork,
num_dropped_blocks_on_fork,
)
});
let replay_stats = bank_progress.replay_stats.clone();
let replay_progress = bank_progress.replay_progress.clone();
drop(progress_lock);
if bank.collector_id() != my_pubkey {
let mut replay_blockstore_time =
Measure::start("replay_blockstore_into_bank");
let blockstore_result = Self::replay_blockstore_into_bank(
bank,
blockstore,
&replay_stats,
&replay_progress,
transaction_status_sender,
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
longest_replay_time_us
.fetch_max(replay_blockstore_time.as_us(), Ordering::Relaxed);
}
replay_result
})
.collect()
});
// Accumulating time across all slots could inflate this number and make it seem like an
// overly large amount of time is being spent on blockstore compared to other activities.
replay_timing.replay_blockstore_us += longest_replay_time_us.load(Ordering::Relaxed);
replay_result_vec
}
#[allow(clippy::too_many_arguments)]
fn replay_active_bank(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
verify_recyclers: &VerifyRecyclers,
replay_vote_sender: &ReplayVoteSender,
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
bank_slot: Slot,
) -> ReplaySlotFromBlockstore {
let mut replay_result = ReplaySlotFromBlockstore {
is_slot_dead: false,
bank_slot,
replay_result: None,
};
let my_pubkey = &my_pubkey.clone();
trace!("Replay active bank: slot {}", bank_slot);
if progress.get(&bank_slot).map(|p| p.is_dead).unwrap_or(false) {
// If the fork was marked as dead, don't replay it
debug!("bank_slot {:?} is marked dead", bank_slot);
replay_result.is_slot_dead = true;
} else {
let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap();
let parent_slot = bank.parent_slot();
let prev_leader_slot = progress.get_bank_prev_leader_slot(bank);
let (num_blocks_on_fork, num_dropped_blocks_on_fork) = {
let stats = progress
.get(&parent_slot)
.expect("parent of active bank must exist in progress map");
let num_blocks_on_fork = stats.num_blocks_on_fork + 1;
let new_dropped_blocks = bank.slot() - parent_slot - 1;
let num_dropped_blocks_on_fork =
stats.num_dropped_blocks_on_fork + new_dropped_blocks;
(num_blocks_on_fork, num_dropped_blocks_on_fork)
};
let bank_progress = progress.entry(bank.slot()).or_insert_with(|| {
ForkProgress::new_from_bank(
bank,
my_pubkey,
&vote_account.clone(),
prev_leader_slot,
num_blocks_on_fork,
num_dropped_blocks_on_fork,
)
});
if bank.collector_id() != my_pubkey {
let mut replay_blockstore_time = Measure::start("replay_blockstore_into_bank");
let blockstore_result = Self::replay_blockstore_into_bank(
bank,
blockstore,
&bank_progress.replay_stats,
&bank_progress.replay_progress,
transaction_status_sender,
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
replay_timing.replay_blockstore_us += replay_blockstore_time.as_us();
}
}
replay_result
}
#[allow(clippy::too_many_arguments)]
fn process_replay_results(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
bank_notification_sender: &Option<BankNotificationSender>, bank_notification_sender: &Option<BankNotificationSender>,
rewards_recorder_sender: &Option<RewardsRecorderSender>, rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>, rpc_subscriptions: &Arc<RpcSubscriptions>,
@ -2215,72 +2412,29 @@ impl ReplayStage {
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>, block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_timing: &mut ReplayTiming, replay_result_vec: &[ReplaySlotFromBlockstore],
log_messages_bytes_limit: Option<usize>,
) -> bool { ) -> bool {
// TODO: See if processing of blockstore replay results and bank completion can be made thread safe.
let mut did_complete_bank = false; let mut did_complete_bank = false;
let mut tx_count = 0; let mut tx_count = 0;
let mut execute_timings = ExecuteTimings::default(); let mut execute_timings = ExecuteTimings::default();
let active_banks = bank_forks.read().unwrap().active_banks(); for replay_result in replay_result_vec {
trace!("active banks {:?}", active_banks); if replay_result.is_slot_dead {
for bank_slot in &active_banks {
// If the fork was marked as dead, don't replay it
if progress.get(bank_slot).map(|p| p.is_dead).unwrap_or(false) {
debug!("bank_slot {:?} is marked dead", *bank_slot);
continue; continue;
} }
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap(); let bank_slot = replay_result.bank_slot;
let parent_slot = bank.parent_slot(); let bank = &bank_forks.read().unwrap().get(bank_slot).unwrap();
let prev_leader_slot = progress.get_bank_prev_leader_slot(&bank); if let Some(replay_result) = &replay_result.replay_result {
let (num_blocks_on_fork, num_dropped_blocks_on_fork) = {
let stats = progress
.get(&parent_slot)
.expect("parent of active bank must exist in progress map");
let num_blocks_on_fork = stats.num_blocks_on_fork + 1;
let new_dropped_blocks = bank.slot() - parent_slot - 1;
let num_dropped_blocks_on_fork =
stats.num_dropped_blocks_on_fork + new_dropped_blocks;
(num_blocks_on_fork, num_dropped_blocks_on_fork)
};
// Insert a progress entry even for slots this node is the leader for, so that
// 1) confirm_forks can report confirmation, 2) we can cache computations about
// this bank in `select_forks()`
let bank_progress = &mut progress.entry(bank.slot()).or_insert_with(|| {
ForkProgress::new_from_bank(
&bank,
my_pubkey,
vote_account,
prev_leader_slot,
num_blocks_on_fork,
num_dropped_blocks_on_fork,
)
});
if bank.collector_id() != my_pubkey {
let root_slot = bank_forks.read().unwrap().root();
let mut replay_blockstore_time = Measure::start("replay_blockstore_into_bank");
let replay_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
bank_progress,
transaction_status_sender,
replay_vote_sender,
verify_recyclers,
log_messages_bytes_limit,
);
replay_blockstore_time.stop();
replay_timing.replay_blockstore_us += replay_blockstore_time.as_us();
match replay_result { match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count, Ok(replay_tx_count) => tx_count += replay_tx_count,
Err(err) => { Err(err) => {
// Error means the slot needs to be marked as dead // Error means the slot needs to be marked as dead
Self::mark_dead_slot( Self::mark_dead_slot(
blockstore, blockstore,
&bank, bank,
root_slot, bank_forks.read().unwrap().root(),
&err, err,
rpc_subscriptions, rpc_subscriptions,
duplicate_slots_tracker, duplicate_slots_tracker,
gossip_duplicate_confirmed_slots, gossip_duplicate_confirmed_slots,
@ -2296,20 +2450,27 @@ impl ReplayStage {
} }
} }
} }
assert_eq!(*bank_slot, bank.slot());
assert_eq!(bank_slot, bank.slot());
if bank.is_complete() { if bank.is_complete() {
let mut bank_complete_time = Measure::start("bank_complete_time"); let mut bank_complete_time = Measure::start("bank_complete_time");
execute_timings.accumulate(&bank_progress.replay_stats.execute_timings); let bank_progress = progress
debug!("bank {} is completed replay from blockstore, contribute to update cost with {:?}", .get_mut(&bank.slot())
bank.slot(), .expect("Bank fork progress entry missing for completed bank");
bank_progress.replay_stats.execute_timings
);
let replay_stats = bank_progress.replay_stats.clone();
let r_replay_stats = replay_stats.read().unwrap();
let replay_progress = bank_progress.replay_progress.clone();
let r_replay_progress = replay_progress.read().unwrap();
debug!("bank {} is completed replay from blockstore, contribute to update cost with {:?}",
bank.slot(),
r_replay_stats.execute_timings
);
did_complete_bank = true; did_complete_bank = true;
info!("bank frozen: {}", bank.slot()); info!("bank frozen: {}", bank.slot());
let _ = cluster_slots_update_sender.send(vec![*bank_slot]); let _ = cluster_slots_update_sender.send(vec![bank_slot]);
if let Some(transaction_status_sender) = transaction_status_sender { if let Some(transaction_status_sender) = transaction_status_sender {
transaction_status_sender.send_transaction_status_freeze_message(&bank); transaction_status_sender.send_transaction_status_freeze_message(bank);
} }
bank.freeze(); bank.freeze();
// report cost tracker stats // report cost tracker stats
@ -2319,8 +2480,7 @@ impl ReplayStage {
warn!("cost_update_sender failed sending bank stats: {:?}", err) warn!("cost_update_sender failed sending bank stats: {:?}", err)
}); });
let bank_hash = bank.hash(); assert_ne!(bank.hash(), Hash::default());
assert_ne!(bank_hash, Hash::default());
// Needs to be updated before `check_slot_agrees_with_cluster()` so that // Needs to be updated before `check_slot_agrees_with_cluster()` so that
// any updates in `check_slot_agrees_with_cluster()` on fork choice take // any updates in `check_slot_agrees_with_cluster()` on fork choice take
// effect // effect
@ -2353,7 +2513,7 @@ impl ReplayStage {
.send(BankNotification::Frozen(bank.clone())) .send(BankNotification::Frozen(bank.clone()))
.unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err)); .unwrap_or_else(|err| warn!("bank_notification_sender failed: {:?}", err));
} }
blockstore_processor::cache_block_meta(&bank, cache_block_meta_sender); blockstore_processor::cache_block_meta(bank, cache_block_meta_sender);
let bank_hash = bank.hash(); let bank_hash = bank.hash();
if let Some(new_frozen_voters) = if let Some(new_frozen_voters) =
@ -2368,7 +2528,7 @@ impl ReplayStage {
); );
} }
} }
Self::record_rewards(&bank, rewards_recorder_sender); Self::record_rewards(bank, rewards_recorder_sender);
if let Some(ref block_metadata_notifier) = block_metadata_notifier { if let Some(ref block_metadata_notifier) = block_metadata_notifier {
let block_metadata_notifier = block_metadata_notifier.read().unwrap(); let block_metadata_notifier = block_metadata_notifier.read().unwrap();
block_metadata_notifier.notify_block_metadata( block_metadata_notifier.notify_block_metadata(
@ -2381,12 +2541,13 @@ impl ReplayStage {
} }
bank_complete_time.stop(); bank_complete_time.stop();
bank_progress.replay_stats.report_stats( r_replay_stats.report_stats(
bank.slot(), bank.slot(),
bank_progress.replay_progress.num_entries, r_replay_progress.num_entries,
bank_progress.replay_progress.num_shreds, r_replay_progress.num_shreds,
bank_complete_time.as_us(), bank_complete_time.as_us(),
); );
execute_timings.accumulate(&r_replay_stats.execute_timings);
} else { } else {
trace!( trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}", "bank {} not completed tick_height: {}, max_tick_height: {}",
@ -2397,7 +2558,7 @@ impl ReplayStage {
} }
} }
// send accumulated execute-timings to cost_update_service // Send accumulated execute-timings to cost_update_service.
if !execute_timings.details.per_program_timings.is_empty() { if !execute_timings.details.per_program_timings.is_empty() {
cost_update_sender cost_update_sender
.send(CostUpdate::ExecuteTiming { .send(CostUpdate::ExecuteTiming {
@ -2405,11 +2566,129 @@ impl ReplayStage {
}) })
.unwrap_or_else(|err| warn!("cost_update_sender failed: {:?}", err)); .unwrap_or_else(|err| warn!("cost_update_sender failed: {:?}", err));
} }
inc_new_counter_info!("replay_stage-replay_transactions", tx_count); inc_new_counter_info!("replay_stage-replay_transactions", tx_count);
did_complete_bank did_complete_bank
} }
#[allow(clippy::too_many_arguments)]
fn replay_active_banks(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
duplicate_slots_tracker: &mut DuplicateSlotsTracker,
gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
unfrozen_gossip_verified_vote_hashes: &mut UnfrozenGossipVerifiedVoteHashes,
latest_validator_votes_for_frozen_banks: &mut LatestValidatorVotesForFrozenBanks,
cluster_slots_update_sender: &ClusterSlotsUpdateSender,
cost_update_sender: &Sender<CostUpdate>,
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
) -> bool {
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
let num_active_banks = active_bank_slots.len();
warn!(
"{} active bank(s) to replay: {:?}",
num_active_banks, active_bank_slots
);
if num_active_banks > 0 {
let replay_result_vec = if num_active_banks > 1 {
if bank_forks
.read()
.unwrap()
.get(active_bank_slots[0])
.unwrap()
.concurrent_replay_of_forks()
{
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
&active_bank_slots,
)
} else {
active_bank_slots
.iter()
.map(|bank_slot| {
Self::replay_active_bank(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
*bank_slot,
)
})
.collect()
}
} else {
vec![Self::replay_active_bank(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
active_bank_slots[0],
)]
};
Self::process_replay_results(
blockstore,
bank_forks,
progress,
transaction_status_sender,
cache_block_meta_sender,
heaviest_subtree_fork_choice,
bank_notification_sender,
rewards_recorder_sender,
rpc_subscriptions,
duplicate_slots_tracker,
gossip_duplicate_confirmed_slots,
epoch_slots_frozen_slots,
unfrozen_gossip_verified_vote_hashes,
latest_validator_votes_for_frozen_banks,
cluster_slots_update_sender,
cost_update_sender,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
block_metadata_notifier,
&replay_result_vec,
)
} else {
false
}
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn compute_bank_stats( pub fn compute_bank_stats(
my_vote_pubkey: &Pubkey, my_vote_pubkey: &Pubkey,
@ -2988,7 +3267,13 @@ impl ReplayStage {
.get(*slot) .get(*slot)
.expect("bank in progress must exist in BankForks") .expect("bank in progress must exist in BankForks")
.clone(); .clone();
let duration = prog.replay_stats.started.elapsed().as_millis(); let duration = prog
.replay_stats
.read()
.unwrap()
.started
.elapsed()
.as_millis();
if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) { if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) {
info!("validator fork confirmed {} {}ms", *slot, duration); info!("validator fork confirmed {} {}ms", *slot, duration);
datapoint_info!("validator-confirmation", ("duration_ms", duration, i64)); datapoint_info!("validator-confirmation", ("duration_ms", duration, i64));
@ -3884,7 +4169,8 @@ pub(crate) mod tests {
let res = ReplayStage::replay_blockstore_into_bank( let res = ReplayStage::replay_blockstore_into_bank(
&bank1, &bank1,
&blockstore, &blockstore,
bank1_progress, &bank1_progress.replay_stats,
&bank1_progress.replay_progress,
None, None,
&replay_vote_sender, &replay_vote_sender,
&VerifyRecyclers::default(), &VerifyRecyclers::default(),

View File

@ -894,7 +894,7 @@ pub fn process_blockstore_from_root(
if bank_slots.len() > 1 { "s" } else { "" }, if bank_slots.len() > 1 { "s" } else { "" },
bank_slots.iter().map(|slot| slot.to_string()).join(", "), bank_slots.iter().map(|slot| slot.to_string()).join(", "),
); );
assert!(bank_forks.active_banks().is_empty()); assert!(bank_forks.active_bank_slots().is_empty());
} }
Ok(()) Ok(())

View File

@ -7507,6 +7507,11 @@ impl Bank {
.is_active(&feature_set::preserve_rent_epoch_for_rent_exempt_accounts::id()) .is_active(&feature_set::preserve_rent_epoch_for_rent_exempt_accounts::id())
} }
pub fn concurrent_replay_of_forks(&self) -> bool {
self.feature_set
.is_active(&feature_set::concurrent_replay_of_forks::id())
}
pub fn read_cost_tracker(&self) -> LockResult<RwLockReadGuard<CostTracker>> { pub fn read_cost_tracker(&self) -> LockResult<RwLockReadGuard<CostTracker>> {
self.cost_tracker.read() self.cost_tracker.read()
} }

View File

@ -107,7 +107,7 @@ impl BankForks {
.collect() .collect()
} }
pub fn active_banks(&self) -> Vec<Slot> { pub fn active_bank_slots(&self) -> Vec<Slot> {
self.banks self.banks
.iter() .iter()
.filter(|(_, v)| !v.is_frozen()) .filter(|(_, v)| !v.is_frozen())
@ -635,7 +635,7 @@ mod tests {
let mut bank_forks = BankForks::new(bank); let mut bank_forks = BankForks::new(bank);
let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1); let child_bank = Bank::new_from_parent(&bank_forks[0u64], &Pubkey::default(), 1);
bank_forks.insert(child_bank); bank_forks.insert(child_bank);
assert_eq!(bank_forks.active_banks(), vec![1]); assert_eq!(bank_forks.active_bank_slots(), vec![1]);
} }
#[test] #[test]

View File

@ -484,6 +484,10 @@ pub mod compact_vote_state_updates {
solana_sdk::declare_id!("86HpNqzutEZwLcPxS6EHDcMNYWk6ikhteg9un7Y2PBKE"); solana_sdk::declare_id!("86HpNqzutEZwLcPxS6EHDcMNYWk6ikhteg9un7Y2PBKE");
} }
pub mod concurrent_replay_of_forks {
solana_sdk::declare_id!("9F2Dcu8xkBPKxiiy65XKPZYdCG3VZDpjDTuSmeYLozJe");
}
lazy_static! { lazy_static! {
/// Map of feature identifiers to user-visible description /// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [ pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -599,6 +603,7 @@ lazy_static! {
(loosen_cpi_size_restriction::id(), "loosen cpi size restrictions #26641"), (loosen_cpi_size_restriction::id(), "loosen cpi size restrictions #26641"),
(use_default_units_in_fee_calculation::id(), "use default units per instruction in fee calculation #26785"), (use_default_units_in_fee_calculation::id(), "use default units per instruction in fee calculation #26785"),
(compact_vote_state_updates::id(), "Compact vote state updates to lower block size"), (compact_vote_state_updates::id(), "Compact vote state updates to lower block size"),
(concurrent_replay_of_forks::id(), "Allow slots from different forks to be replayed concurrently #26465"),
/*************** ADD NEW FEATURES HERE ***************/ /*************** ADD NEW FEATURES HERE ***************/
] ]
.iter() .iter()