Add --gossip-validator argument

This commit is contained in:
Michael Vines 2020-09-11 12:00:16 -07:00
parent 63a67f415e
commit daae638781
9 changed files with 275 additions and 59 deletions

View File

@ -421,7 +421,7 @@ impl ClusterInfo {
gossip.set_shred_version(me.my_shred_version()); gossip.set_shred_version(me.my_shred_version());
} }
me.insert_self(); me.insert_self();
me.push_self(&HashMap::new()); me.push_self(&HashMap::new(), None);
me me
} }
@ -453,13 +453,17 @@ impl ClusterInfo {
self.insert_self() self.insert_self()
} }
fn push_self(&self, stakes: &HashMap<Pubkey, u64>) { fn push_self(
&self,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
) {
let now = timestamp(); let now = timestamp();
self.my_contact_info.write().unwrap().wallclock = now; self.my_contact_info.write().unwrap().wallclock = now;
let entry = let entry =
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair);
let mut w_gossip = self.gossip.write().unwrap(); let mut w_gossip = self.gossip.write().unwrap();
w_gossip.refresh_push_active_set(stakes); w_gossip.refresh_push_active_set(stakes, gossip_validators);
w_gossip.process_push_message(&self.id(), vec![entry], now); w_gossip.process_push_message(&self.id(), vec![entry], now);
} }
@ -1363,13 +1367,17 @@ impl ClusterInfo {
messages messages
} }
fn new_pull_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> { fn new_pull_requests(
&self,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<(SocketAddr, Protocol)> {
let now = timestamp(); let now = timestamp();
let mut pulls: Vec<_> = { let mut pulls: Vec<_> = {
let r_gossip = let r_gossip =
self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests);
r_gossip r_gossip
.new_pull_request(now, stakes, MAX_BLOOM_SIZE) .new_pull_request(now, gossip_validators, stakes, MAX_BLOOM_SIZE)
.ok() .ok()
.into_iter() .into_iter()
.filter_map(|(peer, filters, me)| { .filter_map(|(peer, filters, me)| {
@ -1430,27 +1438,32 @@ impl ClusterInfo {
// Generate new push and pull requests // Generate new push and pull requests
fn generate_new_gossip_requests( fn generate_new_gossip_requests(
&self, &self,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
generate_pull_requests: bool, generate_pull_requests: bool,
) -> Vec<(SocketAddr, Protocol)> { ) -> Vec<(SocketAddr, Protocol)> {
let pulls: Vec<_> = if generate_pull_requests { let mut pulls: Vec<_> = if generate_pull_requests {
self.new_pull_requests(stakes) self.new_pull_requests(gossip_validators, stakes)
} else { } else {
vec![] vec![]
}; };
let pushes: Vec<_> = self.new_push_requests(); let mut pushes: Vec<_> = self.new_push_requests();
vec![pulls, pushes].into_iter().flatten().collect()
pulls.append(&mut pushes);
pulls
} }
/// At random pick a node and try to get updated changes from them /// At random pick a node and try to get updated changes from them
fn run_gossip( fn run_gossip(
&self, &self,
gossip_validators: Option<&HashSet<Pubkey>>,
recycler: &PacketsRecycler, recycler: &PacketsRecycler,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
sender: &PacketSender, sender: &PacketSender,
generate_pull_requests: bool, generate_pull_requests: bool,
) -> Result<()> { ) -> Result<()> {
let reqs = self.generate_new_gossip_requests(&stakes, generate_pull_requests); let reqs =
self.generate_new_gossip_requests(gossip_validators, &stakes, generate_pull_requests);
if !reqs.is_empty() { if !reqs.is_empty() {
let packets = to_packets_with_destination(recycler.clone(), &reqs); let packets = to_packets_with_destination(recycler.clone(), &reqs);
sender.send(packets)?; sender.send(packets)?;
@ -1519,6 +1532,7 @@ impl ClusterInfo {
self: Arc<Self>, self: Arc<Self>,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
sender: PacketSender, sender: PacketSender,
gossip_validators: Option<HashSet<Pubkey>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone(); let exit = exit.clone();
@ -1549,7 +1563,13 @@ impl ClusterInfo {
None => HashMap::new(), None => HashMap::new(),
}; };
let _ = self.run_gossip(&recycler, &stakes, &sender, generate_pull_requests); let _ = self.run_gossip(
gossip_validators.as_ref(),
&recycler,
&stakes,
&sender,
generate_pull_requests,
);
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return; return;
} }
@ -1561,7 +1581,7 @@ impl ClusterInfo {
//TODO: possibly tune this parameter //TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep //we saw a deadlock passing an self.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
self.push_self(&stakes); self.push_self(&stakes, gossip_validators.as_ref());
last_push = timestamp(); last_push = timestamp();
} }
let elapsed = timestamp() - start; let elapsed = timestamp() - start;
@ -2703,8 +2723,8 @@ mod tests {
.gossip .gossip
.write() .write()
.unwrap() .unwrap()
.refresh_push_active_set(&HashMap::new()); .refresh_push_active_set(&HashMap::new(), None);
let reqs = cluster_info.generate_new_gossip_requests(&HashMap::new(), true); let reqs = cluster_info.generate_new_gossip_requests(None, &HashMap::new(), true);
//assert none of the addrs are invalid. //assert none of the addrs are invalid.
reqs.iter().all(|(addr, _)| { reqs.iter().all(|(addr, _)| {
let res = ContactInfo::is_valid_address(addr); let res = ContactInfo::is_valid_address(addr);
@ -2842,7 +2862,7 @@ mod tests {
.gossip .gossip
.write() .write()
.unwrap() .unwrap()
.refresh_push_active_set(&HashMap::new()); .refresh_push_active_set(&HashMap::new(), None);
//check that all types of gossip messages are signed correctly //check that all types of gossip messages are signed correctly
let (_, push_messages) = cluster_info let (_, push_messages) = cluster_info
.gossip .gossip
@ -2859,7 +2879,7 @@ mod tests {
.gossip .gossip
.write() .write()
.unwrap() .unwrap()
.new_pull_request(timestamp(), &HashMap::new(), MAX_BLOOM_SIZE) .new_pull_request(timestamp(), None, &HashMap::new(), MAX_BLOOM_SIZE)
.ok() .ok()
.unwrap(); .unwrap();
assert!(val.verify()); assert!(val.verify());
@ -3078,7 +3098,7 @@ mod tests {
let entrypoint_pubkey = Pubkey::new_rand(); let entrypoint_pubkey = Pubkey::new_rand();
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint.clone()); cluster_info.set_entrypoint(entrypoint.clone());
let pulls = cluster_info.new_pull_requests(&HashMap::new()); let pulls = cluster_info.new_pull_requests(None, &HashMap::new());
assert_eq!(1, pulls.len() as u64); assert_eq!(1, pulls.len() as u64);
match pulls.get(0) { match pulls.get(0) {
Some((addr, msg)) => { Some((addr, msg)) => {
@ -3105,7 +3125,7 @@ mod tests {
vec![entrypoint_crdsvalue], vec![entrypoint_crdsvalue],
&timeouts, &timeouts,
); );
let pulls = cluster_info.new_pull_requests(&HashMap::new()); let pulls = cluster_info.new_pull_requests(None, &HashMap::new());
assert_eq!(1, pulls.len() as u64); assert_eq!(1, pulls.len() as u64);
assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint)); assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint));
} }
@ -3248,7 +3268,7 @@ mod tests {
// Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a
// fresh timestamp). There should only be one pull request to `other_node` // fresh timestamp). There should only be one pull request to `other_node`
let pulls = cluster_info.new_pull_requests(&stakes); let pulls = cluster_info.new_pull_requests(None, &stakes);
assert_eq!(1, pulls.len() as u64); assert_eq!(1, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
@ -3261,14 +3281,14 @@ mod tests {
.as_mut() .as_mut()
.unwrap() .unwrap()
.wallclock = 0; .wallclock = 0;
let pulls = cluster_info.new_pull_requests(&stakes); let pulls = cluster_info.new_pull_requests(None, &stakes);
assert_eq!(2, pulls.len() as u64); assert_eq!(2, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip); assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip);
// Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should
// only be one pull request to `other_node` // only be one pull request to `other_node`
let pulls = cluster_info.new_pull_requests(&stakes); let pulls = cluster_info.new_pull_requests(None, &stakes);
assert_eq!(1, pulls.len() as u64); assert_eq!(1, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
} }

View File

@ -115,10 +115,15 @@ impl CrdsGossip {
/// refresh the push active set /// refresh the push active set
/// * ratio - number of actives to rotate /// * ratio - number of actives to rotate
pub fn refresh_push_active_set(&mut self, stakes: &HashMap<Pubkey, u64>) { pub fn refresh_push_active_set(
&mut self,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
) {
self.push.refresh_push_active_set( self.push.refresh_push_active_set(
&self.crds, &self.crds,
stakes, stakes,
gossip_validators,
&self.id, &self.id,
self.shred_version, self.shred_version,
self.pull.pull_request_time.len(), self.pull.pull_request_time.len(),
@ -130,6 +135,7 @@ impl CrdsGossip {
pub fn new_pull_request( pub fn new_pull_request(
&self, &self,
now: u64, now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
bloom_size: usize, bloom_size: usize,
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> { ) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
@ -138,6 +144,7 @@ impl CrdsGossip {
&self.id, &self.id,
self.shred_version, self.shred_version,
now, now,
gossip_validators,
stakes, stakes,
bloom_size, bloom_size,
) )
@ -271,7 +278,7 @@ mod test {
0, 0,
) )
.unwrap(); .unwrap();
crds_gossip.refresh_push_active_set(&HashMap::new()); crds_gossip.refresh_push_active_set(&HashMap::new(), None);
let now = timestamp(); let now = timestamp();
//incorrect dest //incorrect dest
let mut res = crds_gossip.process_prune_msg( let mut res = crds_gossip.process_prune_msg(

View File

@ -185,10 +185,18 @@ impl CrdsGossipPull {
self_id: &Pubkey, self_id: &Pubkey,
self_shred_version: u16, self_shred_version: u16,
now: u64, now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
bloom_size: usize, bloom_size: usize,
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> { ) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
let options = self.pull_options(crds, &self_id, self_shred_version, now, stakes); let options = self.pull_options(
crds,
&self_id,
self_shred_version,
now,
gossip_validators,
stakes,
);
if options.is_empty() { if options.is_empty() {
return Err(CrdsGossipError::NoPeers); return Err(CrdsGossipError::NoPeers);
} }
@ -207,6 +215,7 @@ impl CrdsGossipPull {
self_id: &Pubkey, self_id: &Pubkey,
self_shred_version: u16, self_shred_version: u16,
now: u64, now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) -> Vec<(f32, &'a ContactInfo)> { ) -> Vec<(f32, &'a ContactInfo)> {
crds.table crds.table
@ -216,6 +225,8 @@ impl CrdsGossipPull {
v.id != *self_id v.id != *self_id
&& ContactInfo::is_valid_address(&v.gossip) && ContactInfo::is_valid_address(&v.gossip)
&& (self_shred_version == 0 || self_shred_version == v.shred_version) && (self_shred_version == 0 || self_shred_version == v.shred_version)
&& gossip_validators
.map_or(true, |gossip_validators| gossip_validators.contains(&v.id))
}) })
.map(|item| { .map(|item| {
let max_weight = f32::from(u16::max_value()) - 1.0; let max_weight = f32::from(u16::max_value()) - 1.0;
@ -609,7 +620,7 @@ mod test {
stakes.insert(id, i * 100); stakes.insert(id, i * 100);
} }
let now = 1024; let now = 1024;
let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, &stakes); let mut options = node.pull_options(&crds, &me.label().pubkey(), 0, now, None, &stakes);
assert!(!options.is_empty()); assert!(!options.is_empty());
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
// check that the highest stake holder is also the heaviest weighted. // check that the highest stake holder is also the heaviest weighted.
@ -659,7 +670,7 @@ mod test {
// shred version 123 should ignore nodes with versions 0 and 456 // shred version 123 should ignore nodes with versions 0 and 456
let options = node let options = node
.pull_options(&crds, &me.label().pubkey(), 123, 0, &stakes) .pull_options(&crds, &me.label().pubkey(), 123, 0, None, &stakes)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, c)| c.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -669,7 +680,7 @@ mod test {
// spy nodes will see all // spy nodes will see all
let options = node let options = node
.pull_options(&crds, &spy.label().pubkey(), 0, 0, &stakes) .pull_options(&crds, &spy.label().pubkey(), 0, 0, None, &stakes)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, c)| c.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -679,6 +690,65 @@ mod test {
assert!(options.contains(&node_456.pubkey())); assert!(options.contains(&node_456.pubkey()));
} }
#[test]
fn test_pulls_only_from_allowed() {
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPull::default();
let gossip = socketaddr!("127.0.0.1:1234");
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
id: Pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
id: Pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
// Empty gossip_validators -- will pull from nobody
let mut gossip_validators = HashSet::new();
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
);
assert!(options.is_empty());
// Unknown pubkey in gossip_validators -- will pull from nobody
gossip_validators.insert(Pubkey::new_rand());
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
);
assert!(options.is_empty());
// node_123 pubkey in gossip_validators -- will pull from it
gossip_validators.insert(node_123.pubkey());
let options = node.pull_options(
&crds,
&me.label().pubkey(),
0,
0,
Some(&gossip_validators),
&stakes,
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].1.id, node_123.pubkey());
}
#[test] #[test]
fn test_crds_filter_set_get() { fn test_crds_filter_set_get() {
let mut crds_filter_set = let mut crds_filter_set =
@ -733,13 +803,13 @@ mod test {
let id = entry.label().pubkey(); let id = entry.label().pubkey();
let node = CrdsGossipPull::default(); let node = CrdsGossipPull::default();
assert_eq!( assert_eq!(
node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE), node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE),
Err(CrdsGossipError::NoPeers) Err(CrdsGossipError::NoPeers)
); );
crds.insert(entry.clone(), 0).unwrap(); crds.insert(entry.clone(), 0).unwrap();
assert_eq!( assert_eq!(
node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE), node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE),
Err(CrdsGossipError::NoPeers) Err(CrdsGossipError::NoPeers)
); );
@ -748,7 +818,7 @@ mod test {
0, 0,
))); )));
crds.insert(new.clone(), 0).unwrap(); crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(&crds, &id, 0, 0, &HashMap::new(), PACKET_DATA_SIZE); let req = node.new_pull_request(&crds, &id, 0, 0, None, &HashMap::new(), PACKET_DATA_SIZE);
let (to, _, self_info) = req.unwrap(); let (to, _, self_info) = req.unwrap();
assert_eq!(to, new.label().pubkey()); assert_eq!(to, new.label().pubkey());
assert_eq!(self_info, entry); assert_eq!(self_info, entry);
@ -785,6 +855,7 @@ mod test {
&node_pubkey, &node_pubkey,
0, 0,
u64::max_value(), u64::max_value(),
None,
&HashMap::new(), &HashMap::new(),
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
); );
@ -814,6 +885,7 @@ mod test {
&node_pubkey, &node_pubkey,
0, 0,
0, 0,
None,
&HashMap::new(), &HashMap::new(),
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
); );
@ -874,6 +946,7 @@ mod test {
&node_pubkey, &node_pubkey,
0, 0,
0, 0,
None,
&HashMap::new(), &HashMap::new(),
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
); );
@ -948,6 +1021,7 @@ mod test {
&node_pubkey, &node_pubkey,
0, 0,
0, 0,
None,
&HashMap::new(), &HashMap::new(),
PACKET_DATA_SIZE, PACKET_DATA_SIZE,
); );

View File

@ -280,6 +280,7 @@ impl CrdsGossipPush {
&mut self, &mut self,
crds: &Crds, crds: &Crds,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
self_id: &Pubkey, self_id: &Pubkey,
self_shred_version: u16, self_shred_version: u16,
network_size: usize, network_size: usize,
@ -288,7 +289,13 @@ impl CrdsGossipPush {
let need = Self::compute_need(self.num_active, self.active_set.len(), ratio); let need = Self::compute_need(self.num_active, self.active_set.len(), ratio);
let mut new_items = HashMap::new(); let mut new_items = HashMap::new();
let options: Vec<_> = self.push_options(crds, &self_id, self_shred_version, stakes); let options: Vec<_> = self.push_options(
crds,
&self_id,
self_shred_version,
stakes,
gossip_validators,
);
if options.is_empty() { if options.is_empty() {
return; return;
} }
@ -336,6 +343,7 @@ impl CrdsGossipPush {
self_id: &Pubkey, self_id: &Pubkey,
self_shred_version: u16, self_shred_version: u16,
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
) -> Vec<(f32, &'a ContactInfo)> { ) -> Vec<(f32, &'a ContactInfo)> {
crds.table crds.table
.values() .values()
@ -345,6 +353,9 @@ impl CrdsGossipPush {
info.id != *self_id info.id != *self_id
&& ContactInfo::is_valid_address(&info.gossip) && ContactInfo::is_valid_address(&info.gossip)
&& self_shred_version == info.shred_version && self_shred_version == info.shred_version
&& gossip_validators.map_or(true, |gossip_validators| {
gossip_validators.contains(&info.id)
})
}) })
.map(|(info, value)| { .map(|(info, value)| {
let max_weight = f32::from(u16::max_value()) - 1.0; let max_weight = f32::from(u16::max_value()) - 1.0;
@ -552,7 +563,7 @@ mod test {
))); )));
assert_eq!(crds.insert(value1.clone(), 0), Ok(None)); assert_eq!(crds.insert(value1.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
assert!(push.active_set.get(&value1.label().pubkey()).is_some()); assert!(push.active_set.get(&value1.label().pubkey()).is_some());
let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let value2 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -562,7 +573,7 @@ mod test {
assert!(push.active_set.get(&value2.label().pubkey()).is_none()); assert!(push.active_set.get(&value2.label().pubkey()).is_none());
assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
for _ in 0..30 { for _ in 0..30 {
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
if push.active_set.get(&value2.label().pubkey()).is_some() { if push.active_set.get(&value2.label().pubkey()).is_some() {
break; break;
} }
@ -575,7 +586,7 @@ mod test {
)); ));
assert_eq!(crds.insert(value2.clone(), 0), Ok(None)); assert_eq!(crds.insert(value2.clone(), 0), Ok(None));
} }
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
assert_eq!(push.active_set.len(), push.num_active); assert_eq!(push.active_set.len(), push.num_active);
} }
#[test] #[test]
@ -593,7 +604,7 @@ mod test {
crds.insert(peer.clone(), time).unwrap(); crds.insert(peer.clone(), time).unwrap();
stakes.insert(id, i * 100); stakes.insert(id, i * 100);
} }
let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes); let mut options = push.push_options(&crds, &Pubkey::default(), 0, &stakes, None);
assert!(!options.is_empty()); assert!(!options.is_empty());
options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap()); options.sort_by(|(weight_l, _), (weight_r, _)| weight_r.partial_cmp(weight_l).unwrap());
// check that the highest stake holder is also the heaviest weighted. // check that the highest stake holder is also the heaviest weighted.
@ -643,7 +654,7 @@ mod test {
// shred version 123 should ignore nodes with versions 0 and 456 // shred version 123 should ignore nodes with versions 0 and 456
let options = node let options = node
.push_options(&crds, &me.label().pubkey(), 123, &stakes) .push_options(&crds, &me.label().pubkey(), 123, &stakes, None)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, c)| c.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -653,12 +664,71 @@ mod test {
// spy nodes should not push to people on different shred versions // spy nodes should not push to people on different shred versions
let options = node let options = node
.push_options(&crds, &spy.label().pubkey(), 0, &stakes) .push_options(&crds, &spy.label().pubkey(), 0, &stakes, None)
.iter() .iter()
.map(|(_, c)| c.id) .map(|(_, c)| c.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
assert!(options.is_empty()); assert!(options.is_empty());
} }
#[test]
fn test_pushes_only_to_allowed() {
let mut crds = Crds::default();
let stakes = HashMap::new();
let node = CrdsGossipPush::default();
let gossip = socketaddr!("127.0.0.1:1234");
let me = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
id: Pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
let node_123 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo {
id: Pubkey::new_rand(),
gossip,
..ContactInfo::default()
}));
crds.insert(me.clone(), 0).unwrap();
crds.insert(node_123.clone(), 0).unwrap();
// Unknown pubkey in gossip_validators -- will push to nobody
let mut gossip_validators = HashSet::new();
let options = node.push_options(
&crds,
&me.label().pubkey(),
0,
&stakes,
Some(&gossip_validators),
);
assert!(options.is_empty());
// Unknown pubkey in gossip_validators -- will push to nobody
gossip_validators.insert(Pubkey::new_rand());
let options = node.push_options(
&crds,
&me.label().pubkey(),
0,
&stakes,
Some(&gossip_validators),
);
assert!(options.is_empty());
// node_123 pubkey in gossip_validators -- will push to it
gossip_validators.insert(node_123.pubkey());
let options = node.push_options(
&crds,
&me.label().pubkey(),
0,
&stakes,
Some(&gossip_validators),
);
assert_eq!(options.len(), 1);
assert_eq!(options[0].1.id, node_123.pubkey());
}
#[test] #[test]
fn test_new_push_messages() { fn test_new_push_messages() {
let mut crds = Crds::default(); let mut crds = Crds::default();
@ -668,7 +738,7 @@ mod test {
0, 0,
))); )));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(), &Pubkey::new_rand(),
@ -705,7 +775,7 @@ mod test {
push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0), push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0),
Ok(None) Ok(None)
); );
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
// push 3's contact info to 1 and 2 and 3 // push 3's contact info to 1 and 2 and 3
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
@ -728,7 +798,7 @@ mod test {
0, 0,
))); )));
assert_eq!(crds.insert(peer.clone(), 0), Ok(None)); assert_eq!(crds.insert(peer.clone(), 0), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( let new_msg = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(), &Pubkey::new_rand(),
@ -755,7 +825,7 @@ mod test {
0, 0,
))); )));
assert_eq!(crds.insert(peer, 0), Ok(None)); assert_eq!(crds.insert(peer, 0), Ok(None));
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 0, 1, 1); push.refresh_push_active_set(&crds, &HashMap::new(), None, &Pubkey::default(), 0, 1, 1);
let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
ci.wallclock = 1; ci.wallclock = 1;

