requires stakes for propagating crds values through gossip (#15561)

This commit is contained in:
behzad nouri 2021-03-12 15:50:14 +00:00 committed by GitHub
parent be99cef593
commit f2865dfd63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 149 additions and 21 deletions

View File

@ -116,6 +116,11 @@ pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 161;
// Limit number of unique pubkeys in the crds table.
const CRDS_UNIQUE_PUBKEY_CAPACITY: usize = 4096;
/// Minimum stake that a node should have so that its CRDS values are
/// propagated through gossip (few types are exempted).
const MIN_STAKE_FOR_GOSSIP: u64 = solana_sdk::native_token::LAMPORTS_PER_SOL;
/// Minimum number of staked nodes for enforcing stakes in gossip.
const MIN_NUM_STAKED_NODES: usize = 500;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
@ -287,6 +292,8 @@ struct GossipStats {
prune_message_len: Counter,
pull_request_ping_pong_check_failed_count: Counter,
purge: Counter,
require_stake_for_gossip_unknown_feature_set: Counter,
require_stake_for_gossip_unknown_stakes: Counter,
trim_crds_table_failed: Counter,
trim_crds_table_purged_values_count: Counter,
epoch_slots_lookup: Counter,
@ -537,6 +544,33 @@ struct ResponseScore {
score: u64, // Relative score of the response
}
// Retains only CRDS values associated with nodes with enough stake.
// (some crds types are exempted)
fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
values.retain(|value| {
match value.data {
CrdsData::ContactInfo(_) => true,
// May Impact new validators starting up without any stake yet.
CrdsData::Vote(_, _) => true,
// Unstaked nodes can still help repair.
CrdsData::EpochSlots(_, _) => true,
// Unstaked nodes can still serve snapshots.
CrdsData::SnapshotHashes(_) => true,
// Otherwise unstaked voting nodes will show up with no version in
// the various dashboards.
CrdsData::Version(_) => true,
CrdsData::LowestSlot(_, _)
| CrdsData::AccountsHashes(_)
| CrdsData::LegacyVersion(_)
| CrdsData::NodeInstance(_)
| CrdsData::DuplicateShred(_, _) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
}
}
})
}
impl ClusterInfo {
/// Without a valid keypair gossip will not function. Only useful for tests.
pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self {
@ -1683,11 +1717,21 @@ impl ClusterInfo {
let mut gossip = self.gossip.write().unwrap();
gossip.process_push_messages(pending_push_messages);
}
fn new_push_requests(&self) -> Vec<(SocketAddr, Protocol)> {
fn new_push_requests(
&self,
stakes: &HashMap<Pubkey, u64>,
require_stake_for_gossip: bool,
) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id();
let (_, push_messages) = self
let mut push_messages = self
.time_gossip_write_lock("new_push_requests", &self.stats.new_push_requests)
.new_push_messages(self.drain_push_queue(), timestamp());
if require_stake_for_gossip {
push_messages.retain(|_, data| {
retain_staked(data, stakes);
!data.is_empty()
})
}
let push_messages: Vec<_> = {
let gossip =
self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2);
@ -1719,6 +1763,7 @@ impl ClusterInfo {
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
generate_pull_requests: bool,
require_stake_for_gossip: bool,
) -> Vec<(SocketAddr, Protocol)> {
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
let mut pulls: Vec<_> = if generate_pull_requests {
@ -1726,7 +1771,7 @@ impl ClusterInfo {
} else {
vec![]
};
let mut pushes: Vec<_> = self.new_push_requests();
let mut pushes: Vec<_> = self.new_push_requests(stakes, require_stake_for_gossip);
self.stats
.packets_sent_pull_requests_count
.add_relaxed(pulls.len() as u64);
@ -1746,12 +1791,14 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>,
sender: &PacketSender,
generate_pull_requests: bool,
require_stake_for_gossip: bool,
) -> Result<()> {
let reqs = self.generate_new_gossip_requests(
thread_pool,
gossip_validators,
&stakes,
stakes,
generate_pull_requests,
require_stake_for_gossip,
);
if !reqs.is_empty() {
let packets = to_packets_with_destination(recycler.clone(), &reqs);
@ -1920,13 +1967,18 @@ impl ClusterInfo {
last_contact_info_save = start;
}
let stakes: HashMap<_, _> = match bank_forks {
let (stakes, feature_set) = match bank_forks {
Some(ref bank_forks) => {
bank_forks.read().unwrap().root_bank().staked_nodes()
let root_bank = bank_forks.read().unwrap().root_bank();
(
root_bank.staked_nodes(),
Some(root_bank.feature_set.clone()),
)
}
None => HashMap::new(),
None => (HashMap::new(), None),
};
let require_stake_for_gossip =
self.require_stake_for_gossip(feature_set.as_deref(), &stakes);
let _ = self.run_gossip(
&thread_pool,
gossip_validators.as_ref(),
@ -1934,6 +1986,7 @@ impl ClusterInfo {
&stakes,
&sender,
generate_pull_requests,
require_stake_for_gossip,
);
if exit.load(Ordering::Relaxed) {
return;
@ -2014,6 +2067,7 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>,
response_sender: &PacketSender,
feature_set: Option<&FeatureSet>,
require_stake_for_gossip: bool,
) {
let _st = ScopedTimer::from(&self.stats.handle_batch_pull_requests_time);
if requests.is_empty() {
@ -2055,7 +2109,13 @@ impl ClusterInfo {
self.stats
.pull_requests_count
.add_relaxed(requests.len() as u64);
let response = self.handle_pull_requests(recycler, requests, stakes, feature_set);
let response = self.handle_pull_requests(
recycler,
requests,
stakes,
feature_set,
require_stake_for_gossip,
);
if !response.is_empty() {
self.stats
.packets_sent_pull_responses_count
@ -2132,6 +2192,7 @@ impl ClusterInfo {
requests: Vec<PullData>,
stakes: &HashMap<Pubkey, u64>,
feature_set: Option<&FeatureSet>,
require_stake_for_gossip: bool,
) -> Packets {
let mut time = Measure::start("handle_pull_requests");
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
@ -2153,13 +2214,17 @@ impl ClusterInfo {
let now = timestamp();
let self_id = self.id();
let pull_responses = self
let mut pull_responses = self
.time_gossip_read_lock(
"generate_pull_responses",
&self.stats.generate_pull_responses,
)
.generate_pull_responses(&caller_and_filters, output_size_limit, now);
if require_stake_for_gossip {
for resp in &mut pull_responses {
retain_staked(resp, stakes);
}
}
let pull_responses: Vec<_> = pull_responses
.into_iter()
.zip(addrs.into_iter())
@ -2450,6 +2515,7 @@ impl ClusterInfo {
recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>,
response_sender: &PacketSender,
require_stake_for_gossip: bool,
) {
let _st = ScopedTimer::from(&self.stats.handle_batch_push_messages_time);
if messages.is_empty() {
@ -2559,7 +2625,7 @@ impl ClusterInfo {
self.stats
.push_response_count
.add_relaxed(packets.packets.len() as u64);
let new_push_requests = self.new_push_requests();
let new_push_requests = self.new_push_requests(stakes, require_stake_for_gossip);
inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len());
for (address, request) in new_push_requests {
if ContactInfo::is_valid_address(&address) {
@ -2602,6 +2668,33 @@ impl ClusterInfo {
}
}
fn require_stake_for_gossip(
&self,
feature_set: Option<&FeatureSet>,
stakes: &HashMap<Pubkey, u64>,
) -> bool {
match feature_set {
None => {
self.stats
.require_stake_for_gossip_unknown_feature_set
.add_relaxed(1);
false
}
Some(feature_set) => {
if !feature_set.is_active(&feature_set::require_stake_for_gossip::id()) {
false
} else if stakes.len() < MIN_NUM_STAKED_NODES {
self.stats
.require_stake_for_gossip_unknown_stakes
.add_relaxed(1);
false
} else {
true
}
}
}
}
fn process_packets(
&self,
packets: VecDeque<Packet>,
@ -2681,6 +2774,17 @@ impl ClusterInfo {
self.stats
.packets_received_prune_messages_count
.add_relaxed(prune_messages.len() as u64);
let require_stake_for_gossip = self.require_stake_for_gossip(feature_set, &stakes);
if require_stake_for_gossip {
for (_, data) in &mut pull_responses {
retain_staked(data, &stakes);
}
for (_, data) in &mut push_messages {
retain_staked(data, &stakes);
}
pull_responses.retain(|(_, data)| !data.is_empty());
push_messages.retain(|(_, data)| !data.is_empty());
}
self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
self.handle_batch_prune_messages(prune_messages);
self.handle_batch_push_messages(
@ -2689,6 +2793,7 @@ impl ClusterInfo {
recycler,
&stakes,
response_sender,
require_stake_for_gossip,
);
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
@ -2700,6 +2805,7 @@ impl ClusterInfo {
&stakes,
response_sender,
feature_set,
require_stake_for_gossip,
);
Ok(())
}
@ -3039,6 +3145,18 @@ impl ClusterInfo {
self.stats.packets_sent_push_messages_count.clear(),
i64
),
(
"require_stake_for_gossip_unknown_feature_set",
self.stats
.require_stake_for_gossip_unknown_feature_set
.clear(),
i64
),
(
"require_stake_for_gossip_unknown_stakes",
self.stats.require_stake_for_gossip_unknown_stakes.clear(),
i64
),
(
"trim_crds_table_failed",
self.stats.trim_crds_table_failed.clear(),
@ -3725,8 +3843,13 @@ mod tests {
.write()
.unwrap()
.refresh_push_active_set(&HashMap::new(), None);
let reqs =
cluster_info.generate_new_gossip_requests(&thread_pool, None, &HashMap::new(), true);
let reqs = cluster_info.generate_new_gossip_requests(
&thread_pool,
None, // gossip_validators
&HashMap::new(),
true, // generate_pull_requests
false, // require_stake_for_gossip
);
//assert none of the addrs are invalid.
reqs.iter().all(|(addr, _)| {
let res = ContactInfo::is_valid_address(addr);
@ -3837,7 +3960,7 @@ mod tests {
.unwrap()
.refresh_push_active_set(&HashMap::new(), None);
//check that all types of gossip messages are signed correctly
let (_, push_messages) = cluster_info
let push_messages = cluster_info
.gossip
.write()
.unwrap()

View File

@ -28,7 +28,7 @@ use crate::contact_info::ContactInfo;
use crate::crds_shards::CrdsShards;
use crate::crds_value::{CrdsData, CrdsValue, CrdsValueLabel, LowestSlot};
use bincode::serialize;
use indexmap::map::{rayon::ParValues, Entry, IndexMap, Values};
use indexmap::map::{rayon::ParValues, Entry, IndexMap};
use indexmap::set::IndexSet;
use rayon::{prelude::*, ThreadPool};
use solana_sdk::hash::{hash, Hash};
@ -272,7 +272,8 @@ impl Crds {
self.table.is_empty()
}
pub fn values(&self) -> Values<'_, CrdsValueLabel, VersionedCrdsValue> {
#[cfg(test)]
pub(crate) fn values(&self) -> impl Iterator<Item = &VersionedCrdsValue> {
self.table.values()
}

View File

@ -106,10 +106,9 @@ impl CrdsGossip {
&mut self,
pending_push_messages: Vec<(CrdsValue, u64)>,
now: u64,
) -> (Pubkey, HashMap<Pubkey, Vec<CrdsValue>>) {
) -> HashMap<Pubkey, Vec<CrdsValue>> {
self.process_push_messages(pending_push_messages);
let push_messages = self.push.new_push_messages(&self.crds, now);
(self.id, push_messages)
self.push.new_push_messages(&self.crds, now)
}
pub(crate) fn push_duplicate_shred(

View File

@ -302,7 +302,7 @@ fn network_run_push(
let mut node_lock = node.lock().unwrap();
let timeouts = node_lock.make_timeouts_test();
node_lock.purge(thread_pool, now, &timeouts);
node_lock.new_push_messages(vec![], now)
(node_lock.id, node_lock.new_push_messages(vec![], now))
})
.collect();
let transfered: Vec<_> = requests

View File

@ -111,6 +111,10 @@ pub mod skip_ro_deserialization {
solana_sdk::declare_id!("6Sw5JV84f7QkDe8gvRxpcPWFnPpfpgEnNziiy8sELaCp");
}
pub mod require_stake_for_gossip {
solana_sdk::declare_id!("6oNzd5Z3M2L1xo4Q5hoox7CR2DuW7m1ETLWH5jHJthwa");
}
lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -139,6 +143,7 @@ lazy_static! {
(check_program_owner::id(), "limit programs to operating on accounts owned by itself"),
(cpi_share_ro_and_exec_accounts::id(), "Share RO and Executable accounts during cross-program invocations"),
(skip_ro_deserialization::id(), "Skip deserialization of read-only accounts"),
(require_stake_for_gossip::id(), "require stakes for propagating crds values through gossip #15561"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()