records outdated gossip crds upserts in received-cache (#29973)
This commit is contained in:
parent
2da02992b7
commit
5284736b6f
|
@ -184,14 +184,12 @@ impl CrdsGossip {
|
|||
pings: &mut Vec<(SocketAddr, Ping)>,
|
||||
socket_addr_space: &SocketAddrSpace,
|
||||
) {
|
||||
let network_size = self.crds.read().unwrap().num_nodes();
|
||||
self.push.refresh_push_active_set(
|
||||
&self.crds,
|
||||
stakes,
|
||||
gossip_validators,
|
||||
self_keypair,
|
||||
self_shred_version,
|
||||
network_size,
|
||||
ping_cache,
|
||||
pings,
|
||||
socket_addr_space,
|
||||
|
|
|
@ -50,6 +50,7 @@ pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
|
|||
const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
|
||||
const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
|
||||
const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3;
|
||||
const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT * 3;
|
||||
|
||||
pub struct CrdsGossipPush {
|
||||
/// Max bytes per message
|
||||
|
@ -152,7 +153,8 @@ impl CrdsGossipPush {
|
|||
received_cache.record(origin, from, usize::from(num_dups));
|
||||
self.num_old.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
Err(_) => {
|
||||
Err(CrdsError::InsertFailed | CrdsError::UnknownStakes) => {
|
||||
received_cache.record(origin, from, /*num_dups:*/ usize::MAX);
|
||||
self.num_old.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
@ -243,7 +245,6 @@ impl CrdsGossipPush {
|
|||
gossip_validators: Option<&HashSet<Pubkey>>,
|
||||
self_keypair: &Keypair,
|
||||
self_shred_version: u16,
|
||||
network_size: usize,
|
||||
ping_cache: &Mutex<PingCache>,
|
||||
pings: &mut Vec<(SocketAddr, Ping)>,
|
||||
socket_addr_space: &SocketAddrSpace,
|
||||
|
@ -276,8 +277,15 @@ impl CrdsGossipPush {
|
|||
if nodes.is_empty() {
|
||||
return;
|
||||
}
|
||||
let cluster_size = crds.read().unwrap().num_pubkeys().max(stakes.len());
|
||||
let mut active_set = self.active_set.write().unwrap();
|
||||
active_set.rotate(&mut rng, self.push_fanout * 3, network_size, &nodes, stakes)
|
||||
active_set.rotate(
|
||||
&mut rng,
|
||||
CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE,
|
||||
cluster_size,
|
||||
&nodes,
|
||||
stakes,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -404,7 +412,6 @@ mod tests {
|
|||
None, // gossip_validtors
|
||||
&Keypair::new(),
|
||||
0, // self_shred_version
|
||||
1, // network_size
|
||||
&ping_cache,
|
||||
&mut Vec::new(), // pings
|
||||
&SocketAddrSpace::Unspecified,
|
||||
|
@ -472,7 +479,6 @@ mod tests {
|
|||
None, // gossip_validators
|
||||
&Keypair::new(),
|
||||
0, // self_shred_version
|
||||
1, // network_size
|
||||
&ping_cache,
|
||||
&mut Vec::new(),
|
||||
&SocketAddrSpace::Unspecified,
|
||||
|
@ -516,7 +522,6 @@ mod tests {
|
|||
None, // gossip_validators
|
||||
&Keypair::new(),
|
||||
0, // self_shred_version
|
||||
1, // network_size
|
||||
&ping_cache,
|
||||
&mut Vec::new(), // pings
|
||||
&SocketAddrSpace::Unspecified,
|
||||
|
@ -564,7 +569,6 @@ mod tests {
|
|||
None, // gossip_validators
|
||||
&Keypair::new(),
|
||||
0, // self_shred_version
|
||||
1, // network_size
|
||||
&ping_cache,
|
||||
&mut Vec::new(), // pings
|
||||
&SocketAddrSpace::Unspecified,
|
||||
|
|
|
@ -13,6 +13,7 @@ const NUM_PUSH_ACTIVE_SET_ENTRIES: usize = 25;
|
|||
// min stake of { this node, crds value owner }
|
||||
// The entry represents set of gossip nodes to actively
|
||||
// push to for crds values belonging to the bucket.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct PushActiveSet([PushActiveSetEntry; NUM_PUSH_ACTIVE_SET_ENTRIES]);
|
||||
|
||||
// Keys are gossip nodes to push messages to.
|
||||
|
@ -165,12 +166,6 @@ impl PushActiveSetEntry {
|
|||
}
|
||||
}
|
||||
|
||||
impl Default for PushActiveSet {
|
||||
fn default() -> Self {
|
||||
Self(std::array::from_fn(|_| PushActiveSetEntry::default()))
|
||||
}
|
||||
}
|
||||
|
||||
// Maps stake to bucket index.
|
||||
fn get_stake_bucket(stake: Option<&u64>) -> usize {
|
||||
let stake = stake.copied().unwrap_or_default() / LAMPORTS_PER_SOL;
|
||||
|
|
|
@ -83,6 +83,9 @@ impl ReceivedCacheEntry {
|
|||
*score = score.saturating_add(1);
|
||||
} else if self.nodes.len() < Self::CAPACITY {
|
||||
// Ensure that node is inserted into the cache for later pruning.
|
||||
// This intentionally does not negatively impact node's score, in
|
||||
// order to prevent replayed messages with spoofed addresses force
|
||||
// pruning a good node.
|
||||
let _ = self.nodes.entry(node).or_default();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue