Switch concurrent replay from feature to param (#27401)

* Switch concurrent replay from feature to param
This commit is contained in:
Brennan Watt 2022-08-26 12:36:02 -07:00 committed by GitHub
parent 713e86e877
commit 46a48760db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 41 additions and 57 deletions

View File

@ -165,6 +165,7 @@ pub struct ReplayStageConfig {
// Stops voting until this slot has been reached. Should be used to avoid
// duplicate voting which can lead to slashing.
pub wait_to_vote_slot: Option<Slot>,
pub replay_slots_concurrently: bool,
}
#[derive(Default)]
@ -424,6 +425,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender,
tower_storage,
wait_to_vote_slot,
replay_slots_concurrently,
} = config;
trace!("replay stage");
@ -528,7 +530,8 @@ impl ReplayStage {
&ancestor_hashes_replay_update_sender,
block_metadata_notifier.clone(),
&mut replay_timing,
log_messages_bytes_limit
log_messages_bytes_limit,
replay_slots_concurrently
);
replay_active_banks_time.stop();
@ -2603,7 +2606,8 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
) -> bool {
replay_slots_concurrently: bool,
) -> bool /* completed a bank */ {
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
let num_active_banks = active_bank_slots.len();
trace!(
@ -2612,49 +2616,8 @@ impl ReplayStage {
active_bank_slots
);
if num_active_banks > 0 {
let replay_result_vec = if num_active_banks > 1 {
if bank_forks
.read()
.unwrap()
.get(active_bank_slots[0])
.unwrap()
.concurrent_replay_of_forks()
{
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
&active_bank_slots,
)
} else {
active_bank_slots
.iter()
.map(|bank_slot| {
Self::replay_active_bank(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
*bank_slot,
)
})
.collect()
}
} else {
vec![Self::replay_active_bank(
let replay_result_vec = if num_active_banks > 1 && replay_slots_concurrently {
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
my_pubkey,
@ -2665,8 +2628,27 @@ impl ReplayStage {
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
active_bank_slots[0],
)]
&active_bank_slots,
)
} else {
active_bank_slots
.iter()
.map(|bank_slot| {
Self::replay_active_bank(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
*bank_slot,
)
})
.collect()
};
Self::process_replay_results(

View File

@ -84,6 +84,7 @@ pub struct TvuConfig {
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub wait_for_vote_to_start_leader: bool,
pub replay_slots_concurrently: bool,
}
impl Tvu {
@ -237,6 +238,7 @@ impl Tvu {
ancestor_hashes_replay_update_sender,
tower_storage: tower_storage.clone(),
wait_to_vote_slot,
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
};
let (voting_sender, voting_receiver) = unbounded();

View File

@ -175,6 +175,7 @@ pub struct ValidatorConfig {
pub wait_to_vote_slot: Option<Slot>,
pub ledger_column_options: LedgerColumnOptions,
pub runtime_config: RuntimeConfig,
pub replay_slots_concurrently: bool,
}
impl Default for ValidatorConfig {
@ -238,6 +239,7 @@ impl Default for ValidatorConfig {
wait_to_vote_slot: None,
ledger_column_options: LedgerColumnOptions::default(),
runtime_config: RuntimeConfig::default(),
replay_slots_concurrently: false,
}
}
}
@ -982,6 +984,7 @@ impl Validator {
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
wait_for_vote_to_start_leader,
replay_slots_concurrently: config.replay_slots_concurrently,
},
&max_slots,
&cost_model,

View File

@ -65,6 +65,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
wait_to_vote_slot: config.wait_to_vote_slot,
ledger_column_options: config.ledger_column_options.clone(),
runtime_config: config.runtime_config.clone(),
replay_slots_concurrently: config.replay_slots_concurrently,
}
}

View File

@ -7523,11 +7523,6 @@ impl Bank {
.is_active(&feature_set::preserve_rent_epoch_for_rent_exempt_accounts::id())
}
pub fn concurrent_replay_of_forks(&self) -> bool {
self.feature_set
.is_active(&feature_set::concurrent_replay_of_forks::id())
}
pub fn read_cost_tracker(&self) -> LockResult<RwLockReadGuard<CostTracker>> {
self.cost_tracker.read()
}

View File

@ -486,10 +486,6 @@ pub mod sign_repair_requests {
solana_sdk::declare_id!("sigrs6u1EWeHuoKFkY8RR7qcSsPmrAeBBPESyf5pnYe");
}
pub mod concurrent_replay_of_forks {
solana_sdk::declare_id!("9F2Dcu8xkBPKxiiy65XKPZYdCG3VZDpjDTuSmeYLozJe");
}
pub mod check_ping_ancestor_requests {
solana_sdk::declare_id!("AXLB87anNaUQtqBSsxkm4gvNzYY985aLtNtpJC94uWLJ");
}
@ -630,7 +626,6 @@ lazy_static! {
(use_default_units_in_fee_calculation::id(), "use default units per instruction in fee calculation #26785"),
(compact_vote_state_updates::id(), "Compact vote state updates to lower block size"),
(sign_repair_requests::id(), "sign repair requests #26834"),
(concurrent_replay_of_forks::id(), "Allow slots from different forks to be replayed concurrently #26465"),
(check_ping_ancestor_requests::id(), "ancestor hash repair socket ping/pong support #26963"),
(incremental_snapshot_only_incremental_hash_calculation::id(), "only hash accounts in incremental snapshot during incremental snapshot creation #26799"),
(disable_cpi_setting_executable_and_rent_epoch::id(), "disable setting is_executable and_rent_epoch in CPI #26987"),

View File

@ -1819,6 +1819,11 @@ pub fn main() {
.value_name("BYTES")
.help("Maximum number of bytes written to the program log before truncation")
)
.arg(
Arg::with_name("replay_slots_concurrently")
.long("replay-slots-concurrently")
.help("Allow concurrent replay of slots on different forks")
)
.after_help("The default subcommand is run")
.subcommand(
SubCommand::with_name("exit")
@ -2759,6 +2764,7 @@ pub fn main() {
..RuntimeConfig::default()
},
staked_nodes_overrides: staked_nodes_overrides.clone(),
replay_slots_concurrently: matches.is_present("replay_slots_concurrently"),
..ValidatorConfig::default()
};