From 16ddd001f6164ec57492931b2cc9e368233726f5 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Tue, 5 May 2020 20:15:19 -0700 Subject: [PATCH] Gossip no longer pushes/pulls from nodes with a different shred version (#9868) --- core/src/cluster_info.rs | 103 ++++++++++++++++++++++------------- core/src/crds_gossip.rs | 16 +++++- core/src/crds_gossip_pull.rs | 83 ++++++++++++++++++++++++++-- core/src/crds_gossip_push.rs | 88 ++++++++++++++++++++++++++---- 4 files changed, 234 insertions(+), 56 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 96da73a12b..20ca23881d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -267,7 +267,11 @@ impl ClusterInfo { my_contact_info: RwLock::new(contact_info), id, }; - me.gossip.write().unwrap().set_self(&id); + { + let mut gossip = me.gossip.write().unwrap(); + gossip.set_self(&id); + gossip.set_shred_version(me.my_shred_version()); + } me.insert_self(); me.push_self(&HashMap::new()); me @@ -361,51 +365,59 @@ impl ClusterInfo { let now = timestamp(); let mut spy_nodes = 0; let mut archivers = 0; + let mut different_shred_nodes = 0; let my_pubkey = self.id(); + let my_shred_version = self.my_shred_version(); let nodes: Vec<_> = self .all_peers() .into_iter() - .map(|(node, last_updated)| { + .filter_map(|(node, last_updated)| { if Self::is_spy_node(&node) { spy_nodes += 1; } else if Self::is_archiver(&node) { archivers += 1; } - fn addr_to_string(default_ip: &IpAddr, addr: &SocketAddr) -> String { - if ContactInfo::is_valid_address(addr) { - if &addr.ip() == default_ip { - addr.port().to_string() - } else { - addr.to_string() - } - } else { - "none".to_string() - } - } - let ip_addr = node.gossip.ip(); - format!( - "{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", - if ContactInfo::is_valid_address(&node.gossip) { - ip_addr.to_string() - } else { - "none".to_string() - }, - if node.id == my_pubkey { "me" } else { "" }.to_string(), - now.saturating_sub(last_updated), - node.id.to_string(), - addr_to_string(&ip_addr, &node.gossip), - addr_to_string(&ip_addr, &node.tpu), - addr_to_string(&ip_addr, &node.tpu_forwards), - addr_to_string(&ip_addr, &node.tvu), - addr_to_string(&ip_addr, &node.tvu_forwards), - addr_to_string(&ip_addr, &node.repair), - addr_to_string(&ip_addr, &node.serve_repair), - addr_to_string(&ip_addr, &node.storage_addr), - addr_to_string(&ip_addr, &node.rpc), - addr_to_string(&ip_addr, &node.rpc_pubsub), - node.shred_version, - ) + if my_shred_version != 0 && (node.shred_version != 0 && node.shred_version != my_shred_version) { + different_shred_nodes += 1; + None + } else { + fn addr_to_string(default_ip: &IpAddr, addr: &SocketAddr) -> String { + if ContactInfo::is_valid_address(addr) { + if &addr.ip() == default_ip { + addr.port().to_string() + } else { + addr.to_string() + } + } else { + "none".to_string() + } + } + + let ip_addr = node.gossip.ip(); + Some(format!( + "{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n", + if ContactInfo::is_valid_address(&node.gossip) { + ip_addr.to_string() + } else { + "none".to_string() + }, + if node.id == my_pubkey { "me" } else { "" }.to_string(), + now.saturating_sub(last_updated), + node.id.to_string(), + addr_to_string(&ip_addr, &node.gossip), + addr_to_string(&ip_addr, &node.tpu), + addr_to_string(&ip_addr, &node.tpu_forwards), + addr_to_string(&ip_addr, &node.tvu), + addr_to_string(&ip_addr, &node.tvu_forwards), + addr_to_string(&ip_addr, &node.repair), + addr_to_string(&ip_addr, &node.serve_repair), + addr_to_string(&ip_addr, &node.storage_addr), + addr_to_string(&ip_addr, &node.rpc), + addr_to_string(&ip_addr, &node.rpc_pubsub), + node.shred_version, + )) + } }) .collect(); @@ -415,7 +427,7 @@ impl ClusterInfo { ------------------+-------+----------------------------------------------+\ ------+------+------+------+------+------+------+------+------+------+--------\n\ {}\ - Nodes: {}{}{}", + Nodes: {}{}{}{}", nodes.join(""), nodes.len() - spy_nodes - archivers, if archivers > 0 { @@ -427,6 +439,14 @@ impl ClusterInfo { format!("\nSpies: {}", spy_nodes) } else { "".to_string() + }, + if spy_nodes > 0 { + format!( + "\nNodes with different shred version: {}", + different_shred_nodes + ) + } else { + "".to_string() } ) } @@ -1350,6 +1370,10 @@ impl ClusterInfo { ); obj.my_contact_info.write().unwrap().shred_version = entrypoint.shred_version; + obj.gossip + .write() + .unwrap() + .set_shred_version(entrypoint.shred_version); obj.insert_self(); adopt_shred_version = false; } @@ -1735,7 +1759,10 @@ impl ClusterInfo { requests.push(more_reqs) } if num_requests >= MAX_GOSSIP_TRAFFIC { - warn!("Too much gossip traffic, ignoring some messages"); + warn!( + "Too much gossip traffic, ignoring some messages (requests={}, max requests={})", + num_requests, MAX_GOSSIP_TRAFFIC + ); } let epoch_ms; let stakes: HashMap<_, _> = match bank_forks { diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 2c9300809d..2d5ee4c6fb 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -20,6 +20,7 @@ pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500; pub struct CrdsGossip { pub crds: Crds, pub id: Pubkey, + pub shred_version: u16, pub push: CrdsGossipPush, pub pull: CrdsGossipPull, } @@ -29,6 +30,7 @@ impl Default for CrdsGossip { CrdsGossip { crds: Crds::default(), id: Pubkey::default(), + shred_version: 0, push: CrdsGossipPush::default(), pull: CrdsGossipPull::default(), } @@ -39,6 +41,9 @@ impl CrdsGossip { pub fn set_self(&mut self, id: &Pubkey) { self.id = *id; } + pub fn set_shred_version(&mut self, shred_version: u16) { + self.shred_version = shred_version; + } /// process a push message to the network pub fn process_push_message( @@ -122,6 +127,7 @@ impl CrdsGossip { &self.crds, stakes, &self.id, + self.shred_version, self.pull.pull_request_time.len(), CRDS_GOSSIP_NUM_ACTIVE, ) @@ -134,8 +140,14 @@ impl CrdsGossip { stakes: &HashMap, bloom_size: usize, ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { - self.pull - .new_pull_request(&self.crds, &self.id, now, stakes, bloom_size) + self.pull.new_pull_request( + &self.crds, + &self.id, + self.shred_version, + now, + stakes, + bloom_size, + ) } /// time when a request to `from` was initiated diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 3958c8fdc3..ad3947df87 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -144,11 +144,12 @@ impl CrdsGossipPull { &self, crds: &Crds, self_id: &Pubkey, + self_shred_version: u16, now: u64, stakes: &HashMap, bloom_size: usize, ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { - let options = self.pull_options(crds, &self_id, now, stakes); + let options = self.pull_options(crds, &self_id, self_shred_version, now, stakes); if options.is_empty() { return Err(CrdsGossipError::NoPeers); } @@ -165,13 +166,20 @@ impl CrdsGossipPull { &self, crds: &'a Crds, self_id: &Pubkey, + self_shred_version: u16, now: u64, stakes: &HashMap, ) -> Vec<(f32, &'a ContactInfo)> { crds.table .values() .filter_map(|v| v.value.contact_info()) - .filter(|v| v.id != *self_id && ContactInfo::is_valid_address(&v.gossip)) + .filter(|v| { + v.id != *self_id + && ContactInfo::is_valid_address(&v.gossip) + && (self_shred_version == 0 + || v.shred_version == 0 + || self_shred_version == v.shred_version) + }) .map(|item| { let max_weight = f32::from(u16::max_value()) - 1.0; let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); @@ -402,7 +410,7 @@ mod test { stakes.insert(id, i * 100); } let now = 1024; - let mut options = node.pull_options(&crds, &me.label().pubkey(), now, &stakes); + let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, &stakes); assert!(!options.is_empty()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. @@ -412,6 +420,66 @@ mod test { ); } + #[test] + fn test_no_pulls_from_different_shred_versions() { + let mut crds = Crds::default(); + let stakes = HashMap::new(); + let node = CrdsGossipPull::default(); + + let gossip = socketaddr!("127.0.0.1:1234"); + + let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + shred_version: 123, + gossip: gossip.clone(), + ..ContactInfo::default() + })); + let spy = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + shred_version: 0, + gossip: gossip.clone(), + ..ContactInfo::default() + })); + let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + shred_version: 123, + gossip: gossip.clone(), + ..ContactInfo::default() + })); + let node_456 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + shred_version: 456, + gossip: gossip.clone(), + ..ContactInfo::default() + })); + + crds.insert(me.clone(), 0).unwrap(); + crds.insert(spy.clone(), 0).unwrap(); + crds.insert(node_123.clone(), 0).unwrap(); + crds.insert(node_456.clone(), 0).unwrap(); + + // shred version 123 should ignore 456 nodes + let options = node + .pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes) + .iter() + .map(|(_, c)| c.id) + .collect::>(); + assert_eq!(options.len(), 2); + assert!(options.contains(&spy.pubkey())); + assert!(options.contains(&node_123.pubkey())); + + // spy nodes will see all + let options = node + .pull_options(&crds, &spy.label().pubkey(), 0, 0, &stakes) + .iter() + .map(|(_, c)| c.id) + .collect::>(); + assert_eq!(options.len(), 3); + assert!(options.contains(&me.pubkey())); + assert!(options.contains(&node_123.pubkey())); + assert!(options.contains(&node_456.pubkey())); + } + #[test] fn test_new_pull_request() { let mut crds = Crds::default(); @@ -422,13 +490,13 @@ mod test { let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( - node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( - node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE), Err(CrdsGossipError::NoPeers) ); @@ -437,7 +505,7 @@ mod test { 0, ))); crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&crds, &id, 0, &HashMap::new(), PACKET_DATA_SIZE); + let req = node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); @@ -472,6 +540,7 @@ mod test { let req = node.new_pull_request( &crds, &node_pubkey, + 0, u64::max_value(), &HashMap::new(), PACKET_DATA_SIZE, @@ -501,6 +570,7 @@ mod test { &node_crds, &node_pubkey, 0, + 0, &HashMap::new(), PACKET_DATA_SIZE, ); @@ -573,6 +643,7 @@ mod test { &node_crds, &node_pubkey, 0, + 0, &HashMap::new(), PACKET_DATA_SIZE, ); diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 12b9dd139f..6b701f4c7e 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -236,13 +236,14 @@ impl CrdsGossipPush { crds: &Crds, stakes: &HashMap, self_id: &Pubkey, + self_shred_version: u16, network_size: usize, ratio: usize, ) { let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); - let options: Vec<_> = self.push_options(crds, &self_id, stakes); + let options: Vec<_> = self.push_options(crds, &self_id, self_shred_version, stakes); if options.is_empty() { return; } @@ -288,13 +289,20 @@ impl CrdsGossipPush { &self, crds: &'a Crds, self_id: &Pubkey, + self_shred_version: u16, stakes: &HashMap, ) -> Vec<(f32, &'a ContactInfo)> { crds.table .values() .filter(|v| v.value.contact_info().is_some()) .map(|v| (v.value.contact_info().unwrap(), v)) - .filter(|(info, _)| info.id != *self_id && ContactInfo::is_valid_address(&info.gossip)) + .filter(|(info, _)| { + info.id != *self_id + && ContactInfo::is_valid_address(&info.gossip) + && (self_shred_version == 0 + || info.shred_version == 0 + || self_shred_version == info.shred_version) + }) .map(|(info, value)| { let max_weight = f32::from(u16::max_value()) - 1.0; let last_updated: u64 = value.local_timestamp; @@ -510,7 +518,7 @@ mod test { ))); assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); assert!(push.active_set.get(&value1.label().pubkey()).is_some()); let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -520,7 +528,7 @@ mod test { assert!(push.active_set.get(&value2.label().pubkey()).is_none()); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); for _ in 0..30 { - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); if push.active_set.get(&value2.label().pubkey()).is_some() { break; } @@ -533,7 +541,7 @@ mod test { )); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); } - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); assert_eq!(push.active_set.len(), push.num_active); } #[test] @@ -551,7 +559,7 @@ mod test { crds.insert(peer.clone(), time).unwrap(); stakes.insert(id, i * 100); } - let mut options = push.push_options(&crds, &Pubkey::default(), &stakes); + let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes); assert!(!options.is_empty()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. @@ -560,6 +568,66 @@ mod test { 10_000_u64 ); } + + #[test] + fn test_no_pushes_to_from_different_shred_versions() { + let mut crds = Crds::default(); + let stakes = HashMap::new(); + let node = CrdsGossipPush::default(); + + let gossip = socketaddr!("127.0.0.1:1234"); + + let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + shred_version: 123, + gossip: gossip.clone(), + ..ContactInfo::default() + })); + let spy = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + shred_version: 0, + gossip: gossip.clone(), + ..ContactInfo::default() + })); + let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + shred_version: 123, + gossip: gossip.clone(), + ..ContactInfo::default() + })); + let node_456 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + shred_version: 456, + gossip: gossip.clone(), + ..ContactInfo::default() + })); + + crds.insert(me.clone(), 0).unwrap(); + crds.insert(spy.clone(), 0).unwrap(); + crds.insert(node_123.clone(), 0).unwrap(); + crds.insert(node_456.clone(), 0).unwrap(); + + // shred version 123 should ignore 456 nodes + let options = node + .push_options(&crds, &me.label().pubkey(), 123, &stakes) + .iter() + .map(|(_, c)| c.id) + .collect::>(); + assert_eq!(options.len(), 2); + assert!(options.contains(&spy.pubkey())); + assert!(options.contains(&node_123.pubkey())); + + // spy nodes will see all + let options = node + .push_options(&crds, &spy.label().pubkey(), 0, &stakes) + .iter() + .map(|(_, c)| c.id) + .collect::>(); + assert_eq!(options.len(), 3); + assert!(options.contains(&me.pubkey())); + assert!(options.contains(&node_123.pubkey())); + assert!(options.contains(&node_456.pubkey())); + } #[test] fn test_new_push_messages() { let mut crds = Crds::default(); @@ -569,7 +637,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -606,7 +674,7 @@ mod test { push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0), Ok(None) ); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); // push 3's contact info to 1 and 2 and 3 let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -628,7 +696,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -651,7 +719,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); ci.wallclock = 1;