View File

@ -6,15 +6,22 @@ use rand::{thread_rng, Rng};
use solana_client::thin_client::{create_client, ThinClient}; use solana_client::thin_client::{create_client, ThinClient};
use solana_perf::recycler::Recycler; use solana_perf::recycler::Recycler;
use solana_runtime::bank_forks::BankForks; use solana_runtime::bank_forks::BankForks;
use solana_sdk::pubkey::Pubkey; use solana_sdk::{
use solana_sdk::signature::{Keypair, Signer}; pubkey::Pubkey,
signature::{Keypair, Signer},
};
use solana_streamer::streamer; use solana_streamer::streamer;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}; use std::{
use std::sync::atomic::{AtomicBool, Ordering}; collections::HashSet,
use std::sync::mpsc::channel; net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
use std::sync::{Arc, RwLock}; sync::{
use std::thread::{self, sleep, JoinHandle}; atomic::{AtomicBool, Ordering},
use std::time::{Duration, Instant}; mpsc::channel,
{Arc, RwLock},
},
thread::{self, sleep, JoinHandle},
time::{Duration, Instant},
};
pub struct GossipService { pub struct GossipService {
thread_hdls: Vec<JoinHandle<()>>, thread_hdls: Vec<JoinHandle<()>>,
@ -25,6 +32,7 @@ impl GossipService {
cluster_info: &Arc<ClusterInfo>, cluster_info: &Arc<ClusterInfo>,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
gossip_validators: Option<HashSet<Pubkey>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
let (request_sender, request_receiver) = channel(); let (request_sender, request_receiver) = channel();
@ -50,7 +58,13 @@ impl GossipService {
response_sender.clone(), response_sender.clone(),
exit, exit,
); );
let t_gossip = ClusterInfo::gossip(cluster_info.clone(), bank_forks, response_sender, exit); let t_gossip = ClusterInfo::gossip(
cluster_info.clone(),
bank_forks,
response_sender,
gossip_validators,
exit,
);
let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip];
Self { thread_hdls } Self { thread_hdls }
} }
@ -265,7 +279,7 @@ fn make_gossip_node(
cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint)); cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint));
} }
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let gossip_service = GossipService::new(&cluster_info, None, gossip_socket, &exit); let gossip_service = GossipService::new(&cluster_info, None, gossip_socket, None, &exit);
(gossip_service, ip_echo, cluster_info) (gossip_service, ip_echo, cluster_info)
} }
@ -284,7 +298,7 @@ mod tests {
let tn = Node::new_localhost(); let tn = Node::new_localhost();
let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone());
let c = Arc::new(cluster_info); let c = Arc::new(cluster_info);
let d = GossipService::new(&c, None, tn.sockets.gossip, &exit); let d = GossipService::new(&c, None, tn.sockets.gossip, None, &exit);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
d.join().unwrap(); d.join().unwrap();
} }

