From fc6cee9c068df6cc3a5850b6152695ea3ae72438 Mon Sep 17 00:00:00 2001 From: janlegner <32453746+janlegner@users.noreply.github.com> Date: Thu, 11 Aug 2022 23:34:04 +0200 Subject: [PATCH] allow staked nodes weight override (#26870) * Allowed staked nodes weight override (#26407) * Allowed staked nodes weight override, passing only HashMap over to core module Co-authored-by: Ondra Chaloupka --- Cargo.lock | 1 + core/src/staked_nodes_updater_service.rs | 48 ++++++++++++- core/src/tpu.rs | 5 +- core/src/validator.rs | 3 + local-cluster/src/validator_configs.rs | 1 + programs/bpf/Cargo.lock | 1 + test-validator/src/lib.rs | 4 ++ validator/Cargo.toml | 1 + validator/src/admin_rpc_service.rs | 62 ++++++++++++++++- validator/src/bin/solana-test-validator.rs | 1 + validator/src/main.rs | 78 +++++++++++++++++++++- 11 files changed, 199 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 415c4c0c6..250918eb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6515,6 +6515,7 @@ dependencies = [ "rayon", "serde", "serde_json", + "serde_yaml", "signal-hook", "solana-clap-utils", "solana-cli-config", diff --git a/core/src/staked_nodes_updater_service.rs b/core/src/staked_nodes_updater_service.rs index 06ca95522..97b9914ea 100644 --- a/core/src/staked_nodes_updater_service.rs +++ b/core/src/staked_nodes_updater_service.rs @@ -8,7 +8,7 @@ use { net::IpAddr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, + Arc, RwLock, RwLockReadGuard, }, thread::{self, sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -27,12 +27,14 @@ impl StakedNodesUpdaterService { cluster_info: Arc, bank_forks: Arc>, shared_staked_nodes: Arc>, + shared_staked_nodes_overrides: Arc>>, ) -> Self { let thread_hdl = Builder::new() .name("sol-sn-updater".to_string()) .spawn(move || { let mut last_stakes = Instant::now(); while !exit.load(Ordering::Relaxed) { + let overrides = shared_staked_nodes_overrides.read().unwrap(); let mut new_ip_to_stake = HashMap::new(); let mut new_id_to_stake = HashMap::new(); let mut total_stake = 0; @@ -47,6 +49,7 @@ impl StakedNodesUpdaterService { &mut min_stake, &bank_forks, &cluster_info, + &overrides, ) { let mut shared = shared_staked_nodes.write().unwrap(); shared.total_stake = total_stake; @@ -69,6 +72,7 @@ impl StakedNodesUpdaterService { min_stake: &mut u64, bank_forks: &RwLock, cluster_info: &ClusterInfo, + overrides: &RwLockReadGuard>, ) -> bool { if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION { let root_bank = bank_forks.read().unwrap().root_bank(); @@ -96,6 +100,14 @@ impl StakedNodesUpdaterService { Some((node.tvu.ip(), *stake)) }) .collect(); + Self::override_stake( + cluster_info, + total_stake, + id_to_stake, + ip_to_stake, + overrides, + ); + *last_stakes = Instant::now(); true } else { @@ -104,6 +116,40 @@ impl StakedNodesUpdaterService { } } + fn override_stake( + cluster_info: &ClusterInfo, + total_stake: &mut u64, + id_to_stake_map: &mut HashMap, + ip_to_stake_map: &mut HashMap, + staked_map_overrides: &HashMap, + ) { + for (id_override, stake_override) in staked_map_overrides.iter() { + if let Some(ip_override) = + cluster_info + .all_peers() + .into_iter() + .find_map(|(node, _seen_time)| { + if node.id == *id_override { + return Some(node.tvu.ip()); + } + None + }) + { + if let Some(previous_stake) = id_to_stake_map.get(id_override) { + *total_stake -= previous_stake; + } + *total_stake += stake_override; + id_to_stake_map.insert(*id_override, *stake_override); + ip_to_stake_map.insert(ip_override, *stake_override); + } else { + error!( + "staked nodes overrides configuration for id {} with stake {} does not match existing IP. Skipping", + id_override, stake_override + ); + } + } + } + pub fn join(self) -> thread::Result<()> { self.thread_hdl.join() } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index cfd546869..e969ba90e 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -29,12 +29,13 @@ use { cost_model::CostModel, vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, }, - solana_sdk::signature::Keypair, + solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_streamer::{ quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, streamer::StakedNodes, }, std::{ + collections::HashMap, net::UdpSocket, sync::{atomic::AtomicBool, Arc, RwLock}, thread, @@ -98,6 +99,7 @@ impl Tpu { log_messages_bytes_limit: Option, enable_quic_servers: bool, staked_nodes: &Arc>, + shared_staked_nodes_overrides: Arc>>, ) -> Self { let TpuSockets { transactions: transactions_sockets, @@ -130,6 +132,7 @@ impl Tpu { cluster_info.clone(), bank_forks.clone(), staked_nodes.clone(), + shared_staked_nodes_overrides, ); let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 119c7d8a4..1ed2af259 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -168,6 +168,7 @@ pub struct ValidatorConfig { pub accounts_db_test_hash_calculation: bool, pub accounts_db_skip_shrink: bool, pub tpu_coalesce_ms: u64, + pub staked_nodes_overrides: Arc>>, pub validator_exit: Arc>, pub no_wait_for_vote_to_start_leader: bool, pub accounts_shrink_ratio: AccountShrinkThreshold, @@ -230,6 +231,7 @@ impl Default for ValidatorConfig { accounts_db_test_hash_calculation: false, accounts_db_skip_shrink: false, tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS, + staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())), validator_exit: Arc::new(RwLock::new(Exit::default())), no_wait_for_vote_to_start_leader: true, accounts_shrink_ratio: AccountShrinkThreshold::default(), @@ -1038,6 +1040,7 @@ impl Validator { config.runtime_config.log_messages_bytes_limit, config.enable_quic_servers, &staked_nodes, + config.staked_nodes_overrides.clone(), ); datapoint_info!( diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index e717b4681..4c3b281cb 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -56,6 +56,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation, accounts_db_skip_shrink: config.accounts_db_skip_shrink, tpu_coalesce_ms: config.tpu_coalesce_ms, + staked_nodes_overrides: config.staked_nodes_overrides.clone(), validator_exit: Arc::new(RwLock::new(Exit::default())), poh_hashes_per_batch: config.poh_hashes_per_batch, no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader, diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 824a2d3be..86b764efa 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -5805,6 +5805,7 @@ dependencies = [ "rayon", "serde", "serde_json", + "serde_yaml", "signal-hook", "solana-clap-utils", "solana-cli-config", diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index 73f4f7005..c5eea8203 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::integer_arithmetic)] + use { log::*, solana_cli_output::CliAccount, @@ -115,6 +116,7 @@ pub struct TestValidatorGenesis { pub validator_exit: Arc>, pub start_progress: Arc>, pub authorized_voter_keypairs: Arc>>>, + pub staked_nodes_overrides: Arc>>, pub max_ledger_shreds: Option, pub max_genesis_archive_unpacked_size: Option, pub geyser_plugin_config_files: Option>, @@ -144,6 +146,7 @@ impl Default for TestValidatorGenesis { validator_exit: Arc::>::default(), start_progress: Arc::>::default(), authorized_voter_keypairs: Arc::>>>::default(), + staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())), max_ledger_shreds: Option::::default(), max_genesis_archive_unpacked_size: Option::::default(), geyser_plugin_config_files: Option::>::default(), @@ -785,6 +788,7 @@ impl TestValidator { rocksdb_compaction_interval: Some(100), // Compact every 100 slots max_ledger_shreds: config.max_ledger_shreds, no_wait_for_vote_to_start_leader: true, + staked_nodes_overrides: config.staked_nodes_overrides.clone(), accounts_db_config, runtime_config, ..ValidatorConfig::default_for_test() diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 0478a7bfb..106bc27d6 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -29,6 +29,7 @@ rand = "0.7.0" rayon = "1.5.3" serde = "1.0.143" serde_json = "1.0.83" +serde_yaml = "0.8.26" solana-clap-utils = { path = "../clap-utils", version = "=1.12.0" } solana-cli-config = { path = "../cli-config", version = "=1.12.0" } solana-client = { path = "../client", version = "=1.12.0" } diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 2c32cc8e2..ab61c2ee6 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -5,7 +5,7 @@ use { jsonrpc_ipc_server::{RequestContext, ServerBuilder}, jsonrpc_server_utils::tokio, log::*, - serde::{Deserialize, Serialize}, + serde::{de::Deserializer, Deserialize, Serialize}, solana_core::{ consensus::Tower, tower_storage::TowerStorage, validator::ValidatorStartProgress, }, @@ -17,6 +17,8 @@ use { signature::{read_keypair_file, Keypair, Signer}, }, std::{ + collections::HashMap, + error, fmt::{self, Display}, net::SocketAddr, path::{Path, PathBuf}, @@ -41,6 +43,7 @@ pub struct AdminRpcRequestMetadata { pub validator_exit: Arc>, pub authorized_voter_keypairs: Arc>>>, pub tower_storage: Arc, + pub staked_nodes_overrides: Arc>>, pub post_init: Arc>>, } impl Metadata for AdminRpcRequestMetadata {} @@ -175,6 +178,9 @@ pub trait AdminRpc { require_tower: bool, ) -> Result<()>; + #[rpc(meta, name = "setStakedNodesOverrides")] + fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>; + #[rpc(meta, name = "contactInfo")] fn contact_info(&self, meta: Self::Metadata) -> Result; } @@ -294,6 +300,24 @@ impl AdminRpc for AdminRpcImpl { AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower) } + fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> { + let loaded_config = load_staked_nodes_overrides(&path) + .map_err(|err| { + error!( + "Failed to load staked nodes overrides from {}: {}", + &path, err + ); + jsonrpc_core::error::Error::internal_error() + })? + .staked_map_id; + let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap(); + write_staked_nodes.clear(); + write_staked_nodes.extend(loaded_config.into_iter()); + info!("Staked nodes overrides loaded from {}", path); + debug!("overrides map: {:?}", write_staked_nodes); + Ok(()) + } + fn contact_info(&self, meta: Self::Metadata) -> Result { meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into())) } @@ -426,3 +450,39 @@ pub async fn connect(ledger_path: &Path) -> std::result::Result jsonrpc_server_utils::tokio::runtime::Runtime { jsonrpc_server_utils::tokio::runtime::Runtime::new().expect("new tokio runtime") } + +#[derive(Default, Deserialize, Clone)] +pub struct StakedNodesOverrides { + #[serde(deserialize_with = "deserialize_pubkey_map")] + pub staked_map_id: HashMap, +} + +pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + let container: HashMap = serde::Deserialize::deserialize(des)?; + let mut container_typed: HashMap = HashMap::new(); + for (key, value) in container.iter() { + let typed_key = Pubkey::try_from(key.as_str()) + .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?; + container_typed.insert(typed_key, *value); + } + Ok(container_typed) +} + +pub fn load_staked_nodes_overrides( + path: &String, +) -> std::result::Result> { + debug!("Loading staked nodes overrides configuration from {}", path); + if Path::new(&path).exists() { + let file = std::fs::File::open(path)?; + Ok(serde_yaml::from_reader(file)?) + } else { + Err(format!( + "Staked nodes overrides provided '{}' a non-existing file path.", + path + ) + .into()) + } +} diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 84f032e96..aec84f88f 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -702,6 +702,7 @@ fn main() { start_time: std::time::SystemTime::now(), validator_exit: genesis.validator_exit.clone(), authorized_voter_keypairs: genesis.authorized_voter_keypairs.clone(), + staked_nodes_overrides: genesis.staked_nodes_overrides.clone(), post_init: admin_service_post_init.clone(), tower_storage: tower_storage.clone(), }, diff --git a/validator/src/main.rs b/validator/src/main.rs index c8b3fac1b..6d3551350 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -73,8 +73,12 @@ use { }, solana_streamer::socket::SocketAddrSpace, solana_validator::{ - admin_rpc_service, bootstrap, dashboard::Dashboard, ledger_lockfile, lock_ledger, - new_spinner_progress_bar, println_name_value, redirect_stderr_to_file, + admin_rpc_service, + admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides}, + bootstrap, + dashboard::Dashboard, + ledger_lockfile, lock_ledger, new_spinner_progress_bar, println_name_value, + redirect_stderr_to_file, }, std::{ collections::{HashSet, VecDeque}, @@ -1233,7 +1237,18 @@ pub fn main() { .takes_value(true) .default_value(default_tpu_connection_pool_size) .validator(is_parsable::) - .help("Controls the TPU connection pool size per remote addresss"), + .help("Controls the TPU connection pool size per remote address"), + ) + .arg( + Arg::with_name("staked_nodes_overrides") + .long("staked-nodes-overrides") + .value_name("PATH") + .takes_value(true) + .help("Provide path to a yaml file with custom overrides for stakes of specific + identities. Overriding the amount of stake this validator considers + as valid for other peers in network. The stake amount is used for calculating + number of QUIC streams permitted from the peer and vote packet sender stage. + Format of the file: `staked_map_id: {: }"), ) .arg( Arg::with_name("rocksdb_max_compaction_jitter") @@ -1924,6 +1939,19 @@ pub fn main() { ) .after_help("Note: the new filter only applies to the currently running validator instance") ) + .subcommand( + SubCommand::with_name("staked-nodes-overrides") + .about("Overrides stakes of specific node identities.") + .arg( + Arg::with_name("path") + .value_name("PATH") + .takes_value(true) + .required(true) + .help("Provide path to a file with custom overrides for stakes of specific validator identities."), + ) + .after_help("Note: the new staked nodes overrides only applies to the \ + currently running validator instance") + ) .subcommand( SubCommand::with_name("wait-for-restart-window") .about("Monitor the validator for a good time to restart") @@ -2108,6 +2136,30 @@ pub fn main() { monitor_validator(&ledger_path); return; } + ("staked-nodes-overrides", Some(subcommand_matches)) => { + if !subcommand_matches.is_present("path") { + println!( + "staked-nodes-overrides requires argument of location of the configuration" + ); + exit(1); + } + + let path = subcommand_matches.value_of("path").unwrap(); + + let admin_client = admin_rpc_service::connect(&ledger_path); + admin_rpc_service::runtime() + .block_on(async move { + admin_client + .await? + .set_staked_nodes_overrides(path.to_string()) + .await + }) + .unwrap_or_else(|err| { + println!("setStakedNodesOverrides request failed: {}", err); + exit(1); + }); + return; + } ("set-identity", Some(subcommand_matches)) => { let require_tower = subcommand_matches.is_present("require_tower"); @@ -2238,6 +2290,24 @@ pub fn main() { }); let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs)); + let staked_nodes_overrides_path = matches + .value_of("staked_nodes_overrides") + .map(str::to_string); + let staked_nodes_overrides = Arc::new(RwLock::new( + match staked_nodes_overrides_path { + None => StakedNodesOverrides::default(), + Some(p) => load_staked_nodes_overrides(&p).unwrap_or_else(|err| { + error!("Failed to load stake-nodes-overrides from {}: {}", &p, err); + clap::Error::with_description( + "Failed to load configuration of stake-nodes-overrides argument", + clap::ErrorKind::InvalidValue, + ) + .exit() + }), + } + .staked_map_id, + )); + let init_complete_file = matches.value_of("init_complete_file"); if matches.is_present("no_check_vote_account") { @@ -2680,6 +2750,7 @@ pub fn main() { ..RuntimeConfig::default() }, enable_quic_servers, + staked_nodes_overrides: staked_nodes_overrides.clone(), ..ValidatorConfig::default() }; @@ -2950,6 +3021,7 @@ pub fn main() { authorized_voter_keypairs: authorized_voter_keypairs.clone(), post_init: admin_service_post_init.clone(), tower_storage: validator_config.tower_storage.clone(), + staked_nodes_overrides, }, );