Add shred version filters to Crds Accessors (#8027)

* Add shred version filters to Crds Accessors

* Adopt entrypoint shred_version if one isn't provided
This commit is contained in:
Sagar Dhawan 2020-01-30 00:15:37 -08:00 committed by GitHub
parent c2baf7b07d
commit 64c42e28dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 5 deletions

View File

@ -522,6 +522,8 @@ impl Archiver {
let mut contact_info = node_info.clone();
contact_info.tvu = "0.0.0.0:0".parse().unwrap();
contact_info.wallclock = timestamp();
// copy over the adopted shred_version from the entrypoint
contact_info.shred_version = cluster_info.read().unwrap().my_data().shred_version;
{
let mut cluster_info_w = cluster_info.write().unwrap();
cluster_info_w.insert_self(contact_info);

View File

@ -413,6 +413,8 @@ impl ClusterInfo {
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me.id)
/* shred_version not considered for rpc peers (ie, caller must select version
if desired) */
.filter(|x| ContactInfo::is_valid_address(&x.rpc))
.cloned()
.collect()
@ -440,13 +442,15 @@ impl ClusterInfo {
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me)
/* shred_version not considered for gossip peers (ie, spy nodes do not set
shred_version) */
.filter(|x| ContactInfo::is_valid_address(&x.gossip))
.cloned()
.collect()
}
/// all validators that have a valid tvu port.
pub fn tvu_peers(&self) -> Vec<ContactInfo> {
/// all validators that have a valid tvu port regardless of `shred_version`.
pub fn all_tvu_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data();
self.gossip
.crds
@ -460,7 +464,37 @@ impl ClusterInfo {
.collect()
}
/// all peers that have a valid storage addr
/// all validators that have a valid tvu port and are on the same `shred_version`.
pub fn tvu_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data();
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| !ClusterInfo::is_archiver(x))
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.cloned()
.collect()
}
/// all peers that have a valid storage addr regardless of `shred_version`.
pub fn all_storage_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data();
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.storage_addr))
.filter(|x| x.id != me.id)
.cloned()
.collect()
}
/// all peers that have a valid storage addr and are on the same `shred_version`.
pub fn storage_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data();
self.gossip
@ -470,6 +504,7 @@ impl ClusterInfo {
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.storage_addr))
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.cloned()
.collect()
}
@ -483,6 +518,7 @@ impl ClusterInfo {
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards))
.cloned()
@ -495,6 +531,7 @@ impl ClusterInfo {
ClusterInfo::tvu_peers(self)
.into_iter()
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.filter(|x| ContactInfo::is_valid_address(&x.gossip))
.filter(|x| {
self.get_epoch_state_for_node(&x.id, None)
@ -1057,6 +1094,7 @@ impl ClusterInfo {
.spawn(move || {
let mut last_push = timestamp();
let mut last_contact_info_trace = timestamp();
let mut adopt_shred_version = obj.read().unwrap().my_data().shred_version == 0;
let recycler = PacketsRecycler::default();
loop {
let start = timestamp();
@ -1097,6 +1135,27 @@ impl ClusterInfo {
("tabel_size", table_size as i64, i64),
("purge_stake_timeout", timeout as i64, i64)
);
// Adopt the entrypoint's `shred_version` if ours is unset
if adopt_shred_version {
// If gossip was given an entrypoint, lookup its id
let entrypoint_id = obj.read().unwrap().entrypoint.as_ref().map(|e| e.id);
if let Some(entrypoint_id) = entrypoint_id {
info!("Shred version unknown, looking for the entrypoint:{:?} Shred version", entrypoint_id);
// If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint = obj.read().unwrap().lookup(&entrypoint_id).cloned();
if let Some(entrypoint) = entrypoint {
let mut self_info = obj.read().unwrap().my_data();
if entrypoint.shred_version == 0 {
warn!("entrypoint is running an invalid shred_version: 0");
} else {
info!("Setting Shred version to {:?} from entrypoint {:?}", entrypoint.shred_version, entrypoint.id);
self_info.shred_version = entrypoint.shred_version;
obj.write().unwrap().insert_self(self_info);
adopt_shred_version = false;
}
}
}
}
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {

View File

@ -197,10 +197,10 @@ fn spy(
tvu_peers = spy_ref
.read()
.unwrap()
.tvu_peers()
.all_tvu_peers()
.into_iter()
.collect::<Vec<_>>();
archivers = spy_ref.read().unwrap().storage_peers();
archivers = spy_ref.read().unwrap().all_storage_peers();
if let Some(num) = num_nodes {
if tvu_peers.len() + archivers.len() >= num {
if let Some(gossip_addr) = find_node_by_gossip_addr {