checks for duplicate validator instances using gossip
This commit is contained in:
parent
14e241be35
commit
8cd5eb9863
|
@ -18,8 +18,8 @@ use crate::{
|
|||
crds_gossip_error::CrdsGossipError,
|
||||
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
|
||||
crds_value::{
|
||||
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash,
|
||||
Version, Vote, MAX_WALLCLOCK,
|
||||
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance,
|
||||
SnapshotHash, Version, Vote, MAX_WALLCLOCK,
|
||||
},
|
||||
data_budget::DataBudget,
|
||||
epoch_slots::EpochSlots,
|
||||
|
@ -300,6 +300,7 @@ pub struct ClusterInfo {
|
|||
socket: UdpSocket,
|
||||
local_message_pending_push_queue: RwLock<Vec<(CrdsValue, u64)>>,
|
||||
contact_debug_interval: u64,
|
||||
instance: RwLock<NodeInstance>,
|
||||
}
|
||||
|
||||
impl Default for ClusterInfo {
|
||||
|
@ -556,6 +557,7 @@ impl ClusterInfo {
|
|||
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||
local_message_pending_push_queue: RwLock::new(vec![]),
|
||||
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL,
|
||||
instance: RwLock::new(NodeInstance::new(id, timestamp())),
|
||||
};
|
||||
{
|
||||
let mut gossip = me.gossip.write().unwrap();
|
||||
|
@ -590,6 +592,7 @@ impl ClusterInfo {
|
|||
.clone(),
|
||||
),
|
||||
contact_debug_interval: self.contact_debug_interval,
|
||||
instance: RwLock::new(NodeInstance::new(*new_id, timestamp())),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -614,16 +617,25 @@ impl ClusterInfo {
|
|||
) {
|
||||
let now = timestamp();
|
||||
self.my_contact_info.write().unwrap().wallclock = now;
|
||||
let entry =
|
||||
CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair);
|
||||
self.instance.write().unwrap().update_wallclock(now);
|
||||
let entries: Vec<_> = vec![
|
||||
CrdsData::ContactInfo(self.my_contact_info()),
|
||||
CrdsData::NodeInstance(self.instance.read().unwrap().clone()),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|v| CrdsValue::new_signed(v, &self.keypair))
|
||||
.collect();
|
||||
{
|
||||
let mut local_message_pending_push_queue =
|
||||
self.local_message_pending_push_queue.write().unwrap();
|
||||
for entry in entries {
|
||||
local_message_pending_push_queue.push((entry, now));
|
||||
}
|
||||
}
|
||||
self.gossip
|
||||
.write()
|
||||
.unwrap()
|
||||
.refresh_push_active_set(stakes, gossip_validators);
|
||||
self.local_message_pending_push_queue
|
||||
.write()
|
||||
.unwrap()
|
||||
.push((entry, now));
|
||||
}
|
||||
|
||||
// TODO kill insert_info, only used by tests
|
||||
|
@ -2497,8 +2509,8 @@ impl ClusterInfo {
|
|||
stakes: HashMap<Pubkey, u64>,
|
||||
feature_set: Option<&FeatureSet>,
|
||||
epoch_time_ms: u64,
|
||||
) {
|
||||
let mut timer = Measure::start("process_gossip_packets_time");
|
||||
) -> Result<()> {
|
||||
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
|
||||
let packets: Vec<_> = thread_pool.install(|| {
|
||||
packets
|
||||
.into_par_iter()
|
||||
|
@ -2511,6 +2523,17 @@ impl ClusterInfo {
|
|||
})
|
||||
.collect()
|
||||
});
|
||||
// Check if there is a duplicate instance of
|
||||
// this node with more recent timestamp.
|
||||
let self_instance = self.instance.read().unwrap().clone();
|
||||
let check_duplicate_instance = |values: &[CrdsValue]| {
|
||||
for value in values {
|
||||
if self_instance.check_duplicate(value) {
|
||||
return Err(Error::DuplicateNodeInstance);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
};
|
||||
// Split packets based on their types.
|
||||
let mut pull_requests = vec![];
|
||||
let mut pull_responses = vec![];
|
||||
|
@ -2523,8 +2546,14 @@ impl ClusterInfo {
|
|||
Protocol::PullRequest(filter, caller) => {
|
||||
pull_requests.push((from_addr, filter, caller))
|
||||
}
|
||||
Protocol::PullResponse(from, data) => pull_responses.push((from, data)),
|
||||
Protocol::PushMessage(from, data) => push_messages.push((from, data)),
|
||||
Protocol::PullResponse(from, data) => {
|
||||
check_duplicate_instance(&data)?;
|
||||
pull_responses.push((from, data));
|
||||
}
|
||||
Protocol::PushMessage(from, data) => {
|
||||
check_duplicate_instance(&data)?;
|
||||
push_messages.push((from, data));
|
||||
}
|
||||
Protocol::PruneMessage(from, data) => prune_messages.push((from, data)),
|
||||
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
|
||||
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
|
||||
|
@ -2549,9 +2578,7 @@ impl ClusterInfo {
|
|||
response_sender,
|
||||
feature_set,
|
||||
);
|
||||
self.stats
|
||||
.process_gossip_packets_time
|
||||
.add_measure(&mut timer);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process messages from the network
|
||||
|
@ -2598,7 +2625,7 @@ impl ClusterInfo {
|
|||
stakes,
|
||||
feature_set.as_deref(),
|
||||
epoch_time_ms,
|
||||
);
|
||||
)?;
|
||||
|
||||
self.print_reset_stats(last_print);
|
||||
|
||||
|
@ -2863,25 +2890,34 @@ impl ClusterInfo {
|
|||
.build()
|
||||
.unwrap();
|
||||
let mut last_print = Instant::now();
|
||||
loop {
|
||||
let e = self.run_listen(
|
||||
while !exit.load(Ordering::Relaxed) {
|
||||
if let Err(err) = self.run_listen(
|
||||
&recycler,
|
||||
bank_forks.as_ref(),
|
||||
&requests_receiver,
|
||||
&response_sender,
|
||||
&thread_pool,
|
||||
&mut last_print,
|
||||
);
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
if e.is_err() {
|
||||
let r_gossip = self.gossip.read().unwrap();
|
||||
debug!(
|
||||
"{}: run_listen timeout, table size: {}",
|
||||
self.id(),
|
||||
r_gossip.crds.len()
|
||||
);
|
||||
) {
|
||||
match err {
|
||||
Error::RecvTimeoutError(_) => {
|
||||
let table_size = self.gossip.read().unwrap().crds.len();
|
||||
debug!(
|
||||
"{}: run_listen timeout, table size: {}",
|
||||
self.id(),
|
||||
table_size,
|
||||
);
|
||||
}
|
||||
Error::DuplicateNodeInstance => {
|
||||
error!(
|
||||
"duplicate running instances of the same validator node: {}",
|
||||
self.id()
|
||||
);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
return;
|
||||
}
|
||||
_ => error!("gossip run_listen failed: {}", err),
|
||||
}
|
||||
}
|
||||
thread_mem_usage::datapoint("solana-listen");
|
||||
}
|
||||
|
|
|
@ -247,7 +247,7 @@ impl CrdsGossipPush {
|
|||
for i in start..(start + push_fanout) {
|
||||
let index = i % self.active_set.len();
|
||||
let (peer, filter) = self.active_set.get_index(index).unwrap();
|
||||
if !filter.contains(&origin) {
|
||||
if !filter.contains(&origin) || value.should_force_push(peer) {
|
||||
trace!("new_push_messages insert {} {:?}", *peer, value);
|
||||
push_messages.entry(*peer).or_default().push(value.clone());
|
||||
num_pushes += 1;
|
||||
|
|
|
@ -79,6 +79,7 @@ pub enum CrdsData {
|
|||
EpochSlots(EpochSlotsIndex, EpochSlots),
|
||||
LegacyVersion(LegacyVersion),
|
||||
Version(Version),
|
||||
NodeInstance(NodeInstance),
|
||||
}
|
||||
|
||||
impl Sanitize for CrdsData {
|
||||
|
@ -107,6 +108,7 @@ impl Sanitize for CrdsData {
|
|||
}
|
||||
CrdsData::LegacyVersion(version) => version.sanitize(),
|
||||
CrdsData::Version(version) => version.sanitize(),
|
||||
CrdsData::NodeInstance(node) => node.sanitize(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -323,6 +325,53 @@ impl Version {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, AbiExample, Deserialize, Serialize)]
|
||||
pub struct NodeInstance {
|
||||
from: Pubkey,
|
||||
wallclock: u64,
|
||||
timestamp: u64, // Timestamp when the instance was created.
|
||||
token: u64, // Randomly generated value at node instantiation.
|
||||
}
|
||||
|
||||
impl NodeInstance {
|
||||
pub fn new(pubkey: Pubkey, now: u64) -> Self {
|
||||
Self {
|
||||
from: pubkey,
|
||||
wallclock: now,
|
||||
timestamp: now,
|
||||
token: rand::thread_rng().gen(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_wallclock(&mut self, now: u64) {
|
||||
if self.wallclock < now {
|
||||
self.wallclock = now;
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if the crds-value is a duplicate instance
|
||||
// of this node, with a more recent timestamp.
|
||||
pub fn check_duplicate(&self, other: &CrdsValue) -> bool {
|
||||
match &other.data {
|
||||
CrdsData::NodeInstance(other) => {
|
||||
self.token != other.token
|
||||
&& self.timestamp <= other.timestamp
|
||||
&& self.from == other.from
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Sanitize for NodeInstance {
|
||||
fn sanitize(&self) -> Result<(), SanitizeError> {
|
||||
if self.wallclock >= MAX_WALLCLOCK {
|
||||
return Err(SanitizeError::ValueOutOfBounds);
|
||||
}
|
||||
self.from.sanitize()
|
||||
}
|
||||
}
|
||||
|
||||
/// Type of the replicated value
|
||||
/// These are labels for values in a record that is associated with `Pubkey`
|
||||
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
|
||||
|
@ -335,6 +384,7 @@ pub enum CrdsValueLabel {
|
|||
AccountsHashes(Pubkey),
|
||||
LegacyVersion(Pubkey),
|
||||
Version(Pubkey),
|
||||
NodeInstance(Pubkey),
|
||||
}
|
||||
|
||||
impl fmt::Display for CrdsValueLabel {
|
||||
|
@ -348,6 +398,7 @@ impl fmt::Display for CrdsValueLabel {
|
|||
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
|
||||
CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()),
|
||||
CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()),
|
||||
CrdsValueLabel::NodeInstance(_) => write!(f, "NodeInstance({})", self.pubkey()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -363,6 +414,7 @@ impl CrdsValueLabel {
|
|||
CrdsValueLabel::AccountsHashes(p) => *p,
|
||||
CrdsValueLabel::LegacyVersion(p) => *p,
|
||||
CrdsValueLabel::Version(p) => *p,
|
||||
CrdsValueLabel::NodeInstance(p) => *p,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -409,6 +461,7 @@ impl CrdsValue {
|
|||
CrdsData::EpochSlots(_, p) => p.wallclock,
|
||||
CrdsData::LegacyVersion(version) => version.wallclock,
|
||||
CrdsData::Version(version) => version.wallclock,
|
||||
CrdsData::NodeInstance(node) => node.wallclock,
|
||||
}
|
||||
}
|
||||
pub fn pubkey(&self) -> Pubkey {
|
||||
|
@ -421,6 +474,7 @@ impl CrdsValue {
|
|||
CrdsData::EpochSlots(_, p) => p.from,
|
||||
CrdsData::LegacyVersion(version) => version.from,
|
||||
CrdsData::Version(version) => version.from,
|
||||
CrdsData::NodeInstance(node) => node.from,
|
||||
}
|
||||
}
|
||||
pub fn label(&self) -> CrdsValueLabel {
|
||||
|
@ -433,6 +487,7 @@ impl CrdsValue {
|
|||
CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()),
|
||||
CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()),
|
||||
CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()),
|
||||
CrdsData::NodeInstance(_) => CrdsValueLabel::NodeInstance(self.pubkey()),
|
||||
}
|
||||
}
|
||||
pub fn contact_info(&self) -> Option<&ContactInfo> {
|
||||
|
@ -499,13 +554,14 @@ impl CrdsValue {
|
|||
|
||||
/// Return all the possible labels for a record identified by Pubkey.
|
||||
pub fn record_labels(key: Pubkey) -> impl Iterator<Item = CrdsValueLabel> {
|
||||
const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [
|
||||
const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 7] = [
|
||||
CrdsValueLabel::ContactInfo,
|
||||
CrdsValueLabel::LowestSlot,
|
||||
CrdsValueLabel::SnapshotHashes,
|
||||
CrdsValueLabel::AccountsHashes,
|
||||
CrdsValueLabel::LegacyVersion,
|
||||
CrdsValueLabel::Version,
|
||||
CrdsValueLabel::NodeInstance,
|
||||
];
|
||||
CRDS_VALUE_LABEL_STUBS
|
||||
.iter()
|
||||
|
@ -545,6 +601,15 @@ impl CrdsValue {
|
|||
.vote_index()
|
||||
.expect("all values must be votes")
|
||||
}
|
||||
|
||||
/// Returns true if, regardless of prunes, this crds-value
|
||||
/// should be pushed to the receiving node.
|
||||
pub fn should_force_push(&self, peer: &Pubkey) -> bool {
|
||||
match &self.data {
|
||||
CrdsData::NodeInstance(node) => node.from == *peer,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Filters out an iterator of crds values, returning
|
||||
|
@ -584,7 +649,7 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn test_labels() {
|
||||
let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
||||
let mut hits = [false; 7 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
||||
// this method should cover all the possible labels
|
||||
for v in CrdsValue::record_labels(Pubkey::default()) {
|
||||
match &v {
|
||||
|
@ -594,9 +659,10 @@ mod test {
|
|||
CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
|
||||
CrdsValueLabel::LegacyVersion(_) => hits[4] = true,
|
||||
CrdsValueLabel::Version(_) => hits[5] = true,
|
||||
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 6] = true,
|
||||
CrdsValueLabel::NodeInstance(_) => hits[6] = true,
|
||||
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 7] = true,
|
||||
CrdsValueLabel::EpochSlots(ix, _) => {
|
||||
hits[*ix as usize + MAX_VOTES as usize + 6] = true
|
||||
hits[*ix as usize + MAX_VOTES as usize + 7] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -806,4 +872,68 @@ mod test {
|
|||
// cannot be more than 5 times number of keys.
|
||||
assert!(currents.len() <= keys.len() * 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_check_duplicate_instance() {
|
||||
fn make_crds_value(node: NodeInstance) -> CrdsValue {
|
||||
CrdsValue::new_unsigned(CrdsData::NodeInstance(node))
|
||||
}
|
||||
let now = timestamp();
|
||||
let mut rng = rand::thread_rng();
|
||||
let pubkey = Pubkey::new_unique();
|
||||
let node = NodeInstance::new(pubkey, now);
|
||||
// Same token is not a duplicate.
|
||||
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||
from: pubkey,
|
||||
wallclock: now + 1,
|
||||
timestamp: now + 1,
|
||||
token: node.token,
|
||||
})));
|
||||
// Older timestamp is not a duplicate.
|
||||
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||
from: pubkey,
|
||||
wallclock: now + 1,
|
||||
timestamp: now - 1,
|
||||
token: rng.gen(),
|
||||
})));
|
||||
// Duplicate instance.
|
||||
assert!(node.check_duplicate(&make_crds_value(NodeInstance {
|
||||
from: pubkey,
|
||||
wallclock: 0,
|
||||
timestamp: now,
|
||||
token: rng.gen(),
|
||||
})));
|
||||
// Different pubkey is not a duplicate.
|
||||
assert!(!node.check_duplicate(&make_crds_value(NodeInstance {
|
||||
from: Pubkey::new_unique(),
|
||||
wallclock: now + 1,
|
||||
timestamp: now + 1,
|
||||
token: rng.gen(),
|
||||
})));
|
||||
// Differnt crds value is not a duplicate.
|
||||
assert!(
|
||||
!node.check_duplicate(&CrdsValue::new_unsigned(CrdsData::ContactInfo(
|
||||
ContactInfo::new_rand(&mut rng, Some(pubkey))
|
||||
)))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_should_force_push() {
|
||||
let mut rng = rand::thread_rng();
|
||||
let pubkey = Pubkey::new_unique();
|
||||
assert!(
|
||||
!CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_rand(
|
||||
&mut rng,
|
||||
Some(pubkey),
|
||||
)))
|
||||
.should_force_push(&pubkey)
|
||||
);
|
||||
let node = CrdsValue::new_unsigned(CrdsData::NodeInstance(NodeInstance::new(
|
||||
pubkey,
|
||||
timestamp(),
|
||||
)));
|
||||
assert!(node.should_force_push(&pubkey));
|
||||
assert!(!node.should_force_push(&Pubkey::new_unique()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ pub enum Error {
|
|||
FsExtra(fs_extra::error::Error),
|
||||
SnapshotError(snapshot_utils::SnapshotError),
|
||||
WeightedIndexError(rand::distributions::weighted::WeightedError),
|
||||
DuplicateNodeInstance,
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
|
Loading…
Reference in New Issue