filters crds values obtained through gossip by their shred version (#18072)

filter_by_shred_version does not check the shred-version of the owner of
the crds-value. It only checks the shred-version of the node which is
relaying the value:
https://github.com/solana-labs/solana/blob/5cc073420/gossip/src/cluster_info.rs#L2274-L2289

So crds-values with different shred versions can still pass through this
function as long as they are relayed by a node with matching shred
version; and so, a single node can bridge different shred values
through-out the cluster.
This commit is contained in:
behzad nouri 2021-06-23 14:16:05 +00:00 committed by GitHub
parent 3b1738c000
commit 69a5f0e6cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 91 additions and 135 deletions

View File

@ -16,7 +16,7 @@ use {
crate::{
cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer},
contact_info::ContactInfo,
crds::Cursor,
crds::{Crds, Cursor},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
@ -1927,7 +1927,6 @@ impl ClusterInfo {
return;
}
let self_pubkey = self.id();
let self_shred_version = self.my_shred_version();
let requests: Vec<_> = thread_pool.install(|| {
requests
.into_par_iter()
@ -1939,17 +1938,7 @@ impl ClusterInfo {
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
false
}
Some(caller) => {
if self_shred_version != 0
&& caller.shred_version != 0
&& caller.shred_version != self_shred_version
{
self.stats.skip_pull_shred_version.add_relaxed(1);
false
} else {
true
}
}
Some(_) => true,
})
.map(|(from_addr, filter, caller)| PullData {
from_addr,
@ -2202,23 +2191,11 @@ impl ClusterInfo {
fn handle_pull_response(
&self,
from: &Pubkey,
mut crds_values: Vec<CrdsValue>,
crds_values: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id(), from, len);
let shred_version = {
let gossip = self.gossip.read().unwrap();
gossip.crds.get_shred_version(from).unwrap_or_default()
};
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
self.my_shred_version(),
);
let filtered_len = crds_values.len();
let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self
.time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response)
@ -2238,14 +2215,8 @@ impl ClusterInfo {
&mut pull_stats,
);
}
self.stats
.skip_pull_response_shred_version
.add_relaxed((len - filtered_len) as u64);
self.stats.process_pull_response_count.add_relaxed(1);
self.stats
.process_pull_response_len
.add_relaxed(filtered_len as u64);
self.stats.process_pull_response_len.add_relaxed(len as u64);
self.stats
.process_pull_response_timeout
.add_relaxed(pull_stats.timeout_count as u64);
@ -2266,23 +2237,6 @@ impl ClusterInfo {
)
}
fn filter_by_shred_version(
from: &Pubkey,
crds_values: &mut Vec<CrdsValue>,
shred_version: u16,
my_shred_version: u16,
) {
// Always run filter on spies
if my_shred_version != 0 && shred_version != my_shred_version {
// Allow someone to update their own ContactInfo so they
// can change shred versions if needed.
crds_values.retain(|crds_value| match &crds_value.data {
CrdsData::ContactInfo(contact_info) => contact_info.id == *from,
_ => false,
});
}
}
fn handle_batch_ping_messages<I>(
&self,
pings: I,
@ -2355,41 +2309,10 @@ impl ClusterInfo {
self.stats
.push_message_count
.add_relaxed(messages.len() as u64);
// Obtain shred versions of the origins.
let shred_versions: Vec<_> = {
let gossip = self.gossip.read().unwrap();
messages
.iter()
.map(|(from, _)| gossip.crds.get_shred_version(from).unwrap_or_default())
.collect()
};
// Filter out data if the origin has different shred version.
let self_shred_version = self.my_shred_version();
let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum();
let messages: Vec<_> = messages
.into_iter()
.zip(shred_versions)
.filter_map(|((from, mut crds_values), shred_version)| {
Self::filter_by_shred_version(
&from,
&mut crds_values,
shred_version,
self_shred_version,
);
if crds_values.is_empty() {
None
} else {
Some((from, crds_values))
}
})
.collect();
let num_filtered_crds_values = messages.iter().map(|(_, data)| data.len() as u64).sum();
self.stats
.push_message_value_count
.add_relaxed(num_filtered_crds_values);
self.stats
.skip_push_message_shred_version
.add_relaxed(num_crds_values - num_filtered_crds_values);
.add_relaxed(num_crds_values);
// Origins' pubkeys of upserted crds values.
let origins: HashSet<_> = {
let mut gossip =
@ -2512,6 +2435,28 @@ impl ClusterInfo {
should_check_duplicate_instance: bool,
) -> Result<(), GossipError> {
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
// Filter out values if the shred-versions are different.
let self_shred_version = self.my_shred_version();
let packets = if self_shred_version == 0 {
packets
} else {
let gossip = self.gossip.read().unwrap();
thread_pool.install(|| {
packets
.into_par_iter()
.with_min_len(1024)
.filter_map(|(from, msg)| {
let msg = filter_on_shred_version(
msg,
self_shred_version,
&gossip.crds,
&self.stats,
)?;
Some((from, msg))
})
.collect()
})
};
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = |values: &[CrdsValue]| {
@ -3099,6 +3044,70 @@ pub fn stake_weight_peers(
ClusterInfo::sorted_stakes_with_index(peers, stakes)
}
// Filters out values from nodes with different shred-version.
fn filter_on_shred_version(
mut msg: Protocol,
self_shred_version: u16,
crds: &Crds,
stats: &GossipStats,
) -> Option<Protocol> {
let filter_values = |from: &Pubkey, values: &mut Vec<CrdsValue>, skipped_counter: &Counter| {
let num_values = values.len();
if crds.get_shred_version(from) == Some(self_shred_version) {
// Retain values with the same shred-vesion, or those which are
// contact-info so that shred-versions can be updated.
values.retain(|value| match &value.data {
CrdsData::ContactInfo(_) => true,
_ => crds.get_shred_version(&value.pubkey()) == Some(self_shred_version),
})
} else {
// Only allow node to update its own contact info in case their
// shred-version changes.
values.retain(|value| match &value.data {
CrdsData::ContactInfo(node) => node.id == *from,
_ => false,
})
}
let num_skipped = num_values - values.len();
if num_skipped != 0 {
skipped_counter.add_relaxed(num_skipped as u64);
}
};
match &mut msg {
Protocol::PullRequest(_, caller) => match &caller.data {
// Allow spy nodes with shred-verion == 0 to pull from other nodes.
CrdsData::ContactInfo(node)
if node.shred_version == 0 || node.shred_version == self_shred_version =>
{
Some(msg)
}
_ => {
stats.skip_pull_shred_version.add_relaxed(1);
None
}
},
Protocol::PullResponse(from, values) => {
filter_values(from, values, &stats.skip_pull_response_shred_version);
if values.is_empty() {
None
} else {
Some(msg)
}
}
Protocol::PushMessage(from, values) => {
filter_values(from, values, &stats.skip_push_message_shred_version);
if values.is_empty() {
None
} else {
Some(msg)
}
}
Protocol::PruneMessage(_, _) | Protocol::PingMessage(_) | Protocol::PongMessage(_) => {
Some(msg)
}
}
}
#[cfg(test)]
mod tests {
use {
@ -3294,59 +3303,6 @@ mod tests {
vec![entrypoint_crdsvalue]
}
#[test]
fn test_filter_shred_version() {
let from = solana_sdk::pubkey::new_rand();
let my_shred_version = 1;
let other_shred_version = 1;
// Allow same shred_version
let mut values = test_crds_values(from);
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
// Allow shred_version=0.
let other_shred_version = 0;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash {
from: solana_sdk::pubkey::new_rand(),
hashes: vec![],
wallclock: 0,
}));
values.push(snapshot_hash_data);
// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
}
#[test]
fn test_max_snapshot_hashes_with_push_messages() {
let mut rng = rand::thread_rng();