validates gossip addresses before sending pull-requests

IP addresses need to be validated before sending packets to them.
This commit, sends a ping packet to nodes before any pull requests.
Pull requests are then only sent to the nodes which have responded with
the correct hash of their respective ping packet.
This commit is contained in:
behzad nouri 2021-04-20 14:06:13 -04:00
parent 2231017b35
commit 7cea2c4466
6 changed files with 315 additions and 138 deletions

View File

@ -108,8 +108,8 @@ pub const MAX_SNAPSHOT_HASHES: usize = 16;
const MAX_PRUNE_DATA_NODES: usize = 32;
/// Number of bytes in the randomly generated token sent with ping messages.
const GOSSIP_PING_TOKEN_SIZE: usize = 32;
const GOSSIP_PING_CACHE_CAPACITY: usize = 16384;
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640);
const GOSSIP_PING_CACHE_CAPACITY: usize = 65536;
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
pub const DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS: u64 = 10_000;
pub const DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS: u64 = 60_000;
/// Minimum serialized size of a Protocol::PullResponse packet.
@ -317,7 +317,7 @@ pub fn make_accounts_hashes_message(
Some(CrdsValue::new_signed(message, keypair))
}
type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "CH5BWuhAyvUiUQYgu2Lcwu7eoiW6bQitvtLS1yFsdmrE")]
@ -1566,21 +1566,29 @@ impl ClusterInfo {
})
}
#[allow(clippy::type_complexity)]
fn new_pull_requests(
&self,
thread_pool: &ThreadPool,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<(SocketAddr, Protocol)> {
) -> (
Vec<(SocketAddr, Ping)>, // Ping packets.
Vec<(SocketAddr, Protocol)>, // Pull requests
) {
let now = timestamp();
let mut pings = Vec::new();
let mut pulls: Vec<_> = {
let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests);
match gossip.new_pull_request(
thread_pool,
self.keypair.deref(),
now,
gossip_validators,
stakes,
MAX_BLOOM_SIZE,
&self.ping_cache,
&mut pings,
) {
Err(_) => Vec::default(),
Ok((peer, filters)) => vec![(peer, filters)],
@ -1598,14 +1606,17 @@ impl ClusterInfo {
}
let self_info = CrdsData::ContactInfo(self.my_contact_info());
let self_info = CrdsValue::new_signed(self_info, &self.keypair);
pulls
let pulls = pulls
.into_iter()
.flat_map(|(peer, filters)| std::iter::repeat(peer.gossip).zip(filters))
.map(|(gossip_addr, filter)| {
let request = Protocol::PullRequest(filter, self_info.clone());
(gossip_addr, request)
})
.collect()
});
self.stats
.new_pull_requests_pings_count
.add_relaxed(pings.len() as u64);
(pings, pulls.collect())
}
fn drain_push_queue(&self) -> Vec<CrdsValue> {
@ -1676,11 +1687,16 @@ impl ClusterInfo {
.packets_sent_push_messages_count
.add_relaxed(out.len() as u64);
if generate_pull_requests {
let pull_requests = self.new_pull_requests(&thread_pool, gossip_validators, stakes);
let (pings, pull_requests) =
self.new_pull_requests(&thread_pool, gossip_validators, stakes);
self.stats
.packets_sent_pull_requests_count
.add_relaxed(pull_requests.len() as u64);
let pings = pings
.into_iter()
.map(|(addr, ping)| (addr, Protocol::PingMessage(ping)));
out.extend(pull_requests);
out.extend(pings);
}
out
}
@ -3576,6 +3592,11 @@ mod tests {
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let peer = ContactInfo::new_localhost(&peer_keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new(contact_info, Arc::new(keypair));
cluster_info
.ping_cache
.lock()
.unwrap()
.mock_pong(peer.id, peer.gossip, Instant::now());
cluster_info.insert_info(peer);
cluster_info
.gossip
@ -3594,16 +3615,20 @@ mod tests {
.values()
.for_each(|v| v.par_iter().for_each(|v| assert!(v.verify())));
let mut pings = Vec::new();
cluster_info
.gossip
.write()
.unwrap()
.new_pull_request(
&thread_pool,
cluster_info.keypair.deref(),
timestamp(),
None,
&HashMap::new(),
MAX_BLOOM_SIZE,
&cluster_info.ping_cache,
&mut pings,
)
.ok()
.unwrap();
@ -3859,7 +3884,8 @@ mod tests {
let entrypoint_pubkey = solana_sdk::pubkey::new_rand();
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint.clone());
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
assert!(pings.is_empty());
assert_eq!(1, pulls.len() as u64);
match pulls.get(0) {
Some((addr, msg)) => {
@ -3886,7 +3912,8 @@ mod tests {
vec![entrypoint_crdsvalue],
&timeouts,
);
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
assert_eq!(pings.len(), 1);
assert_eq!(1, pulls.len() as u64);
assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]);
}
@ -4062,26 +4089,34 @@ mod tests {
let other_node_pubkey = solana_sdk::pubkey::new_rand();
let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp());
assert_ne!(other_node.gossip, entrypoint.gossip);
cluster_info.ping_cache.lock().unwrap().mock_pong(
other_node.id,
other_node.gossip,
Instant::now(),
);
cluster_info.insert_info(other_node.clone());
stakes.insert(other_node_pubkey, 10);
// Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a
// fresh timestamp). There should only be one pull request to `other_node`
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(1, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
// Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should
// now be two pull requests
cluster_info.entrypoints.write().unwrap()[0].wallclock = 0;
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(2, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
assert_eq!(pulls.get(1).unwrap().0, entrypoint.gossip);
// Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should
// only be one pull request to `other_node`
let pulls = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(1, pulls.len() as u64);
assert_eq!(pulls.get(0).unwrap().0, other_node.gossip);
}

View File

@ -68,6 +68,7 @@ pub(crate) struct GossipStats {
pub(crate) mark_pull_request: Counter,
pub(crate) new_pull_requests: Counter,
pub(crate) new_pull_requests_count: Counter,
pub(crate) new_pull_requests_pings_count: Counter,
pub(crate) new_push_requests2: Counter,
pub(crate) new_push_requests: Counter,
pub(crate) new_push_requests_num: Counter,
@ -242,6 +243,11 @@ pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock<CrdsGossi
stats.pull_request_ping_pong_check_failed_count.clear(),
i64
),
(
"new_pull_requests_pings_count",
stats.new_pull_requests_pings_count.clear(),
i64
),
(
"generate_pull_responses",
stats.generate_pull_responses.clear(),

View File

@ -4,6 +4,7 @@
//! packet::PACKET_DATA_SIZE size.
use crate::{
cluster_info::Ping,
contact_info::ContactInfo,
crds::{Crds, VersionedCrdsValue},
crds_gossip_error::CrdsGossipError,
@ -11,6 +12,7 @@ use crate::{
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
ping_pong::PingCache,
};
use rayon::ThreadPool;
use solana_ledger::shred::Shred;
@ -20,7 +22,11 @@ use solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
};
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::Mutex,
};
///The min size for bloom filters
pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500;
@ -206,20 +212,25 @@ impl CrdsGossip {
pub fn new_pull_request(
&self,
thread_pool: &ThreadPool,
self_keypair: &Keypair,
now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
bloom_size: usize,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
) -> Result<(ContactInfo, Vec<CrdsFilter>), CrdsGossipError> {
self.pull.new_pull_request(
thread_pool,
&self.crds,
&self.id,
self_keypair,
self.shred_version,
now,
gossip_validators,
stakes,
bloom_size,
ping_cache,
pings,
)
}

View File

@ -10,12 +10,13 @@
//! of false positives.
use crate::{
cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY,
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
contact_info::ContactInfo,
crds::{Crds, CrdsError},
crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS},
crds_gossip_error::CrdsGossipError,
crds_value::{CrdsValue, CrdsValueLabel},
ping_pong::PingCache,
};
use itertools::Itertools;
use lru::LruCache;
@ -26,11 +27,16 @@ use solana_runtime::bloom::{AtomicBloom, Bloom};
use solana_sdk::{
hash::{hash, Hash},
pubkey::Pubkey,
signature::{Keypair, Signer},
};
use std::{
cmp,
collections::{HashMap, HashSet, VecDeque},
convert::TryInto,
net::SocketAddr,
sync::Mutex,
time::Instant,
};
use std::cmp;
use std::collections::VecDeque;
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// The maximum age of a value received over pull responses
@ -203,33 +209,62 @@ impl Default for CrdsGossipPull {
}
impl CrdsGossipPull {
/// generate a random request
#[allow(clippy::too_many_arguments)]
pub fn new_pull_request(
&self,
thread_pool: &ThreadPool,
crds: &Crds,
self_id: &Pubkey,
self_keypair: &Keypair,
self_shred_version: u16,
now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
bloom_size: usize,
ping_cache: &Mutex<PingCache>,
pings: &mut Vec<(SocketAddr, Ping)>,
) -> Result<(ContactInfo, Vec<CrdsFilter>), CrdsGossipError> {
let options = self.pull_options(
crds,
&self_id,
self_shred_version,
now,
gossip_validators,
stakes,
);
if options.is_empty() {
let (weights, peers): (Vec<_>, Vec<_>) = self
.pull_options(
crds,
&self_keypair.pubkey(),
self_shred_version,
now,
gossip_validators,
stakes,
)
.into_iter()
.unzip();
if peers.is_empty() {
return Err(CrdsGossipError::NoPeers);
}
let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
let index = WeightedIndex::new(options.iter().map(|weighted| weighted.0)).unwrap();
let random = index.sample(&mut rand::thread_rng());
let (_weight, peer) = options[random];
Ok((peer.clone(), filters))
let mut peers = {
let mut rng = rand::thread_rng();
let num_samples = peers.len() * 2;
let index = WeightedIndex::new(weights).unwrap();
let sample_peer = move || peers[index.sample(&mut rng)];
std::iter::repeat_with(sample_peer).take(num_samples)
};
let peer = {
let mut rng = rand::thread_rng();
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(&mut rng, self_keypair).ok();
let now = Instant::now();
peers.find(|peer| {
let node = (peer.id, peer.gossip);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
if let Some(ping) = ping {
pings.push((peer.gossip, ping));
}
check
})
};
match peer {
None => Err(CrdsGossipError::NoPeers),
Some(peer) => {
let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
Ok((peer.clone(), filters))
}
}
}
fn pull_options<'a>(
@ -629,7 +664,7 @@ mod test {
packet::PACKET_DATA_SIZE,
timing::timestamp,
};
use std::iter::repeat_with;
use std::{iter::repeat_with, time::Duration};
#[test]
fn test_hash_as_u64() {
@ -919,22 +954,29 @@ mod test {
fn test_new_pull_request() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut crds = Crds::default();
let node_keypair = Keypair::new();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
&node_keypair.pubkey(),
0,
)));
let id = entry.label().pubkey();
let node = CrdsGossipPull::default();
let mut pings = Vec::new();
let ping_cache = Mutex::new(PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
));
assert_eq!(
node.new_pull_request(
&thread_pool,
&crds,
&id,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
),
Err(CrdsGossipError::NoPeers)
);
@ -944,30 +986,35 @@ mod test {
node.new_pull_request(
&thread_pool,
&crds,
&id,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
),
Err(CrdsGossipError::NoPeers)
);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache
.lock()
.unwrap()
.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
crds.insert(new.clone(), 0).unwrap();
let req = node.new_pull_request(
&thread_pool,
&crds,
&id,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
);
let (peer, _) = req.unwrap();
assert_eq!(peer, *new.contact_info().unwrap());
@ -977,23 +1024,25 @@ mod test {
fn test_new_mark_creation_time() {
let now: u64 = 1_605_127_770_789;
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
);
let mut crds = Crds::default();
let node_keypair = Keypair::new();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
&node_keypair.pubkey(),
0,
)));
let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
crds.insert(entry, now).unwrap();
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let old = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(old.id, old.gossip, Instant::now());
let old = CrdsValue::new_unsigned(CrdsData::ContactInfo(old));
crds.insert(old.clone(), now).unwrap();
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
crds.insert(new.clone(), now).unwrap();
// set request creation time to now.
@ -1002,16 +1051,20 @@ mod test {
// odds of getting the other request should be close to 1.
let now = now + 1_000;
let mut pings = Vec::new();
let ping_cache = Mutex::new(ping_cache);
for _ in 0..10 {
let req = node.new_pull_request(
&thread_pool,
&crds,
&node_pubkey,
&node_keypair,
0,
now,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
);
let (peer, _) = req.unwrap();
assert_eq!(peer, *old.contact_info().unwrap());
@ -1056,29 +1109,35 @@ mod test {
#[test]
fn test_generate_pull_responses() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let node_keypair = Keypair::new();
let mut node_crds = Crds::default();
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
);
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
&node_keypair.pubkey(),
0,
)));
let caller = entry.clone();
let node_pubkey = entry.label().pubkey();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
let mut pings = Vec::new();
let req = node.new_pull_request(
&thread_pool,
&node_crds,
&node_pubkey,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&Mutex::new(ping_cache),
&mut pings,
);
let mut dest_crds = Crds::default();
@ -1133,29 +1192,35 @@ mod test {
#[test]
fn test_process_pull_request() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let node_keypair = Keypair::new();
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
&node_keypair.pubkey(),
0,
)));
let caller = entry.clone();
let node_pubkey = entry.label().pubkey();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
);
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
let mut pings = Vec::new();
let req = node.new_pull_request(
&thread_pool,
&node_crds,
&node_pubkey,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&Mutex::new(ping_cache),
&mut pings,
);
let mut dest_crds = Crds::default();
@ -1193,34 +1258,37 @@ mod test {
#[test]
fn test_process_pull_request_response() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
let node_keypair = Keypair::new();
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
&node_keypair.pubkey(),
1,
)));
let caller = entry.clone();
let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
1,
)));
let mut ping_cache = PingCache::new(
Duration::from_secs(20 * 60), // ttl
128, // capacity
);
let new = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 1);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
node_crds.insert(new, 0).unwrap();
let mut dest = CrdsGossipPull::default();
let mut dest_crds = Crds::default();
let new_id = solana_sdk::pubkey::new_rand();
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&new_id, 1,
)));
let new = ContactInfo::new_localhost(&new_id, 1);
ping_cache.mock_pong(new.id, new.gossip, Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
dest_crds.insert(new.clone(), 0).unwrap();
// node contains a key from the dest node, but at an older local timestamp
let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&new_id, 0,
)));
let same_key = ContactInfo::new_localhost(&new_id, 0);
ping_cache.mock_pong(same_key.id, same_key.gossip, Instant::now());
let same_key = CrdsValue::new_unsigned(CrdsData::ContactInfo(same_key));
assert_eq!(same_key.label(), new.label());
assert!(same_key.wallclock() < new.wallclock());
node_crds.insert(same_key.clone(), 0).unwrap();
@ -1232,17 +1300,21 @@ mod test {
0
);
let mut done = false;
let mut pings = Vec::new();
let ping_cache = Mutex::new(ping_cache);
for _ in 0..30 {
// there is a chance of a false positive with bloom filters
let req = node.new_pull_request(
&thread_pool,
&node_crds,
&node_pubkey,
&node_keypair,
0,
0,
None,
&HashMap::new(),
PACKET_DATA_SIZE,
&ping_cache,
&mut pings,
);
let (_, filters) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();

View File

@ -243,6 +243,11 @@ impl PingCache {
}
clone
}
/// Only for tests and simulations.
pub fn mock_pong(&mut self, node: Pubkey, socket: SocketAddr, now: Instant) {
self.pongs.put((node, socket), now);
}
}
#[cfg(test)]

