From f2865dfd63cfa95f2286f9a6e5651ac984fa7a75 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 12 Mar 2021 15:50:14 +0000 Subject: [PATCH] requires stakes for propagating crds values through gossip (#15561) --- core/src/cluster_info.rs | 153 ++++++++++++++++++++++++++++++++++---- core/src/crds.rs | 5 +- core/src/crds_gossip.rs | 5 +- core/tests/crds_gossip.rs | 2 +- sdk/src/feature_set.rs | 5 ++ 5 files changed, 149 insertions(+), 21 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 47ec43e3fb..b1cc79fbc4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -116,6 +116,11 @@ pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000; const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161; // Limit number of unique pubkeys in the crds table. const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 4096; +/// Minimum stake that a node should have so that its CRDS values are +/// propagated through gossip (few types are exempted). +const MIN_STAKE_FOR_GOSSIP: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL; +/// Minimum number of staked nodes for enforcing stakes in gossip. +const MIN_NUM_STAKED_NODES: usize = 500; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -287,6 +292,8 @@ struct GossipStats { prune_message_len: Counter, pull_request_ping_pong_check_failed_count: Counter, purge: Counter, + require_stake_for_gossip_unknown_feature_set: Counter, + require_stake_for_gossip_unknown_stakes: Counter, trim_crds_table_failed: Counter, trim_crds_table_purged_values_count: Counter, epoch_slots_lookup: Counter, @@ -537,6 +544,33 @@ struct ResponseScore { score: u64, // Relative score of the response } +// Retains only CRDS values associated with nodes with enough stake. +// (some crds types are exempted) +fn retain_staked(values: &mut Vec, stakes: &HashMap) { + values.retain(|value| { + match value.data { + CrdsData::ContactInfo(_) => true, + // May Impact new validators starting up without any stake yet. + CrdsData::Vote(_, _) => true, + // Unstaked nodes can still help repair. + CrdsData::EpochSlots(_, _) => true, + // Unstaked nodes can still serve snapshots. + CrdsData::SnapshotHashes(_) => true, + // Otherwise unstaked voting nodes will show up with no version in + // the various dashboards. + CrdsData::Version(_) => true, + CrdsData::LowestSlot(_, _) + | CrdsData::AccountsHashes(_) + | CrdsData::LegacyVersion(_) + | CrdsData::NodeInstance(_) + | CrdsData::DuplicateShred(_, _) => { + let stake = stakes.get(&value.pubkey()).copied(); + stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP + } + } + }) +} + impl ClusterInfo { /// Without a valid keypair gossip will not function. Only useful for tests. pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self { @@ -1683,11 +1717,21 @@ impl ClusterInfo { let mut gossip = self.gossip.write().unwrap(); gossip.process_push_messages(pending_push_messages); } - fn new_push_requests(&self) -> Vec<(SocketAddr, Protocol)> { + fn new_push_requests( + &self, + stakes: &HashMap, + require_stake_for_gossip: bool, + ) -> Vec<(SocketAddr, Protocol)> { let self_id = self.id(); - let (_, push_messages) = self + let mut push_messages = self .time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests) .new_push_messages(self.drain_push_queue(), timestamp()); + if require_stake_for_gossip { + push_messages.retain(|_, data| { + retain_staked(data, stakes); + !data.is_empty() + }) + } let push_messages: Vec<_> = { let gossip = self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2); @@ -1719,6 +1763,7 @@ impl ClusterInfo { gossip_validators: Option<&HashSet>, stakes: &HashMap, generate_pull_requests: bool, + require_stake_for_gossip: bool, ) -> Vec<(SocketAddr, Protocol)> { self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes); let mut pulls: Vec<_> = if generate_pull_requests { @@ -1726,7 +1771,7 @@ impl ClusterInfo { } else { vec![] }; - let mut pushes: Vec<_> = self.new_push_requests(); + let mut pushes: Vec<_> = self.new_push_requests(stakes, require_stake_for_gossip); self.stats .packets_sent_pull_requests_count .add_relaxed(pulls.len() as u64); @@ -1746,12 +1791,14 @@ impl ClusterInfo { stakes: &HashMap, sender: &PacketSender, generate_pull_requests: bool, + require_stake_for_gossip: bool, ) -> Result<()> { let reqs = self.generate_new_gossip_requests( thread_pool, gossip_validators, - &stakes, + stakes, generate_pull_requests, + require_stake_for_gossip, ); if !reqs.is_empty() { let packets = to_packets_with_destination(recycler.clone(), &reqs); @@ -1920,13 +1967,18 @@ impl ClusterInfo { last_contact_info_save = start; } - let stakes: HashMap<_, _> = match bank_forks { + let (stakes, feature_set) = match bank_forks { Some(ref bank_forks) => { - bank_forks.read().unwrap().root_bank().staked_nodes() + let root_bank = bank_forks.read().unwrap().root_bank(); + ( + root_bank.staked_nodes(), + Some(root_bank.feature_set.clone()), + ) } - None => HashMap::new(), + None => (HashMap::new(), None), }; - + let require_stake_for_gossip = + self.require_stake_for_gossip(feature_set.as_deref(), &stakes); let _ = self.run_gossip( &thread_pool, gossip_validators.as_ref(), @@ -1934,6 +1986,7 @@ impl ClusterInfo { &stakes, &sender, generate_pull_requests, + require_stake_for_gossip, ); if exit.load(Ordering::Relaxed) { return; @@ -2014,6 +2067,7 @@ impl ClusterInfo { stakes: &HashMap, response_sender: &PacketSender, feature_set: Option<&FeatureSet>, + require_stake_for_gossip: bool, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_pull_requests_time); if requests.is_empty() { @@ -2055,7 +2109,13 @@ impl ClusterInfo { self.stats .pull_requests_count .add_relaxed(requests.len() as u64); - let response = self.handle_pull_requests(recycler, requests, stakes, feature_set); + let response = self.handle_pull_requests( + recycler, + requests, + stakes, + feature_set, + require_stake_for_gossip, + ); if !response.is_empty() { self.stats .packets_sent_pull_responses_count @@ -2132,6 +2192,7 @@ impl ClusterInfo { requests: Vec, stakes: &HashMap, feature_set: Option<&FeatureSet>, + require_stake_for_gossip: bool, ) -> Packets { let mut time = Measure::start("handle_pull_requests"); let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller)); @@ -2153,13 +2214,17 @@ impl ClusterInfo { let now = timestamp(); let self_id = self.id(); - let pull_responses = self + let mut pull_responses = self .time_gossip_read_lock( "generate_pull_responses", &self.stats.generate_pull_responses, ) .generate_pull_responses(&caller_and_filters, output_size_limit, now); - + if require_stake_for_gossip { + for resp in &mut pull_responses { + retain_staked(resp, stakes); + } + } let pull_responses: Vec<_> = pull_responses .into_iter() .zip(addrs.into_iter()) @@ -2450,6 +2515,7 @@ impl ClusterInfo { recycler: &PacketsRecycler, stakes: &HashMap, response_sender: &PacketSender, + require_stake_for_gossip: bool, ) { let _st = ScopedTimer::from(&self.stats.handle_batch_push_messages_time); if messages.is_empty() { @@ -2559,7 +2625,7 @@ impl ClusterInfo { self.stats .push_response_count .add_relaxed(packets.packets.len() as u64); - let new_push_requests = self.new_push_requests(); + let new_push_requests = self.new_push_requests(stakes, require_stake_for_gossip); inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len()); for (address, request) in new_push_requests { if ContactInfo::is_valid_address(&address) { @@ -2602,6 +2668,33 @@ impl ClusterInfo { } } + fn require_stake_for_gossip( + &self, + feature_set: Option<&FeatureSet>, + stakes: &HashMap, + ) -> bool { + match feature_set { + None => { + self.stats + .require_stake_for_gossip_unknown_feature_set + .add_relaxed(1); + false + } + Some(feature_set) => { + if !feature_set.is_active(&feature_set::require_stake_for_gossip::id()) { + false + } else if stakes.len() < MIN_NUM_STAKED_NODES { + self.stats + .require_stake_for_gossip_unknown_stakes + .add_relaxed(1); + false + } else { + true + } + } + } + } + fn process_packets( &self, packets: VecDeque, @@ -2681,6 +2774,17 @@ impl ClusterInfo { self.stats .packets_received_prune_messages_count .add_relaxed(prune_messages.len() as u64); + let require_stake_for_gossip = self.require_stake_for_gossip(feature_set, &stakes); + if require_stake_for_gossip { + for (_, data) in &mut pull_responses { + retain_staked(data, &stakes); + } + for (_, data) in &mut push_messages { + retain_staked(data, &stakes); + } + pull_responses.retain(|(_, data)| !data.is_empty()); + push_messages.retain(|(_, data)| !data.is_empty()); + } self.handle_batch_ping_messages(ping_messages, recycler, response_sender); self.handle_batch_prune_messages(prune_messages); self.handle_batch_push_messages( @@ -2689,6 +2793,7 @@ impl ClusterInfo { recycler, &stakes, response_sender, + require_stake_for_gossip, ); self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms); self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes); @@ -2700,6 +2805,7 @@ impl ClusterInfo { &stakes, response_sender, feature_set, + require_stake_for_gossip, ); Ok(()) } @@ -3039,6 +3145,18 @@ impl ClusterInfo { self.stats.packets_sent_push_messages_count.clear(), i64 ), + ( + "require_stake_for_gossip_unknown_feature_set", + self.stats + .require_stake_for_gossip_unknown_feature_set + .clear(), + i64 + ), + ( + "require_stake_for_gossip_unknown_stakes", + self.stats.require_stake_for_gossip_unknown_stakes.clear(), + i64 + ), ( "trim_crds_table_failed", self.stats.trim_crds_table_failed.clear(), @@ -3725,8 +3843,13 @@ mod tests { .write() .unwrap() .refresh_push_active_set(&HashMap::new(), None); - let reqs = - cluster_info.generate_new_gossip_requests(&thread_pool, None, &HashMap::new(), true); + let reqs = cluster_info.generate_new_gossip_requests( + &thread_pool, + None, // gossip_validators + &HashMap::new(), + true, // generate_pull_requests + false, // require_stake_for_gossip + ); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr); @@ -3837,7 +3960,7 @@ mod tests { .unwrap() .refresh_push_active_set(&HashMap::new(), None); //check that all types of gossip messages are signed correctly - let (_, push_messages) = cluster_info + let push_messages = cluster_info .gossip .write() .unwrap() diff --git a/core/src/crds.rs b/core/src/crds.rs index 0cdef2d1ef..18a0dd6d64 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -28,7 +28,7 @@ use crate::contact_info::ContactInfo; use crate::crds_shards::CrdsShards; use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel, LowestSlot}; use bincode::serialize; -use indexmap::map::{rayon::ParValues, Entry, IndexMap, Values}; +use indexmap::map::{rayon::ParValues, Entry, IndexMap}; use indexmap::set::IndexSet; use rayon::{prelude::*, ThreadPool}; use solana_sdk::hash::{hash, Hash}; @@ -272,7 +272,8 @@ impl Crds { self.table.is_empty() } - pub fn values(&self) -> Values<'_, CrdsValueLabel, VersionedCrdsValue> { + #[cfg(test)] + pub(crate) fn values(&self) -> impl Iterator { self.table.values() } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index f528b0e241..ab0399ca0d 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -106,10 +106,9 @@ impl CrdsGossip { &mut self, pending_push_messages: Vec<(CrdsValue, u64)>, now: u64, - ) -> (Pubkey, HashMap>) { + ) -> HashMap> { self.process_push_messages(pending_push_messages); - let push_messages = self.push.new_push_messages(&self.crds, now); - (self.id, push_messages) + self.push.new_push_messages(&self.crds, now) } pub(crate) fn push_duplicate_shred( diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 88ce7ad04e..7f62626256 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -302,7 +302,7 @@ fn network_run_push( let mut node_lock = node.lock().unwrap(); let timeouts = node_lock.make_timeouts_test(); node_lock.purge(thread_pool, now, &timeouts); - node_lock.new_push_messages(vec![], now) + (node_lock.id, node_lock.new_push_messages(vec![], now)) }) .collect(); let transfered: Vec<_> = requests diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 0d6a308080..66e5db8764 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -111,6 +111,10 @@ pub mod skip_ro_deserialization { solana_sdk::declare_id!("6Sw5JV84f7QkDe8gvRxpcPWFnPpfpgEnNziiy8sELaCp"); } +pub mod require_stake_for_gossip { + solana_sdk::declare_id!("6oNzd5Z3M2L1xo4Q5hoox7CR2DuW7m1ETLWH5jHJthwa"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -139,6 +143,7 @@ lazy_static! { (check_program_owner::id(), "limit programs to operating on accounts owned by itself"), (cpi_share_ro_and_exec_accounts::id(), "Share RO and Executable accounts during cross-program invocations"), (skip_ro_deserialization::id(), "Skip deserialization of read-only accounts"), + (require_stake_for_gossip::id(), "require stakes for propagating crds values through gossip #15561"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()