diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 65938363af..0890def4c5 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -421,7 +421,7 @@ impl ClusterInfo { gossip.set_shred_version(me.my_shred_version()); } me.insert_self(); - me.push_self(&HashMap::new()); + me.push_self(&HashMap::new(), None); me } @@ -453,13 +453,17 @@ impl ClusterInfo { self.insert_self() } - fn push_self(&self, stakes: &HashMap) { + fn push_self( + &self, + stakes: &HashMap, + gossip_validators: Option<&HashSet>, + ) { let now = timestamp(); self.my_contact_info.write().unwrap().wallclock = now; let entry = CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); let mut w_gossip = self.gossip.write().unwrap(); - w_gossip.refresh_push_active_set(stakes); + w_gossip.refresh_push_active_set(stakes, gossip_validators); w_gossip.process_push_message(&self.id(), vec![entry], now); } @@ -1363,13 +1367,17 @@ impl ClusterInfo { messages } - fn new_pull_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { + fn new_pull_requests( + &self, + gossip_validators: Option<&HashSet>, + stakes: &HashMap, + ) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); let mut pulls: Vec<_> = { let r_gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); r_gossip - .new_pull_request(now, stakes, MAX_BLOOM_SIZE) + .new_pull_request(now, gossip_validators, stakes, MAX_BLOOM_SIZE) .ok() .into_iter() .filter_map(|(peer, filters, me)| { @@ -1430,27 +1438,32 @@ impl ClusterInfo { // Generate new push and pull requests fn generate_new_gossip_requests( &self, + gossip_validators: Option<&HashSet>, stakes: &HashMap, generate_pull_requests: bool, ) -> Vec<(SocketAddr, Protocol)> { - let pulls: Vec<_> = if generate_pull_requests { - self.new_pull_requests(stakes) + let mut pulls: Vec<_> = if generate_pull_requests { + self.new_pull_requests(gossip_validators, stakes) } else { vec![] }; - let pushes: Vec<_> = self.new_push_requests(); - vec![pulls, pushes].into_iter().flatten().collect() + let mut pushes: Vec<_> = self.new_push_requests(); + + pulls.append(&mut pushes); + pulls } /// At random pick a node and try to get updated changes from them fn run_gossip( &self, + gossip_validators: Option<&HashSet>, recycler: &PacketsRecycler, stakes: &HashMap, sender: &PacketSender, generate_pull_requests: bool, ) -> Result<()> { - let reqs = self.generate_new_gossip_requests(&stakes, generate_pull_requests); + let reqs = + self.generate_new_gossip_requests(gossip_validators, &stakes, generate_pull_requests); if !reqs.is_empty() { let packets = to_packets_with_destination(recycler.clone(), &reqs); sender.send(packets)?; @@ -1519,6 +1532,7 @@ impl ClusterInfo { self: Arc, bank_forks: Option>>, sender: PacketSender, + gossip_validators: Option>, exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); @@ -1549,7 +1563,13 @@ impl ClusterInfo { None => HashMap::new(), }; - let _ = self.run_gossip(&recycler, &stakes, &sender, generate_pull_requests); + let _ = self.run_gossip( + gossip_validators.as_ref(), + &recycler, + &stakes, + &sender, + generate_pull_requests, + ); if exit.load(Ordering::Relaxed) { return; } @@ -1561,7 +1581,7 @@ impl ClusterInfo { //TODO: possibly tune this parameter //we saw a deadlock passing an self.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { - self.push_self(&stakes); + self.push_self(&stakes, gossip_validators.as_ref()); last_push = timestamp(); } let elapsed = timestamp() - start; @@ -2703,8 +2723,8 @@ mod tests { .gossip .write() .unwrap() - .refresh_push_active_set(&HashMap::new()); - let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new(), true); + .refresh_push_active_set(&HashMap::new(), None); + let reqs = cluster_info.generate_new_gossip_requests(None, &HashMap::new(), true); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr); @@ -2842,7 +2862,7 @@ mod tests { .gossip .write() .unwrap() - .refresh_push_active_set(&HashMap::new()); + .refresh_push_active_set(&HashMap::new(), None); //check that all types of gossip messages are signed correctly let (_, push_messages) = cluster_info .gossip @@ -2859,7 +2879,7 @@ mod tests { .gossip .write() .unwrap() - .new_pull_request(timestamp(), &HashMap::new(), MAX_BLOOM_SIZE) + .new_pull_request(timestamp(), None, &HashMap::new(), MAX_BLOOM_SIZE) .ok() .unwrap(); assert!(val.verify()); @@ -3078,7 +3098,7 @@ mod tests { let entrypoint_pubkey = Pubkey::new_rand(); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); - let pulls = cluster_info.new_pull_requests(&HashMap::new()); + let pulls = cluster_info.new_pull_requests(None, &HashMap::new()); assert_eq!(1, pulls.len() as u64); match pulls.get(0) { Some((addr, msg)) => { @@ -3105,7 +3125,7 @@ mod tests { vec![entrypoint_crdsvalue], &timeouts, ); - let pulls = cluster_info.new_pull_requests(&HashMap::new()); + let pulls = cluster_info.new_pull_requests(None, &HashMap::new()); assert_eq!(1, pulls.len() as u64); assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint)); } @@ -3248,7 +3268,7 @@ mod tests { // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a // fresh timestamp). There should only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(&stakes); + let pulls = cluster_info.new_pull_requests(None, &stakes); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); @@ -3261,14 +3281,14 @@ mod tests { .as_mut() .unwrap() .wallclock = 0; - let pulls = cluster_info.new_pull_requests(&stakes); + let pulls = cluster_info.new_pull_requests(None, &stakes); assert_eq!(2, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip); // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // only be one pull request to `other_node` - let pulls = cluster_info.new_pull_requests(&stakes); + let pulls = cluster_info.new_pull_requests(None, &stakes); assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); } diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 53520930f2..c207611770 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -115,10 +115,15 @@ impl CrdsGossip { /// refresh the push active set /// * ratio - number of actives to rotate - pub fn refresh_push_active_set(&mut self, stakes: &HashMap) { + pub fn refresh_push_active_set( + &mut self, + stakes: &HashMap, + gossip_validators: Option<&HashSet>, + ) { self.push.refresh_push_active_set( &self.crds, stakes, + gossip_validators, &self.id, self.shred_version, self.pull.pull_request_time.len(), @@ -130,6 +135,7 @@ impl CrdsGossip { pub fn new_pull_request( &self, now: u64, + gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { @@ -138,6 +144,7 @@ impl CrdsGossip { &self.id, self.shred_version, now, + gossip_validators, stakes, bloom_size, ) @@ -271,7 +278,7 @@ mod test { 0, ) .unwrap(); - crds_gossip.refresh_push_active_set(&HashMap::new()); + crds_gossip.refresh_push_active_set(&HashMap::new(), None); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 9dea319a1f..ee5a8569a9 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -185,10 +185,18 @@ impl CrdsGossipPull { self_id: &Pubkey, self_shred_version: u16, now: u64, + gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, ) -> Result<(Pubkey, Vec, CrdsValue), CrdsGossipError> { - let options = self.pull_options(crds, &self_id, self_shred_version, now, stakes); + let options = self.pull_options( + crds, + &self_id, + self_shred_version, + now, + gossip_validators, + stakes, + ); if options.is_empty() { return Err(CrdsGossipError::NoPeers); } @@ -207,6 +215,7 @@ impl CrdsGossipPull { self_id: &Pubkey, self_shred_version: u16, now: u64, + gossip_validators: Option<&HashSet>, stakes: &HashMap, ) -> Vec<(f32, &'a ContactInfo)> { crds.table @@ -216,6 +225,8 @@ impl CrdsGossipPull { v.id != *self_id && ContactInfo::is_valid_address(&v.gossip) && (self_shred_version == 0 || self_shred_version == v.shred_version) + && gossip_validators + .map_or(true, |gossip_validators| gossip_validators.contains(&v.id)) }) .map(|item| { let max_weight = f32::from(u16::max_value()) - 1.0; @@ -609,7 +620,7 @@ mod test { stakes.insert(id, i * 100); } let now = 1024; - let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, &stakes); + let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, None, &stakes); assert!(!options.is_empty()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. @@ -659,7 +670,7 @@ mod test { // shred version 123 should ignore nodes with versions 0 and 456 let options = node - .pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes) + .pull_options(&crds, &me.label().pubkey(), 123, 0, None, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); @@ -669,7 +680,7 @@ mod test { // spy nodes will see all let options = node - .pull_options(&crds, &spy.label().pubkey(), 0, 0, &stakes) + .pull_options(&crds, &spy.label().pubkey(), 0, 0, None, &stakes) .iter() .map(|(_, c)| c.id) .collect::>(); @@ -679,6 +690,65 @@ mod test { assert!(options.contains(&node_456.pubkey())); } + #[test] + fn test_pulls_only_from_allowed() { + let mut crds = Crds::default(); + let stakes = HashMap::new(); + let node = CrdsGossipPull::default(); + let gossip = socketaddr!("127.0.0.1:1234"); + + let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + gossip, + ..ContactInfo::default() + })); + let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + gossip, + ..ContactInfo::default() + })); + + crds.insert(me.clone(), 0).unwrap(); + crds.insert(node_123.clone(), 0).unwrap(); + + // Empty gossip_validators -- will pull from nobody + let mut gossip_validators = HashSet::new(); + let options = node.pull_options( + &crds, + &me.label().pubkey(), + 0, + 0, + Some(&gossip_validators), + &stakes, + ); + assert!(options.is_empty()); + + // Unknown pubkey in gossip_validators -- will pull from nobody + gossip_validators.insert(Pubkey::new_rand()); + let options = node.pull_options( + &crds, + &me.label().pubkey(), + 0, + 0, + Some(&gossip_validators), + &stakes, + ); + assert!(options.is_empty()); + + // node_123 pubkey in gossip_validators -- will pull from it + gossip_validators.insert(node_123.pubkey()); + let options = node.pull_options( + &crds, + &me.label().pubkey(), + 0, + 0, + Some(&gossip_validators), + &stakes, + ); + assert_eq!(options.len(), 1); + assert_eq!(options[0].1.id, node_123.pubkey()); + } + #[test] fn test_crds_filter_set_get() { let mut crds_filter_set = @@ -733,13 +803,13 @@ mod test { let id = entry.label().pubkey(); let node = CrdsGossipPull::default(); assert_eq!( - node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE), Err(CrdsGossipError::NoPeers) ); crds.insert(entry.clone(), 0).unwrap(); assert_eq!( - node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE), + node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE), Err(CrdsGossipError::NoPeers) ); @@ -748,7 +818,7 @@ mod test { 0, ))); crds.insert(new.clone(), 0).unwrap(); - let req = node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE); + let req = node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE); let (to, _, self_info) = req.unwrap(); assert_eq!(to, new.label().pubkey()); assert_eq!(self_info, entry); @@ -785,6 +855,7 @@ mod test { &node_pubkey, 0, u64::max_value(), + None, &HashMap::new(), PACKET_DATA_SIZE, ); @@ -814,6 +885,7 @@ mod test { &node_pubkey, 0, 0, + None, &HashMap::new(), PACKET_DATA_SIZE, ); @@ -874,6 +946,7 @@ mod test { &node_pubkey, 0, 0, + None, &HashMap::new(), PACKET_DATA_SIZE, ); @@ -948,6 +1021,7 @@ mod test { &node_pubkey, 0, 0, + None, &HashMap::new(), PACKET_DATA_SIZE, ); diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 28d723a8f0..21ab578a57 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -280,6 +280,7 @@ impl CrdsGossipPush { &mut self, crds: &Crds, stakes: &HashMap, + gossip_validators: Option<&HashSet>, self_id: &Pubkey, self_shred_version: u16, network_size: usize, @@ -288,7 +289,13 @@ impl CrdsGossipPush { let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let mut new_items = HashMap::new(); - let options: Vec<_> = self.push_options(crds, &self_id, self_shred_version, stakes); + let options: Vec<_> = self.push_options( + crds, + &self_id, + self_shred_version, + stakes, + gossip_validators, + ); if options.is_empty() { return; } @@ -336,6 +343,7 @@ impl CrdsGossipPush { self_id: &Pubkey, self_shred_version: u16, stakes: &HashMap, + gossip_validators: Option<&HashSet>, ) -> Vec<(f32, &'a ContactInfo)> { crds.table .values() @@ -345,6 +353,9 @@ impl CrdsGossipPush { info.id != *self_id && ContactInfo::is_valid_address(&info.gossip) && self_shred_version == info.shred_version + && gossip_validators.map_or(true, |gossip_validators| { + gossip_validators.contains(&info.id) + }) }) .map(|(info, value)| { let max_weight = f32::from(u16::max_value()) - 1.0; @@ -552,7 +563,7 @@ mod test { ))); assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert!(push.active_set.get(&value1.label().pubkey()).is_some()); let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -562,7 +573,7 @@ mod test { assert!(push.active_set.get(&value2.label().pubkey()).is_none()); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); for _ in 0..30 { - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); if push.active_set.get(&value2.label().pubkey()).is_some() { break; } @@ -575,7 +586,7 @@ mod test { )); assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); } - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); assert_eq!(push.active_set.len(), push.num_active); } #[test] @@ -593,7 +604,7 @@ mod test { crds.insert(peer.clone(), time).unwrap(); stakes.insert(id, i * 100); } - let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes); + let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None); assert!(!options.is_empty()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); // check that the highest stake holder is also the heaviest weighted. @@ -643,7 +654,7 @@ mod test { // shred version 123 should ignore nodes with versions 0 and 456 let options = node - .push_options(&crds, &me.label().pubkey(), 123, &stakes) + .push_options(&crds, &me.label().pubkey(), 123, &stakes, None) .iter() .map(|(_, c)| c.id) .collect::>(); @@ -653,12 +664,71 @@ mod test { // spy nodes should not push to people on different shred versions let options = node - .push_options(&crds, &spy.label().pubkey(), 0, &stakes) + .push_options(&crds, &spy.label().pubkey(), 0, &stakes, None) .iter() .map(|(_, c)| c.id) .collect::>(); assert!(options.is_empty()); } + + #[test] + fn test_pushes_only_to_allowed() { + let mut crds = Crds::default(); + let stakes = HashMap::new(); + let node = CrdsGossipPush::default(); + let gossip = socketaddr!("127.0.0.1:1234"); + + let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + gossip, + ..ContactInfo::default() + })); + let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo { + id: Pubkey::new_rand(), + gossip, + ..ContactInfo::default() + })); + + crds.insert(me.clone(), 0).unwrap(); + crds.insert(node_123.clone(), 0).unwrap(); + + // Unknown pubkey in gossip_validators -- will push to nobody + let mut gossip_validators = HashSet::new(); + let options = node.push_options( + &crds, + &me.label().pubkey(), + 0, + &stakes, + Some(&gossip_validators), + ); + + assert!(options.is_empty()); + + // Unknown pubkey in gossip_validators -- will push to nobody + gossip_validators.insert(Pubkey::new_rand()); + let options = node.push_options( + &crds, + &me.label().pubkey(), + 0, + &stakes, + Some(&gossip_validators), + ); + assert!(options.is_empty()); + + // node_123 pubkey in gossip_validators -- will push to it + gossip_validators.insert(node_123.pubkey()); + let options = node.push_options( + &crds, + &me.label().pubkey(), + 0, + &stakes, + Some(&gossip_validators), + ); + + assert_eq!(options.len(), 1); + assert_eq!(options[0].1.id, node_123.pubkey()); + } + #[test] fn test_new_push_messages() { let mut crds = Crds::default(); @@ -668,7 +738,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -705,7 +775,7 @@ mod test { push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0), Ok(None) ); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); // push 3's contact info to 1 and 2 and 3 let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( @@ -728,7 +798,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), @@ -755,7 +825,7 @@ mod test { 0, ))); assert_eq!(crds.insert(peer, 0), Ok(None)); - push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); + push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1); let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); ci.wallclock = 1; diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 3671c95ffb..9892b62b8f 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -6,15 +6,22 @@ use rand::{thread_rng, Rng}; use solana_client::thin_client::{create_client, ThinClient}; use solana_perf::recycler::Recycler; use solana_runtime::bank_forks::BankForks; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, Signer}; +use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; use solana_streamer::streamer; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, RwLock}; -use std::thread::{self, sleep, JoinHandle}; -use std::time::{Duration, Instant}; +use std::{ + collections::HashSet, + net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + {Arc, RwLock}, + }, + thread::{self, sleep, JoinHandle}, + time::{Duration, Instant}, +}; pub struct GossipService { thread_hdls: Vec>, @@ -25,6 +32,7 @@ impl GossipService { cluster_info: &Arc, bank_forks: Option>>, gossip_socket: UdpSocket, + gossip_validators: Option>, exit: &Arc, ) -> Self { let (request_sender, request_receiver) = channel(); @@ -50,7 +58,13 @@ impl GossipService { response_sender.clone(), exit, ); - let t_gossip = ClusterInfo::gossip(cluster_info.clone(), bank_forks, response_sender, exit); + let t_gossip = ClusterInfo::gossip( + cluster_info.clone(), + bank_forks, + response_sender, + gossip_validators, + exit, + ); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; Self { thread_hdls } } @@ -265,7 +279,7 @@ fn make_gossip_node( cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint)); } let cluster_info = Arc::new(cluster_info); - let gossip_service = GossipService::new(&cluster_info, None, gossip_socket, &exit); + let gossip_service = GossipService::new(&cluster_info, None, gossip_socket, None, &exit); (gossip_service, ip_echo, cluster_info) } @@ -284,7 +298,7 @@ mod tests { let tn = Node::new_localhost(); let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); let c = Arc::new(cluster_info); - let d = GossipService::new(&c, None, tn.sockets.gossip, &exit); + let d = GossipService::new(&c, None, tn.sockets.gossip, None, &exit); exit.store(true, Ordering::Relaxed); d.join().unwrap(); } diff --git a/core/src/validator.rs b/core/src/validator.rs index f0871c5d51..002b1b7482 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -87,6 +87,7 @@ pub struct ValidatorConfig { pub new_hard_forks: Option>, pub trusted_validators: Option>, // None = trust all pub repair_validators: Option>, // None = repair from all + pub gossip_validators: Option>, // None = gossip with all pub halt_on_trusted_validators_accounts_hash_mismatch: bool, pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection pub frozen_accounts: Vec, @@ -116,6 +117,7 @@ impl Default for ValidatorConfig { new_hard_forks: None, trusted_validators: None, repair_validators: None, + gossip_validators: None, halt_on_trusted_validators_accounts_hash_mismatch: false, accounts_hash_fault_injection_slots: 0, frozen_accounts: vec![], @@ -395,6 +397,7 @@ impl Validator { &cluster_info, Some(bank_forks.clone()), node.sockets.gossip, + config.gossip_validators.clone(), &exit, ); diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 6384b5cf48..451ee3733c 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -222,7 +222,7 @@ fn network_simulator(network: &mut Network, max_convergance: f64) { network_values.par_iter().for_each(|node| { node.lock() .unwrap() - .refresh_push_active_set(&HashMap::new()); + .refresh_push_active_set(&HashMap::new(), None); }); let mut total_bytes = bytes_tx; for second in 1..num { @@ -361,7 +361,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, network_values.par_iter().for_each(|node| { node.lock() .unwrap() - .refresh_push_active_set(&HashMap::new()); + .refresh_push_active_set(&HashMap::new(), None); }); } total = network_values @@ -408,7 +408,7 @@ fn network_run_pull( .filter_map(|from| { from.lock() .unwrap() - .new_pull_request(now, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE) + .new_pull_request(now, None, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE) .ok() }) .collect() @@ -581,7 +581,7 @@ fn test_prune_errors() { 0, ) .unwrap(); - crds_gossip.refresh_push_active_set(&HashMap::new()); + crds_gossip.refresh_push_active_set(&HashMap::new(), None); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 637342ba3a..58d16d2469 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -19,7 +19,8 @@ fn test_node(exit: &Arc) -> (Arc, GossipService, UdpSoc let keypair = Arc::new(Keypair::new()); let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey()); let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair)); - let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit); + let gossip_service = + GossipService::new(&cluster_info, None, test_node.sockets.gossip, None, exit); let _ = cluster_info.my_contact_info(); ( cluster_info, @@ -39,6 +40,7 @@ fn test_node_with_bank( &cluster_info, Some(bank_forks), test_node.sockets.gossip, + None, exit, ); let _ = cluster_info.my_contact_info(); diff --git a/validator/src/main.rs b/validator/src/main.rs index 81d21e14aa..78d0454f83 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -142,6 +142,7 @@ fn start_gossip_node( gossip_addr: &SocketAddr, gossip_socket: UdpSocket, expected_shred_version: Option, + gossip_validators: Option>, ) -> (Arc, Arc, GossipService) { let cluster_info = ClusterInfo::new( ClusterInfo::gossip_contact_info( @@ -155,7 +156,13 @@ fn start_gossip_node( let cluster_info = Arc::new(cluster_info); let gossip_exit_flag = Arc::new(AtomicBool::new(false)); - let gossip_service = GossipService::new(&cluster_info, None, gossip_socket, &gossip_exit_flag); + let gossip_service = GossipService::new( + &cluster_info, + None, + gossip_socket, + gossip_validators, + &gossip_exit_flag, + ); (cluster_info, gossip_exit_flag, gossip_service) } @@ -862,7 +869,18 @@ pub fn main() { .multiple(true) .takes_value(true) .help("A list of validators to request repairs from. If specified, repair will not \ - request from validators outside this set [default: request repairs from all validators]") + request from validators outside this set [default: all validators]") + ) + .arg( + Arg::with_name("gossip_validators") + .long("gossip-validator") + .validator(is_pubkey) + .value_name("PUBKEY") + .multiple(true) + .takes_value(true) + .help("A list of validators to gossip with. If specified, gossip \ + will not pull/pull from from validators outside this set. \ + [default: all validators]") ) .arg( Arg::with_name("no_rocksdb_compaction") @@ -979,6 +997,12 @@ pub fn main() { "repair_validators", "--repair-validator", ); + let gossip_validators = validators_set( + &identity_keypair.pubkey(), + &matches, + "gossip_validators", + "--gossip-validator", + ); let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap()) .expect("invalid bind_address"); @@ -1029,6 +1053,7 @@ pub fn main() { wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), trusted_validators, repair_validators, + gossip_validators, frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(), no_rocksdb_compaction, wal_recovery_mode, @@ -1329,6 +1354,7 @@ pub fn main() { &node.info.gossip, node.sockets.gossip.try_clone().unwrap(), validator_config.expected_shred_version, + validator_config.gossip_validators.clone(), )); }