diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 521439a450..4fd6f993b9 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -184,14 +184,12 @@ impl CrdsGossip { pings: &mut Vec<(SocketAddr, Ping)>, socket_addr_space: &SocketAddrSpace, ) { - let network_size = self.crds.read().unwrap().num_nodes(); self.push.refresh_push_active_set( &self.crds, stakes, gossip_validators, self_keypair, self_shred_version, - network_size, ping_cache, pings, socket_addr_space, diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index 4b390ac12c..f0d50106cd 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -50,6 +50,7 @@ pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000; const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3; +const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT * 3; pub struct CrdsGossipPush { /// Max bytes per message @@ -152,7 +153,8 @@ impl CrdsGossipPush { received_cache.record(origin, from, usize::from(num_dups)); self.num_old.fetch_add(1, Ordering::Relaxed); } - Err(_) => { + Err(CrdsError::InsertFailed | CrdsError::UnknownStakes) => { + received_cache.record(origin, from, /*num_dups:*/ usize::MAX); self.num_old.fetch_add(1, Ordering::Relaxed); } } @@ -243,7 +245,6 @@ impl CrdsGossipPush { gossip_validators: Option<&HashSet>, self_keypair: &Keypair, self_shred_version: u16, - network_size: usize, ping_cache: &Mutex, pings: &mut Vec<(SocketAddr, Ping)>, socket_addr_space: &SocketAddrSpace, @@ -276,8 +277,15 @@ impl CrdsGossipPush { if nodes.is_empty() { return; } + let cluster_size = crds.read().unwrap().num_pubkeys().max(stakes.len()); let mut active_set = self.active_set.write().unwrap(); - active_set.rotate(&mut rng, self.push_fanout * 3, network_size, &nodes, stakes) + active_set.rotate( + &mut rng, + CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE, + cluster_size, + &nodes, + stakes, + ) } } @@ -404,7 +412,6 @@ mod tests { None, // gossip_validtors &Keypair::new(), 0, // self_shred_version - 1, // network_size &ping_cache, &mut Vec::new(), // pings &SocketAddrSpace::Unspecified, @@ -472,7 +479,6 @@ mod tests { None, // gossip_validators &Keypair::new(), 0, // self_shred_version - 1, // network_size &ping_cache, &mut Vec::new(), &SocketAddrSpace::Unspecified, @@ -516,7 +522,6 @@ mod tests { None, // gossip_validators &Keypair::new(), 0, // self_shred_version - 1, // network_size &ping_cache, &mut Vec::new(), // pings &SocketAddrSpace::Unspecified, @@ -564,7 +569,6 @@ mod tests { None, // gossip_validators &Keypair::new(), 0, // self_shred_version - 1, // network_size &ping_cache, &mut Vec::new(), // pings &SocketAddrSpace::Unspecified, diff --git a/gossip/src/push_active_set.rs b/gossip/src/push_active_set.rs index 9196aae6d8..70902ccc44 100644 --- a/gossip/src/push_active_set.rs +++ b/gossip/src/push_active_set.rs @@ -13,6 +13,7 @@ const NUM_PUSH_ACTIVE_SET_ENTRIES: usize = 25; // min stake of { this node, crds value owner } // The entry represents set of gossip nodes to actively // push to for crds values belonging to the bucket. +#[derive(Default)] pub(crate) struct PushActiveSet([PushActiveSetEntry; NUM_PUSH_ACTIVE_SET_ENTRIES]); // Keys are gossip nodes to push messages to. @@ -165,12 +166,6 @@ impl PushActiveSetEntry { } } -impl Default for PushActiveSet { - fn default() -> Self { - Self(std::array::from_fn(|_| PushActiveSetEntry::default())) - } -} - // Maps stake to bucket index. fn get_stake_bucket(stake: Option<&u64>) -> usize { let stake = stake.copied().unwrap_or_default() / LAMPORTS_PER_SOL; diff --git a/gossip/src/received_cache.rs b/gossip/src/received_cache.rs index d1e7623dfd..8a82336312 100644 --- a/gossip/src/received_cache.rs +++ b/gossip/src/received_cache.rs @@ -83,6 +83,9 @@ impl ReceivedCacheEntry { *score = score.saturating_add(1); } else if self.nodes.len() < Self::CAPACITY { // Ensure that node is inserted into the cache for later pruning. + // This intentionally does not negatively impact node's score, in + // order to prevent replayed messages with spoofed addresses force + // pruning a good node. let _ = self.nodes.entry(node).or_default(); } }