View File

@ -87,6 +87,7 @@ pub struct ValidatorConfig {
pub new_hard_forks: Option<Vec<Slot>>, pub new_hard_forks: Option<Vec<Slot>>,
pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all
pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
pub halt_on_trusted_validators_accounts_hash_mismatch: bool, pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
pub frozen_accounts: Vec<Pubkey>, pub frozen_accounts: Vec<Pubkey>,
@ -116,6 +117,7 @@ impl Default for ValidatorConfig {
new_hard_forks: None, new_hard_forks: None,
trusted_validators: None, trusted_validators: None,
repair_validators: None, repair_validators: None,
gossip_validators: None,
halt_on_trusted_validators_accounts_hash_mismatch: false, halt_on_trusted_validators_accounts_hash_mismatch: false,
accounts_hash_fault_injection_slots: 0, accounts_hash_fault_injection_slots: 0,
frozen_accounts: vec![], frozen_accounts: vec![],
@ -395,6 +397,7 @@ impl Validator {
&cluster_info, &cluster_info,
Some(bank_forks.clone()), Some(bank_forks.clone()),
node.sockets.gossip, node.sockets.gossip,
config.gossip_validators.clone(),
&exit, &exit,
); );

View File

@ -222,7 +222,7 @@ fn network_simulator(network: &mut Network, max_convergance: f64) {
network_values.par_iter().for_each(|node| { network_values.par_iter().for_each(|node| {
node.lock() node.lock()
.unwrap() .unwrap()
.refresh_push_active_set(&HashMap::new()); .refresh_push_active_set(&HashMap::new(), None);
}); });
let mut total_bytes = bytes_tx; let mut total_bytes = bytes_tx;
for second in 1..num { for second in 1..num {
@ -361,7 +361,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
network_values.par_iter().for_each(|node| { network_values.par_iter().for_each(|node| {
node.lock() node.lock()
.unwrap() .unwrap()
.refresh_push_active_set(&HashMap::new()); .refresh_push_active_set(&HashMap::new(), None);
}); });
} }
total = network_values total = network_values
@ -408,7 +408,7 @@ fn network_run_pull(
.filter_map(|from| { .filter_map(|from| {
from.lock() from.lock()
.unwrap() .unwrap()
.new_pull_request(now, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE) .new_pull_request(now, None, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE)
.ok() .ok()
}) })
.collect() .collect()
@ -581,7 +581,7 @@ fn test_prune_errors() {
0, 0,
) )
.unwrap(); .unwrap();
crds_gossip.refresh_push_active_set(&HashMap::new()); crds_gossip.refresh_push_active_set(&HashMap::new(), None);
let now = timestamp(); let now = timestamp();
//incorrect dest //incorrect dest
let mut res = crds_gossip.process_prune_msg( let mut res = crds_gossip.process_prune_msg(

View File

@ -19,7 +19,8 @@ fn test_node(exit: &Arc<AtomicBool>) -> (Arc<ClusterInfo>, GossipService, UdpSoc
let keypair = Arc::new(Keypair::new()); let keypair = Arc::new(Keypair::new());
let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey()); let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey());
let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair)); let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair));
let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit); let gossip_service =
GossipService::new(&cluster_info, None, test_node.sockets.gossip, None, exit);
let _ = cluster_info.my_contact_info(); let _ = cluster_info.my_contact_info();
( (
cluster_info, cluster_info,
@ -39,6 +40,7 @@ fn test_node_with_bank(
&cluster_info, &cluster_info,
Some(bank_forks), Some(bank_forks),
test_node.sockets.gossip, test_node.sockets.gossip,
None,
exit, exit,
); );
let _ = cluster_info.my_contact_info(); let _ = cluster_info.my_contact_info();

View File

@ -142,6 +142,7 @@ fn start_gossip_node(
gossip_addr: &SocketAddr, gossip_addr: &SocketAddr,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
expected_shred_version: Option<u16>, expected_shred_version: Option<u16>,
gossip_validators: Option<HashSet<Pubkey>>,
) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) { ) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
let cluster_info = ClusterInfo::new( let cluster_info = ClusterInfo::new(
ClusterInfo::gossip_contact_info( ClusterInfo::gossip_contact_info(
@ -155,7 +156,13 @@ fn start_gossip_node(
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let gossip_exit_flag = Arc::new(AtomicBool::new(false)); let gossip_exit_flag = Arc::new(AtomicBool::new(false));
let gossip_service = GossipService::new(&cluster_info, None, gossip_socket, &gossip_exit_flag); let gossip_service = GossipService::new(
&cluster_info,
None,
gossip_socket,
gossip_validators,
&gossip_exit_flag,
);
(cluster_info, gossip_exit_flag, gossip_service) (cluster_info, gossip_exit_flag, gossip_service)
} }
@ -862,7 +869,18 @@ pub fn main() {
.multiple(true) .multiple(true)
.takes_value(true) .takes_value(true)
.help("A list of validators to request repairs from. If specified, repair will not \ .help("A list of validators to request repairs from. If specified, repair will not \
request from validators outside this set [default: request repairs from all validators]") request from validators outside this set [default: all validators]")
)
.arg(
Arg::with_name("gossip_validators")
.long("gossip-validator")
.validator(is_pubkey)
.value_name("PUBKEY")
.multiple(true)
.takes_value(true)
.help("A list of validators to gossip with. If specified, gossip \
will not pull/pull from from validators outside this set. \
[default: all validators]")
) )
.arg( .arg(
Arg::with_name("no_rocksdb_compaction") Arg::with_name("no_rocksdb_compaction")
@ -979,6 +997,12 @@ pub fn main() {
"repair_validators", "repair_validators",
"--repair-validator", "--repair-validator",
); );
let gossip_validators = validators_set(
&identity_keypair.pubkey(),
&matches,
"gossip_validators",
"--gossip-validator",
);
let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap()) let bind_address = solana_net_utils::parse_host(matches.value_of("bind_address").unwrap())
.expect("invalid bind_address"); .expect("invalid bind_address");
@ -1029,6 +1053,7 @@ pub fn main() {
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
trusted_validators, trusted_validators,
repair_validators, repair_validators,
gossip_validators,
frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(), frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(),
no_rocksdb_compaction, no_rocksdb_compaction,
wal_recovery_mode, wal_recovery_mode,
@ -1329,6 +1354,7 @@ pub fn main() {
&node.info.gossip, &node.info.gossip,
node.sockets.gossip.try_clone().unwrap(), node.sockets.gossip.try_clone().unwrap(),
validator_config.expected_shred_version, validator_config.expected_shred_version,
validator_config.gossip_validators.clone(),
)); ));
} }