From 409fe3bca112edc7aefafb6c7afe31c1e78f4f8a Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 10 Dec 2020 17:01:55 +0000 Subject: [PATCH] adds the instance token to crds-labels for node-instance crds-values (#14037) If a node "a" receives instance-info from node "b1" it will override any instance-info associated with "b1" pubkey in its crds table. This makes it less likely that when "b1" receives crds values from "a" (either through pull or push), it sees other instances of itself (because node "a" discarded them when it received "b1" instance info). In order for the crds table to contain all instance-info associated with the same pubkey at the same time, we need to add the instance tokens to the keys in the crds table (i.e. the CrdsValueLabel). --- core/src/cluster_info.rs | 4 +- core/src/crds_value.rs | 80 +++++++++++++++++++++++++++++++++------- 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index ee6dfb245d..5b7046d709 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -557,7 +557,7 @@ impl ClusterInfo { socket: UdpSocket::bind("0.0.0.0:0").unwrap(), local_message_pending_push_queue: RwLock::new(vec![]), contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL, - instance: NodeInstance::new(id, timestamp()), + instance: NodeInstance::new(&mut rand::thread_rng(), id, timestamp()), }; { let mut gossip = me.gossip.write().unwrap(); @@ -592,7 +592,7 @@ impl ClusterInfo { .clone(), ), contact_debug_interval: self.contact_debug_interval, - instance: NodeInstance::new(*new_id, timestamp()), + instance: NodeInstance::new(&mut rand::thread_rng(), *new_id, timestamp()), } } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index aec4911bce..74e59c0b60 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -3,7 +3,7 @@ use crate::contact_info::ContactInfo; use crate::deprecated; use crate::epoch_slots::EpochSlots; use bincode::{serialize, serialized_size}; -use rand::Rng; +use rand::{CryptoRng, Rng}; use solana_sdk::sanitize::{Sanitize, SanitizeError}; use solana_sdk::timing::timestamp; use solana_sdk::{ @@ -334,12 +334,15 @@ pub struct NodeInstance { } impl NodeInstance { - pub fn new(pubkey: Pubkey, now: u64) -> Self { + pub fn new(rng: &mut R, pubkey: Pubkey, now: u64) -> Self + where + R: Rng + CryptoRng, + { Self { from: pubkey, wallclock: now, timestamp: now, - token: rand::thread_rng().gen(), + token: rng.gen(), } } @@ -386,7 +389,7 @@ pub enum CrdsValueLabel { AccountsHashes(Pubkey), LegacyVersion(Pubkey), Version(Pubkey), - NodeInstance(Pubkey), + NodeInstance(Pubkey, u64 /*token*/), } impl fmt::Display for CrdsValueLabel { @@ -400,7 +403,7 @@ impl fmt::Display for CrdsValueLabel { CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()), CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()), CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()), - CrdsValueLabel::NodeInstance(_) => write!(f, "NodeInstance({})", self.pubkey()), + CrdsValueLabel::NodeInstance(_, _) => write!(f, "NodeInstance({})", self.pubkey()), } } } @@ -416,7 +419,7 @@ impl CrdsValueLabel { CrdsValueLabel::AccountsHashes(p) => *p, CrdsValueLabel::LegacyVersion(p) => *p, CrdsValueLabel::Version(p) => *p, - CrdsValueLabel::NodeInstance(p) => *p, + CrdsValueLabel::NodeInstance(p, _ /*token*/) => *p, } } } @@ -489,7 +492,7 @@ impl CrdsValue { CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()), CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()), CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()), - CrdsData::NodeInstance(_) => CrdsValueLabel::NodeInstance(self.pubkey()), + CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(self.pubkey(), node.token), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -555,15 +558,16 @@ impl CrdsValue { } /// Return all the possible labels for a record identified by Pubkey. + /// Excludes NodeInstance, which is pushed priodically, and does not need + /// to update its local-timestmap. pub fn record_labels(key: Pubkey) -> impl Iterator { - const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 7] = [ + const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [ CrdsValueLabel::ContactInfo, CrdsValueLabel::LowestSlot, CrdsValueLabel::SnapshotHashes, CrdsValueLabel::AccountsHashes, CrdsValueLabel::LegacyVersion, CrdsValueLabel::Version, - CrdsValueLabel::NodeInstance, ]; CRDS_VALUE_LABEL_STUBS .iter() @@ -651,7 +655,7 @@ mod test { #[test] fn test_labels() { - let mut hits = [false; 7 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; + let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; // this method should cover all the possible labels for v in CrdsValue::record_labels(Pubkey::default()) { match &v { @@ -661,10 +665,10 @@ mod test { CrdsValueLabel::AccountsHashes(_) => hits[3] = true, CrdsValueLabel::LegacyVersion(_) => hits[4] = true, CrdsValueLabel::Version(_) => hits[5] = true, - CrdsValueLabel::NodeInstance(_) => hits[6] = true, - CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 7] = true, + CrdsValueLabel::NodeInstance(_, _) => panic!("NodeInstance!?"), + CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 6] = true, CrdsValueLabel::EpochSlots(ix, _) => { - hits[*ix as usize + MAX_VOTES as usize + 7] = true + hits[*ix as usize + MAX_VOTES as usize + 6] = true } } } @@ -875,6 +879,53 @@ mod test { assert!(currents.len() <= keys.len() * 5); } + #[test] + fn test_node_instance_crds_lable() { + fn make_crds_value(node: NodeInstance) -> CrdsValue { + CrdsValue::new_unsigned(CrdsData::NodeInstance(node)) + } + let mut rng = rand::thread_rng(); + let now = timestamp(); + let pubkey = Pubkey::new_unique(); + let node = NodeInstance::new(&mut rng, pubkey, now); + assert_eq!( + make_crds_value(node.clone()).label(), + make_crds_value(node.with_wallclock(now + 8)).label() + ); + let other = NodeInstance { + from: Pubkey::new_unique(), + ..node + }; + assert_ne!( + make_crds_value(node.clone()).label(), + make_crds_value(other).label() + ); + let other = NodeInstance { + wallclock: now + 8, + ..node + }; + assert_eq!( + make_crds_value(node.clone()).label(), + make_crds_value(other).label() + ); + let other = NodeInstance { + timestamp: now + 8, + ..node + }; + assert_eq!( + make_crds_value(node.clone()).label(), + make_crds_value(other).label() + ); + let other = NodeInstance { + token: rng.gen(), + ..node + }; + assert_ne!( + make_crds_value(node).label(), + make_crds_value(other).label() + ); + } + #[test] fn test_check_duplicate_instance() { fn make_crds_value(node: NodeInstance) -> CrdsValue { @@ -883,7 +934,7 @@ mod test { let now = timestamp(); let mut rng = rand::thread_rng(); let pubkey = Pubkey::new_unique(); - let node = NodeInstance::new(pubkey, now); + let node = NodeInstance::new(&mut rng, pubkey, now); // Same token is not a duplicate. assert!(!node.check_duplicate(&make_crds_value(NodeInstance { from: pubkey, @@ -944,6 +995,7 @@ mod test { .should_force_push(&pubkey) ); let node = CrdsValue::new_unsigned(CrdsData::NodeInstance(NodeInstance::new( + &mut rng, pubkey, timestamp(), )));