2018-11-15 13:23:26 -08:00
|
|
|
//! Crds Gossip
|
|
|
|
//! This module ties together Crds and the push and pull gossip overlays. The interface is
|
|
|
|
//! designed to run with a simulator or over a UDP network connection with messages up to a
|
2019-11-14 11:49:31 -08:00
|
|
|
//! packet::PACKET_DATA_SIZE size.
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2020-02-18 08:46:11 -08:00
|
|
|
use crate::{
|
|
|
|
crds::{Crds, VersionedCrdsValue},
|
|
|
|
crds_gossip_error::CrdsGossipError,
|
|
|
|
crds_gossip_pull::{CrdsFilter, CrdsGossipPull},
|
|
|
|
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
|
|
|
|
crds_value::{CrdsValue, CrdsValueLabel},
|
|
|
|
};
|
2018-11-15 13:23:26 -08:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
2019-06-26 00:30:16 -07:00
|
|
|
use std::collections::{HashMap, HashSet};
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2018-12-01 12:00:30 -08:00
|
|
|
///The min size for bloom filters
|
2019-08-13 18:04:14 -07:00
|
|
|
pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500;
|
2018-12-01 12:00:30 -08:00
|
|
|
|
2019-02-11 16:20:31 -08:00
|
|
|
#[derive(Clone)]
|
2018-11-15 13:23:26 -08:00
|
|
|
pub struct CrdsGossip {
|
|
|
|
pub crds: Crds,
|
|
|
|
pub id: Pubkey,
|
2019-02-18 08:18:04 -08:00
|
|
|
pub push: CrdsGossipPush,
|
2019-03-08 18:08:24 -08:00
|
|
|
pub pull: CrdsGossipPull,
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for CrdsGossip {
|
|
|
|
fn default() -> Self {
|
|
|
|
CrdsGossip {
|
|
|
|
crds: Crds::default(),
|
|
|
|
id: Pubkey::default(),
|
|
|
|
push: CrdsGossipPush::default(),
|
|
|
|
pull: CrdsGossipPull::default(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl CrdsGossip {
|
2019-03-09 19:28:43 -08:00
|
|
|
pub fn set_self(&mut self, id: &Pubkey) {
|
|
|
|
self.id = *id;
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2019-05-08 13:50:32 -07:00
|
|
|
|
2019-06-26 00:30:16 -07:00
|
|
|
/// process a push message to the network
|
|
|
|
pub fn process_push_message(
|
|
|
|
&mut self,
|
|
|
|
from: &Pubkey,
|
|
|
|
values: Vec<CrdsValue>,
|
|
|
|
now: u64,
|
|
|
|
) -> Vec<VersionedCrdsValue> {
|
|
|
|
values
|
2018-11-15 13:23:26 -08:00
|
|
|
.into_iter()
|
2019-06-26 00:30:16 -07:00
|
|
|
.filter_map(|val| {
|
|
|
|
let res = self
|
|
|
|
.push
|
|
|
|
.process_push_message(&mut self.crds, from, val, now);
|
|
|
|
if let Ok(Some(val)) = res {
|
2018-11-15 13:23:26 -08:00
|
|
|
self.pull
|
|
|
|
.record_old_hash(val.value_hash, val.local_timestamp);
|
2019-06-26 00:30:16 -07:00
|
|
|
Some(val)
|
2018-11-15 13:23:26 -08:00
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.collect()
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
2019-06-26 00:30:16 -07:00
|
|
|
/// remove redundant paths in the network
|
|
|
|
pub fn prune_received_cache(
|
|
|
|
&mut self,
|
|
|
|
labels: Vec<CrdsValueLabel>,
|
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
|
|
) -> HashMap<Pubkey, HashSet<Pubkey>> {
|
|
|
|
let id = &self.id;
|
|
|
|
let crds = &self.crds;
|
|
|
|
let push = &mut self.push;
|
|
|
|
let versioned = labels
|
|
|
|
.into_iter()
|
|
|
|
.filter_map(|label| crds.lookup_versioned(&label));
|
|
|
|
|
|
|
|
let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new();
|
|
|
|
for val in versioned {
|
|
|
|
let origin = val.value.pubkey();
|
|
|
|
let hash = val.value_hash;
|
|
|
|
let peers = push.prune_received_cache(id, &origin, hash, stakes);
|
|
|
|
for from in peers {
|
|
|
|
prune_map.entry(from).or_default().insert(origin);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
prune_map
|
|
|
|
}
|
|
|
|
|
2019-05-28 18:39:40 -07:00
|
|
|
pub fn new_push_messages(&mut self, now: u64) -> (Pubkey, HashMap<Pubkey, Vec<CrdsValue>>) {
|
|
|
|
let push_messages = self.push.new_push_messages(&self.crds, now);
|
|
|
|
(self.id, push_messages)
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// add the `from` to the peer's filter of nodes
|
2018-12-01 12:00:30 -08:00
|
|
|
pub fn process_prune_msg(
|
|
|
|
&mut self,
|
2019-03-09 19:28:43 -08:00
|
|
|
peer: &Pubkey,
|
|
|
|
destination: &Pubkey,
|
2018-12-01 12:00:30 -08:00
|
|
|
origin: &[Pubkey],
|
|
|
|
wallclock: u64,
|
|
|
|
now: u64,
|
|
|
|
) -> Result<(), CrdsGossipError> {
|
|
|
|
let expired = now > wallclock + self.push.prune_timeout;
|
|
|
|
if expired {
|
|
|
|
return Err(CrdsGossipError::PruneMessageTimeout);
|
|
|
|
}
|
2019-03-09 19:28:43 -08:00
|
|
|
if self.id == *destination {
|
2018-12-01 12:00:30 -08:00
|
|
|
self.push.process_prune_msg(peer, origin);
|
|
|
|
Ok(())
|
|
|
|
} else {
|
|
|
|
Err(CrdsGossipError::BadPruneDestination)
|
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// refresh the push active set
|
|
|
|
/// * ratio - number of actives to rotate
|
2019-02-20 20:02:47 -08:00
|
|
|
pub fn refresh_push_active_set(&mut self, stakes: &HashMap<Pubkey, u64>) {
|
2018-11-15 13:23:26 -08:00
|
|
|
self.push.refresh_push_active_set(
|
|
|
|
&self.crds,
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes,
|
2019-03-09 19:28:43 -08:00
|
|
|
&self.id,
|
2018-11-15 13:23:26 -08:00
|
|
|
self.pull.pull_request_time.len(),
|
|
|
|
CRDS_GOSSIP_NUM_ACTIVE,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// generate a random request
|
|
|
|
pub fn new_pull_request(
|
|
|
|
&self,
|
|
|
|
now: u64,
|
2019-02-20 20:02:47 -08:00
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
2019-08-13 18:04:14 -07:00
|
|
|
bloom_size: usize,
|
|
|
|
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
|
2019-03-09 19:28:43 -08:00
|
|
|
self.pull
|
2019-08-13 18:04:14 -07:00
|
|
|
.new_pull_request(&self.crds, &self.id, now, stakes, bloom_size)
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// time when a request to `from` was initiated
|
2019-03-07 17:47:23 -08:00
|
|
|
/// This is used for weighted random selection during `new_pull_request`
|
2018-11-15 13:23:26 -08:00
|
|
|
/// It's important to use the local nodes request creation time as the weight
|
2019-03-07 17:47:23 -08:00
|
|
|
/// instead of the response received time otherwise failed nodes will increase their weight.
|
2019-03-09 19:28:43 -08:00
|
|
|
pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) {
|
2018-11-15 13:23:26 -08:00
|
|
|
self.pull.mark_pull_request_creation_time(from, now)
|
|
|
|
}
|
|
|
|
/// process a pull request and create a response
|
2019-08-15 17:04:45 -07:00
|
|
|
pub fn process_pull_requests(
|
2018-11-15 13:23:26 -08:00
|
|
|
&mut self,
|
2019-08-15 17:04:45 -07:00
|
|
|
filters: Vec<(CrdsValue, CrdsFilter)>,
|
2018-11-15 13:23:26 -08:00
|
|
|
now: u64,
|
2019-08-15 17:04:45 -07:00
|
|
|
) -> Vec<Vec<CrdsValue>> {
|
2018-11-15 13:23:26 -08:00
|
|
|
self.pull
|
2019-08-15 17:04:45 -07:00
|
|
|
.process_pull_requests(&mut self.crds, filters, now)
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
/// process a pull response
|
|
|
|
pub fn process_pull_response(
|
|
|
|
&mut self,
|
2019-03-09 19:28:43 -08:00
|
|
|
from: &Pubkey,
|
2020-02-07 12:38:24 -08:00
|
|
|
timeouts: &HashMap<Pubkey, u64>,
|
2018-11-15 13:23:26 -08:00
|
|
|
response: Vec<CrdsValue>,
|
|
|
|
now: u64,
|
|
|
|
) -> usize {
|
|
|
|
self.pull
|
2020-02-07 12:38:24 -08:00
|
|
|
.process_pull_response(&mut self.crds, from, timeouts, response, now)
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
2019-11-20 11:25:18 -08:00
|
|
|
|
|
|
|
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {
|
|
|
|
self.make_timeouts(&HashMap::new(), self.pull.crds_timeout)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn make_timeouts(
|
|
|
|
&self,
|
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
|
|
epoch_ms: u64,
|
|
|
|
) -> HashMap<Pubkey, u64> {
|
|
|
|
self.pull.make_timeouts(&self.id, stakes, epoch_ms)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn purge(&mut self, now: u64, timeouts: &HashMap<Pubkey, u64>) -> usize {
|
|
|
|
let mut rv = 0;
|
2018-11-15 13:23:26 -08:00
|
|
|
if now > self.push.msg_timeout {
|
|
|
|
let min = now - self.push.msg_timeout;
|
|
|
|
self.push.purge_old_pending_push_messages(&self.crds, min);
|
|
|
|
}
|
|
|
|
if now > 5 * self.push.msg_timeout {
|
|
|
|
let min = now - 5 * self.push.msg_timeout;
|
2019-06-26 00:30:16 -07:00
|
|
|
self.push.purge_old_received_cache(min);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
if now > self.pull.crds_timeout {
|
2019-11-20 11:25:18 -08:00
|
|
|
//sanity check
|
|
|
|
let min = self.pull.crds_timeout;
|
|
|
|
assert_eq!(timeouts[&self.id], std::u64::MAX);
|
|
|
|
assert_eq!(timeouts[&Pubkey::default()], min);
|
|
|
|
rv = self.pull.purge_active(&mut self.crds, now, &timeouts);
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
if now > 5 * self.pull.crds_timeout {
|
|
|
|
let min = now - 5 * self.pull.crds_timeout;
|
|
|
|
self.pull.purge_purged(min);
|
|
|
|
}
|
2019-11-20 11:25:18 -08:00
|
|
|
rv
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-20 20:02:47 -08:00
|
|
|
/// Computes a normalized(log of actual stake) stake
|
|
|
|
pub fn get_stake<S: std::hash::BuildHasher>(id: &Pubkey, stakes: &HashMap<Pubkey, u64, S>) -> f32 {
|
|
|
|
// cap the max balance to u32 max (it should be plenty)
|
|
|
|
let bal = f64::from(u32::max_value()).min(*stakes.get(id).unwrap_or(&0) as f64);
|
|
|
|
1_f32.max((bal as f32).ln())
|
2019-02-20 17:08:56 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Computes bounded weight given some max, a time since last selected, and a stake value
|
|
|
|
/// The minimum stake is 1 and not 0 to allow 'time since last' picked to factor in.
|
|
|
|
pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) -> f32 {
|
|
|
|
let mut weight = time_since_last_selected as f32 * stake;
|
|
|
|
if weight.is_infinite() {
|
|
|
|
weight = max_weight;
|
|
|
|
}
|
|
|
|
1.0_f32.max(weight.min(max_weight))
|
|
|
|
}
|
|
|
|
|
2018-11-15 13:23:26 -08:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
2019-03-08 17:23:07 -08:00
|
|
|
use crate::contact_info::ContactInfo;
|
2019-11-03 10:07:51 -08:00
|
|
|
use crate::crds_value::CrdsData;
|
2018-12-01 12:00:30 -08:00
|
|
|
use solana_sdk::hash::hash;
|
|
|
|
use solana_sdk::timing::timestamp;
|
2018-11-15 13:23:26 -08:00
|
|
|
|
2018-12-01 12:00:30 -08:00
|
|
|
#[test]
|
|
|
|
fn test_prune_errors() {
|
|
|
|
let mut crds_gossip = CrdsGossip::default();
|
|
|
|
crds_gossip.id = Pubkey::new(&[0; 32]);
|
|
|
|
let id = crds_gossip.id;
|
2019-03-09 19:28:43 -08:00
|
|
|
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
|
2018-12-01 12:00:30 -08:00
|
|
|
let prune_pubkey = Pubkey::new(&[2; 32]);
|
|
|
|
crds_gossip
|
|
|
|
.crds
|
2019-11-03 10:07:51 -08:00
|
|
|
.insert(
|
|
|
|
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
|
|
|
|
0,
|
|
|
|
)
|
2018-12-01 12:00:30 -08:00
|
|
|
.unwrap();
|
2019-02-20 20:02:47 -08:00
|
|
|
crds_gossip.refresh_push_active_set(&HashMap::new());
|
2018-12-01 12:00:30 -08:00
|
|
|
let now = timestamp();
|
|
|
|
//incorrect dest
|
|
|
|
let mut res = crds_gossip.process_prune_msg(
|
2019-03-09 19:28:43 -08:00
|
|
|
&ci.id,
|
|
|
|
&Pubkey::new(hash(&[1; 32]).as_ref()),
|
2018-12-01 12:00:30 -08:00
|
|
|
&[prune_pubkey],
|
|
|
|
now,
|
|
|
|
now,
|
|
|
|
);
|
|
|
|
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
|
|
|
|
//correct dest
|
2019-03-09 19:28:43 -08:00
|
|
|
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now);
|
2019-01-28 14:52:35 -08:00
|
|
|
res.unwrap();
|
2018-12-01 12:00:30 -08:00
|
|
|
//test timeout
|
|
|
|
let timeout = now + crds_gossip.push.prune_timeout * 2;
|
2019-03-09 19:28:43 -08:00
|
|
|
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout);
|
2018-12-01 12:00:30 -08:00
|
|
|
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
|
|
|
|
}
|
2018-11-15 13:23:26 -08:00
|
|
|
}
|