Update entrypoint contact info even when shred version adoption is not requested

This commit is contained in:
Michael Vines 2020-12-21 19:34:49 -08:00
parent a14cfd660a
commit 3373082ffa
1 changed files with 86 additions and 18 deletions

View File

@ -1731,19 +1731,21 @@ impl ClusterInfo {
Ok(())
}
fn handle_adopt_shred_version(&self, adopt_shred_version: &mut bool) {
// Adopt the entrypoint's `shred_version` if ours is unset
if *adopt_shred_version {
// If gossip was given an entrypoint, look up the ContactInfo by the given
// entrypoint gossip adddress
let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);
fn process_entrypoint(&self, entrypoint_processed: &mut bool) {
if *entrypoint_processed {
return;
}
let gossip_addr = self.entrypoint.read().unwrap().as_ref().map(|e| e.gossip);
if let Some(gossip_addr) = gossip_addr {
// If a pull from the entrypoint was successful, it should exist in the crds table
// If a pull from the entrypoint was successful it should exist in the CRDS table
let entrypoint = self.lookup_contact_info_by_gossip_addr(&gossip_addr);
if let Some(entrypoint) = entrypoint {
// Adopt the entrypoint's `shred_version` if ours is unset
if self.my_shred_version() == 0 {
if entrypoint.shred_version == 0 {
info!("Unable to adopt entrypoint's shred version");
warn!("Unable to adopt entrypoint shred version of 0");
} else {
info!(
"Setting shred version to {:?} from entrypoint {:?}",
@ -1756,11 +1758,18 @@ impl ClusterInfo {
.unwrap()
.set_shred_version(entrypoint.shred_version);
self.insert_self();
*entrypoint_processed = true;
}
} else {
*entrypoint_processed = true;
}
// Update the entrypoint's id so future entrypoint pulls correctly reference it
*self.entrypoint.write().unwrap() = Some(entrypoint);
*adopt_shred_version = false;
}
}
}
} else {
// No entrypoint specified. Nothing more to process
*entrypoint_processed = true;
}
}
@ -1807,7 +1816,7 @@ impl ClusterInfo {
.spawn(move || {
let mut last_push = timestamp();
let mut last_contact_info_trace = timestamp();
let mut adopt_shred_version = self.my_shred_version() == 0;
let mut entrypoint_processed = false;
let recycler = PacketsRecycler::default();
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
@ -1854,7 +1863,7 @@ impl ClusterInfo {
self.handle_purge(&thread_pool, &bank_forks, &stakes);
self.handle_adopt_shred_version(&mut adopt_shred_version);
self.process_entrypoint(&mut entrypoint_processed);
//TODO: possibly tune this parameter
//we saw a deadlock passing an self.read().unwrap().timeout into sleep
@ -4225,12 +4234,69 @@ mod tests {
}
#[test]
fn test_handle_adopt_shred_version() {
fn test_process_entrypoint_adopt_shred_version() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
));
assert_eq!(cluster_info.my_shred_version(), 0);
// Simulating starting up with default entrypoint, no known id, only a gossip
// address
let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint = ContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint.gossip = entrypoint_gossip_addr;
assert_eq!(entrypoint.shred_version, 0);
cluster_info.set_entrypoint(entrypoint);
// Simulate getting entrypoint ContactInfo from gossip with an entrypoint shred version of
// 0
let mut gossiped_entrypoint_info =
ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
gossiped_entrypoint_info.gossip = entrypoint_gossip_addr;
gossiped_entrypoint_info.shred_version = 0;
cluster_info.insert_info(gossiped_entrypoint_info.clone());
// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert!(!entrypoint_processed); // <--- entrypoint processing incomplete because shred adoption still pending
assert_eq!(cluster_info.my_shred_version(), 0); // <-- shred version still 0
// Simulate getting entrypoint ContactInfo from gossip with an entrypoint shred version of
// !0
gossiped_entrypoint_info.shred_version = 1;
cluster_info.insert_info(gossiped_entrypoint_info.clone());
// Adopt the entrypoint's gossiped contact info and verify
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert!(entrypoint_processed);
assert_eq!(cluster_info.my_shred_version(), 1); // <-- shred version now adopted from entrypoint
}
#[test]
fn test_process_entrypoint_without_adopt_shred_version() {
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
{
let mut contact_info =
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp());
contact_info.shred_version = 2;
contact_info
},
node_keypair,
));
assert_eq!(cluster_info.my_shred_version(), 2);
// Simulating starting up with default entrypoint, no known id, only a gossip
// address
@ -4248,11 +4314,13 @@ mod tests {
cluster_info.insert_info(gossiped_entrypoint_info.clone());
// Adopt the entrypoint's gossiped contact info and verify
ClusterInfo::handle_adopt_shred_version(&cluster_info, &mut true);
let mut entrypoint_processed = false;
ClusterInfo::process_entrypoint(&cluster_info, &mut entrypoint_processed);
assert_eq!(
cluster_info.entrypoint.read().unwrap().as_ref().unwrap(),
&gossiped_entrypoint_info
);
assert_eq!(cluster_info.my_shred_version(), 1);
assert!(entrypoint_processed);
assert_eq!(cluster_info.my_shred_version(), 2); // <--- No change to shred version
}
}