Stake weighted pruning for the gossip network (#4769)

* Stake weighted pruning

* Fix compile error

* Fix clippy errors

* Add helper for creating a connected staked network

* Bug fixes and test groundwork

* Small refactor

* Anatoly's feedback and tests

* Doc updates

* @rob-solana's feedback

* Fix test bug and add log trace

* @rob-solana's feedback
This commit is contained in:
Justin Starry 2019-06-26 03:30:16 -04:00 committed by Michael Vines
parent d6737b8cc9
commit 861d6468ca
7 changed files with 425 additions and 100 deletions

View File

@ -34,8 +34,8 @@ Nodes send push messages to `PUSH_FANOUT` push peers.
Upon receiving a push message, a node examines the message for:
1. Duplication: if the message has been seen before, the node responds with
`PushMessagePrune` and drops the message
1. Duplication: if the message has been seen before, the node drops the message
and may respond with `PushMessagePrune` if forwarded from a low staked node
2. New data: if the message is new to the node
* Stores the new information with an updated version in its cluster info and
@ -51,7 +51,7 @@ Upon receiving a push message, a node examines the message for:
A nodes selects its push peers at random from the active set of known peers.
The node keeps this selection for a relatively long time. When a prune message
is received, the node drops the push peer that sent the prune. Prune is an
indication that there is another, faster path to that node than direct push.
indication that there is another, higher stake weighted path to that node than direct push.
The set of push peers is kept fresh by rotating a new node into the set every
`PUSH_MSG_TIMEOUT/2` milliseconds.

View File

@ -47,7 +47,7 @@ use solana_sdk::transaction::Transaction;
use std::borrow::Borrow;
use std::borrow::Cow;
use std::cmp::min;
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
@ -206,7 +206,8 @@ impl ClusterInfo {
let mut entry = CrdsValue::ContactInfo(my_data);
entry.sign(&self.keypair);
self.gossip.refresh_push_active_set(stakes);
self.gossip.process_push_message(vec![entry], now);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
// TODO kill insert_info, only used by tests
@ -316,7 +317,8 @@ impl ClusterInfo {
let now = timestamp();
let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now));
entry.sign(&self.keypair);
self.gossip.process_push_message(vec![entry], now);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
pub fn push_vote(&mut self, vote: Transaction) {
@ -324,7 +326,8 @@ impl ClusterInfo {
let vote = Vote::new(&self.id(), vote, now);
let mut entry = CrdsValue::Vote(vote);
entry.sign(&self.keypair);
self.gossip.process_push_message(vec![entry], now);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
/// Get votes in the crds
@ -1071,12 +1074,13 @@ impl ClusterInfo {
fn handle_blob(
obj: &Arc<RwLock<Self>>,
blocktree: Option<&Arc<Blocktree>>,
stakes: &HashMap<Pubkey, u64>,
blob: &Blob,
) -> Vec<SharedBlob> {
deserialize(&blob.data[..blob.meta.size])
.into_iter()
.flat_map(|request| {
ClusterInfo::handle_protocol(obj, &blob.meta.addr(), blocktree, request)
ClusterInfo::handle_protocol(obj, &blob.meta.addr(), blocktree, stakes, request)
})
.collect()
}
@ -1120,6 +1124,7 @@ impl ClusterInfo {
inc_new_counter_debug!("cluster_info-pull_request-rsp", len);
to_shared_blob(rsp, from.gossip).ok().into_iter().collect()
}
fn handle_pull_response(me: &Arc<RwLock<Self>>, from: &Pubkey, data: Vec<CrdsValue>) {
let len = data.len();
let now = Instant::now();
@ -1134,40 +1139,52 @@ impl ClusterInfo {
report_time_spent("ReceiveUpdates", &now.elapsed(), &format!(" len: {}", len));
}
fn handle_push_message(
me: &Arc<RwLock<Self>>,
from: &Pubkey,
data: Vec<CrdsValue>,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<SharedBlob> {
let self_id = me.read().unwrap().gossip.id;
inc_new_counter_debug!("cluster_info-push_message", 1, 0, 1000);
let prunes: Vec<_> = me
let updated: Vec<_> =
me.write()
.unwrap()
.gossip
.process_push_message(from, data, timestamp());
let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect();
let prunes_map: HashMap<Pubkey, HashSet<Pubkey>> = me
.write()
.unwrap()
.gossip
.process_push_message(data, timestamp());
.prune_received_cache(updated_labels, stakes);
if !prunes.is_empty() {
inc_new_counter_debug!("cluster_info-push_message-prunes", prunes.len());
let ci = me.read().unwrap().lookup(from).cloned();
let pushes: Vec<_> = me.write().unwrap().new_push_requests();
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
let mut rsp: Vec<_> = ci
.and_then(|ci| {
let mut rsp: Vec<_> = prunes_map
.into_iter()
.map(|(from, prune_set)| {
inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len());
me.read().unwrap().lookup(&from).cloned().and_then(|ci| {
let mut prune_msg = PruneData {
pubkey: self_id,
prunes,
prunes: prune_set.into_iter().collect(),
signature: Signature::default(),
destination: *from,
destination: from,
wallclock: timestamp(),
};
prune_msg.sign(&me.read().unwrap().keypair);
let rsp = Protocol::PruneMessage(self_id, prune_msg);
to_shared_blob(rsp, ci.gossip).ok()
})
.into_iter()
})
.flatten()
.collect();
if !rsp.is_empty() {
let pushes: Vec<_> = me.write().unwrap().new_push_requests();
inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len());
let mut blobs: Vec<_> = pushes
.into_iter()
.filter_map(|(remote_gossip_addr, req)| {
@ -1269,6 +1286,7 @@ impl ClusterInfo {
me: &Arc<RwLock<Self>>,
from_addr: &SocketAddr,
blocktree: Option<&Arc<Blocktree>>,
stakes: &HashMap<Pubkey, u64>,
request: Protocol,
) -> Vec<SharedBlob> {
match request {
@ -1300,7 +1318,7 @@ impl ClusterInfo {
}
ret
});
Self::handle_push_message(me, &from, data)
Self::handle_push_message(me, &from, data, stakes)
}
Protocol::PruneMessage(from, data) => {
if data.verify() {
@ -1335,6 +1353,7 @@ impl ClusterInfo {
fn run_listen(
obj: &Arc<RwLock<Self>>,
blocktree: Option<&Arc<Blocktree>>,
bank_forks: Option<&Arc<RwLock<BankForks>>>,
requests_receiver: &BlobReceiver,
response_sender: &BlobSender,
) -> Result<()> {
@ -1345,8 +1364,16 @@ impl ClusterInfo {
reqs.append(&mut more);
}
let mut resps = Vec::new();
let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => {
staking_utils::staked_nodes(&bank_forks.read().unwrap().working_bank())
}
None => HashMap::new(),
};
for req in reqs {
let mut resp = Self::handle_blob(obj, blocktree, &req.read().unwrap());
let mut resp = Self::handle_blob(obj, blocktree, &stakes, &req.read().unwrap());
resps.append(&mut resp);
}
response_sender.send(resps)?;
@ -1355,6 +1382,7 @@ impl ClusterInfo {
pub fn listen(
me: Arc<RwLock<Self>>,
blocktree: Option<Arc<Blocktree>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
exit: &Arc<AtomicBool>,
@ -1366,6 +1394,7 @@ impl ClusterInfo {
let e = Self::run_listen(
&me,
blocktree.as_ref(),
bank_forks.as_ref(),
&requests_receiver,
&response_sender,
);

View File

@ -3,15 +3,16 @@
//! designed to run with a simulator or over a UDP network connection with messages up to a
//! packet::BLOB_DATA_SIZE size.
use crate::crds::Crds;
use crate::crds::{Crds, VersionedCrdsValue};
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CrdsGossipPull;
use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
use crate::crds_value::CrdsValue;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use solana_sdk::signature::Signable;
use std::collections::{HashMap, HashSet};
///The min size for bloom filters
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;
@ -39,25 +40,24 @@ impl CrdsGossip {
pub fn set_self(&mut self, id: &Pubkey) {
self.id = *id;
}
/// process a push message to the network
pub fn process_push_message(&mut self, values: Vec<CrdsValue>, now: u64) -> Vec<Pubkey> {
let labels: Vec<_> = values.iter().map(CrdsValue::label).collect();
let results: Vec<_> = values
pub fn process_push_message(
&mut self,
from: &Pubkey,
values: Vec<CrdsValue>,
now: u64,
) -> Vec<VersionedCrdsValue> {
values
.into_iter()
.map(|val| self.push.process_push_message(&mut self.crds, val, now))
.collect();
results
.into_iter()
.zip(labels)
.filter_map(|(r, d)| {
if r == Err(CrdsGossipError::PushMessagePrune) {
Some(d.pubkey())
} else if let Ok(Some(val)) = r {
.filter_map(|val| {
let res = self
.push
.process_push_message(&mut self.crds, from, val, now);
if let Ok(Some(val)) = res {
self.pull
.record_old_hash(val.value_hash, val.local_timestamp);
None
Some(val)
} else {
None
}
@ -65,6 +65,31 @@ impl CrdsGossip {
.collect()
}
/// remove redundant paths in the network
pub fn prune_received_cache(
&mut self,
labels: Vec<CrdsValueLabel>,
stakes: &HashMap<Pubkey, u64>,
) -> HashMap<Pubkey, HashSet<Pubkey>> {
let id = &self.id;
let crds = &self.crds;
let push = &mut self.push;
let versioned = labels
.into_iter()
.filter_map(|label| crds.lookup_versioned(&label));
let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new();
for val in versioned {
let origin = val.value.pubkey();
let hash = val.value_hash;
let peers = push.prune_received_cache(id, &origin, hash, stakes);
for from in peers {
prune_map.entry(from).or_default().insert(origin);
}
}
prune_map
}
pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, HashMap<Pubkey, Vec<CrdsValue>>) {
let push_messages = self.push.new_push_messages(&self.crds, now);
(self.id, push_messages)
@ -147,7 +172,7 @@ impl CrdsGossip {
}
if now > 5 * self.push.msg_timeout {
let min = now - 5 * self.push.msg_timeout;
self.push.purge_old_pushed_once_messages(min);
self.push.purge_old_received_cache(min);
}
if now > self.pull.crds_timeout {
let min = now - self.pull.crds_timeout;

View File

@ -2,7 +2,7 @@
pub enum CrdsGossipError {
NoPeers,
PushMessageTimeout,
PushMessagePrune,
PushMessageAlreadyReceived,
PushMessageOldVersion,
BadPruneDestination,
PruneMessageTimeout,

View File

@ -27,12 +27,13 @@ use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::cmp;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000;
pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
#[derive(Clone)]
pub struct CrdsGossipPush {
@ -42,7 +43,8 @@ pub struct CrdsGossipPush {
active_set: IndexMap<Pubkey, Bloom<Pubkey>>,
/// push message queue
push_messages: HashMap<CrdsValueLabel, Hash>,
pushed_once: HashMap<Hash, u64>,
/// cache that tracks which validators a message was received from
received_cache: HashMap<Hash, (u64, HashSet<Pubkey>)>,
pub num_active: usize,
pub push_fanout: usize,
pub msg_timeout: u64,
@ -55,7 +57,7 @@ impl Default for CrdsGossipPush {
max_bytes: BLOB_DATA_SIZE,
active_set: IndexMap::new(),
push_messages: HashMap::new(),
pushed_once: HashMap::new(),
received_cache: HashMap::new(),
num_active: CRDS_GOSSIP_NUM_ACTIVE,
push_fanout: CRDS_GOSSIP_PUSH_FANOUT,
msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS,
@ -67,10 +69,69 @@ impl CrdsGossipPush {
pub fn num_pending(&self) -> usize {
self.push_messages.len()
}
fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 {
let min_path_stake = self_stake.min(origin_stake);
(CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64
}
pub fn prune_received_cache(
&mut self,
self_pubkey: &Pubkey,
origin: &Pubkey,
hash: Hash,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<Pubkey> {
let origin_stake = stakes.get(origin).unwrap_or(&0);
let self_stake = stakes.get(self_pubkey).unwrap_or(&0);
let cache = self.received_cache.get(&hash);
if cache.is_none() {
return Vec::new();
}
let peers = &cache.unwrap().1;
let peer_stake_total: u64 = peers.iter().map(|p| stakes.get(p).unwrap_or(&0)).sum();
let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake);
if peer_stake_total < prune_stake_threshold {
return Vec::new();
}
let staked_peers: Vec<(Pubkey, u64)> = peers
.iter()
.filter_map(|p| stakes.get(p).map(|s| (*p, *s)))
.filter(|(_, s)| *s > 0)
.collect();
let mut seed = [0; 32];
seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes());
let shuffle = weighted_shuffle(
staked_peers.iter().map(|(_, stake)| *stake).collect_vec(),
ChaChaRng::from_seed(seed),
);
let mut keep = HashSet::new();
let mut peer_stake_sum = 0;
for next in shuffle {
let (next_peer, next_stake) = staked_peers[next];
keep.insert(next_peer);
peer_stake_sum += next_stake;
if peer_stake_sum >= prune_stake_threshold {
break;
}
}
peers
.iter()
.filter(|p| !keep.contains(p))
.cloned()
.collect()
}
/// process a push message to the network
pub fn process_push_message(
&mut self,
crds: &mut Crds,
from: &Pubkey,
value: CrdsValue,
now: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsGossipError> {
@ -81,18 +142,20 @@ impl CrdsGossipPush {
return Err(CrdsGossipError::PushMessageTimeout);
}
let label = value.label();
let new_value = crds.new_versioned(now, value);
let value_hash = new_value.value_hash;
if self.pushed_once.get(&value_hash).is_some() {
return Err(CrdsGossipError::PushMessagePrune);
if let Some((_, ref mut received_set)) = self.received_cache.get_mut(&value_hash) {
received_set.insert(from.clone());
return Err(CrdsGossipError::PushMessageAlreadyReceived);
}
let old = crds.insert_versioned(new_value);
if old.is_err() {
return Err(CrdsGossipError::PushMessageOldVersion);
}
let mut received_set = HashSet::new();
received_set.insert(from.clone());
self.push_messages.insert(label, value_hash);
self.pushed_once.insert(value_hash, now);
self.received_cache.insert(value_hash, (now, received_set));
Ok(old.ok().and_then(|opt| opt))
}
@ -258,16 +321,17 @@ impl CrdsGossipPush {
self.push_messages.remove(&k);
}
}
/// purge old pushed_once messages
pub fn purge_old_pushed_once_messages(&mut self, min_time: u64) {
/// purge received push message cache
pub fn purge_old_received_cache(&mut self, min_time: u64) {
let old_msgs: Vec<Hash> = self
.pushed_once
.received_cache
.iter()
.filter_map(|(k, v)| if *v < min_time { Some(k) } else { None })
.filter_map(|(k, (rcvd_time, _))| if *rcvd_time < min_time { Some(k) } else { None })
.cloned()
.collect();
for k in old_msgs {
self.pushed_once.remove(&k);
self.received_cache.remove(&k);
}
}
}
@ -278,6 +342,55 @@ mod test {
use crate::contact_info::ContactInfo;
use solana_sdk::signature::Signable;
#[test]
fn test_prune() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let mut stakes = HashMap::new();
let self_id = Pubkey::new_rand();
let origin = Pubkey::new_rand();
stakes.insert(self_id, 100);
stakes.insert(origin, 100);
let value = CrdsValue::ContactInfo(ContactInfo::new_localhost(&origin, 0));
let label = value.label();
let low_staked_peers = (0..10).map(|_| Pubkey::new_rand());
let mut low_staked_set = HashSet::new();
low_staked_peers.for_each(|p| {
let _ = push.process_push_message(&mut crds, &p, value.clone(), 0);
low_staked_set.insert(p);
stakes.insert(p, 1);
});
let versioned = crds
.lookup_versioned(&label)
.expect("versioned value should exist");
let hash = versioned.value_hash;
let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes);
assert!(
pruned.is_empty(),
"should not prune if min threshold has not been reached"
);
let high_staked_peer = Pubkey::new_rand();
let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10;
stakes.insert(high_staked_peer, high_stake);
let _ = push.process_push_message(&mut crds, &high_staked_peer, value.clone(), 0);
let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes);
assert!(
pruned.len() < low_staked_set.len() + 1,
"should not prune all peers"
);
pruned.iter().for_each(|p| {
assert!(
low_staked_set.contains(p),
"only low staked peers should be pruned"
);
});
}
#[test]
fn test_process_push() {
let mut crds = Crds::default();
@ -286,15 +399,15 @@ mod test {
let label = value.label();
// push a new message
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
Ok(None)
);
assert_eq!(crds.lookup(&label), Some(&value));
// push it again
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
Err(CrdsGossipError::PushMessagePrune)
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
Err(CrdsGossipError::PushMessageAlreadyReceived)
);
}
#[test]
@ -306,13 +419,16 @@ mod test {
let value = CrdsValue::ContactInfo(ci.clone());
// push a new message
assert_eq!(push.process_push_message(&mut crds, value, 0), Ok(None));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
Ok(None)
);
// push an old version
ci.wallclock = 0;
let value = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, value, 0),
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
Err(CrdsGossipError::PushMessageOldVersion)
);
}
@ -327,7 +443,7 @@ mod test {
ci.wallclock = timeout + 1;
let value = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, value, 0),
push.process_push_message(&mut crds, &Pubkey::default(), value, 0),
Err(CrdsGossipError::PushMessageTimeout)
);
@ -335,7 +451,7 @@ mod test {
ci.wallclock = 0;
let value = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, value, timeout + 1),
push.process_push_message(&mut crds, &Pubkey::default(), value, timeout + 1),
Err(CrdsGossipError::PushMessageTimeout)
);
}
@ -349,7 +465,7 @@ mod test {
// push a new message
assert_eq!(
push.process_push_message(&mut crds, value_old.clone(), 0),
push.process_push_message(&mut crds, &Pubkey::default(), value_old.clone(), 0),
Ok(None)
);
@ -357,7 +473,7 @@ mod test {
ci.wallclock = 1;
let value = CrdsValue::ContactInfo(ci.clone());
assert_eq!(
push.process_push_message(&mut crds, value, 0)
push.process_push_message(&mut crds, &Pubkey::default(), value, 0)
.unwrap()
.unwrap()
.value,
@ -433,7 +549,10 @@ mod test {
let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let mut expected = HashMap::new();
expected.insert(peer.label().pubkey(), vec![new_msg.clone()]);
assert_eq!(push.process_push_message(&mut crds, new_msg, 0), Ok(None));
assert_eq!(
push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 0),
Ok(None)
);
assert_eq!(push.active_set.len(), 1);
assert_eq!(push.new_push_messages(&crds, 0), expected);
}
@ -447,7 +566,7 @@ mod test {
assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None));
let peer_3 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
assert_eq!(
push.process_push_message(&mut crds, peer_3.clone(), 0),
push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0),
Ok(None)
);
push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1);
@ -471,7 +590,7 @@ mod test {
let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let expected = HashMap::new();
assert_eq!(
push.process_push_message(&mut crds, new_msg.clone(), 0),
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0),
Ok(None)
);
push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]);
@ -490,7 +609,7 @@ mod test {
let new_msg = CrdsValue::ContactInfo(ci.clone());
let expected = HashMap::new();
assert_eq!(
push.process_push_message(&mut crds, new_msg.clone(), 1),
push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 1),
Ok(None)
);
push.purge_old_pending_push_messages(&crds, 0);
@ -498,7 +617,7 @@ mod test {
}
#[test]
fn test_purge_old_pushed_once_messages() {
fn test_purge_old_received_cache() {
let mut crds = Crds::default();
let mut push = CrdsGossipPush::default();
let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
@ -507,23 +626,23 @@ mod test {
let label = value.label();
// push a new message
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
Ok(None)
);
assert_eq!(crds.lookup(&label), Some(&value));
// push it again
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
Err(CrdsGossipError::PushMessagePrune)
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
Err(CrdsGossipError::PushMessageAlreadyReceived)
);
// purge the old pushed
push.purge_old_pushed_once_messages(1);
push.purge_old_received_cache(1);
// push it again
assert_eq!(
push.process_push_message(&mut crds, value.clone(), 0),
push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0),
Err(CrdsGossipError::PushMessageOldVersion)
);
}

View File

@ -45,6 +45,7 @@ impl GossipService {
let t_listen = ClusterInfo::listen(
cluster_info.clone(),
blocktree,
bank_forks.clone(),
request_receiver,
response_sender.clone(),
exit,

View File

@ -11,10 +11,65 @@ use solana_sdk::hash::hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
type Node = Arc<Mutex<CrdsGossip>>;
type Network = HashMap<Pubkey, Node>;
#[derive(Clone)]
struct Node {
gossip: Arc<Mutex<CrdsGossip>>,
stake: u64,
}
impl Node {
fn new(gossip: Arc<Mutex<CrdsGossip>>) -> Self {
Node { gossip, stake: 0 }
}
fn staked(gossip: Arc<Mutex<CrdsGossip>>, stake: u64) -> Self {
Node { gossip, stake }
}
}
impl Deref for Node {
type Target = Arc<Mutex<CrdsGossip>>;
fn deref(&self) -> &Self::Target {
&self.gossip
}
}
struct Network {
nodes: HashMap<Pubkey, Node>,
pruned_count: usize,
stake_pruned: u64,
}
impl Network {
fn new(nodes: HashMap<Pubkey, Node>) -> Self {
Network {
nodes,
pruned_count: 0,
stake_pruned: 0,
}
}
}
impl Deref for Network {
type Target = HashMap<Pubkey, Node>;
fn deref(&self) -> &Self::Target {
&self.nodes
}
}
fn stakes(network: &Network) -> HashMap<Pubkey, u64> {
let mut stakes = HashMap::new();
for (key, Node { stake, .. }) in network.iter() {
stakes.insert(*key, *stake);
}
stakes
}
fn star_network_create(num: usize) -> Network {
let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let mut network: HashMap<_, _> = (1..num)
@ -25,15 +80,15 @@ fn star_network_create(num: usize) -> Network {
node.crds.insert(new.clone(), 0).unwrap();
node.crds.insert(entry.clone(), 0).unwrap();
node.set_self(&id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
})
.collect();
let mut node = CrdsGossip::default();
let id = entry.label().pubkey();
node.crds.insert(entry.clone(), 0).unwrap();
node.set_self(&id);
network.insert(id, Arc::new(Mutex::new(node)));
network
network.insert(id, Node::new(Arc::new(Mutex::new(node))));
Network::new(network)
}
fn rstar_network_create(num: usize) -> Network {
@ -50,11 +105,11 @@ fn rstar_network_create(num: usize) -> Network {
node.crds.insert(new.clone(), 0).unwrap();
origin.crds.insert(new.clone(), 0).unwrap();
node.set_self(&id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
})
.collect();
network.insert(id, Arc::new(Mutex::new(origin)));
network
network.insert(id, Node::new(Arc::new(Mutex::new(origin))));
Network::new(network)
}
fn ring_network_create(num: usize) -> Network {
@ -65,7 +120,7 @@ fn ring_network_create(num: usize) -> Network {
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.set_self(&id);
(new.label().pubkey(), Arc::new(Mutex::new(node)))
(new.label().pubkey(), Node::new(Arc::new(Mutex::new(node))))
})
.collect();
let keys: Vec<Pubkey> = network.keys().cloned().collect();
@ -84,7 +139,45 @@ fn ring_network_create(num: usize) -> Network {
let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap();
end.lock().unwrap().crds.insert(start_info, 0).unwrap();
}
network
Network::new(network)
}
fn connected_staked_network_create(stakes: &[u64]) -> Network {
let num = stakes.len();
let mut network: HashMap<_, _> = (0..num)
.map(|n| {
let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0));
let id = new.label().pubkey();
let mut node = CrdsGossip::default();
node.crds.insert(new.clone(), 0).unwrap();
node.set_self(&id);
(
new.label().pubkey(),
Node::staked(Arc::new(Mutex::new(node)), stakes[n]),
)
})
.collect();
let keys: Vec<Pubkey> = network.keys().cloned().collect();
let start_entries: Vec<_> = keys
.iter()
.map(|k| {
let start = &network[k].lock().unwrap();
let start_id = start.id.clone();
let start_label = CrdsValueLabel::ContactInfo(start_id);
start.crds.lookup(&start_label).unwrap().clone()
})
.collect();
for end in network.values_mut() {
for k in 0..keys.len() {
let mut end = end.lock().unwrap();
if keys[k] != end.id {
let start_info = start_entries[k].clone();
end.crds.insert(start_info, 0).unwrap();
}
}
}
Network::new(network)
}
fn network_simulator_pull_only(network: &mut Network) {
@ -125,7 +218,7 @@ fn network_simulator(network: &mut Network) {
.and_then(|v| v.contact_info().cloned())
.unwrap();
m.wallclock = now;
node.process_push_message(vec![CrdsValue::ContactInfo(m)], now);
node.process_push_message(&Pubkey::default(), vec![CrdsValue::ContactInfo(m)], now);
});
// push for a bit
let (queue_size, bytes_tx) = network_run_push(network, start, end);
@ -159,7 +252,9 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
let num = network.len();
let mut prunes: usize = 0;
let mut delivered: usize = 0;
let mut stake_pruned: u64 = 0;
let network_values: Vec<Node> = network.values().cloned().collect();
let stakes = stakes(network);
for t in start..end {
let now = t as u64 * 100;
let requests: Vec<_> = network_values
@ -176,35 +271,62 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
let mut delivered: usize = 0;
let mut num_msgs: usize = 0;
let mut prunes: usize = 0;
let mut stake_pruned: u64 = 0;
for (to, msgs) in push_messages {
bytes += serialized_size(&msgs).unwrap() as usize;
num_msgs += 1;
let rsps = network
let updated = network
.get(&to)
.map(|node| node.lock().unwrap().process_push_message(msgs.clone(), now))
.map(|node| {
node.lock()
.unwrap()
.process_push_message(&from, msgs.clone(), now)
})
.unwrap();
bytes += serialized_size(&rsps).unwrap() as usize;
prunes += rsps.len();
let updated_labels: Vec<_> =
updated.into_iter().map(|u| u.value.label()).collect();
let prunes_map = network
.get(&to)
.map(|node| {
node.lock()
.unwrap()
.prune_received_cache(updated_labels, &stakes)
})
.unwrap();
for (from, prune_set) in prunes_map {
let prune_keys: Vec<_> = prune_set.into_iter().collect();
bytes += serialized_size(&prune_keys).unwrap() as usize;
delivered += 1;
prunes += prune_keys.len();
let stake_pruned_sum = stakes.get(&from).unwrap() * prune_keys.len() as u64;
stake_pruned += stake_pruned_sum;
network
.get(&from)
.map(|node| {
let mut node = node.lock().unwrap();
let destination = node.id;
let now = timestamp();
node.process_prune_msg(&to, &destination, &rsps, now, now)
node.process_prune_msg(&to, &destination, &prune_keys, now, now)
.unwrap()
})
.unwrap();
delivered += rsps.is_empty() as usize;
}
(bytes, delivered, num_msgs, prunes)
}
(bytes, delivered, num_msgs, prunes, stake_pruned)
})
.collect();
for (b, d, m, p) in transfered {
for (b, d, m, p, s) in transfered {
bytes += b;
delivered += d;
num_msgs += m;
prunes += p;
stake_pruned += s;
}
if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 {
network_values.par_iter().for_each(|node| {
@ -218,16 +340,19 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
.map(|v| v.lock().unwrap().push.num_pending())
.sum();
trace!(
"network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} delivered: {}",
"network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}",
num,
now,
total,
bytes,
num_msgs,
prunes,
stake_pruned,
delivered,
);
}
network.pruned_count = prunes;
network.stake_pruned = stake_pruned;
(total, bytes)
}
@ -338,6 +463,32 @@ fn test_star_network_push_ring_200() {
network_simulator(&mut network);
}
#[test]
fn test_connected_staked_network() {
solana_logger::setup();
let stakes = [
[1000; 5].to_vec(),
[100; 20].to_vec(),
[10; 50].to_vec(),
[1; 125].to_vec(),
]
.concat();
let mut network = connected_staked_network_create(&stakes);
network_simulator(&mut network);
let stake_sum: u64 = stakes.iter().sum();
let avg_stake: u64 = stake_sum / stakes.len() as u64;
let avg_stake_pruned = network.stake_pruned / network.pruned_count as u64;
trace!(
"connected staked network, avg_stake: {}, avg_stake_pruned: {}",
avg_stake,
avg_stake_pruned
);
assert!(
avg_stake_pruned < avg_stake,
"network should prune lower stakes more often"
)
}
#[test]
#[ignore]
fn test_star_network_large_pull() {
solana_logger::setup();