removes delayed crds inserts when upserting gossip table (#16806)
It is crucial that VersionedCrdsValue::insert_timestamp does not go backward in time: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79 Otherwise methods such as get_votes and get_epoch_slots_since will break, which will break their downstream flow, including vote-listener and optimistic confirmation: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215 https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298 For that, Crds::new_versioned is intended to be called "atomically" with Crds::insert_verioned (as the comment already says so): https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129 However, currently this is violated in the code. For example, filter_pull_responses creates VersionedCrdsValues (with the current timestamp), then acquires an exclusive lock on gossip, then process_pull_responses writes those values to the crds table: https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392 Depending on the workload and lock contention, the insert_timestamps may well be in the past when these values finally are inserted into gossip. To avoid such scenarios, this commit: * removes Crds::new_versioned and Crd::insert_versioned. * makes VersionedCrdsValue constructor private, only invoked in Crds::insert, so that insert_timestamp is populated right before insert. This will improve insert_timestamp monotonicity as long as Crds::insert is not called with a stalled timestamp. Following commits may further improve this by calling timestamp() inside Crds::insert, and/or switching to std::time::Instant which guarantees monotonicity.
This commit is contained in:
parent
1c95e2bbee
commit
1ac2a8cfa5
|
@ -3,30 +3,34 @@
|
|||
extern crate test;
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_core::contact_info::ContactInfo;
|
||||
use solana_core::crds::VersionedCrdsValue;
|
||||
use solana_core::crds_shards::CrdsShards;
|
||||
use solana_core::crds_value::{CrdsData, CrdsValue};
|
||||
use solana_sdk::pubkey;
|
||||
use solana_core::{
|
||||
crds::{Crds, VersionedCrdsValue},
|
||||
crds_shards::CrdsShards,
|
||||
crds_value::CrdsValue,
|
||||
};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use std::iter::repeat_with;
|
||||
use test::Bencher;
|
||||
|
||||
const CRDS_SHARDS_BITS: u32 = 8;
|
||||
|
||||
fn new_test_crds_value() -> VersionedCrdsValue {
|
||||
let data = CrdsData::ContactInfo(ContactInfo::new_localhost(&pubkey::new_rand(), timestamp()));
|
||||
VersionedCrdsValue::new(timestamp(), CrdsValue::new_unsigned(data))
|
||||
fn new_test_crds_value<R: Rng>(rng: &mut R) -> VersionedCrdsValue {
|
||||
let value = CrdsValue::new_rand(rng, None);
|
||||
let label = value.label();
|
||||
let mut crds = Crds::default();
|
||||
crds.insert(value, timestamp()).unwrap();
|
||||
crds.remove(&label).unwrap()
|
||||
}
|
||||
|
||||
fn bench_crds_shards_find(bencher: &mut Bencher, num_values: usize, mask_bits: u32) {
|
||||
let values: Vec<VersionedCrdsValue> = std::iter::repeat_with(new_test_crds_value)
|
||||
let mut rng = thread_rng();
|
||||
let values: Vec<_> = repeat_with(|| new_test_crds_value(&mut rng))
|
||||
.take(num_values)
|
||||
.collect();
|
||||
let mut shards = CrdsShards::new(CRDS_SHARDS_BITS);
|
||||
for (index, value) in values.iter().enumerate() {
|
||||
assert!(shards.insert(index, value));
|
||||
}
|
||||
let mut rng = thread_rng();
|
||||
bencher.iter(|| {
|
||||
let mask = rng.gen();
|
||||
let _hits = shards.find(mask, mask_bits).count();
|
||||
|
|
192
core/src/crds.rs
192
core/src/crds.rs
|
@ -33,11 +33,11 @@ use indexmap::set::IndexSet;
|
|||
use rayon::{prelude::*, ThreadPool};
|
||||
use solana_sdk::hash::{hash, Hash};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::Keypair;
|
||||
use solana_sdk::timing::timestamp;
|
||||
use std::cmp;
|
||||
use std::collections::{hash_map, BTreeSet, HashMap};
|
||||
use std::ops::{Bound, Index, IndexMut};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{hash_map, BTreeSet, HashMap},
|
||||
ops::{Bound, Index, IndexMut},
|
||||
};
|
||||
|
||||
const CRDS_SHARDS_BITS: u32 = 8;
|
||||
// Limit number of crds values associated with each unique pubkey. This
|
||||
|
@ -60,7 +60,9 @@ pub struct Crds {
|
|||
|
||||
#[derive(PartialEq, Debug)]
|
||||
pub enum CrdsError {
|
||||
InsertFailed,
|
||||
// Hash of the crds value which failed to insert should be recorded in
|
||||
// failed_inserts to be excluded from the next pull-request.
|
||||
InsertFailed(Hash),
|
||||
UnknownStakes,
|
||||
}
|
||||
|
||||
|
@ -71,26 +73,15 @@ pub enum CrdsError {
|
|||
pub struct VersionedCrdsValue {
|
||||
pub value: CrdsValue,
|
||||
/// local time when inserted
|
||||
pub insert_timestamp: u64,
|
||||
pub(crate) insert_timestamp: u64,
|
||||
/// local time when updated
|
||||
pub(crate) local_timestamp: u64,
|
||||
/// value hash
|
||||
pub value_hash: Hash,
|
||||
pub(crate) value_hash: Hash,
|
||||
}
|
||||
|
||||
impl PartialOrd for VersionedCrdsValue {
|
||||
fn partial_cmp(&self, other: &VersionedCrdsValue) -> Option<cmp::Ordering> {
|
||||
if self.value.label() != other.value.label() {
|
||||
None
|
||||
} else if self.value.wallclock() == other.value.wallclock() {
|
||||
Some(self.value_hash.cmp(&other.value_hash))
|
||||
} else {
|
||||
Some(self.value.wallclock().cmp(&other.value.wallclock()))
|
||||
}
|
||||
}
|
||||
}
|
||||
impl VersionedCrdsValue {
|
||||
pub fn new(local_timestamp: u64, value: CrdsValue) -> Self {
|
||||
fn new(local_timestamp: u64, value: CrdsValue) -> Self {
|
||||
let value_hash = hash(&serialize(&value).unwrap());
|
||||
VersionedCrdsValue {
|
||||
value,
|
||||
|
@ -99,13 +90,6 @@ impl VersionedCrdsValue {
|
|||
value_hash,
|
||||
}
|
||||
}
|
||||
|
||||
/// New random VersionedCrdsValue for tests and simulations.
|
||||
pub fn new_rand<R: rand::Rng>(rng: &mut R, keypair: Option<&Keypair>) -> Self {
|
||||
let delay = 10 * 60 * 1000; // 10 minutes
|
||||
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
|
||||
Self::new(now, CrdsValue::new_rand(rng, keypair))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Crds {
|
||||
|
@ -122,34 +106,46 @@ impl Default for Crds {
|
|||
}
|
||||
}
|
||||
|
||||
impl Crds {
|
||||
/// must be called atomically with `insert_versioned`
|
||||
pub fn new_versioned(&self, local_timestamp: u64, value: CrdsValue) -> VersionedCrdsValue {
|
||||
VersionedCrdsValue::new(local_timestamp, value)
|
||||
// Returns true if the first value updates the 2nd one.
|
||||
// Both values should have the same key/label.
|
||||
fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
|
||||
assert_eq!(value.label(), other.value.label(), "labels mismatch!");
|
||||
match value.wallclock().cmp(&other.value.wallclock()) {
|
||||
Ordering::Less => false,
|
||||
Ordering::Greater => true,
|
||||
// Ties should be broken in a deterministic way across the cluster.
|
||||
// For backward compatibility this is done by comparing hash of
|
||||
// serialized values.
|
||||
Ordering::Equal => {
|
||||
let value_hash = hash(&serialize(&value).unwrap());
|
||||
other.value_hash < value_hash
|
||||
}
|
||||
pub fn would_insert(
|
||||
&self,
|
||||
}
|
||||
}
|
||||
|
||||
impl Crds {
|
||||
/// Returns true if the given value updates an existing one in the table.
|
||||
/// The value is outdated and fails to insert, if it already exists in the
|
||||
/// table with a more recent wallclock.
|
||||
pub(crate) fn upserts(&self, value: &CrdsValue) -> bool {
|
||||
match self.table.get(&value.label()) {
|
||||
Some(other) => overrides(value, other),
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
value: CrdsValue,
|
||||
local_timestamp: u64,
|
||||
) -> (bool, VersionedCrdsValue) {
|
||||
let new_value = self.new_versioned(local_timestamp, value);
|
||||
let label = new_value.value.label();
|
||||
// New value is outdated and fails to insert, if it already exists in
|
||||
// the table with a more recent wallclock.
|
||||
let outdated = matches!(self.table.get(&label), Some(current) if new_value <= *current);
|
||||
(!outdated, new_value)
|
||||
}
|
||||
/// insert the new value, returns the old value if insert succeeds
|
||||
pub fn insert_versioned(
|
||||
&mut self,
|
||||
new_value: VersionedCrdsValue,
|
||||
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
|
||||
let label = new_value.value.label();
|
||||
let label = value.label();
|
||||
let value = VersionedCrdsValue::new(local_timestamp, value);
|
||||
match self.table.entry(label) {
|
||||
Entry::Vacant(entry) => {
|
||||
let entry_index = entry.index();
|
||||
self.shards.insert(entry_index, &new_value);
|
||||
match new_value.value.data {
|
||||
self.shards.insert(entry_index, &value);
|
||||
match value.value.data {
|
||||
CrdsData::ContactInfo(_) => {
|
||||
self.nodes.insert(entry_index);
|
||||
}
|
||||
|
@ -158,52 +154,45 @@ impl Crds {
|
|||
}
|
||||
CrdsData::EpochSlots(_, _) => {
|
||||
self.epoch_slots
|
||||
.insert((new_value.insert_timestamp, entry_index));
|
||||
.insert((value.insert_timestamp, entry_index));
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
self.records
|
||||
.entry(new_value.value.pubkey())
|
||||
.entry(value.value.pubkey())
|
||||
.or_default()
|
||||
.insert(entry_index);
|
||||
entry.insert(new_value);
|
||||
entry.insert(value);
|
||||
self.num_inserts += 1;
|
||||
Ok(None)
|
||||
}
|
||||
Entry::Occupied(mut entry) if *entry.get() < new_value => {
|
||||
Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => {
|
||||
let entry_index = entry.index();
|
||||
self.shards.remove(entry_index, entry.get());
|
||||
self.shards.insert(entry_index, &new_value);
|
||||
if let CrdsData::EpochSlots(_, _) = new_value.value.data {
|
||||
self.shards.insert(entry_index, &value);
|
||||
if let CrdsData::EpochSlots(_, _) = value.value.data {
|
||||
self.epoch_slots
|
||||
.remove(&(entry.get().insert_timestamp, entry_index));
|
||||
self.epoch_slots
|
||||
.insert((new_value.insert_timestamp, entry_index));
|
||||
.insert((value.insert_timestamp, entry_index));
|
||||
}
|
||||
self.num_inserts += 1;
|
||||
// As long as the pubkey does not change, self.records
|
||||
// does not need to be updated.
|
||||
debug_assert_eq!(entry.get().value.pubkey(), new_value.value.pubkey());
|
||||
Ok(Some(entry.insert(new_value)))
|
||||
debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey());
|
||||
Ok(Some(entry.insert(value)))
|
||||
}
|
||||
_ => {
|
||||
trace!(
|
||||
"INSERT FAILED data: {} new.wallclock: {}",
|
||||
new_value.value.label(),
|
||||
new_value.value.wallclock(),
|
||||
value.value.label(),
|
||||
value.value.wallclock(),
|
||||
);
|
||||
Err(CrdsError::InsertFailed)
|
||||
Err(CrdsError::InsertFailed(value.value_hash))
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn insert(
|
||||
&mut self,
|
||||
value: CrdsValue,
|
||||
local_timestamp: u64,
|
||||
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
|
||||
let new_value = self.new_versioned(local_timestamp, value);
|
||||
self.insert_versioned(new_value)
|
||||
}
|
||||
|
||||
pub fn lookup(&self, label: &CrdsValueLabel) -> Option<&CrdsValue> {
|
||||
self.table.get(label).map(|x| &x.value)
|
||||
}
|
||||
|
@ -504,10 +493,13 @@ impl Crds {
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::{contact_info::ContactInfo, crds_value::NodeInstance};
|
||||
use crate::{
|
||||
contact_info::ContactInfo,
|
||||
crds_value::{new_rand_timestamp, NodeInstance},
|
||||
};
|
||||
use rand::{thread_rng, Rng};
|
||||
use rayon::ThreadPoolBuilder;
|
||||
use solana_sdk::signature::Signer;
|
||||
use solana_sdk::signature::{Keypair, Signer};
|
||||
use std::{collections::HashSet, iter::repeat_with};
|
||||
|
||||
#[test]
|
||||
|
@ -523,8 +515,12 @@ mod test {
|
|||
fn test_update_old() {
|
||||
let mut crds = Crds::default();
|
||||
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
|
||||
let value_hash = hash(&serialize(&val).unwrap());
|
||||
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
|
||||
assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed));
|
||||
assert_eq!(
|
||||
crds.insert(val.clone(), 1),
|
||||
Err(CrdsError::InsertFailed(value_hash))
|
||||
);
|
||||
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
|
||||
}
|
||||
#[test]
|
||||
|
@ -718,8 +714,9 @@ mod test {
|
|||
let mut num_overrides = 0;
|
||||
for _ in 0..4096 {
|
||||
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
|
||||
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
|
||||
match crds.insert_versioned(value) {
|
||||
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
|
||||
let local_timestamp = new_rand_timestamp(&mut rng);
|
||||
match crds.insert(value, local_timestamp) {
|
||||
Ok(None) => {
|
||||
num_inserts += 1;
|
||||
check_crds_shards(&crds);
|
||||
|
@ -811,8 +808,9 @@ mod test {
|
|||
let mut num_overrides = 0;
|
||||
for k in 0..4096 {
|
||||
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
|
||||
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
|
||||
match crds.insert_versioned(value) {
|
||||
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
|
||||
let local_timestamp = new_rand_timestamp(&mut rng);
|
||||
match crds.insert(value, local_timestamp) {
|
||||
Ok(None) => {
|
||||
num_inserts += 1;
|
||||
}
|
||||
|
@ -870,8 +868,9 @@ mod test {
|
|||
let mut crds = Crds::default();
|
||||
for k in 0..4096 {
|
||||
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
|
||||
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
|
||||
let _ = crds.insert_versioned(value);
|
||||
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
|
||||
let local_timestamp = new_rand_timestamp(&mut rng);
|
||||
let _ = crds.insert(value, local_timestamp);
|
||||
if k % 64 == 0 {
|
||||
check_crds_records(&crds);
|
||||
}
|
||||
|
@ -911,8 +910,9 @@ mod test {
|
|||
let mut crds = Crds::default();
|
||||
for _ in 0..2048 {
|
||||
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
|
||||
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
|
||||
let _ = crds.insert_versioned(value);
|
||||
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
|
||||
let local_timestamp = new_rand_timestamp(&mut rng);
|
||||
let _ = crds.insert(value, local_timestamp);
|
||||
}
|
||||
let num_values = crds.table.len();
|
||||
let num_pubkeys = num_unique_pubkeys(crds.table.values());
|
||||
|
@ -967,8 +967,8 @@ mod test {
|
|||
let v2 = VersionedCrdsValue::new(1, val);
|
||||
assert_eq!(v1, v2);
|
||||
assert!(!(v1 != v2));
|
||||
assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Equal));
|
||||
assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Equal));
|
||||
assert!(!overrides(&v1.value, &v2));
|
||||
assert!(!overrides(&v2.value, &v1));
|
||||
}
|
||||
#[test]
|
||||
#[allow(clippy::neg_cmp_op_on_partial_ord)]
|
||||
|
@ -991,18 +991,12 @@ mod test {
|
|||
assert_ne!(v1.value_hash, v2.value_hash);
|
||||
assert!(v1 != v2);
|
||||
assert!(!(v1 == v2));
|
||||
if v1 > v2 {
|
||||
assert!(v1 > v2);
|
||||
assert!(v2 < v1);
|
||||
assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Greater));
|
||||
assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Less));
|
||||
} else if v2 > v1 {
|
||||
assert!(v1 < v2);
|
||||
assert!(v2 > v1);
|
||||
assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Less));
|
||||
assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Greater));
|
||||
if v1.value_hash > v2.value_hash {
|
||||
assert!(overrides(&v1.value, &v2));
|
||||
assert!(!overrides(&v2.value, &v1));
|
||||
} else {
|
||||
panic!("bad PartialOrd implementation?");
|
||||
assert!(overrides(&v2.value, &v1));
|
||||
assert!(!overrides(&v1.value, &v2));
|
||||
}
|
||||
}
|
||||
#[test]
|
||||
|
@ -1023,14 +1017,13 @@ mod test {
|
|||
))),
|
||||
);
|
||||
assert_eq!(v1.value.label(), v2.value.label());
|
||||
assert!(v1 > v2);
|
||||
assert!(!(v1 < v2));
|
||||
assert!(overrides(&v1.value, &v2));
|
||||
assert!(!overrides(&v2.value, &v1));
|
||||
assert!(v1 != v2);
|
||||
assert!(!(v1 == v2));
|
||||
assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Greater));
|
||||
assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Less));
|
||||
}
|
||||
#[test]
|
||||
#[should_panic(expected = "labels mismatch!")]
|
||||
#[allow(clippy::neg_cmp_op_on_partial_ord)]
|
||||
fn test_label_order() {
|
||||
let v1 = VersionedCrdsValue::new(
|
||||
|
@ -1049,11 +1042,6 @@ mod test {
|
|||
);
|
||||
assert_ne!(v1, v2);
|
||||
assert!(!(v1 == v2));
|
||||
assert!(!(v1 < v2));
|
||||
assert!(!(v1 > v2));
|
||||
assert!(!(v2 < v1));
|
||||
assert!(!(v2 > v1));
|
||||
assert_eq!(v1.partial_cmp(&v2), None);
|
||||
assert_eq!(v2.partial_cmp(&v1), None);
|
||||
assert!(!overrides(&v2.value, &v1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -265,7 +265,11 @@ impl CrdsGossip {
|
|||
response: Vec<CrdsValue>,
|
||||
now: u64,
|
||||
process_pull_stats: &mut ProcessPullStats,
|
||||
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>, Vec<Hash>) {
|
||||
) -> (
|
||||
Vec<CrdsValue>, // valid responses.
|
||||
Vec<CrdsValue>, // responses with expired timestamps.
|
||||
Vec<Hash>, // hash of outdated values.
|
||||
) {
|
||||
self.pull
|
||||
.filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats)
|
||||
}
|
||||
|
@ -274,8 +278,8 @@ impl CrdsGossip {
|
|||
pub fn process_pull_responses(
|
||||
&mut self,
|
||||
from: &Pubkey,
|
||||
responses: Vec<VersionedCrdsValue>,
|
||||
responses_expired_timeout: Vec<VersionedCrdsValue>,
|
||||
responses: Vec<CrdsValue>,
|
||||
responses_expired_timeout: Vec<CrdsValue>,
|
||||
failed_inserts: Vec<Hash>,
|
||||
now: u64,
|
||||
process_pull_stats: &mut ProcessPullStats,
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
use crate::{
|
||||
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
|
||||
contact_info::ContactInfo,
|
||||
crds::{Crds, VersionedCrdsValue},
|
||||
crds::{Crds, CrdsError},
|
||||
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
|
||||
crds_gossip_error::CrdsGossipError,
|
||||
crds_value::{CrdsValue, CrdsValueLabel},
|
||||
|
@ -23,8 +23,10 @@ use rand::distributions::{Distribution, WeightedIndex};
|
|||
use rand::Rng;
|
||||
use rayon::{prelude::*, ThreadPool};
|
||||
use solana_runtime::bloom::{AtomicBloom, Bloom};
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::{
|
||||
hash::{hash, Hash},
|
||||
pubkey::Pubkey,
|
||||
};
|
||||
use std::cmp;
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
@ -330,16 +332,16 @@ impl CrdsGossipPull {
|
|||
responses: Vec<CrdsValue>,
|
||||
now: u64,
|
||||
stats: &mut ProcessPullStats,
|
||||
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>, Vec<Hash>) {
|
||||
let mut versioned = vec![];
|
||||
let mut versioned_expired_timestamp = vec![];
|
||||
) -> (Vec<CrdsValue>, Vec<CrdsValue>, Vec<Hash>) {
|
||||
let mut active_values = vec![];
|
||||
let mut expired_values = vec![];
|
||||
let mut failed_inserts = vec![];
|
||||
let mut maybe_push = |response, values: &mut Vec<VersionedCrdsValue>| {
|
||||
let (push, value) = crds.would_insert(response, now);
|
||||
if push {
|
||||
values.push(value);
|
||||
let mut maybe_push = |response, values: &mut Vec<CrdsValue>| {
|
||||
if crds.upserts(&response) {
|
||||
values.push(response);
|
||||
} else {
|
||||
failed_inserts.push(value.value_hash)
|
||||
let response = bincode::serialize(&response).unwrap();
|
||||
failed_inserts.push(hash(&response));
|
||||
}
|
||||
};
|
||||
let default_timeout = timeouts
|
||||
|
@ -354,17 +356,17 @@ impl CrdsGossipPull {
|
|||
// owner exists in the table. If it doesn't, that implies that this
|
||||
// value can be discarded
|
||||
if now <= response.wallclock().saturating_add(timeout) {
|
||||
maybe_push(response, &mut versioned);
|
||||
maybe_push(response, &mut active_values);
|
||||
} else if crds.get_contact_info(owner).is_some() {
|
||||
// Silently insert this old value without bumping record
|
||||
// timestamps
|
||||
maybe_push(response, &mut versioned_expired_timestamp);
|
||||
maybe_push(response, &mut expired_values);
|
||||
} else {
|
||||
stats.timeout_count += 1;
|
||||
stats.failed_timeout += 1;
|
||||
}
|
||||
}
|
||||
(versioned, versioned_expired_timestamp, failed_inserts)
|
||||
(active_values, expired_values, failed_inserts)
|
||||
}
|
||||
|
||||
/// process a vec of pull responses
|
||||
|
@ -372,33 +374,34 @@ impl CrdsGossipPull {
|
|||
&mut self,
|
||||
crds: &mut Crds,
|
||||
from: &Pubkey,
|
||||
responses: Vec<VersionedCrdsValue>,
|
||||
responses_expired_timeout: Vec<VersionedCrdsValue>,
|
||||
responses: Vec<CrdsValue>,
|
||||
responses_expired_timeout: Vec<CrdsValue>,
|
||||
mut failed_inserts: Vec<Hash>,
|
||||
now: u64,
|
||||
stats: &mut ProcessPullStats,
|
||||
) -> Vec<(CrdsValueLabel, Hash, u64)> {
|
||||
let mut success = vec![];
|
||||
let mut owners = HashSet::new();
|
||||
for r in responses_expired_timeout {
|
||||
let value_hash = r.value_hash;
|
||||
match crds.insert_versioned(r) {
|
||||
for response in responses_expired_timeout {
|
||||
match crds.insert(response, now) {
|
||||
Ok(None) => (),
|
||||
Ok(Some(old)) => self.purged_values.push_back((old.value_hash, now)),
|
||||
Err(_) => failed_inserts.push(value_hash),
|
||||
Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash),
|
||||
Err(CrdsError::UnknownStakes) => (),
|
||||
}
|
||||
}
|
||||
for r in responses {
|
||||
let label = r.value.label();
|
||||
let wc = r.value.wallclock();
|
||||
let hash = r.value_hash;
|
||||
match crds.insert_versioned(r) {
|
||||
Err(_) => failed_inserts.push(hash),
|
||||
for response in responses {
|
||||
let label = response.label();
|
||||
let wallclock = response.wallclock();
|
||||
match crds.insert(response, now) {
|
||||
Err(CrdsError::InsertFailed(value_hash)) => failed_inserts.push(value_hash),
|
||||
Err(CrdsError::UnknownStakes) => (),
|
||||
Ok(old) => {
|
||||
stats.success += 1;
|
||||
self.num_pulls += 1;
|
||||
owners.insert(label.pubkey());
|
||||
success.push((label, hash, wc));
|
||||
let value_hash = crds.get(&label).unwrap().value_hash;
|
||||
success.push((label, value_hash, wallclock));
|
||||
if let Some(val) = old {
|
||||
self.purged_values.push_back((val.value_hash, now))
|
||||
}
|
||||
|
|
|
@ -172,29 +172,29 @@ impl CrdsGossipPush {
|
|||
now: u64,
|
||||
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
|
||||
self.num_total += 1;
|
||||
if now > value.wallclock().checked_add(self.msg_timeout).unwrap_or(0) {
|
||||
return Err(CrdsGossipError::PushMessageTimeout);
|
||||
}
|
||||
if now + self.msg_timeout < value.wallclock() {
|
||||
let range = now.saturating_sub(self.msg_timeout)..=now.saturating_add(self.msg_timeout);
|
||||
if !range.contains(&value.wallclock()) {
|
||||
return Err(CrdsGossipError::PushMessageTimeout);
|
||||
}
|
||||
let label = value.label();
|
||||
let origin = label.pubkey();
|
||||
let new_value = crds.new_versioned(now, value);
|
||||
let value_hash = new_value.value_hash;
|
||||
let received_set = self
|
||||
.received_cache
|
||||
self.received_cache
|
||||
.entry(origin)
|
||||
.or_insert_with(HashMap::new);
|
||||
received_set.entry(*from).or_insert((false, 0)).1 = now;
|
||||
|
||||
let old = crds.insert_versioned(new_value);
|
||||
if old.is_err() {
|
||||
.or_default()
|
||||
.entry(*from)
|
||||
.and_modify(|(_pruned, timestamp)| *timestamp = now)
|
||||
.or_insert((/*pruned:*/ false, now));
|
||||
match crds.insert(value, now) {
|
||||
Err(_) => {
|
||||
self.num_old += 1;
|
||||
return Err(CrdsGossipError::PushMessageOldVersion);
|
||||
Err(CrdsGossipError::PushMessageOldVersion)
|
||||
}
|
||||
Ok(old) => {
|
||||
let value_hash = crds.get(&label).unwrap().value_hash;
|
||||
self.push_messages.insert(label, value_hash);
|
||||
Ok(old.unwrap())
|
||||
Ok(old)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// push pull responses
|
||||
|
|
|
@ -130,19 +130,17 @@ where
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::contact_info::ContactInfo;
|
||||
use crate::crds_value::{CrdsData, CrdsValue};
|
||||
use crate::{crds::Crds, crds_value::CrdsValue};
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use std::collections::HashSet;
|
||||
use std::ops::Index;
|
||||
use std::{collections::HashSet, iter::repeat_with, ops::Index};
|
||||
|
||||
fn new_test_crds_value() -> VersionedCrdsValue {
|
||||
let data = CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||
&solana_sdk::pubkey::new_rand(),
|
||||
timestamp(),
|
||||
));
|
||||
VersionedCrdsValue::new(timestamp(), CrdsValue::new_unsigned(data))
|
||||
fn new_test_crds_value<R: Rng>(rng: &mut R) -> VersionedCrdsValue {
|
||||
let value = CrdsValue::new_rand(rng, None);
|
||||
let label = value.label();
|
||||
let mut crds = Crds::default();
|
||||
crds.insert(value, timestamp()).unwrap();
|
||||
crds.remove(&label).unwrap()
|
||||
}
|
||||
|
||||
// Returns true if the first mask_bits most significant bits of hash is the
|
||||
|
@ -176,7 +174,7 @@ mod test {
|
|||
fn test_crds_shards_round_trip() {
|
||||
let mut rng = thread_rng();
|
||||
// Generate some random hash and crds value labels.
|
||||
let mut values: Vec<_> = std::iter::repeat_with(new_test_crds_value)
|
||||
let mut values: Vec<_> = repeat_with(|| new_test_crds_value(&mut rng))
|
||||
.take(4096)
|
||||
.collect();
|
||||
// Insert everything into the crds shards.
|
||||
|
|
Loading…
Reference in New Issue