diff --git a/book/src/gossip.md b/book/src/gossip.md index 41a501364..4d01e7c0b 100644 --- a/book/src/gossip.md +++ b/book/src/gossip.md @@ -34,8 +34,8 @@ Nodes send push messages to `PUSH_FANOUT` push peers. Upon receiving a push message, a node examines the message for: -1. Duplication: if the message has been seen before, the node responds with - `PushMessagePrune` and drops the message +1. Duplication: if the message has been seen before, the node drops the message + and may respond with `PushMessagePrune` if forwarded from a low staked node 2. New data: if the message is new to the node * Stores the new information with an updated version in its cluster info and @@ -51,7 +51,7 @@ Upon receiving a push message, a node examines the message for: A nodes selects its push peers at random from the active set of known peers. The node keeps this selection for a relatively long time. When a prune message is received, the node drops the push peer that sent the prune. Prune is an -indication that there is another, faster path to that node than direct push. +indication that there is another, higher stake weighted path to that node than direct push. The set of push peers is kept fresh by rotating a new node into the set every `PUSH_MSG_TIMEOUT/2` milliseconds. diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a393511ba..c58e86922 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -47,7 +47,7 @@ use solana_sdk::transaction::Transaction; use std::borrow::Borrow; use std::borrow::Cow; use std::cmp::min; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -206,7 +206,8 @@ impl ClusterInfo { let mut entry = CrdsValue::ContactInfo(my_data); entry.sign(&self.keypair); self.gossip.refresh_push_active_set(stakes); - self.gossip.process_push_message(vec![entry], now); + self.gossip + .process_push_message(&self.id(), vec![entry], now); } // TODO kill insert_info, only used by tests @@ -316,7 +317,8 @@ impl ClusterInfo { let now = timestamp(); let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now)); entry.sign(&self.keypair); - self.gossip.process_push_message(vec![entry], now); + self.gossip + .process_push_message(&self.id(), vec![entry], now); } pub fn push_vote(&mut self, vote: Transaction) { @@ -324,7 +326,8 @@ impl ClusterInfo { let vote = Vote::new(&self.id(), vote, now); let mut entry = CrdsValue::Vote(vote); entry.sign(&self.keypair); - self.gossip.process_push_message(vec![entry], now); + self.gossip + .process_push_message(&self.id(), vec![entry], now); } /// Get votes in the crds @@ -1071,12 +1074,13 @@ impl ClusterInfo { fn handle_blob( obj: &Arc>, blocktree: Option<&Arc>, + stakes: &HashMap, blob: &Blob, ) -> Vec { deserialize(&blob.data[..blob.meta.size]) .into_iter() .flat_map(|request| { - ClusterInfo::handle_protocol(obj, &blob.meta.addr(), blocktree, request) + ClusterInfo::handle_protocol(obj, &blob.meta.addr(), blocktree, stakes, request) }) .collect() } @@ -1120,6 +1124,7 @@ impl ClusterInfo { inc_new_counter_debug!("cluster_info-pull_request-rsp", len); to_shared_blob(rsp, from.gossip).ok().into_iter().collect() } + fn handle_pull_response(me: &Arc>, from: &Pubkey, data: Vec) { let len = data.len(); let now = Instant::now(); @@ -1134,40 +1139,52 @@ impl ClusterInfo { report_time_spent("ReceiveUpdates", &now.elapsed(), &format!(" len: {}", len)); } + fn handle_push_message( me: &Arc>, from: &Pubkey, data: Vec, + stakes: &HashMap, ) -> Vec { let self_id = me.read().unwrap().gossip.id; inc_new_counter_debug!("cluster_info-push_message", 1, 0, 1000); - let prunes: Vec<_> = me + let updated: Vec<_> = + me.write() + .unwrap() + .gossip + .process_push_message(from, data, timestamp()); + + let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect(); + let prunes_map: HashMap> = me .write() .unwrap() .gossip - .process_push_message(data, timestamp()); + .prune_received_cache(updated_labels, stakes); - if !prunes.is_empty() { - inc_new_counter_debug!("cluster_info-push_message-prunes", prunes.len()); - let ci = me.read().unwrap().lookup(from).cloned(); - let pushes: Vec<_> = me.write().unwrap().new_push_requests(); - inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); - let mut rsp: Vec<_> = ci - .and_then(|ci| { + let mut rsp: Vec<_> = prunes_map + .into_iter() + .map(|(from, prune_set)| { + inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len()); + me.read().unwrap().lookup(&from).cloned().and_then(|ci| { let mut prune_msg = PruneData { pubkey: self_id, - prunes, + prunes: prune_set.into_iter().collect(), signature: Signature::default(), - destination: *from, + destination: from, wallclock: timestamp(), }; prune_msg.sign(&me.read().unwrap().keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); to_shared_blob(rsp, ci.gossip).ok() }) - .into_iter() - .collect(); + }) + .flatten() + .collect(); + + if !rsp.is_empty() { + let pushes: Vec<_> = me.write().unwrap().new_push_requests(); + inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); let mut blobs: Vec<_> = pushes .into_iter() .filter_map(|(remote_gossip_addr, req)| { @@ -1269,6 +1286,7 @@ impl ClusterInfo { me: &Arc>, from_addr: &SocketAddr, blocktree: Option<&Arc>, + stakes: &HashMap, request: Protocol, ) -> Vec { match request { @@ -1300,7 +1318,7 @@ impl ClusterInfo { } ret }); - Self::handle_push_message(me, &from, data) + Self::handle_push_message(me, &from, data, stakes) } Protocol::PruneMessage(from, data) => { if data.verify() { @@ -1335,6 +1353,7 @@ impl ClusterInfo { fn run_listen( obj: &Arc>, blocktree: Option<&Arc>, + bank_forks: Option<&Arc>>, requests_receiver: &BlobReceiver, response_sender: &BlobSender, ) -> Result<()> { @@ -1345,8 +1364,16 @@ impl ClusterInfo { reqs.append(&mut more); } let mut resps = Vec::new(); + + let stakes: HashMap<_, _> = match bank_forks { + Some(ref bank_forks) => { + staking_utils::staked_nodes(&bank_forks.read().unwrap().working_bank()) + } + None => HashMap::new(), + }; + for req in reqs { - let mut resp = Self::handle_blob(obj, blocktree, &req.read().unwrap()); + let mut resp = Self::handle_blob(obj, blocktree, &stakes, &req.read().unwrap()); resps.append(&mut resp); } response_sender.send(resps)?; @@ -1355,6 +1382,7 @@ impl ClusterInfo { pub fn listen( me: Arc>, blocktree: Option>, + bank_forks: Option>>, requests_receiver: BlobReceiver, response_sender: BlobSender, exit: &Arc, @@ -1366,6 +1394,7 @@ impl ClusterInfo { let e = Self::run_listen( &me, blocktree.as_ref(), + bank_forks.as_ref(), &requests_receiver, &response_sender, ); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index c860b905c..4617b6fcb 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -3,15 +3,16 @@ //! designed to run with a simulator or over a UDP network connection with messages up to a //! packet::BLOB_DATA_SIZE size. -use crate::crds::Crds; +use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; -use crate::crds_value::CrdsValue; +use crate::crds_value::{CrdsValue, CrdsValueLabel}; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; -use std::collections::HashMap; +use solana_sdk::signature::Signable; +use std::collections::{HashMap, HashSet}; ///The min size for bloom filters pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; @@ -39,25 +40,24 @@ impl CrdsGossip { pub fn set_self(&mut self, id: &Pubkey) { self.id = *id; } + /// process a push message to the network - pub fn process_push_message(&mut self, values: Vec, now: u64) -> Vec { - let labels: Vec<_> = values.iter().map(CrdsValue::label).collect(); - - let results: Vec<_> = values + pub fn process_push_message( + &mut self, + from: &Pubkey, + values: Vec, + now: u64, + ) -> Vec { + values .into_iter() - .map(|val| self.push.process_push_message(&mut self.crds, val, now)) - .collect(); - - results - .into_iter() - .zip(labels) - .filter_map(|(r, d)| { - if r == Err(CrdsGossipError::PushMessagePrune) { - Some(d.pubkey()) - } else if let Ok(Some(val)) = r { + .filter_map(|val| { + let res = self + .push + .process_push_message(&mut self.crds, from, val, now); + if let Ok(Some(val)) = res { self.pull .record_old_hash(val.value_hash, val.local_timestamp); - None + Some(val) } else { None } @@ -65,6 +65,31 @@ impl CrdsGossip { .collect() } + /// remove redundant paths in the network + pub fn prune_received_cache( + &mut self, + labels: Vec, + stakes: &HashMap, + ) -> HashMap> { + let id = &self.id; + let crds = &self.crds; + let push = &mut self.push; + let versioned = labels + .into_iter() + .filter_map(|label| crds.lookup_versioned(&label)); + + let mut prune_map: HashMap> = HashMap::new(); + for val in versioned { + let origin = val.value.pubkey(); + let hash = val.value_hash; + let peers = push.prune_received_cache(id, &origin, hash, stakes); + for from in peers { + prune_map.entry(from).or_default().insert(origin); + } + } + prune_map + } + pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, HashMap>) { let push_messages = self.push.new_push_messages(&self.crds, now); (self.id, push_messages) @@ -147,7 +172,7 @@ impl CrdsGossip { } if now > 5 * self.push.msg_timeout { let min = now - 5 * self.push.msg_timeout; - self.push.purge_old_pushed_once_messages(min); + self.push.purge_old_received_cache(min); } if now > self.pull.crds_timeout { let min = now - self.pull.crds_timeout; diff --git a/core/src/crds_gossip_error.rs b/core/src/crds_gossip_error.rs index 2c3cb5176..e99ae611e 100644 --- a/core/src/crds_gossip_error.rs +++ b/core/src/crds_gossip_error.rs @@ -2,7 +2,7 @@ pub enum CrdsGossipError { NoPeers, PushMessageTimeout, - PushMessagePrune, + PushMessageAlreadyReceived, PushMessageOldVersion, BadPruneDestination, PruneMessageTimeout, diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 1de735371..dc1c9b435 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -27,12 +27,13 @@ use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::cmp; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 5000; pub const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500; +pub const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15; #[derive(Clone)] pub struct CrdsGossipPush { @@ -42,7 +43,8 @@ pub struct CrdsGossipPush { active_set: IndexMap>, /// push message queue push_messages: HashMap, - pushed_once: HashMap, + /// cache that tracks which validators a message was received from + received_cache: HashMap)>, pub num_active: usize, pub push_fanout: usize, pub msg_timeout: u64, @@ -55,7 +57,7 @@ impl Default for CrdsGossipPush { max_bytes: BLOB_DATA_SIZE, active_set: IndexMap::new(), push_messages: HashMap::new(), - pushed_once: HashMap::new(), + received_cache: HashMap::new(), num_active: CRDS_GOSSIP_NUM_ACTIVE, push_fanout: CRDS_GOSSIP_PUSH_FANOUT, msg_timeout: CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, @@ -67,10 +69,69 @@ impl CrdsGossipPush { pub fn num_pending(&self) -> usize { self.push_messages.len() } + + fn prune_stake_threshold(self_stake: u64, origin_stake: u64) -> u64 { + let min_path_stake = self_stake.min(origin_stake); + (CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT * min_path_stake as f64).round() as u64 + } + + pub fn prune_received_cache( + &mut self, + self_pubkey: &Pubkey, + origin: &Pubkey, + hash: Hash, + stakes: &HashMap, + ) -> Vec { + let origin_stake = stakes.get(origin).unwrap_or(&0); + let self_stake = stakes.get(self_pubkey).unwrap_or(&0); + let cache = self.received_cache.get(&hash); + if cache.is_none() { + return Vec::new(); + } + + let peers = &cache.unwrap().1; + let peer_stake_total: u64 = peers.iter().map(|p| stakes.get(p).unwrap_or(&0)).sum(); + let prune_stake_threshold = Self::prune_stake_threshold(*self_stake, *origin_stake); + if peer_stake_total < prune_stake_threshold { + return Vec::new(); + } + + let staked_peers: Vec<(Pubkey, u64)> = peers + .iter() + .filter_map(|p| stakes.get(p).map(|s| (*p, *s))) + .filter(|(_, s)| *s > 0) + .collect(); + + let mut seed = [0; 32]; + seed[0..8].copy_from_slice(&thread_rng().next_u64().to_le_bytes()); + let shuffle = weighted_shuffle( + staked_peers.iter().map(|(_, stake)| *stake).collect_vec(), + ChaChaRng::from_seed(seed), + ); + + let mut keep = HashSet::new(); + let mut peer_stake_sum = 0; + for next in shuffle { + let (next_peer, next_stake) = staked_peers[next]; + keep.insert(next_peer); + peer_stake_sum += next_stake; + if peer_stake_sum >= prune_stake_threshold { + break; + } + } + + peers + .iter() + .filter(|p| !keep.contains(p)) + .cloned() + .collect() + } + /// process a push message to the network pub fn process_push_message( &mut self, crds: &mut Crds, + from: &Pubkey, value: CrdsValue, now: u64, ) -> Result, CrdsGossipError> { @@ -81,18 +142,20 @@ impl CrdsGossipPush { return Err(CrdsGossipError::PushMessageTimeout); } let label = value.label(); - let new_value = crds.new_versioned(now, value); let value_hash = new_value.value_hash; - if self.pushed_once.get(&value_hash).is_some() { - return Err(CrdsGossipError::PushMessagePrune); + if let Some((_, ref mut received_set)) = self.received_cache.get_mut(&value_hash) { + received_set.insert(from.clone()); + return Err(CrdsGossipError::PushMessageAlreadyReceived); } let old = crds.insert_versioned(new_value); if old.is_err() { return Err(CrdsGossipError::PushMessageOldVersion); } + let mut received_set = HashSet::new(); + received_set.insert(from.clone()); self.push_messages.insert(label, value_hash); - self.pushed_once.insert(value_hash, now); + self.received_cache.insert(value_hash, (now, received_set)); Ok(old.ok().and_then(|opt| opt)) } @@ -258,16 +321,17 @@ impl CrdsGossipPush { self.push_messages.remove(&k); } } - /// purge old pushed_once messages - pub fn purge_old_pushed_once_messages(&mut self, min_time: u64) { + + /// purge received push message cache + pub fn purge_old_received_cache(&mut self, min_time: u64) { let old_msgs: Vec = self - .pushed_once + .received_cache .iter() - .filter_map(|(k, v)| if *v < min_time { Some(k) } else { None }) + .filter_map(|(k, (rcvd_time, _))| if *rcvd_time < min_time { Some(k) } else { None }) .cloned() .collect(); for k in old_msgs { - self.pushed_once.remove(&k); + self.received_cache.remove(&k); } } } @@ -278,6 +342,55 @@ mod test { use crate::contact_info::ContactInfo; use solana_sdk::signature::Signable; + #[test] + fn test_prune() { + let mut crds = Crds::default(); + let mut push = CrdsGossipPush::default(); + let mut stakes = HashMap::new(); + + let self_id = Pubkey::new_rand(); + let origin = Pubkey::new_rand(); + stakes.insert(self_id, 100); + stakes.insert(origin, 100); + + let value = CrdsValue::ContactInfo(ContactInfo::new_localhost(&origin, 0)); + let label = value.label(); + let low_staked_peers = (0..10).map(|_| Pubkey::new_rand()); + let mut low_staked_set = HashSet::new(); + low_staked_peers.for_each(|p| { + let _ = push.process_push_message(&mut crds, &p, value.clone(), 0); + low_staked_set.insert(p); + stakes.insert(p, 1); + }); + + let versioned = crds + .lookup_versioned(&label) + .expect("versioned value should exist"); + let hash = versioned.value_hash; + let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes); + assert!( + pruned.is_empty(), + "should not prune if min threshold has not been reached" + ); + + let high_staked_peer = Pubkey::new_rand(); + let high_stake = CrdsGossipPush::prune_stake_threshold(100, 100) + 10; + stakes.insert(high_staked_peer, high_stake); + let _ = push.process_push_message(&mut crds, &high_staked_peer, value.clone(), 0); + + let pruned = push.prune_received_cache(&self_id, &origin, hash, &stakes); + assert!( + pruned.len() < low_staked_set.len() + 1, + "should not prune all peers" + ); + pruned.iter().for_each(|p| { + assert!( + low_staked_set.contains(p), + "only low staked peers should be pruned" + ); + }); + } + #[test] fn test_process_push() { let mut crds = Crds::default(); @@ -286,15 +399,15 @@ mod test { let label = value.label(); // push a new message assert_eq!( - push.process_push_message(&mut crds, value.clone(), 0), + push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), Ok(None) ); assert_eq!(crds.lookup(&label), Some(&value)); // push it again assert_eq!( - push.process_push_message(&mut crds, value.clone(), 0), - Err(CrdsGossipError::PushMessagePrune) + push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), + Err(CrdsGossipError::PushMessageAlreadyReceived) ); } #[test] @@ -306,13 +419,16 @@ mod test { let value = CrdsValue::ContactInfo(ci.clone()); // push a new message - assert_eq!(push.process_push_message(&mut crds, value, 0), Ok(None)); + assert_eq!( + push.process_push_message(&mut crds, &Pubkey::default(), value, 0), + Ok(None) + ); // push an old version ci.wallclock = 0; let value = CrdsValue::ContactInfo(ci.clone()); assert_eq!( - push.process_push_message(&mut crds, value, 0), + push.process_push_message(&mut crds, &Pubkey::default(), value, 0), Err(CrdsGossipError::PushMessageOldVersion) ); } @@ -327,7 +443,7 @@ mod test { ci.wallclock = timeout + 1; let value = CrdsValue::ContactInfo(ci.clone()); assert_eq!( - push.process_push_message(&mut crds, value, 0), + push.process_push_message(&mut crds, &Pubkey::default(), value, 0), Err(CrdsGossipError::PushMessageTimeout) ); @@ -335,7 +451,7 @@ mod test { ci.wallclock = 0; let value = CrdsValue::ContactInfo(ci.clone()); assert_eq!( - push.process_push_message(&mut crds, value, timeout + 1), + push.process_push_message(&mut crds, &Pubkey::default(), value, timeout + 1), Err(CrdsGossipError::PushMessageTimeout) ); } @@ -349,7 +465,7 @@ mod test { // push a new message assert_eq!( - push.process_push_message(&mut crds, value_old.clone(), 0), + push.process_push_message(&mut crds, &Pubkey::default(), value_old.clone(), 0), Ok(None) ); @@ -357,7 +473,7 @@ mod test { ci.wallclock = 1; let value = CrdsValue::ContactInfo(ci.clone()); assert_eq!( - push.process_push_message(&mut crds, value, 0) + push.process_push_message(&mut crds, &Pubkey::default(), value, 0) .unwrap() .unwrap() .value, @@ -433,7 +549,10 @@ mod test { let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); let mut expected = HashMap::new(); expected.insert(peer.label().pubkey(), vec![new_msg.clone()]); - assert_eq!(push.process_push_message(&mut crds, new_msg, 0), Ok(None)); + assert_eq!( + push.process_push_message(&mut crds, &Pubkey::default(), new_msg, 0), + Ok(None) + ); assert_eq!(push.active_set.len(), 1); assert_eq!(push.new_push_messages(&crds, 0), expected); } @@ -447,7 +566,7 @@ mod test { assert_eq!(crds.insert(peer_2.clone(), 0), Ok(None)); let peer_3 = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); assert_eq!( - push.process_push_message(&mut crds, peer_3.clone(), 0), + push.process_push_message(&mut crds, &Pubkey::default(), peer_3.clone(), 0), Ok(None) ); push.refresh_push_active_set(&crds, &HashMap::new(), &Pubkey::default(), 1, 1); @@ -471,7 +590,7 @@ mod test { let new_msg = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); let expected = HashMap::new(); assert_eq!( - push.process_push_message(&mut crds, new_msg.clone(), 0), + push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 0), Ok(None) ); push.process_prune_msg(&peer.label().pubkey(), &[new_msg.label().pubkey()]); @@ -490,7 +609,7 @@ mod test { let new_msg = CrdsValue::ContactInfo(ci.clone()); let expected = HashMap::new(); assert_eq!( - push.process_push_message(&mut crds, new_msg.clone(), 1), + push.process_push_message(&mut crds, &Pubkey::default(), new_msg.clone(), 1), Ok(None) ); push.purge_old_pending_push_messages(&crds, 0); @@ -498,7 +617,7 @@ mod test { } #[test] - fn test_purge_old_pushed_once_messages() { + fn test_purge_old_received_cache() { let mut crds = Crds::default(); let mut push = CrdsGossipPush::default(); let mut ci = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); @@ -507,23 +626,23 @@ mod test { let label = value.label(); // push a new message assert_eq!( - push.process_push_message(&mut crds, value.clone(), 0), + push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), Ok(None) ); assert_eq!(crds.lookup(&label), Some(&value)); // push it again assert_eq!( - push.process_push_message(&mut crds, value.clone(), 0), - Err(CrdsGossipError::PushMessagePrune) + push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), + Err(CrdsGossipError::PushMessageAlreadyReceived) ); // purge the old pushed - push.purge_old_pushed_once_messages(1); + push.purge_old_received_cache(1); // push it again assert_eq!( - push.process_push_message(&mut crds, value.clone(), 0), + push.process_push_message(&mut crds, &Pubkey::default(), value.clone(), 0), Err(CrdsGossipError::PushMessageOldVersion) ); } diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 35e1caadd..d0ba4be36 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -45,6 +45,7 @@ impl GossipService { let t_listen = ClusterInfo::listen( cluster_info.clone(), blocktree, + bank_forks.clone(), request_receiver, response_sender.clone(), exit, diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 9d7981bff..804cc9ffc 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -11,10 +11,65 @@ use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::collections::HashMap; +use std::ops::Deref; use std::sync::{Arc, Mutex}; -type Node = Arc>; -type Network = HashMap; +#[derive(Clone)] +struct Node { + gossip: Arc>, + stake: u64, +} + +impl Node { + fn new(gossip: Arc>) -> Self { + Node { gossip, stake: 0 } + } + + fn staked(gossip: Arc>, stake: u64) -> Self { + Node { gossip, stake } + } +} + +impl Deref for Node { + type Target = Arc>; + + fn deref(&self) -> &Self::Target { + &self.gossip + } +} + +struct Network { + nodes: HashMap, + pruned_count: usize, + stake_pruned: u64, +} + +impl Network { + fn new(nodes: HashMap) -> Self { + Network { + nodes, + pruned_count: 0, + stake_pruned: 0, + } + } +} + +impl Deref for Network { + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.nodes + } +} + +fn stakes(network: &Network) -> HashMap { + let mut stakes = HashMap::new(); + for (key, Node { stake, .. }) in network.iter() { + stakes.insert(*key, *stake); + } + stakes +} + fn star_network_create(num: usize) -> Network { let entry = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); let mut network: HashMap<_, _> = (1..num) @@ -25,15 +80,15 @@ fn star_network_create(num: usize) -> Network { node.crds.insert(new.clone(), 0).unwrap(); node.crds.insert(entry.clone(), 0).unwrap(); node.set_self(&id); - (new.label().pubkey(), Arc::new(Mutex::new(node))) + (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) }) .collect(); let mut node = CrdsGossip::default(); let id = entry.label().pubkey(); node.crds.insert(entry.clone(), 0).unwrap(); node.set_self(&id); - network.insert(id, Arc::new(Mutex::new(node))); - network + network.insert(id, Node::new(Arc::new(Mutex::new(node)))); + Network::new(network) } fn rstar_network_create(num: usize) -> Network { @@ -50,11 +105,11 @@ fn rstar_network_create(num: usize) -> Network { node.crds.insert(new.clone(), 0).unwrap(); origin.crds.insert(new.clone(), 0).unwrap(); node.set_self(&id); - (new.label().pubkey(), Arc::new(Mutex::new(node))) + (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) }) .collect(); - network.insert(id, Arc::new(Mutex::new(origin))); - network + network.insert(id, Node::new(Arc::new(Mutex::new(origin)))); + Network::new(network) } fn ring_network_create(num: usize) -> Network { @@ -65,7 +120,7 @@ fn ring_network_create(num: usize) -> Network { let mut node = CrdsGossip::default(); node.crds.insert(new.clone(), 0).unwrap(); node.set_self(&id); - (new.label().pubkey(), Arc::new(Mutex::new(node))) + (new.label().pubkey(), Node::new(Arc::new(Mutex::new(node)))) }) .collect(); let keys: Vec = network.keys().cloned().collect(); @@ -84,7 +139,45 @@ fn ring_network_create(num: usize) -> Network { let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap(); end.lock().unwrap().crds.insert(start_info, 0).unwrap(); } - network + Network::new(network) +} + +fn connected_staked_network_create(stakes: &[u64]) -> Network { + let num = stakes.len(); + let mut network: HashMap<_, _> = (0..num) + .map(|n| { + let new = CrdsValue::ContactInfo(ContactInfo::new_localhost(&Pubkey::new_rand(), 0)); + let id = new.label().pubkey(); + let mut node = CrdsGossip::default(); + node.crds.insert(new.clone(), 0).unwrap(); + node.set_self(&id); + ( + new.label().pubkey(), + Node::staked(Arc::new(Mutex::new(node)), stakes[n]), + ) + }) + .collect(); + + let keys: Vec = network.keys().cloned().collect(); + let start_entries: Vec<_> = keys + .iter() + .map(|k| { + let start = &network[k].lock().unwrap(); + let start_id = start.id.clone(); + let start_label = CrdsValueLabel::ContactInfo(start_id); + start.crds.lookup(&start_label).unwrap().clone() + }) + .collect(); + for end in network.values_mut() { + for k in 0..keys.len() { + let mut end = end.lock().unwrap(); + if keys[k] != end.id { + let start_info = start_entries[k].clone(); + end.crds.insert(start_info, 0).unwrap(); + } + } + } + Network::new(network) } fn network_simulator_pull_only(network: &mut Network) { @@ -125,7 +218,7 @@ fn network_simulator(network: &mut Network) { .and_then(|v| v.contact_info().cloned()) .unwrap(); m.wallclock = now; - node.process_push_message(vec![CrdsValue::ContactInfo(m)], now); + node.process_push_message(&Pubkey::default(), vec![CrdsValue::ContactInfo(m)], now); }); // push for a bit let (queue_size, bytes_tx) = network_run_push(network, start, end); @@ -159,7 +252,9 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, let num = network.len(); let mut prunes: usize = 0; let mut delivered: usize = 0; + let mut stake_pruned: u64 = 0; let network_values: Vec = network.values().cloned().collect(); + let stakes = stakes(network); for t in start..end { let now = t as u64 * 100; let requests: Vec<_> = network_values @@ -176,35 +271,62 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, let mut delivered: usize = 0; let mut num_msgs: usize = 0; let mut prunes: usize = 0; + let mut stake_pruned: u64 = 0; for (to, msgs) in push_messages { bytes += serialized_size(&msgs).unwrap() as usize; num_msgs += 1; - let rsps = network + let updated = network .get(&to) - .map(|node| node.lock().unwrap().process_push_message(msgs.clone(), now)) - .unwrap(); - bytes += serialized_size(&rsps).unwrap() as usize; - prunes += rsps.len(); - network - .get(&from) .map(|node| { - let mut node = node.lock().unwrap(); - let destination = node.id; - let now = timestamp(); - node.process_prune_msg(&to, &destination, &rsps, now, now) + node.lock() .unwrap() + .process_push_message(&from, msgs.clone(), now) }) .unwrap(); - delivered += rsps.is_empty() as usize; + + let updated_labels: Vec<_> = + updated.into_iter().map(|u| u.value.label()).collect(); + let prunes_map = network + .get(&to) + .map(|node| { + node.lock() + .unwrap() + .prune_received_cache(updated_labels, &stakes) + }) + .unwrap(); + + for (from, prune_set) in prunes_map { + let prune_keys: Vec<_> = prune_set.into_iter().collect(); + + bytes += serialized_size(&prune_keys).unwrap() as usize; + delivered += 1; + prunes += prune_keys.len(); + + let stake_pruned_sum = stakes.get(&from).unwrap() * prune_keys.len() as u64; + stake_pruned += stake_pruned_sum; + + network + .get(&from) + .map(|node| { + let mut node = node.lock().unwrap(); + let destination = node.id; + let now = timestamp(); + node.process_prune_msg(&to, &destination, &prune_keys, now, now) + .unwrap() + }) + .unwrap(); + } } - (bytes, delivered, num_msgs, prunes) + (bytes, delivered, num_msgs, prunes, stake_pruned) }) .collect(); - for (b, d, m, p) in transfered { + + for (b, d, m, p, s) in transfered { bytes += b; delivered += d; num_msgs += m; prunes += p; + stake_pruned += s; } if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 { network_values.par_iter().for_each(|node| { @@ -218,16 +340,19 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize, .map(|v| v.lock().unwrap().push.num_pending()) .sum(); trace!( - "network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} delivered: {}", + "network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}", num, now, total, bytes, num_msgs, prunes, + stake_pruned, delivered, ); } + network.pruned_count = prunes; + network.stake_pruned = stake_pruned; (total, bytes) } @@ -338,6 +463,32 @@ fn test_star_network_push_ring_200() { network_simulator(&mut network); } #[test] +fn test_connected_staked_network() { + solana_logger::setup(); + let stakes = [ + [1000; 5].to_vec(), + [100; 20].to_vec(), + [10; 50].to_vec(), + [1; 125].to_vec(), + ] + .concat(); + let mut network = connected_staked_network_create(&stakes); + network_simulator(&mut network); + + let stake_sum: u64 = stakes.iter().sum(); + let avg_stake: u64 = stake_sum / stakes.len() as u64; + let avg_stake_pruned = network.stake_pruned / network.pruned_count as u64; + trace!( + "connected staked network, avg_stake: {}, avg_stake_pruned: {}", + avg_stake, + avg_stake_pruned + ); + assert!( + avg_stake_pruned < avg_stake, + "network should prune lower stakes more often" + ) +} +#[test] #[ignore] fn test_star_network_large_pull() { solana_logger::setup();