removes shred_version from LegacyContactInfo public interface (#31304)

Working towards LegacyContactInfo => ContactInfo migration, the commit
adds more api parity between the two.
This commit is contained in:
behzad nouri 2023-04-24 15:19:33 +00:00 committed by GitHub
parent c3eb17f2a3
commit 1b08d01a80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 37 additions and 26 deletions

View File

@ -2119,7 +2119,7 @@ fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: boo
let vote_state_node_pubkey = vote_account.node_pubkey().unwrap_or_default(); let vote_state_node_pubkey = vote_account.node_pubkey().unwrap_or_default();
if let Some(peer) = peers.get(&vote_state_node_pubkey) { if let Some(peer) = peers.get(&vote_state_node_pubkey) {
if peer.shred_version == my_shred_version { if peer.shred_version() == my_shred_version {
trace!( trace!(
"observed {} in gossip, (activated_stake={})", "observed {} in gossip, (activated_stake={})",
vote_state_node_pubkey, vote_state_node_pubkey,

View File

@ -751,7 +751,7 @@ impl ClusterInfo {
.filter(|addr| self.socket_addr_space.check(addr))?; .filter(|addr| self.socket_addr_space.check(addr))?;
let node_version = self.get_node_version(&node.id); let node_version = self.get_node_version(&node.id);
if my_shred_version != 0 if my_shred_version != 0
&& (node.shred_version != 0 && node.shred_version != my_shred_version) && (node.shred_version() != 0 && node.shred_version() != my_shred_version)
{ {
return None; return None;
} }
@ -769,7 +769,7 @@ impl ClusterInfo {
}, },
self.addr_to_string(&Some(rpc_addr), &node.rpc().ok()), self.addr_to_string(&Some(rpc_addr), &node.rpc().ok()),
self.addr_to_string(&Some(rpc_addr), &node.rpc_pubsub().ok()), self.addr_to_string(&Some(rpc_addr), &node.rpc_pubsub().ok()),
node.shred_version, node.shred_version(),
)) ))
}) })
.collect(); .collect();
@ -803,7 +803,7 @@ impl ClusterInfo {
} }
let node_version = self.get_node_version(&node.id); let node_version = self.get_node_version(&node.id);
if my_shred_version != 0 && (node.shred_version != 0 && node.shred_version != my_shred_version) { if my_shred_version != 0 && (node.shred_version() != 0 && node.shred_version() != my_shred_version) {
different_shred_nodes = different_shred_nodes.saturating_add(1); different_shred_nodes = different_shred_nodes.saturating_add(1);
None None
} else { } else {
@ -837,7 +837,7 @@ impl ClusterInfo {
self.addr_to_string(&ip_addr, &node.tvu_forwards().ok()), self.addr_to_string(&ip_addr, &node.tvu_forwards().ok()),
self.addr_to_string(&ip_addr, &node.repair().ok()), self.addr_to_string(&ip_addr, &node.repair().ok()),
self.addr_to_string(&ip_addr, &node.serve_repair().ok()), self.addr_to_string(&ip_addr, &node.serve_repair().ok()),
node.shred_version, node.shred_version(),
)) ))
} }
}) })
@ -1310,7 +1310,7 @@ impl ClusterInfo {
.get_nodes_contact_info() .get_nodes_contact_info()
.filter(|node| { .filter(|node| {
node.id != self_pubkey node.id != self_pubkey
&& node.shred_version == self_shred_version && node.shred_version() == self_shred_version
&& self.check_socket_addr_space(&node.tvu()) && self.check_socket_addr_space(&node.tvu())
}) })
.cloned() .cloned()
@ -1327,7 +1327,7 @@ impl ClusterInfo {
.get_nodes_contact_info() .get_nodes_contact_info()
.filter(|node| { .filter(|node| {
node.id != self_pubkey node.id != self_pubkey
&& node.shred_version == self_shred_version && node.shred_version() == self_shred_version
&& self.check_socket_addr_space(&node.tvu()) && self.check_socket_addr_space(&node.tvu())
&& self.check_socket_addr_space(&node.serve_repair()) && self.check_socket_addr_space(&node.serve_repair())
&& match gossip_crds.get::<&LowestSlot>(node.id) { && match gossip_crds.get::<&LowestSlot>(node.id) {
@ -1666,16 +1666,17 @@ impl ClusterInfo {
if self.my_shred_version() == 0 { if self.my_shred_version() == 0 {
if let Some(entrypoint) = entrypoints if let Some(entrypoint) = entrypoints
.iter() .iter()
.find(|entrypoint| entrypoint.shred_version != 0) .find(|entrypoint| entrypoint.shred_version() != 0)
{ {
info!( info!(
"Setting shred version to {:?} from entrypoint {:?}", "Setting shred version to {:?} from entrypoint {:?}",
entrypoint.shred_version, entrypoint.id entrypoint.shred_version(),
entrypoint.id
); );
self.my_contact_info self.my_contact_info
.write() .write()
.unwrap() .unwrap()
.set_shred_version(entrypoint.shred_version); .set_shred_version(entrypoint.shred_version());
} }
} }
self.my_shred_version() != 0 self.my_shred_version() != 0
@ -3146,7 +3147,7 @@ fn filter_on_shred_version(
Protocol::PullRequest(_, caller) => match &caller.data { Protocol::PullRequest(_, caller) => match &caller.data {
// Allow spy nodes with shred-verion == 0 to pull from other nodes. // Allow spy nodes with shred-verion == 0 to pull from other nodes.
CrdsData::LegacyContactInfo(node) CrdsData::LegacyContactInfo(node)
if node.shred_version == 0 || node.shred_version == self_shred_version => if node.shred_version() == 0 || node.shred_version() == self_shred_version =>
{ {
Some(msg) Some(msg)
} }
@ -4059,7 +4060,7 @@ RPC Enabled Nodes: 1"#;
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let node_pubkey = Pubkey::new_unique(); let node_pubkey = Pubkey::new_unique();
let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey));
node.shred_version = 42; node.set_shred_version(42);
let epoch_slots = EpochSlots::new_rand(&mut rng, Some(node_pubkey)); let epoch_slots = EpochSlots::new_rand(&mut rng, Some(node_pubkey));
let entries = vec![ let entries = vec![
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)), CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)),
@ -4490,12 +4491,12 @@ RPC Enabled Nodes: 1"#;
let entrypoint1_gossip_addr = socketaddr!("127.0.0.2:1234"); let entrypoint1_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint1 = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); let mut entrypoint1 = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint1.set_gossip(entrypoint1_gossip_addr).unwrap(); entrypoint1.set_gossip(entrypoint1_gossip_addr).unwrap();
assert_eq!(entrypoint1.shred_version, 0); assert_eq!(entrypoint1.shred_version(), 0);
let entrypoint2_gossip_addr = socketaddr!("127.0.0.2:5678"); let entrypoint2_gossip_addr = socketaddr!("127.0.0.2:5678");
let mut entrypoint2 = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); let mut entrypoint2 = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint2.set_gossip(entrypoint2_gossip_addr).unwrap(); entrypoint2.set_gossip(entrypoint2_gossip_addr).unwrap();
assert_eq!(entrypoint2.shred_version, 0); assert_eq!(entrypoint2.shred_version(), 0);
cluster_info.set_entrypoints(vec![entrypoint1, entrypoint2]); cluster_info.set_entrypoints(vec![entrypoint1, entrypoint2]);
// Simulate getting entrypoint ContactInfo from gossip with an entrypoint1 shred version of // Simulate getting entrypoint ContactInfo from gossip with an entrypoint1 shred version of
@ -4582,7 +4583,7 @@ RPC Enabled Nodes: 1"#;
let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234"); let entrypoint_gossip_addr = socketaddr!("127.0.0.2:1234");
let mut entrypoint = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp()); let mut entrypoint = LegacyContactInfo::new_localhost(&Pubkey::default(), timestamp());
entrypoint.set_gossip(entrypoint_gossip_addr).unwrap(); entrypoint.set_gossip(entrypoint_gossip_addr).unwrap();
assert_eq!(entrypoint.shred_version, 0); assert_eq!(entrypoint.shred_version(), 0);
cluster_info.set_entrypoint(entrypoint); cluster_info.set_entrypoint(entrypoint);
// Simulate getting entrypoint ContactInfo from gossip // Simulate getting entrypoint ContactInfo from gossip
@ -4765,7 +4766,7 @@ RPC Enabled Nodes: 1"#;
let keypair = Keypair::new(); let keypair = Keypair::new();
peers.push(keypair.pubkey()); peers.push(keypair.pubkey());
let mut rand_ci = LegacyContactInfo::new_rand(&mut rng, Some(keypair.pubkey())); let mut rand_ci = LegacyContactInfo::new_rand(&mut rng, Some(keypair.pubkey()));
rand_ci.shred_version = shred_version; rand_ci.set_shred_version(shred_version);
rand_ci.set_wallclock(timestamp()); rand_ci.set_wallclock(timestamp());
CrdsValue::new_signed(CrdsData::LegacyContactInfo(rand_ci), &keypair) CrdsValue::new_signed(CrdsData::LegacyContactInfo(rand_ci), &keypair)
}) })

View File

@ -223,7 +223,7 @@ impl Crds {
match &value.value.data { match &value.value.data {
CrdsData::LegacyContactInfo(node) => { CrdsData::LegacyContactInfo(node) => {
self.nodes.insert(entry_index); self.nodes.insert(entry_index);
self.shred_versions.insert(pubkey, node.shred_version); self.shred_versions.insert(pubkey, node.shred_version());
} }
CrdsData::Vote(_, _) => { CrdsData::Vote(_, _) => {
self.votes.insert(value.ordinal, entry_index); self.votes.insert(value.ordinal, entry_index);
@ -249,7 +249,7 @@ impl Crds {
self.shards.insert(entry_index, &value); self.shards.insert(entry_index, &value);
match &value.value.data { match &value.value.data {
CrdsData::LegacyContactInfo(node) => { CrdsData::LegacyContactInfo(node) => {
self.shred_versions.insert(pubkey, node.shred_version); self.shred_versions.insert(pubkey, node.shred_version());
// self.nodes does not need to be updated since the // self.nodes does not need to be updated since the
// entry at this index was and stays contact-info. // entry at this index was and stays contact-info.
debug_assert_matches!( debug_assert_matches!(
@ -1285,7 +1285,7 @@ mod tests {
// Initial insertion of a node with shred version: // Initial insertion of a node with shred version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
let wallclock = node.wallclock(); let wallclock = node.wallclock();
node.shred_version = 42; node.set_shred_version(42);
let node = CrdsData::LegacyContactInfo(node); let node = CrdsData::LegacyContactInfo(node);
let node = CrdsValue::new_unsigned(node); let node = CrdsValue::new_unsigned(node);
assert_eq!( assert_eq!(
@ -1296,7 +1296,7 @@ mod tests {
// An outdated value should not update shred-version: // An outdated value should not update shred-version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
node.set_wallclock(wallclock - 1); // outdated. node.set_wallclock(wallclock - 1); // outdated.
node.shred_version = 8; node.set_shred_version(8);
let node = CrdsData::LegacyContactInfo(node); let node = CrdsData::LegacyContactInfo(node);
let node = CrdsValue::new_unsigned(node); let node = CrdsValue::new_unsigned(node);
assert_eq!( assert_eq!(
@ -1307,7 +1307,7 @@ mod tests {
// Update shred version: // Update shred version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
node.set_wallclock(wallclock + 1); // so that it overrides the prev one. node.set_wallclock(wallclock + 1); // so that it overrides the prev one.
node.shred_version = 8; node.set_shred_version(8);
let node = CrdsData::LegacyContactInfo(node); let node = CrdsData::LegacyContactInfo(node);
let node = CrdsValue::new_unsigned(node); let node = CrdsValue::new_unsigned(node);
assert_eq!( assert_eq!(

View File

@ -357,7 +357,7 @@ pub(crate) fn get_gossip_nodes<R: Rng>(
}) })
.filter(|node| { .filter(|node| {
&node.id != pubkey &node.id != pubkey
&& verify_shred_version(node.shred_version) && verify_shred_version(node.shred_version())
&& node && node
.gossip() .gossip()
.map(|addr| socket_addr_space.check(&addr)) .map(|addr| socket_addr_space.check(&addr))

View File

@ -44,7 +44,7 @@ pub struct LegacyContactInfo {
/// latest wallclock picked /// latest wallclock picked
wallclock: u64, wallclock: u64,
/// node shred version /// node shred version
pub shred_version: u16, shred_version: u16,
} }
impl Sanitize for LegacyContactInfo { impl Sanitize for LegacyContactInfo {
@ -161,10 +161,20 @@ impl LegacyContactInfo {
self.wallclock self.wallclock
} }
#[inline]
pub fn shred_version(&self) -> u16 {
self.shred_version
}
pub fn set_wallclock(&mut self, wallclock: u64) { pub fn set_wallclock(&mut self, wallclock: u64) {
self.wallclock = wallclock; self.wallclock = wallclock;
} }
#[cfg(test)]
pub(crate) fn set_shred_version(&mut self, shred_version: u16) {
self.shred_version = shred_version
}
get_socket!(gossip); get_socket!(gossip);
get_socket!(tvu); get_socket!(tvu);
get_socket!(tvu_forwards); get_socket!(tvu_forwards);

View File

@ -3452,7 +3452,7 @@ pub mod rpc_full {
.all_peers() .all_peers()
.iter() .iter()
.filter_map(|(contact_info, _)| { .filter_map(|(contact_info, _)| {
if my_shred_version == contact_info.shred_version if my_shred_version == contact_info.shred_version()
&& contact_info && contact_info
.gossip() .gossip()
.map(|addr| socket_addr_space.check(&addr)) .map(|addr| socket_addr_space.check(&addr))

View File

@ -193,7 +193,7 @@ fn get_rpc_peers(
.gossip() .gossip()
.ok() .ok()
.and_then(|addr| cluster_info.lookup_contact_info_by_gossip_addr(&addr)) .and_then(|addr| cluster_info.lookup_contact_info_by_gossip_addr(&addr))
.map_or(false, |entrypoint| entrypoint.shred_version == 0) .map_or(false, |entrypoint| entrypoint.shred_version() == 0)
}); });
if all_zero_shred_versions { if all_zero_shred_versions {
@ -215,7 +215,7 @@ fn get_rpc_peers(
let mut rpc_peers = cluster_info let mut rpc_peers = cluster_info
.all_rpc_peers() .all_rpc_peers()
.into_iter() .into_iter()
.filter(|contact_info| contact_info.shred_version == shred_version) .filter(|contact_info| contact_info.shred_version() == shred_version)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if bootstrap_config.only_known_rpc { if bootstrap_config.only_known_rpc {