allow staked nodes weight override (#26870)

* Allowed staked nodes weight override (#26407)

* Allowed staked nodes weight override, passing only HashMap over to core module

Co-authored-by: Ondra Chaloupka <chalda@chainkeepers.io>
This commit is contained in:
janlegner 2022-08-11 23:34:04 +02:00 committed by GitHub
parent da4028b24f
commit fc6cee9c06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 199 additions and 6 deletions

1
Cargo.lock generated
View File

@ -6515,6 +6515,7 @@ dependencies = [
"rayon",
"serde",
"serde_json",
"serde_yaml",
"signal-hook",
"solana-clap-utils",
"solana-cli-config",

View File

@ -8,7 +8,7 @@ use {
net::IpAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
Arc, RwLock, RwLockReadGuard,
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
@ -27,12 +27,14 @@ impl StakedNodesUpdaterService {
cluster_info: Arc<ClusterInfo>,
bank_forks: Arc<RwLock<BankForks>>,
shared_staked_nodes: Arc<RwLock<StakedNodes>>,
shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
) -> Self {
let thread_hdl = Builder::new()
.name("sol-sn-updater".to_string())
.spawn(move || {
let mut last_stakes = Instant::now();
while !exit.load(Ordering::Relaxed) {
let overrides = shared_staked_nodes_overrides.read().unwrap();
let mut new_ip_to_stake = HashMap::new();
let mut new_id_to_stake = HashMap::new();
let mut total_stake = 0;
@ -47,6 +49,7 @@ impl StakedNodesUpdaterService {
&mut min_stake,
&bank_forks,
&cluster_info,
&overrides,
) {
let mut shared = shared_staked_nodes.write().unwrap();
shared.total_stake = total_stake;
@ -69,6 +72,7 @@ impl StakedNodesUpdaterService {
min_stake: &mut u64,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
overrides: &RwLockReadGuard<HashMap<Pubkey, u64>>,
) -> bool {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
@ -96,6 +100,14 @@ impl StakedNodesUpdaterService {
Some((node.tvu.ip(), *stake))
})
.collect();
Self::override_stake(
cluster_info,
total_stake,
id_to_stake,
ip_to_stake,
overrides,
);
*last_stakes = Instant::now();
true
} else {
@ -104,6 +116,40 @@ impl StakedNodesUpdaterService {
}
}
fn override_stake(
cluster_info: &ClusterInfo,
total_stake: &mut u64,
id_to_stake_map: &mut HashMap<Pubkey, u64>,
ip_to_stake_map: &mut HashMap<IpAddr, u64>,
staked_map_overrides: &HashMap<Pubkey, u64>,
) {
for (id_override, stake_override) in staked_map_overrides.iter() {
if let Some(ip_override) =
cluster_info
.all_peers()
.into_iter()
.find_map(|(node, _seen_time)| {
if node.id == *id_override {
return Some(node.tvu.ip());
}
None
})
{
if let Some(previous_stake) = id_to_stake_map.get(id_override) {
*total_stake -= previous_stake;
}
*total_stake += stake_override;
id_to_stake_map.insert(*id_override, *stake_override);
ip_to_stake_map.insert(ip_override, *stake_override);
} else {
error!(
"staked nodes overrides configuration for id {} with stake {} does not match existing IP. Skipping",
id_override, stake_override
);
}
}
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}

View File

@ -29,12 +29,13 @@ use {
cost_model::CostModel,
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
},
solana_sdk::signature::Keypair,
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::{
quic::{spawn_server, StreamStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
std::{
collections::HashMap,
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock},
thread,
@ -98,6 +99,7 @@ impl Tpu {
log_messages_bytes_limit: Option<usize>,
enable_quic_servers: bool,
staked_nodes: &Arc<RwLock<StakedNodes>>,
shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
) -> Self {
let TpuSockets {
transactions: transactions_sockets,
@ -130,6 +132,7 @@ impl Tpu {
cluster_info.clone(),
bank_forks.clone(),
staked_nodes.clone(),
shared_staked_nodes_overrides,
);
let (find_packet_sender_stake_sender, find_packet_sender_stake_receiver) = unbounded();

View File

@ -168,6 +168,7 @@ pub struct ValidatorConfig {
pub accounts_db_test_hash_calculation: bool,
pub accounts_db_skip_shrink: bool,
pub tpu_coalesce_ms: u64,
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
pub validator_exit: Arc<RwLock<Exit>>,
pub no_wait_for_vote_to_start_leader: bool,
pub accounts_shrink_ratio: AccountShrinkThreshold,
@ -230,6 +231,7 @@ impl Default for ValidatorConfig {
accounts_db_test_hash_calculation: false,
accounts_db_skip_shrink: false,
tpu_coalesce_ms: DEFAULT_TPU_COALESCE_MS,
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
validator_exit: Arc::new(RwLock::new(Exit::default())),
no_wait_for_vote_to_start_leader: true,
accounts_shrink_ratio: AccountShrinkThreshold::default(),
@ -1038,6 +1040,7 @@ impl Validator {
config.runtime_config.log_messages_bytes_limit,
config.enable_quic_servers,
&staked_nodes,
config.staked_nodes_overrides.clone(),
);
datapoint_info!(

View File

@ -56,6 +56,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
accounts_db_test_hash_calculation: config.accounts_db_test_hash_calculation,
accounts_db_skip_shrink: config.accounts_db_skip_shrink,
tpu_coalesce_ms: config.tpu_coalesce_ms,
staked_nodes_overrides: config.staked_nodes_overrides.clone(),
validator_exit: Arc::new(RwLock::new(Exit::default())),
poh_hashes_per_batch: config.poh_hashes_per_batch,
no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader,

View File

@ -5805,6 +5805,7 @@ dependencies = [
"rayon",
"serde",
"serde_json",
"serde_yaml",
"signal-hook",
"solana-clap-utils",
"solana-cli-config",

View File

@ -1,4 +1,5 @@
#![allow(clippy::integer_arithmetic)]
use {
log::*,
solana_cli_output::CliAccount,
@ -115,6 +116,7 @@ pub struct TestValidatorGenesis {
pub validator_exit: Arc<RwLock<Exit>>,
pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
pub max_ledger_shreds: Option<u64>,
pub max_genesis_archive_unpacked_size: Option<u64>,
pub geyser_plugin_config_files: Option<Vec<PathBuf>>,
@ -144,6 +146,7 @@ impl Default for TestValidatorGenesis {
validator_exit: Arc::<RwLock<Exit>>::default(),
start_progress: Arc::<RwLock<ValidatorStartProgress>>::default(),
authorized_voter_keypairs: Arc::<RwLock<Vec<Arc<Keypair>>>>::default(),
staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
max_ledger_shreds: Option::<u64>::default(),
max_genesis_archive_unpacked_size: Option::<u64>::default(),
geyser_plugin_config_files: Option::<Vec<PathBuf>>::default(),
@ -785,6 +788,7 @@ impl TestValidator {
rocksdb_compaction_interval: Some(100), // Compact every 100 slots
max_ledger_shreds: config.max_ledger_shreds,
no_wait_for_vote_to_start_leader: true,
staked_nodes_overrides: config.staked_nodes_overrides.clone(),
accounts_db_config,
runtime_config,
..ValidatorConfig::default_for_test()

View File

@ -29,6 +29,7 @@ rand = "0.7.0"
rayon = "1.5.3"
serde = "1.0.143"
serde_json = "1.0.83"
serde_yaml = "0.8.26"
solana-clap-utils = { path = "../clap-utils", version = "=1.12.0" }
solana-cli-config = { path = "../cli-config", version = "=1.12.0" }
solana-client = { path = "../client", version = "=1.12.0" }

View File

@ -5,7 +5,7 @@ use {
jsonrpc_ipc_server::{RequestContext, ServerBuilder},
jsonrpc_server_utils::tokio,
log::*,
serde::{Deserialize, Serialize},
serde::{de::Deserializer, Deserialize, Serialize},
solana_core::{
consensus::Tower, tower_storage::TowerStorage, validator::ValidatorStartProgress,
},
@ -17,6 +17,8 @@ use {
signature::{read_keypair_file, Keypair, Signer},
},
std::{
collections::HashMap,
error,
fmt::{self, Display},
net::SocketAddr,
path::{Path, PathBuf},
@ -41,6 +43,7 @@ pub struct AdminRpcRequestMetadata {
pub validator_exit: Arc<RwLock<Exit>>,
pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
pub tower_storage: Arc<dyn TowerStorage>,
pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
}
impl Metadata for AdminRpcRequestMetadata {}
@ -175,6 +178,9 @@ pub trait AdminRpc {
require_tower: bool,
) -> Result<()>;
#[rpc(meta, name = "setStakedNodesOverrides")]
fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
#[rpc(meta, name = "contactInfo")]
fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
}
@ -294,6 +300,24 @@ impl AdminRpc for AdminRpcImpl {
AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
}
fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
let loaded_config = load_staked_nodes_overrides(&path)
.map_err(|err| {
error!(
"Failed to load staked nodes overrides from {}: {}",
&path, err
);
jsonrpc_core::error::Error::internal_error()
})?
.staked_map_id;
let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
write_staked_nodes.clear();
write_staked_nodes.extend(loaded_config.into_iter());
info!("Staked nodes overrides loaded from {}", path);
debug!("overrides map: {:?}", write_staked_nodes);
Ok(())
}
fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
}
@ -426,3 +450,39 @@ pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Clie
pub fn runtime() -> jsonrpc_server_utils::tokio::runtime::Runtime {
jsonrpc_server_utils::tokio::runtime::Runtime::new().expect("new tokio runtime")
}
#[derive(Default, Deserialize, Clone)]
pub struct StakedNodesOverrides {
#[serde(deserialize_with = "deserialize_pubkey_map")]
pub staked_map_id: HashMap<Pubkey, u64>,
}
pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
where
D: Deserializer<'de>,
{
let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
for (key, value) in container.iter() {
let typed_key = Pubkey::try_from(key.as_str())
.map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
container_typed.insert(typed_key, *value);
}
Ok(container_typed)
}
pub fn load_staked_nodes_overrides(
path: &String,
) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
debug!("Loading staked nodes overrides configuration from {}", path);
if Path::new(&path).exists() {
let file = std::fs::File::open(path)?;
Ok(serde_yaml::from_reader(file)?)
} else {
Err(format!(
"Staked nodes overrides provided '{}' a non-existing file path.",
path
)
.into())
}
}

View File

@ -702,6 +702,7 @@ fn main() {
start_time: std::time::SystemTime::now(),
validator_exit: genesis.validator_exit.clone(),
authorized_voter_keypairs: genesis.authorized_voter_keypairs.clone(),
staked_nodes_overrides: genesis.staked_nodes_overrides.clone(),
post_init: admin_service_post_init.clone(),
tower_storage: tower_storage.clone(),
},

View File

@ -73,8 +73,12 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
solana_validator::{
admin_rpc_service, bootstrap, dashboard::Dashboard, ledger_lockfile, lock_ledger,
new_spinner_progress_bar, println_name_value, redirect_stderr_to_file,
admin_rpc_service,
admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides},
bootstrap,
dashboard::Dashboard,
ledger_lockfile, lock_ledger, new_spinner_progress_bar, println_name_value,
redirect_stderr_to_file,
},
std::{
collections::{HashSet, VecDeque},
@ -1233,7 +1237,18 @@ pub fn main() {
.takes_value(true)
.default_value(default_tpu_connection_pool_size)
.validator(is_parsable::<usize>)
.help("Controls the TPU connection pool size per remote addresss"),
.help("Controls the TPU connection pool size per remote address"),
)
.arg(
Arg::with_name("staked_nodes_overrides")
.long("staked-nodes-overrides")
.value_name("PATH")
.takes_value(true)
.help("Provide path to a yaml file with custom overrides for stakes of specific
identities. Overriding the amount of stake this validator considers
as valid for other peers in network. The stake amount is used for calculating
number of QUIC streams permitted from the peer and vote packet sender stage.
Format of the file: `staked_map_id: {<pubkey>: <SOL stake amount>}"),
)
.arg(
Arg::with_name("rocksdb_max_compaction_jitter")
@ -1924,6 +1939,19 @@ pub fn main() {
)
.after_help("Note: the new filter only applies to the currently running validator instance")
)
.subcommand(
SubCommand::with_name("staked-nodes-overrides")
.about("Overrides stakes of specific node identities.")
.arg(
Arg::with_name("path")
.value_name("PATH")
.takes_value(true)
.required(true)
.help("Provide path to a file with custom overrides for stakes of specific validator identities."),
)
.after_help("Note: the new staked nodes overrides only applies to the \
currently running validator instance")
)
.subcommand(
SubCommand::with_name("wait-for-restart-window")
.about("Monitor the validator for a good time to restart")
@ -2108,6 +2136,30 @@ pub fn main() {
monitor_validator(&ledger_path);
return;
}
("staked-nodes-overrides", Some(subcommand_matches)) => {
if !subcommand_matches.is_present("path") {
println!(
"staked-nodes-overrides requires argument of location of the configuration"
);
exit(1);
}
let path = subcommand_matches.value_of("path").unwrap();
let admin_client = admin_rpc_service::connect(&ledger_path);
admin_rpc_service::runtime()
.block_on(async move {
admin_client
.await?
.set_staked_nodes_overrides(path.to_string())
.await
})
.unwrap_or_else(|err| {
println!("setStakedNodesOverrides request failed: {}", err);
exit(1);
});
return;
}
("set-identity", Some(subcommand_matches)) => {
let require_tower = subcommand_matches.is_present("require_tower");
@ -2238,6 +2290,24 @@ pub fn main() {
});
let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs));
let staked_nodes_overrides_path = matches
.value_of("staked_nodes_overrides")
.map(str::to_string);
let staked_nodes_overrides = Arc::new(RwLock::new(
match staked_nodes_overrides_path {
None => StakedNodesOverrides::default(),
Some(p) => load_staked_nodes_overrides(&p).unwrap_or_else(|err| {
error!("Failed to load stake-nodes-overrides from {}: {}", &p, err);
clap::Error::with_description(
"Failed to load configuration of stake-nodes-overrides argument",
clap::ErrorKind::InvalidValue,
)
.exit()
}),
}
.staked_map_id,
));
let init_complete_file = matches.value_of("init_complete_file");
if matches.is_present("no_check_vote_account") {
@ -2680,6 +2750,7 @@ pub fn main() {
..RuntimeConfig::default()
},
enable_quic_servers,
staked_nodes_overrides: staked_nodes_overrides.clone(),
..ValidatorConfig::default()
};
@ -2950,6 +3021,7 @@ pub fn main() {
authorized_voter_keypairs: authorized_voter_keypairs.clone(),
post_init: admin_service_post_init.clone(),
tower_storage: validator_config.tower_storage.clone(),
staked_nodes_overrides,
},
);