adds the instance token to crds-labels for node-instance crds-values (#14037)
If a node "a" receives instance-info from node "b1" it will override any instance-info associated with "b1" pubkey in its crds table. This makes it less likely that when "b1" receives crds values from "a" (either through pull or push), it sees other instances of itself (because node "a" discarded them when it received "b1" instance info). In order for the crds table to contain all instance-info associated with the same pubkey at the same time, we need to add the instance tokens to the keys in the crds table (i.e. the CrdsValueLabel).
This commit is contained in:
parent
5c95d8e963
commit
409fe3bca1
|
@ -557,7 +557,7 @@ impl ClusterInfo {
|
||||||
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
local_message_pending_push_queue: RwLock::new(vec![]),
|
local_message_pending_push_queue: RwLock::new(vec![]),
|
||||||
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL,
|
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL,
|
||||||
instance: NodeInstance::new(id, timestamp()),
|
instance: NodeInstance::new(&mut rand::thread_rng(), id, timestamp()),
|
||||||
};
|
};
|
||||||
{
|
{
|
||||||
let mut gossip = me.gossip.write().unwrap();
|
let mut gossip = me.gossip.write().unwrap();
|
||||||
|
@ -592,7 +592,7 @@ impl ClusterInfo {
|
||||||
.clone(),
|
.clone(),
|
||||||
),
|
),
|
||||||
contact_debug_interval: self.contact_debug_interval,
|
contact_debug_interval: self.contact_debug_interval,
|
||||||
instance: NodeInstance::new(*new_id, timestamp()),
|
instance: NodeInstance::new(&mut rand::thread_rng(), *new_id, timestamp()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ use crate::contact_info::ContactInfo;
|
||||||
use crate::deprecated;
|
use crate::deprecated;
|
||||||
use crate::epoch_slots::EpochSlots;
|
use crate::epoch_slots::EpochSlots;
|
||||||
use bincode::{serialize, serialized_size};
|
use bincode::{serialize, serialized_size};
|
||||||
use rand::Rng;
|
use rand::{CryptoRng, Rng};
|
||||||
use solana_sdk::sanitize::{Sanitize, SanitizeError};
|
use solana_sdk::sanitize::{Sanitize, SanitizeError};
|
||||||
use solana_sdk::timing::timestamp;
|
use solana_sdk::timing::timestamp;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
|
@ -334,12 +334,15 @@ pub struct NodeInstance {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NodeInstance {
|
impl NodeInstance {
|
||||||
pub fn new(pubkey: Pubkey, now: u64) -> Self {
|
pub fn new<R>(rng: &mut R, pubkey: Pubkey, now: u64) -> Self
|
||||||
|
where
|
||||||
|
R: Rng + CryptoRng,
|
||||||
|
{
|
||||||
Self {
|
Self {
|
||||||
from: pubkey,
|
from: pubkey,
|
||||||
wallclock: now,
|
wallclock: now,
|
||||||
timestamp: now,
|
timestamp: now,
|
||||||
token: rand::thread_rng().gen(),
|
token: rng.gen(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -386,7 +389,7 @@ pub enum CrdsValueLabel {
|
||||||
AccountsHashes(Pubkey),
|
AccountsHashes(Pubkey),
|
||||||
LegacyVersion(Pubkey),
|
LegacyVersion(Pubkey),
|
||||||
Version(Pubkey),
|
Version(Pubkey),
|
||||||
NodeInstance(Pubkey),
|
NodeInstance(Pubkey, u64 /*token*/),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for CrdsValueLabel {
|
impl fmt::Display for CrdsValueLabel {
|
||||||
|
@ -400,7 +403,7 @@ impl fmt::Display for CrdsValueLabel {
|
||||||
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
|
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
|
||||||
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
|
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
|
||||||
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
|
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
|
||||||
CrdsValueLabel::NodeInstance(_) => write!(f, "NodeInstance({})", self.pubkey()),
|
CrdsValueLabel::NodeInstance(_, _) => write!(f, "NodeInstance({})", self.pubkey()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -416,7 +419,7 @@ impl CrdsValueLabel {
|
||||||
CrdsValueLabel::AccountsHashes(p) => *p,
|
CrdsValueLabel::AccountsHashes(p) => *p,
|
||||||
CrdsValueLabel::LegacyVersion(p) => *p,
|
CrdsValueLabel::LegacyVersion(p) => *p,
|
||||||
CrdsValueLabel::Version(p) => *p,
|
CrdsValueLabel::Version(p) => *p,
|
||||||
CrdsValueLabel::NodeInstance(p) => *p,
|
CrdsValueLabel::NodeInstance(p, _ /*token*/) => *p,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -489,7 +492,7 @@ impl CrdsValue {
|
||||||
CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()),
|
CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()),
|
||||||
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
|
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
|
||||||
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
|
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
|
||||||
CrdsData::NodeInstance(_) => CrdsValueLabel::NodeInstance(self.pubkey()),
|
CrdsData::NodeInstance(node) => CrdsValueLabel::NodeInstance(self.pubkey(), node.token),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn contact_info(&self) -> Option<&ContactInfo> {
|
pub fn contact_info(&self) -> Option<&ContactInfo> {
|
||||||
|
@ -555,15 +558,16 @@ impl CrdsValue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return all the possible labels for a record identified by Pubkey.
|
/// Return all the possible labels for a record identified by Pubkey.
|
||||||
|
/// Excludes NodeInstance, which is pushed priodically, and does not need
|
||||||
|
/// to update its local-timestmap.
|
||||||
pub fn record_labels(key: Pubkey) -> impl Iterator<Item = CrdsValueLabel> {
|
pub fn record_labels(key: Pubkey) -> impl Iterator<Item = CrdsValueLabel> {
|
||||||
const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 7] = [
|
const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [
|
||||||
CrdsValueLabel::ContactInfo,
|
CrdsValueLabel::ContactInfo,
|
||||||
CrdsValueLabel::LowestSlot,
|
CrdsValueLabel::LowestSlot,
|
||||||
CrdsValueLabel::SnapshotHashes,
|
CrdsValueLabel::SnapshotHashes,
|
||||||
CrdsValueLabel::AccountsHashes,
|
CrdsValueLabel::AccountsHashes,
|
||||||
CrdsValueLabel::LegacyVersion,
|
CrdsValueLabel::LegacyVersion,
|
||||||
CrdsValueLabel::Version,
|
CrdsValueLabel::Version,
|
||||||
CrdsValueLabel::NodeInstance,
|
|
||||||
];
|
];
|
||||||
CRDS_VALUE_LABEL_STUBS
|
CRDS_VALUE_LABEL_STUBS
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -651,7 +655,7 @@ mod test {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_labels() {
|
fn test_labels() {
|
||||||
let mut hits = [false; 7 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
||||||
// this method should cover all the possible labels
|
// this method should cover all the possible labels
|
||||||
for v in CrdsValue::record_labels(Pubkey::default()) {
|
for v in CrdsValue::record_labels(Pubkey::default()) {
|
||||||
match &v {
|
match &v {
|
||||||
|
@ -661,10 +665,10 @@ mod test {
|
||||||
CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
|
CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
|
||||||
CrdsValueLabel::LegacyVersion(_) => hits[4] = true,
|
CrdsValueLabel::LegacyVersion(_) => hits[4] = true,
|
||||||
CrdsValueLabel::Version(_) => hits[5] = true,
|
CrdsValueLabel::Version(_) => hits[5] = true,
|
||||||
CrdsValueLabel::NodeInstance(_) => hits[6] = true,
|
CrdsValueLabel::NodeInstance(_, _) => panic!("NodeInstance!?"),
|
||||||
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 7] = true,
|
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 6] = true,
|
||||||
CrdsValueLabel::EpochSlots(ix, _) => {
|
CrdsValueLabel::EpochSlots(ix, _) => {
|
||||||
hits[*ix as usize + MAX_VOTES as usize + 7] = true
|
hits[*ix as usize + MAX_VOTES as usize + 6] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -875,6 +879,53 @@ mod test {
|
||||||
assert!(currents.len() <= keys.len() * 5);
|
assert!(currents.len() <= keys.len() * 5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_node_instance_crds_lable() {
|
||||||
|
fn make_crds_value(node: NodeInstance) -> CrdsValue {
|
||||||
|
CrdsValue::new_unsigned(CrdsData::NodeInstance(node))
|
||||||
|
}
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let now = timestamp();
|
||||||
|
let pubkey = Pubkey::new_unique();
|
||||||
|
let node = NodeInstance::new(&mut rng, pubkey, now);
|
||||||
|
assert_eq!(
|
||||||
|
make_crds_value(node.clone()).label(),
|
||||||
|
make_crds_value(node.with_wallclock(now + 8)).label()
|
||||||
|
);
|
||||||
|
let other = NodeInstance {
|
||||||
|
from: Pubkey::new_unique(),
|
||||||
|
..node
|
||||||
|
};
|
||||||
|
assert_ne!(
|
||||||
|
make_crds_value(node.clone()).label(),
|
||||||
|
make_crds_value(other).label()
|
||||||
|
);
|
||||||
|
let other = NodeInstance {
|
||||||
|
wallclock: now + 8,
|
||||||
|
..node
|
||||||
|
};
|
||||||
|
assert_eq!(
|
||||||
|
make_crds_value(node.clone()).label(),
|
||||||
|
make_crds_value(other).label()
|
||||||
|
);
|
||||||
|
let other = NodeInstance {
|
||||||
|
timestamp: now + 8,
|
||||||
|
..node
|
||||||
|
};
|
||||||
|
assert_eq!(
|
||||||
|
make_crds_value(node.clone()).label(),
|
||||||
|
make_crds_value(other).label()
|
||||||
|
);
|
||||||
|
let other = NodeInstance {
|
||||||
|
token: rng.gen(),
|
||||||
|
..node
|
||||||
|
};
|
||||||
|
assert_ne!(
|
||||||
|
make_crds_value(node).label(),
|
||||||
|
make_crds_value(other).label()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_check_duplicate_instance() {
|
fn test_check_duplicate_instance() {
|
||||||
fn make_crds_value(node: NodeInstance) -> CrdsValue {
|
fn make_crds_value(node: NodeInstance) -> CrdsValue {
|
||||||
|
@ -883,7 +934,7 @@ mod test {
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
let mut rng = rand::thread_rng();
|
let mut rng = rand::thread_rng();
|
||||||
let pubkey = Pubkey::new_unique();
|
let pubkey = Pubkey::new_unique();
|
||||||
let node = NodeInstance::new(pubkey, now);
|
let node = NodeInstance::new(&mut rng, pubkey, now);
|
||||||
// Same token is not a duplicate.
|
// Same token is not a duplicate.
|
||||||
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||||
from: pubkey,
|
from: pubkey,
|
||||||
|
@ -944,6 +995,7 @@ mod test {
|
||||||
.should_force_push(&pubkey)
|
.should_force_push(&pubkey)
|
||||||
);
|
);
|
||||||
let node = CrdsValue::new_unsigned(CrdsData::NodeInstance(NodeInstance::new(
|
let node = CrdsValue::new_unsigned(CrdsData::NodeInstance(NodeInstance::new(
|
||||||
|
&mut rng,
|
||||||
pubkey,
|
pubkey,
|
||||||
timestamp(),
|
timestamp(),
|
||||||
)));
|
)));
|
||||||
|
|
Loading…
Reference in New Issue