diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 8a29d037d..48641297f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -51,7 +51,6 @@ use { solana_measure::measure::Measure, solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, solana_program_runtime::timings::ExecuteTimings, - solana_rayon_threadlimit::get_max_thread_count, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig}, rpc_subscriptions::RpcSubscriptions, @@ -80,6 +79,7 @@ use { solana_vote_program::vote_state::VoteTransaction, std::{ collections::{HashMap, HashSet}, + num::NonZeroUsize, result, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -95,11 +95,9 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; + const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; -// Expect this number to be small enough to minimize thread pool overhead while large enough -// to be able to replay all active forks at the same time in most cases. -const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4; const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10; #[derive(PartialEq, Eq, Debug)] @@ -291,7 +289,8 @@ pub struct ReplayStageConfig { // Stops voting until this slot has been reached. Should be used to avoid // duplicate voting which can lead to slashing. pub wait_to_vote_slot: Option, - pub replay_slots_concurrently: bool, + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, } /// Timing information for the ReplayStage main processing loop @@ -574,7 +573,8 @@ impl ReplayStage { ancestor_hashes_replay_update_sender, tower_storage, wait_to_vote_slot, - replay_slots_concurrently, + replay_forks_threads, + replay_transactions_threads, } = config; trace!("replay stage"); @@ -654,19 +654,19 @@ impl ReplayStage { ) }; // Thread pool to (maybe) replay multiple threads in parallel - let replay_mode = if replay_slots_concurrently { + let replay_mode = if replay_forks_threads.get() == 1 { + ForkReplayMode::Serial + } else { let pool = rayon::ThreadPoolBuilder::new() - .num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY) + .num_threads(replay_forks_threads.get()) .thread_name(|i| format!("solReplayFork{i:02}")) .build() .expect("new rayon threadpool"); ForkReplayMode::Parallel(pool) - } else { - ForkReplayMode::Serial }; // Thread pool to replay multiple transactions within one block in parallel let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) + .num_threads(replay_transactions_threads.get()) .thread_name(|i| format!("solReplayTx{i:02}")) .build() .expect("new rayon threadpool"); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 47bc9a790..2e64fe067 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -53,6 +53,7 @@ use { std::{ collections::HashSet, net::{SocketAddr, UdpSocket}, + num::NonZeroUsize, sync::{atomic::AtomicBool, Arc, RwLock}, thread::{self, JoinHandle}, }, @@ -81,7 +82,6 @@ pub struct TvuSockets { pub ancestor_hashes_requests: UdpSocket, } -#[derive(Default)] pub struct TvuConfig { pub max_ledger_shreds: Option, pub shred_version: u16, @@ -90,7 +90,22 @@ pub struct TvuConfig { // Validators which should be given priority when serving repairs pub repair_whitelist: Arc>>, pub wait_for_vote_to_start_leader: bool, - pub replay_slots_concurrently: bool, + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, +} + +impl Default for TvuConfig { + fn default() -> Self { + Self { + max_ledger_shreds: None, + shred_version: 0, + repair_validators: None, + repair_whitelist: Arc::new(RwLock::new(HashSet::default())), + wait_for_vote_to_start_leader: false, + replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + } + } } impl Tvu { @@ -265,7 +280,8 @@ impl Tvu { ancestor_hashes_replay_update_sender, tower_storage: tower_storage.clone(), wait_to_vote_slot, - replay_slots_concurrently: tvu_config.replay_slots_concurrently, + replay_forks_threads: tvu_config.replay_forks_threads, + replay_transactions_threads: tvu_config.replay_transactions_threads, }; let (voting_sender, voting_receiver) = unbounded(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 3d2a93dae..98a267aea 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -74,6 +74,7 @@ use { poh_service::{self, PohService}, }, solana_program_runtime::runtime_config::RuntimeConfig, + solana_rayon_threadlimit::get_max_thread_count, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ @@ -123,6 +124,7 @@ use { std::{ collections::{HashMap, HashSet}, net::SocketAddr, + num::NonZeroUsize, path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -260,7 +262,6 @@ pub struct ValidatorConfig { pub wait_to_vote_slot: Option, pub ledger_column_options: LedgerColumnOptions, pub runtime_config: RuntimeConfig, - pub replay_slots_concurrently: bool, pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit, pub block_verification_method: BlockVerificationMethod, pub block_production_method: BlockProductionMethod, @@ -268,6 +269,8 @@ pub struct ValidatorConfig { pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, pub wen_restart_proto_path: Option, pub unified_scheduler_handler_threads: Option, + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, } impl Default for ValidatorConfig { @@ -328,7 +331,6 @@ impl Default for ValidatorConfig { wait_to_vote_slot: None, ledger_column_options: LedgerColumnOptions::default(), runtime_config: RuntimeConfig::default(), - replay_slots_concurrently: false, banking_trace_dir_byte_limit: 0, block_verification_method: BlockVerificationMethod::default(), block_production_method: BlockProductionMethod::default(), @@ -336,6 +338,8 @@ impl Default for ValidatorConfig { use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), wen_restart_proto_path: None, unified_scheduler_handler_threads: None, + replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), } } } @@ -346,6 +350,9 @@ impl ValidatorConfig { enforce_ulimit_nofile: false, rpc_config: JsonRpcConfig::default_for_test(), block_production_method: BlockProductionMethod::ThreadLocalMultiIterator, + replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + replay_transactions_threads: NonZeroUsize::new(get_max_thread_count()) + .expect("thread count is non-zero"), ..Self::default() } } @@ -1305,7 +1312,8 @@ impl Validator { repair_validators: config.repair_validators.clone(), repair_whitelist: config.repair_whitelist.clone(), wait_for_vote_to_start_leader, - replay_slots_concurrently: config.replay_slots_concurrently, + replay_forks_threads: config.replay_forks_threads, + replay_transactions_threads: config.replay_transactions_threads, }, &max_slots, block_metadata_notifier, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 33883bb02..450452034 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -61,7 +61,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { wait_to_vote_slot: config.wait_to_vote_slot, ledger_column_options: config.ledger_column_options.clone(), runtime_config: config.runtime_config.clone(), - replay_slots_concurrently: config.replay_slots_concurrently, banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit, block_verification_method: config.block_verification_method.clone(), block_production_method: config.block_production_method.clone(), @@ -69,6 +68,8 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, wen_restart_proto_path: config.wen_restart_proto_path.clone(), unified_scheduler_handler_threads: config.unified_scheduler_handler_threads, + replay_forks_threads: config.replay_forks_threads, + replay_transactions_threads: config.replay_transactions_threads, } } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index f127273c8..8cae6667f 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -52,6 +52,9 @@ use { std::{path::PathBuf, str::FromStr}, }; +pub mod thread_args; +use thread_args::{thread_args, DefaultThreadArgs}; + const EXCLUDE_KEY: &str = "account-index-exclude-key"; const INCLUDE_KEY: &str = "account-index-include-key"; // The default minimal snapshot download speed (bytes/second) @@ -1466,11 +1469,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .value_name("BYTES") .help("Maximum number of bytes written to the program log before truncation"), ) - .arg( - Arg::with_name("replay_slots_concurrently") - .long("replay-slots-concurrently") - .help("Allow concurrent replay of slots on different forks"), - ) .arg( Arg::with_name("banking_trace_dir_byte_limit") // expose friendly alternative name to cli than internal @@ -1555,6 +1553,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { ", ), ) + .args(&thread_args(&default_args.thread_args)) .args(&get_deprecated_arguments()) .after_help("The default subcommand is run") .subcommand( @@ -2073,6 +2072,13 @@ fn deprecated_arguments() -> Vec { .long("no-rocksdb-compaction") .takes_value(false) .help("Disable manual compaction of the ledger database")); + add_arg!( + Arg::with_name("replay_slots_concurrently") + .long("replay-slots-concurrently") + .help("Allow concurrent replay of slots on different forks") + .conflicts_with("replay_forks_threads"), + replaced_by: "replay_forks_threads", + usage_warning: "Equivalent behavior to this flag would be --replay-forks-threads 4"); add_arg!(Arg::with_name("rocksdb_compaction_interval") .long("rocksdb-compaction-interval-slots") .value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS") @@ -2195,6 +2201,8 @@ pub struct DefaultArgs { pub banking_trace_dir_byte_limit: String, pub wen_restart_path: String, + + pub thread_args: DefaultThreadArgs, } impl DefaultArgs { @@ -2277,6 +2285,7 @@ impl DefaultArgs { wait_for_restart_window_max_delinquent_stake: "5".to_string(), banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(), wen_restart_path: "wen_restart_progress.proto".to_string(), + thread_args: DefaultThreadArgs::default(), } } } diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs new file mode 100644 index 000000000..53d8cf15d --- /dev/null +++ b/validator/src/cli/thread_args.rs @@ -0,0 +1,115 @@ +//! Arguments for controlling the number of threads allocated for various tasks + +use { + clap::{value_t_or_exit, Arg, ArgMatches}, + solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, + solana_rayon_threadlimit::get_max_thread_count, + std::{num::NonZeroUsize, ops::RangeInclusive}, +}; + +// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's +pub struct DefaultThreadArgs { + pub replay_forks_threads: String, + pub replay_transactions_threads: String, +} + +impl Default for DefaultThreadArgs { + fn default() -> Self { + Self { + replay_forks_threads: ReplayForksThreadsArg::default().to_string(), + replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(), + } + } +} + +pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { + vec![ + new_thread_arg::(&defaults.replay_forks_threads), + new_thread_arg::(&defaults.replay_transactions_threads), + ] +} + +fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> { + Arg::with_name(T::NAME) + .long(T::LONG_NAME) + .takes_value(true) + .value_name("NUMBER") + .default_value(default) + .validator(|num| is_within_range(num, T::range())) + .hidden(hidden_unless_forced()) + .help(T::HELP) +} + +pub struct NumThreadConfig { + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, +} + +pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { + NumThreadConfig { + replay_forks_threads: if matches.is_present("replay_slots_concurrently") { + NonZeroUsize::new(4).expect("4 is non-zero") + } else { + value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize) + }, + replay_transactions_threads: value_t_or_exit!( + matches, + ReplayTransactionsThreadsArg::NAME, + NonZeroUsize + ), + } +} + +/// Configuration for CLAP arguments that control the number of threads for various functions +trait ThreadArg { + /// The argument's name + const NAME: &'static str; + /// The argument's long name + const LONG_NAME: &'static str; + /// The argument's help message + const HELP: &'static str; + + /// The default number of threads + fn default() -> usize; + /// The minimum allowed number of threads (inclusive) + fn min() -> usize { + 1 + } + /// The maximum allowed number of threads (inclusive) + fn max() -> usize { + // By default, no thread pool should scale over the number of the machine's threads + get_max_thread_count() + } + /// The range of allowed number of threads (inclusive on both ends) + fn range() -> RangeInclusive { + RangeInclusive::new(Self::min(), Self::max()) + } +} + +struct ReplayForksThreadsArg; +impl ThreadArg for ReplayForksThreadsArg { + const NAME: &'static str = "replay_forks_threads"; + const LONG_NAME: &'static str = "replay-forks-threads"; + const HELP: &'static str = "Number of threads to use for replay of blocks on different forks"; + + fn default() -> usize { + // Default to single threaded fork execution + 1 + } + fn max() -> usize { + // Choose a value that is small enough to limit the overhead of having a large thread pool + // while also being large enough to allow replay of all active forks in most scenarios + 4 + } +} + +struct ReplayTransactionsThreadsArg; +impl ThreadArg for ReplayTransactionsThreadsArg { + const NAME: &'static str = "replay_transactions_threads"; + const LONG_NAME: &'static str = "replay-transactions-threads"; + const HELP: &'static str = "Number of threads to use for transaction replay"; + + fn default() -> usize { + get_max_thread_count() + } +} diff --git a/validator/src/main.rs b/validator/src/main.rs index cadd27590..560500319 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -6,7 +6,7 @@ use { admin_rpc_service, admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides}, bootstrap, - cli::{app, warn_for_deprecated_arguments, DefaultArgs}, + cli::{self, app, warn_for_deprecated_arguments, DefaultArgs}, dashboard::Dashboard, ledger_lockfile, lock_ledger, new_spinner_progress_bar, println_name_value, redirect_stderr_to_file, @@ -1331,6 +1331,11 @@ pub fn main() { let full_api = matches.is_present("full_rpc_api"); + let cli::thread_args::NumThreadConfig { + replay_forks_threads, + replay_transactions_threads, + } = cli::thread_args::parse_num_threads_args(&matches); + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage, @@ -1464,12 +1469,13 @@ pub fn main() { ..RuntimeConfig::default() }, staked_nodes_overrides: staked_nodes_overrides.clone(), - replay_slots_concurrently: matches.is_present("replay_slots_concurrently"), use_snapshot_archives_at_startup: value_t_or_exit!( matches, use_snapshot_archives_at_startup::cli::NAME, UseSnapshotArchivesAtStartup ), + replay_forks_threads, + replay_transactions_threads, ..ValidatorConfig::default() };