//! Crds Gossip //! This module ties together Crds and the push and pull gossip overlays. The interface is //! designed to run with a simulator or over a UDP network connection with messages up to a //! packet::PACKET_DATA_SIZE size. use crate::{ cluster_info::Ping, contact_info::ContactInfo, crds::{Crds, VersionedCrdsValue}, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats}, crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}, crds_value::{CrdsData, CrdsValue}, duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS}, ping_pong::PingCache, }; use itertools::Itertools; use rayon::ThreadPool; use solana_ledger::shred::Shred; use solana_sdk::{ hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, timing::timestamp, }; use std::{ collections::{HashMap, HashSet}, net::SocketAddr, sync::Mutex, }; pub struct CrdsGossip { pub crds: Crds, pub id: Pubkey, pub shred_version: u16, pub push: CrdsGossipPush, pub pull: CrdsGossipPull, } impl Default for CrdsGossip { fn default() -> Self { CrdsGossip { crds: Crds::default(), id: Pubkey::default(), shred_version: 0, push: CrdsGossipPush::default(), pull: CrdsGossipPull::default(), } } } impl CrdsGossip { pub fn set_self(&mut self, id: &Pubkey) { self.id = *id; } pub fn set_shred_version(&mut self, shred_version: u16) { self.shred_version = shred_version; } /// process a push message to the network pub fn process_push_message( &mut self, from: &Pubkey, values: Vec, now: u64, ) -> Vec { values .into_iter() .filter_map(|val| { let old = self .push .process_push_message(&mut self.crds, from, val, now) .ok()?; self.pull.record_old_hash(old.as_ref()?.value_hash, now); old }) .collect() } /// remove redundant paths in the network pub fn prune_received_cache( &mut self, origins: I, // Unique pubkeys of crds values' owners. stakes: &HashMap, ) -> HashMap> where I: IntoIterator, { let self_pubkey = self.id; origins .into_iter() .flat_map(|origin| { self.push .prune_received_cache(&self_pubkey, &origin, stakes) .into_iter() .zip(std::iter::repeat(origin)) }) .into_group_map() } pub fn new_push_messages( &mut self, pending_push_messages: Vec, now: u64, ) -> HashMap> { let self_pubkey = self.id; self.process_push_message(&self_pubkey, pending_push_messages, now); self.push.new_push_messages(&self.crds, now) } pub(crate) fn push_duplicate_shred( &mut self, keypair: &Keypair, shred: &Shred, other_payload: &[u8], leader_schedule: Option, // Maximum serialized size of each DuplicateShred chunk payload. max_payload_size: usize, ) -> Result<(), duplicate_shred::Error> { let pubkey = keypair.pubkey(); // Skip if there are already records of duplicate shreds for this slot. let shred_slot = shred.slot(); if self .crds .get_records(&pubkey) .any(|value| match &value.value.data { CrdsData::DuplicateShred(_, value) => value.slot == shred_slot, _ => false, }) { return Ok(()); } let chunks = duplicate_shred::from_shred( shred.clone(), pubkey, Vec::from(other_payload), leader_schedule, timestamp(), max_payload_size, )?; // Find the index of oldest duplicate shred. let mut num_dup_shreds = 0; let offset = self .crds .get_records(&pubkey) .filter_map(|value| match &value.value.data { CrdsData::DuplicateShred(ix, value) => { num_dup_shreds += 1; Some((value.wallclock, *ix)) } _ => None, }) .min() // Override the oldest records. .map(|(_ /*wallclock*/, ix)| ix) .unwrap_or(0); let offset = if num_dup_shreds < MAX_DUPLICATE_SHREDS { num_dup_shreds } else { offset }; let entries = chunks .enumerate() .map(|(k, chunk)| { let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS; let data = CrdsData::DuplicateShred(index, chunk); CrdsValue::new_signed(data, keypair) }) .collect(); self.process_push_message(&pubkey, entries, timestamp()); Ok(()) } /// add the `from` to the peer's filter of nodes pub fn process_prune_msg( &self, peer: &Pubkey, destination: &Pubkey, origin: &[Pubkey], wallclock: u64, now: u64, ) -> Result<(), CrdsGossipError> { let expired = now > wallclock + self.push.prune_timeout; if expired { return Err(CrdsGossipError::PruneMessageTimeout); } if self.id == *destination { self.push.process_prune_msg(&self.id, peer, origin); Ok(()) } else { Err(CrdsGossipError::BadPruneDestination) } } /// refresh the push active set /// * ratio - number of actives to rotate pub fn refresh_push_active_set( &mut self, stakes: &HashMap, gossip_validators: Option<&HashSet>, ) { self.push.refresh_push_active_set( &self.crds, stakes, gossip_validators, &self.id, self.shred_version, self.crds.num_nodes(), CRDS_GOSSIP_NUM_ACTIVE, ) } /// generate a random request pub fn new_pull_request( &self, thread_pool: &ThreadPool, self_keypair: &Keypair, now: u64, gossip_validators: Option<&HashSet>, stakes: &HashMap, bloom_size: usize, ping_cache: &Mutex, pings: &mut Vec<(SocketAddr, Ping)>, ) -> Result<(ContactInfo, Vec), CrdsGossipError> { self.pull.new_pull_request( thread_pool, &self.crds, self_keypair, self.shred_version, now, gossip_validators, stakes, bloom_size, ping_cache, pings, ) } /// time when a request to `from` was initiated /// This is used for weighted random selection during `new_pull_request` /// It's important to use the local nodes request creation time as the weight /// instead of the response received time otherwise failed nodes will increase their weight. pub fn mark_pull_request_creation_time(&mut self, from: Pubkey, now: u64) { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response pub fn process_pull_requests(&mut self, callers: I, now: u64) where I: IntoIterator, { self.pull .process_pull_requests(&mut self.crds, callers, now); } pub fn generate_pull_responses( &self, filters: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, ) -> Vec> { self.pull .generate_pull_responses(&self.crds, filters, output_size_limit, now) } pub fn filter_pull_responses( &self, timeouts: &HashMap, response: Vec, now: u64, process_pull_stats: &mut ProcessPullStats, ) -> ( Vec, // valid responses. Vec, // responses with expired timestamps. Vec, // hash of outdated values. ) { self.pull .filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats) } /// process a pull response pub fn process_pull_responses( &mut self, from: &Pubkey, responses: Vec, responses_expired_timeout: Vec, failed_inserts: Vec, now: u64, process_pull_stats: &mut ProcessPullStats, ) { self.pull.process_pull_responses( &mut self.crds, from, responses, responses_expired_timeout, failed_inserts, now, process_pull_stats, ); } pub fn make_timeouts_test(&self) -> HashMap { self.make_timeouts(&HashMap::new(), self.pull.crds_timeout) } pub fn make_timeouts( &self, stakes: &HashMap, epoch_ms: u64, ) -> HashMap { self.pull.make_timeouts(&self.id, stakes, epoch_ms) } pub fn purge( &mut self, thread_pool: &ThreadPool, now: u64, timeouts: &HashMap, ) -> usize { let mut rv = 0; if now > 5 * self.push.msg_timeout { let min = now - 5 * self.push.msg_timeout; self.push.purge_old_received_cache(min); } if now > self.pull.crds_timeout { //sanity check let min = self.pull.crds_timeout; assert_eq!(timeouts[&self.id], std::u64::MAX); assert_eq!(timeouts[&Pubkey::default()], min); rv = self .pull .purge_active(thread_pool, &mut self.crds, now, &timeouts); } if now > 5 * self.pull.crds_timeout { let min = now - 5 * self.pull.crds_timeout; self.pull.purge_purged(min); } self.pull.purge_failed_inserts(now); rv } // Only for tests and simulations. pub(crate) fn mock_clone(&self) -> Self { Self { crds: self.crds.clone(), push: self.push.mock_clone(), pull: self.pull.mock_clone(), ..*self } } } /// Computes a normalized(log of actual stake) stake pub fn get_stake(id: &Pubkey, stakes: &HashMap) -> f32 { // cap the max balance to u32 max (it should be plenty) let bal = f64::from(u32::max_value()).min(*stakes.get(id).unwrap_or(&0) as f64); 1_f32.max((bal as f32).ln()) } /// Computes bounded weight given some max, a time since last selected, and a stake value /// The minimum stake is 1 and not 0 to allow 'time since last' picked to factor in. pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) -> f32 { let mut weight = time_since_last_selected as f32 * stake; if weight.is_infinite() { weight = max_weight; } 1.0_f32.max(weight.min(max_weight)) } #[cfg(test)] mod test { use super::*; use crate::contact_info::ContactInfo; use crate::crds_value::CrdsData; use solana_sdk::hash::hash; use solana_sdk::timing::timestamp; #[test] fn test_prune_errors() { let mut crds_gossip = CrdsGossip { id: Pubkey::new(&[0; 32]), ..CrdsGossip::default() }; let id = crds_gossip.id; let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0); let prune_pubkey = Pubkey::new(&[2; 32]); crds_gossip .crds .insert( CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())), 0, ) .unwrap(); crds_gossip.refresh_push_active_set(&HashMap::new(), None); let now = timestamp(); //incorrect dest let mut res = crds_gossip.process_prune_msg( &ci.id, &Pubkey::new(hash(&[1; 32]).as_ref()), &[prune_pubkey], now, now, ); assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination)); //correct dest res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now); res.unwrap(); //test timeout let timeout = now + crds_gossip.push.prune_timeout * 2; res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout); assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout)); } }