Gossip no longer pushes/pulls from nodes with a different shred version (#9868)
This commit is contained in:
parent
72312ad615
commit
16ddd001f6
|
@ -267,7 +267,11 @@ impl ClusterInfo {
|
|||
my_contact_info: RwLock::new(contact_info),
|
||||
id,
|
||||
};
|
||||
me.gossip.write().unwrap().set_self(&id);
|
||||
{
|
||||
let mut gossip = me.gossip.write().unwrap();
|
||||
gossip.set_self(&id);
|
||||
gossip.set_shred_version(me.my_shred_version());
|
||||
}
|
||||
me.insert_self();
|
||||
me.push_self(&HashMap::new());
|
||||
me
|
||||
|
@ -361,16 +365,23 @@ impl ClusterInfo {
|
|||
let now = timestamp();
|
||||
let mut spy_nodes = 0;
|
||||
let mut archivers = 0;
|
||||
let mut different_shred_nodes = 0;
|
||||
let my_pubkey = self.id();
|
||||
let my_shred_version = self.my_shred_version();
|
||||
let nodes: Vec<_> = self
|
||||
.all_peers()
|
||||
.into_iter()
|
||||
.map(|(node, last_updated)| {
|
||||
.filter_map(|(node, last_updated)| {
|
||||
if Self::is_spy_node(&node) {
|
||||
spy_nodes += 1;
|
||||
} else if Self::is_archiver(&node) {
|
||||
archivers += 1;
|
||||
}
|
||||
|
||||
if my_shred_version != 0 && (node.shred_version != 0 && node.shred_version != my_shred_version) {
|
||||
different_shred_nodes += 1;
|
||||
None
|
||||
} else {
|
||||
fn addr_to_string(default_ip: &IpAddr, addr: &SocketAddr) -> String {
|
||||
if ContactInfo::is_valid_address(addr) {
|
||||
if &addr.ip() == default_ip {
|
||||
|
@ -384,7 +395,7 @@ impl ClusterInfo {
|
|||
}
|
||||
|
||||
let ip_addr = node.gossip.ip();
|
||||
format!(
|
||||
Some(format!(
|
||||
"{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
|
||||
if ContactInfo::is_valid_address(&node.gossip) {
|
||||
ip_addr.to_string()
|
||||
|
@ -405,7 +416,8 @@ impl ClusterInfo {
|
|||
addr_to_string(&ip_addr, &node.rpc),
|
||||
addr_to_string(&ip_addr, &node.rpc_pubsub),
|
||||
node.shred_version,
|
||||
)
|
||||
))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -415,7 +427,7 @@ impl ClusterInfo {
|
|||
------------------+-------+----------------------------------------------+\
|
||||
------+------+------+------+------+------+------+------+------+------+--------\n\
|
||||
{}\
|
||||
Nodes: {}{}{}",
|
||||
Nodes: {}{}{}{}",
|
||||
nodes.join(""),
|
||||
nodes.len() - spy_nodes - archivers,
|
||||
if archivers > 0 {
|
||||
|
@ -427,6 +439,14 @@ impl ClusterInfo {
|
|||
format!("\nSpies: {}", spy_nodes)
|
||||
} else {
|
||||
"".to_string()
|
||||
},
|
||||
if spy_nodes > 0 {
|
||||
format!(
|
||||
"\nNodes with different shred version: {}",
|
||||
different_shred_nodes
|
||||
)
|
||||
} else {
|
||||
"".to_string()
|
||||
}
|
||||
)
|
||||
}
|
||||
|
@ -1350,6 +1370,10 @@ impl ClusterInfo {
|
|||
);
|
||||
obj.my_contact_info.write().unwrap().shred_version =
|
||||
entrypoint.shred_version;
|
||||
obj.gossip
|
||||
.write()
|
||||
.unwrap()
|
||||
.set_shred_version(entrypoint.shred_version);
|
||||
obj.insert_self();
|
||||
adopt_shred_version = false;
|
||||
}
|
||||
|
@ -1735,7 +1759,10 @@ impl ClusterInfo {
|
|||
requests.push(more_reqs)
|
||||
}
|
||||
if num_requests >= MAX_GOSSIP_TRAFFIC {
|
||||
warn!("Too much gossip traffic, ignoring some messages");
|
||||
warn!(
|
||||
"Too much gossip traffic, ignoring some messages (requests={}, max requests={})",
|
||||
num_requests, MAX_GOSSIP_TRAFFIC
|
||||
);
|
||||
}
|
||||
let epoch_ms;
|
||||
let stakes: HashMap<_, _> = match bank_forks {
|
||||
|
|
|
@ -20,6 +20,7 @@ pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500;
|
|||
pub struct CrdsGossip {
|
||||
pub crds: Crds,
|
||||
pub id: Pubkey,
|
||||
pub shred_version: u16,
|
||||
pub push: CrdsGossipPush,
|
||||
pub pull: CrdsGossipPull,
|
||||
}
|
||||
|
@ -29,6 +30,7 @@ impl Default for CrdsGossip {
|
|||
CrdsGossip {
|
||||
crds: Crds::default(),
|
||||
id: Pubkey::default(),
|
||||
shred_version: 0,
|
||||
push: CrdsGossipPush::default(),
|
||||
pull: CrdsGossipPull::default(),
|
||||
}
|
||||
|
@ -39,6 +41,9 @@ impl CrdsGossip {
|
|||
pub fn set_self(&mut self, id: &Pubkey) {
|
||||
self.id = *id;
|
||||
}
|
||||
pub fn set_shred_version(&mut self, shred_version: u16) {
|
||||
self.shred_version = shred_version;
|
||||
}
|
||||
|
||||
/// process a push message to the network
|
||||
pub fn process_push_message(
|
||||
|
@ -122,6 +127,7 @@ impl CrdsGossip {
|
|||
&self.crds,
|
||||
stakes,
|
||||
&self.id,
|
||||
self.shred_version,
|
||||
self.pull.pull_request_time.len(),
|
||||
CRDS_GOSSIP_NUM_ACTIVE,
|
||||
)
|
||||
|
@ -134,8 +140,14 @@ impl CrdsGossip {
|
|||
stakes: &HashMap<Pubkey, u64>,
|
||||
bloom_size: usize,
|
||||
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
|
||||
self.pull
|
||||
.new_pull_request(&self.crds, &self.id, now, stakes, bloom_size)
|
||||
self.pull.new_pull_request(
|
||||
&self.crds,
|
||||
&self.id,
|
||||
self.shred_version,
|
||||
now,
|
||||
stakes,
|
||||
bloom_size,
|
||||
)
|
||||
}
|
||||
|
||||
/// time when a request to `from` was initiated
|
||||
|
|
|
@ -144,11 +144,12 @@ impl CrdsGossipPull {
|
|||
&self,
|
||||
crds: &Crds,
|
||||
self_id: &Pubkey,
|
||||
self_shred_version: u16,
|
||||
now: u64,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
bloom_size: usize,
|
||||
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
|
||||
let options = self.pull_options(crds, &self_id, now, stakes);
|
||||
let options = self.pull_options(crds, &self_id, self_shred_version, now, stakes);
|
||||
if options.is_empty() {
|
||||
return Err(CrdsGossipError::NoPeers);
|
||||
}
|
||||
|
@ -165,13 +166,20 @@ impl CrdsGossipPull {
|
|||
&self,
|
||||
crds: &'a Crds,
|
||||
self_id: &Pubkey,
|
||||
self_shred_version: u16,
|
||||
now: u64,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
) -> Vec<(f32, &'a ContactInfo)> {
|
||||
crds.table
|
||||
.values()
|
||||
.filter_map(|v| v.value.contact_info())
|
||||
.filter(|v| v.id != *self_id && ContactInfo::is_valid_address(&v.gossip))
|
||||
.filter(|v| {
|
||||
v.id != *self_id
|
||||
&& ContactInfo::is_valid_address(&v.gossip)
|
||||
&& (self_shred_version == 0
|
||||
|| v.shred_version == 0
|
||||
|| self_shred_version == v.shred_version)
|
||||
})
|
||||
.map(|item| {
|
||||
let max_weight = f32::from(u16::max_value()) - 1.0;
|
||||
let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0);
|
||||
|
@ -402,7 +410,7 @@ mod test {
|
|||
stakes.insert(id, i * 100);
|
||||
}
|
||||
let now = 1024;
|
||||
let mut options = node.pull_options(&crds, &me.label().pubkey(), now, &stakes);
|
||||
let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, &stakes);
|
||||
assert!(!options.is_empty());
|
||||
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
|
||||
// check that the highest stake holder is also the heaviest weighted.
|
||||
|
@ -412,6 +420,66 @@ mod test {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_no_pulls_from_different_shred_versions() {
|
||||
let mut crds = Crds::default();
|
||||
let stakes = HashMap::new();
|
||||
let node = CrdsGossipPull::default();
|
||||
|
||||
let gossip = socketaddr!("127.0.0.1:1234");
|
||||
|
||||
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
||||
id: Pubkey::new_rand(),
|
||||
shred_version: 123,
|
||||
gossip: gossip.clone(),
|
||||
..ContactInfo::default()
|
||||
}));
|
||||
let spy = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
||||
id: Pubkey::new_rand(),
|
||||
shred_version: 0,
|
||||
gossip: gossip.clone(),
|
||||
..ContactInfo::default()
|
||||
}));
|
||||
let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
||||
id: Pubkey::new_rand(),
|
||||
shred_version: 123,
|
||||
gossip: gossip.clone(),
|
||||
..ContactInfo::default()
|
||||
}));
|
||||
let node_456 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
||||
id: Pubkey::new_rand(),
|
||||
shred_version: 456,
|
||||
gossip: gossip.clone(),
|
||||
..ContactInfo::default()
|
||||
}));
|
||||
|
||||
crds.insert(me.clone(), 0).unwrap();
|
||||
crds.insert(spy.clone(), 0).unwrap();
|
||||
crds.insert(node_123.clone(), 0).unwrap();
|
||||
crds.insert(node_456.clone(), 0).unwrap();
|
||||
|
||||
// shred version 123 should ignore 456 nodes
|
||||
let options = node
|
||||
.pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes)
|
||||
.iter()
|
||||
.map(|(_, c)| c.id)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(options.len(), 2);
|
||||
assert!(options.contains(&spy.pubkey()));
|
||||
assert!(options.contains(&node_123.pubkey()));
|
||||
|
||||
// spy nodes will see all
|
||||
let options = node
|
||||
.pull_options(&crds, &spy.label().pubkey(), 0, 0, &stakes)
|
||||
.iter()
|
||||
.map(|(_, c)| c.id)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(options.len(), 3);
|
||||
assert!(options.contains(&me.pubkey()));
|
||||
assert!(options.contains(&node_123.pubkey()));
|
||||
assert!(options.contains(&node_456.pubkey()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_new_pull_request() {
|
||||
let mut crds = Crds::default();
|
||||
|
@ -422,13 +490,13 @@ mod test {
|
|||
let id = entry.label().pubkey();
|
||||
let node = CrdsGossipPull::default();
|
||||
assert_eq!(
|
||||
node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE),
|
||||
node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE),
|
||||
Err(CrdsGossipError::NoPeers)
|
||||
);
|
||||
|
||||
crds.insert(entry.clone(), 0).unwrap();
|
||||
assert_eq!(
|
||||
node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE),
|
||||
node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE),
|
||||
Err(CrdsGossipError::NoPeers)
|
||||
);
|
||||
|
||||
|
@ -437,7 +505,7 @@ mod test {
|
|||
0,
|
||||
)));
|
||||
crds.insert(new.clone(), 0).unwrap();
|
||||
let req = node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE);
|
||||
let req = node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE);
|
||||
let (to, _, self_info) = req.unwrap();
|
||||
assert_eq!(to, new.label().pubkey());
|
||||
assert_eq!(self_info, entry);
|
||||
|
@ -472,6 +540,7 @@ mod test {
|
|||
let req = node.new_pull_request(
|
||||
&crds,
|
||||
&node_pubkey,
|
||||
0,
|
||||
u64::max_value(),
|
||||
&HashMap::new(),
|
||||
PACKET_DATA_SIZE,
|
||||
|
@ -501,6 +570,7 @@ mod test {
|
|||
&node_crds,
|
||||
&node_pubkey,
|
||||
0,
|
||||
0,
|
||||
&HashMap::new(),
|
||||
PACKET_DATA_SIZE,
|
||||
);
|
||||
|
@ -573,6 +643,7 @@ mod test {
|
|||
&node_crds,
|
||||
&node_pubkey,
|
||||
0,
|
||||
0,
|
||||
&HashMap::new(),
|
||||
PACKET_DATA_SIZE,
|
||||
);
|
||||
|
|
|
@ -236,13 +236,14 @@ impl CrdsGossipPush {
|
|||
crds: &Crds,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
self_id: &Pubkey,
|
||||
self_shred_version: u16,
|
||||
network_size: usize,
|
||||
ratio: usize,
|
||||
) {
|
||||
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
|
||||
let mut new_items = HashMap::new();
|
||||
|
||||
let options: Vec<_> = self.push_options(crds, &self_id, stakes);
|
||||
let options: Vec<_> = self.push_options(crds, &self_id, self_shred_version, stakes);
|
||||
if options.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
@ -288,13 +289,20 @@ impl CrdsGossipPush {
|
|||
&self,
|
||||
crds: &'a Crds,
|
||||
self_id: &Pubkey,
|
||||
self_shred_version: u16,
|
||||
stakes: &HashMap<Pubkey, u64>,
|
||||
) -> Vec<(f32, &'a ContactInfo)> {
|
||||
crds.table
|
||||
.values()
|
||||
.filter(|v| v.value.contact_info().is_some())
|
||||
.map(|v| (v.value.contact_info().unwrap(), v))
|
||||
.filter(|(info, _)| info.id != *self_id && ContactInfo::is_valid_address(&info.gossip))
|
||||
.filter(|(info, _)| {
|
||||
info.id != *self_id
|
||||
&& ContactInfo::is_valid_address(&info.gossip)
|
||||
&& (self_shred_version == 0
|
||||
|| info.shred_version == 0
|
||||
|| self_shred_version == info.shred_version)
|
||||
})
|
||||
.map(|(info, value)| {
|
||||
let max_weight = f32::from(u16::max_value()) - 1.0;
|
||||
let last_updated: u64 = value.local_timestamp;
|
||||
|
@ -510,7 +518,7 @@ mod test {
|
|||
)));
|
||||
|
||||
assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1);
|
||||
|
||||
assert!(push.active_set.get(&value1.label().pubkey()).is_some());
|
||||
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
|
@ -520,7 +528,7 @@ mod test {
|
|||
assert!(push.active_set.get(&value2.label().pubkey()).is_none());
|
||||
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
|
||||
for _ in 0..30 {
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1);
|
||||
if push.active_set.get(&value2.label().pubkey()).is_some() {
|
||||
break;
|
||||
}
|
||||
|
@ -533,7 +541,7 @@ mod test {
|
|||
));
|
||||
assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
|
||||
}
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1);
|
||||
assert_eq!(push.active_set.len(), push.num_active);
|
||||
}
|
||||
#[test]
|
||||
|
@ -551,7 +559,7 @@ mod test {
|
|||
crds.insert(peer.clone(), time).unwrap();
|
||||
stakes.insert(id, i * 100);
|
||||
}
|
||||
let mut options = push.push_options(&crds, &Pubkey::default(), &stakes);
|
||||
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes);
|
||||
assert!(!options.is_empty());
|
||||
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
|
||||
// check that the highest stake holder is also the heaviest weighted.
|
||||
|
@ -560,6 +568,66 @@ mod test {
|
|||
10_000_u64
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_no_pushes_to_from_different_shred_versions() {
|
||||
let mut crds = Crds::default();
|
||||
let stakes = HashMap::new();
|
||||
let node = CrdsGossipPush::default();
|
||||
|
||||
let gossip = socketaddr!("127.0.0.1:1234");
|
||||
|
||||
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
||||
id: Pubkey::new_rand(),
|
||||
shred_version: 123,
|
||||
gossip: gossip.clone(),
|
||||
..ContactInfo::default()
|
||||
}));
|
||||
let spy = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
||||
id: Pubkey::new_rand(),
|
||||
shred_version: 0,
|
||||
gossip: gossip.clone(),
|
||||
..ContactInfo::default()
|
||||
}));
|
||||
let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
||||
id: Pubkey::new_rand(),
|
||||
shred_version: 123,
|
||||
gossip: gossip.clone(),
|
||||
..ContactInfo::default()
|
||||
}));
|
||||
let node_456 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
|
||||
id: Pubkey::new_rand(),
|
||||
shred_version: 456,
|
||||
gossip: gossip.clone(),
|
||||
..ContactInfo::default()
|
||||
}));
|
||||
|
||||
crds.insert(me.clone(), 0).unwrap();
|
||||
crds.insert(spy.clone(), 0).unwrap();
|
||||
crds.insert(node_123.clone(), 0).unwrap();
|
||||
crds.insert(node_456.clone(), 0).unwrap();
|
||||
|
||||
// shred version 123 should ignore 456 nodes
|
||||
let options = node
|
||||
.push_options(&crds, &me.label().pubkey(), 123, &stakes)
|
||||
.iter()
|
||||
.map(|(_, c)| c.id)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(options.len(), 2);
|
||||
assert!(options.contains(&spy.pubkey()));
|
||||
assert!(options.contains(&node_123.pubkey()));
|
||||
|
||||
// spy nodes will see all
|
||||
let options = node
|
||||
.push_options(&crds, &spy.label().pubkey(), 0, &stakes)
|
||||
.iter()
|
||||
.map(|(_, c)| c.id)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(options.len(), 3);
|
||||
assert!(options.contains(&me.pubkey()));
|
||||
assert!(options.contains(&node_123.pubkey()));
|
||||
assert!(options.contains(&node_456.pubkey()));
|
||||
}
|
||||
#[test]
|
||||
fn test_new_push_messages() {
|
||||
let mut crds = Crds::default();
|
||||
|
@ -569,7 +637,7 @@ mod test {
|
|||
0,
|
||||
)));
|
||||
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1);
|
||||
|
||||
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
&Pubkey::new_rand(),
|
||||
|
@ -606,7 +674,7 @@ mod test {
|
|||
push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0),
|
||||
Ok(None)
|
||||
);
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1);
|
||||
|
||||
// push 3's contact info to 1 and 2 and 3
|
||||
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
|
@ -628,7 +696,7 @@ mod test {
|
|||
0,
|
||||
)));
|
||||
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1);
|
||||
|
||||
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
&Pubkey::new_rand(),
|
||||
|
@ -651,7 +719,7 @@ mod test {
|
|||
0,
|
||||
)));
|
||||
assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
|
||||
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1);
|
||||
|
||||
let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
|
||||
ci.wallclock = 1;
|
||||
|
|
Loading…
Reference in New Issue