View File

@ -4,35 +4,65 @@ use log::*;
use rayon::prelude::*;
use rayon::{ThreadPool, ThreadPoolBuilder};
use serial_test::serial;
use solana_core::cluster_info;
use solana_core::contact_info::ContactInfo;
use solana_core::crds_gossip::*;
use solana_core::crds_gossip_error::CrdsGossipError;
use solana_core::crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS};
use solana_core::crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS;
use solana_core::crds_value::CrdsValueLabel;
use solana_core::crds_value::{CrdsData, CrdsValue};
use solana_core::{
cluster_info,
contact_info::ContactInfo,
crds_gossip::*,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
ping_pong::PingCache,
};
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use solana_sdk::{
hash::hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::timestamp,
};
use std::{
collections::{HashMap, HashSet},
ops::Deref,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
#[derive(Clone)]
struct Node {
keypair: Arc<Keypair>,
contact_info: ContactInfo,
gossip: Arc<Mutex<CrdsGossip>>,
ping_cache: Arc<Mutex<PingCache>>,
stake: u64,
}
impl Node {
fn new(gossip: Arc<Mutex<CrdsGossip>>) -> Self {
Node { gossip, stake: 0 }
fn new(
keypair: Arc<Keypair>,
contact_info: ContactInfo,
gossip: Arc<Mutex<CrdsGossip>>,
) -> Self {
Self::staked(keypair, contact_info, gossip, 0)
}
fn staked(gossip: Arc<Mutex<CrdsGossip>>, stake: u64) -> Self {
Node { gossip, stake }
fn staked(
keypair: Arc<Keypair>,
contact_info: ContactInfo,
gossip: Arc<Mutex<CrdsGossip>>,
stake: u64,
) -> Self {
let ping_cache = Arc::new(Mutex::new(PingCache::new(
Duration::from_secs(20 * 60), // ttl
2048, // capacity
)));
Node {
keypair,
contact_info,
gossip,
ping_cache,
stake,
}
}
}
@ -77,71 +107,72 @@ fn stakes(network: &Network) -> HashMap<Pubkey, u64> {
}
fn star_network_create(num: usize) -> Network {
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.crds.insert(entry.clone(), timestamp()).unwrap();
node.set_self(&id);
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
(new.label().pubkey(), node)
})
.collect();
let mut node = CrdsGossip::default();
let id = entry.label().pubkey();
node.crds.insert(entry, timestamp()).unwrap();
node.set_self(&id);
network.insert(id, Node::new(Arc::new(Mutex::new(node))));
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
network.insert(id, node);
Network::new(network)
}
fn rstar_network_create(num: usize) -> Network {
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let mut origin = CrdsGossip::default();
let id = entry.label().pubkey();
origin.crds.insert(entry, timestamp()).unwrap();
origin.set_self(&id);
let mut network: HashMap<_, _> = (1..num)
.map(|_| {
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap();
origin.crds.insert(new.clone(), timestamp()).unwrap();
node.set_self(&id);
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
(new.label().pubkey(), node)
})
.collect();
network.insert(id, Node::new(Arc::new(Mutex::new(origin))));
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(origin)));
network.insert(id, node);
Network::new(network)
}
fn ring_network_create(num: usize) -> Network {
let mut network: HashMap<_, _> = (0..num)
.map(|_| {
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.set_self(&id);
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
let node = Node::new(node_keypair, contact_info, Arc::new(Mutex::new(node)));
(new.label().pubkey(), node)
})
.collect();
let keys: Vec<Pubkey> = network.keys().cloned().collect();
@ -171,18 +202,20 @@ fn connected_staked_network_create(stakes: &[u64]) -> Network {
let num = stakes.len();
let mut network: HashMap<_, _> = (0..num)
.map(|n| {
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&solana_sdk::pubkey::new_rand(),
0,
)));
let node_keypair = Arc::new(Keypair::new());
let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info.clone()));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), timestamp()).unwrap();
node.set_self(&id);
(
new.label().pubkey(),
Node::staked(Arc::new(Mutex::new(node)), stakes[n]),
)
let node = Node::staked(
node_keypair,
contact_info,
Arc::new(Mutex::new(node)),
stakes[n],
);
(new.label().pubkey(), node)
})
.collect();
@ -416,22 +449,37 @@ fn network_run_pull(
let network_values: Vec<Node> = network.values().cloned().collect();
let mut timeouts = HashMap::new();
timeouts.insert(Pubkey::default(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS);
for node in &network_values {
let mut ping_cache = node.ping_cache.lock().unwrap();
for other in &network_values {
if node.keypair.pubkey() != other.keypair.pubkey() {
ping_cache.mock_pong(
other.keypair.pubkey(),
other.contact_info.gossip,
Instant::now(),
);
}
}
}
for t in start..end {
let now = t as u64 * 100;
let requests: Vec<_> = {
network_values
.par_iter()
.filter_map(|from| {
let mut pings = Vec::new();
let (peer, filters) = from
.lock()
.unwrap()
.new_pull_request(
&thread_pool,
from.keypair.deref(),
now,
None,
&HashMap::new(),
cluster_info::MAX_BLOOM_SIZE,
from.ping_cache.deref(),
&mut pings,
)
.ok()?;
let gossip = from.gossip.lock().unwrap();