dedups gossip addresses, taking the one with highest weight (#29421)
dedups gossip addresses, keeping only the one with the highest weight In order to avoid traffic congestion or sending duplicate packets, when sampling gossip nodes if several nodes have the same gossip address (because they are behind a relayer or whatever), they need to be deduplicated into one.
This commit is contained in:
parent
d22c1d1ce4
commit
e5323166b3
|
@ -112,7 +112,9 @@ impl ContactInfo {
|
||||||
let delay = 10 * 60 * 1000; // 10 minutes
|
let delay = 10 * 60 * 1000; // 10 minutes
|
||||||
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
|
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
|
||||||
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
|
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
|
||||||
ContactInfo::new_localhost(&pubkey, now)
|
let mut node = ContactInfo::new_localhost(&pubkey, now);
|
||||||
|
node.gossip.set_port(rng.gen_range(1024, u16::MAX));
|
||||||
|
node
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -17,6 +17,7 @@ use {
|
||||||
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
|
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
|
||||||
ping_pong::PingCache,
|
ping_pong::PingCache,
|
||||||
},
|
},
|
||||||
|
itertools::Itertools,
|
||||||
rayon::ThreadPool,
|
rayon::ThreadPool,
|
||||||
solana_ledger::shred::Shred,
|
solana_ledger::shred::Shred,
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
|
@ -353,6 +354,22 @@ pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) ->
|
||||||
1.0_f32.max(weight.min(max_weight))
|
1.0_f32.max(weight.min(max_weight))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Dedups gossip addresses, keeping only the one with the highest weight.
|
||||||
|
pub(crate) fn dedup_gossip_addresses<I, T: PartialOrd>(
|
||||||
|
nodes: I,
|
||||||
|
) -> HashMap</*gossip:*/ SocketAddr, (/*weight:*/ T, ContactInfo)>
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = (/*weight:*/ T, ContactInfo)>,
|
||||||
|
{
|
||||||
|
nodes
|
||||||
|
.into_iter()
|
||||||
|
.into_grouping_map_by(|(_weight, node)| node.gossip)
|
||||||
|
.aggregate(|acc, _node_gossip, (weight, node)| match acc {
|
||||||
|
Some((ref w, _)) if w >= &weight => acc,
|
||||||
|
Some(_) | None => Some((weight, node)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use {
|
use {
|
||||||
|
|
|
@ -17,7 +17,7 @@ use {
|
||||||
cluster_info_metrics::GossipStats,
|
cluster_info_metrics::GossipStats,
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
crds::{Crds, GossipRoute, VersionedCrdsValue},
|
crds::{Crds, GossipRoute, VersionedCrdsValue},
|
||||||
crds_gossip::{get_stake, get_weight},
|
crds_gossip::{self, get_stake, get_weight},
|
||||||
crds_gossip_error::CrdsGossipError,
|
crds_gossip_error::CrdsGossipError,
|
||||||
crds_value::CrdsValue,
|
crds_value::CrdsValue,
|
||||||
ping_pong::PingCache,
|
ping_pong::PingCache,
|
||||||
|
@ -244,22 +244,25 @@ impl CrdsGossipPull {
|
||||||
);
|
);
|
||||||
// Check for nodes which have responded to ping messages.
|
// Check for nodes which have responded to ping messages.
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let (weights, peers): (Vec<_>, Vec<_>) = {
|
let peers: Vec<_> = {
|
||||||
let mut ping_cache = ping_cache.lock().unwrap();
|
let mut ping_cache = ping_cache.lock().unwrap();
|
||||||
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
|
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
peers
|
peers
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|(weight, peer)| {
|
.filter(|(_weight, peer)| {
|
||||||
let node = (peer.id, peer.gossip);
|
let node = (peer.id, peer.gossip);
|
||||||
let (check, ping) = ping_cache.check(now, node, &mut pingf);
|
let (check, ping) = ping_cache.check(now, node, &mut pingf);
|
||||||
if let Some(ping) = ping {
|
if let Some(ping) = ping {
|
||||||
pings.push((peer.gossip, ping));
|
pings.push((peer.gossip, ping));
|
||||||
}
|
}
|
||||||
check.then_some((weight, peer))
|
check
|
||||||
})
|
})
|
||||||
.unzip()
|
.collect()
|
||||||
};
|
};
|
||||||
|
let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers)
|
||||||
|
.into_values()
|
||||||
|
.unzip();
|
||||||
if peers.is_empty() {
|
if peers.is_empty() {
|
||||||
return Err(CrdsGossipError::NoPeers);
|
return Err(CrdsGossipError::NoPeers);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ use {
|
||||||
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
|
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
|
||||||
contact_info::ContactInfo,
|
contact_info::ContactInfo,
|
||||||
crds::{Crds, CrdsError, Cursor, GossipRoute},
|
crds::{Crds, CrdsError, Cursor, GossipRoute},
|
||||||
crds_gossip::{get_stake, get_weight},
|
crds_gossip::{self, get_stake, get_weight},
|
||||||
crds_value::CrdsValue,
|
crds_value::CrdsValue,
|
||||||
ping_pong::PingCache,
|
ping_pong::PingCache,
|
||||||
received_cache::ReceivedCache,
|
received_cache::ReceivedCache,
|
||||||
|
@ -299,22 +299,26 @@ impl CrdsGossipPush {
|
||||||
socket_addr_space,
|
socket_addr_space,
|
||||||
);
|
);
|
||||||
// Check for nodes which have responded to ping messages.
|
// Check for nodes which have responded to ping messages.
|
||||||
let (weights, peers): (Vec<_>, Vec<_>) = {
|
let peers: Vec<_> = {
|
||||||
let mut ping_cache = ping_cache.lock().unwrap();
|
let mut ping_cache = ping_cache.lock().unwrap();
|
||||||
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
|
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
peers
|
peers
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|(weight, peer)| {
|
.filter(|(_weight, peer)| {
|
||||||
let node = (peer.id, peer.gossip);
|
let node = (peer.id, peer.gossip);
|
||||||
let (check, ping) = ping_cache.check(now, node, &mut pingf);
|
let (check, ping) = ping_cache.check(now, node, &mut pingf);
|
||||||
if let Some(ping) = ping {
|
if let Some(ping) = ping {
|
||||||
pings.push((peer.gossip, ping));
|
pings.push((peer.gossip, ping));
|
||||||
}
|
}
|
||||||
check.then_some((weight, peer.id))
|
check
|
||||||
})
|
})
|
||||||
.unzip()
|
.collect()
|
||||||
};
|
};
|
||||||
|
let (weights, peers): (Vec<_>, Vec<_>) = crds_gossip::dedup_gossip_addresses(peers)
|
||||||
|
.into_values()
|
||||||
|
.map(|(weight, node)| (weight, node.id))
|
||||||
|
.unzip();
|
||||||
if peers.is_empty() {
|
if peers.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -572,7 +576,8 @@ mod tests {
|
||||||
|
|
||||||
let active_set = push.active_set.read().unwrap();
|
let active_set = push.active_set.read().unwrap();
|
||||||
assert!(active_set.get(&value1.label().pubkey()).is_some());
|
assert!(active_set.get(&value1.label().pubkey()).is_some());
|
||||||
let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||||
|
value2.gossip.set_port(1245);
|
||||||
ping_cache
|
ping_cache
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -608,8 +613,9 @@ mod tests {
|
||||||
let active_set = push.active_set.read().unwrap();
|
let active_set = push.active_set.read().unwrap();
|
||||||
assert!(active_set.get(&value2.label().pubkey()).is_some());
|
assert!(active_set.get(&value2.label().pubkey()).is_some());
|
||||||
}
|
}
|
||||||
for _ in 0..push.num_active {
|
for k in 0..push.num_active {
|
||||||
let value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
let mut value2 = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
|
||||||
|
value2.gossip.set_port(1246 + k as u16);
|
||||||
ping_cache
|
ping_cache
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|
|
@ -148,7 +148,8 @@ impl CrdsData {
|
||||||
// the mainnet crds table.
|
// the mainnet crds table.
|
||||||
match kind {
|
match kind {
|
||||||
0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)),
|
0 => CrdsData::ContactInfo(ContactInfo::new_rand(rng, pubkey)),
|
||||||
1 => CrdsData::LowestSlot(rng.gen(), LowestSlot::new_rand(rng, pubkey)),
|
// Index for LowestSlot is deprecated and should be zero.
|
||||||
|
1 => CrdsData::LowestSlot(0, LowestSlot::new_rand(rng, pubkey)),
|
||||||
2 => CrdsData::SnapshotHashes(SnapshotHashes::new_rand(rng, pubkey)),
|
2 => CrdsData::SnapshotHashes(SnapshotHashes::new_rand(rng, pubkey)),
|
||||||
3 => CrdsData::AccountsHashes(SnapshotHashes::new_rand(rng, pubkey)),
|
3 => CrdsData::AccountsHashes(SnapshotHashes::new_rand(rng, pubkey)),
|
||||||
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
|
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
|
||||||
|
@ -864,7 +865,7 @@ mod test {
|
||||||
let index = rng.gen_range(0, keys.len());
|
let index = rng.gen_range(0, keys.len());
|
||||||
CrdsValue::new_rand(&mut rng, Some(&keys[index]))
|
CrdsValue::new_rand(&mut rng, Some(&keys[index]))
|
||||||
})
|
})
|
||||||
.take(2048)
|
.take(1 << 12)
|
||||||
.collect();
|
.collect();
|
||||||
let mut currents = HashMap::new();
|
let mut currents = HashMap::new();
|
||||||
for value in filter_current(&values) {
|
for value in filter_current(&values) {
|
||||||
|
@ -888,10 +889,12 @@ mod test {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert_eq!(count, currents.len());
|
assert_eq!(count, currents.len());
|
||||||
// Currently CrdsData::new_rand is only implemented for 5 different
|
// Currently CrdsData::new_rand is implemented for:
|
||||||
// kinds and excludes EpochSlots, and so the unique labels cannot be
|
// AccountsHashes, ContactInfo, LowestSlot, SnapshotHashes, Version
|
||||||
// more than (5 + MAX_VOTES) times number of keys.
|
// EpochSlots x MAX_EPOCH_SLOTS
|
||||||
assert!(currents.len() <= keys.len() * (5 + MAX_VOTES as usize));
|
// Vote x MAX_VOTES
|
||||||
|
let num_kinds = 5 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize;
|
||||||
|
assert!(currents.len() <= keys.len() * num_kinds);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
Loading…
Reference in New Issue