diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index c12431dba..0dc8975b1 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -69,7 +69,6 @@ use std::{ cmp::min, collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, fmt::{self, Debug}, - iter::FromIterator, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, ops::{Deref, DerefMut}, sync::atomic::{AtomicBool, AtomicU64, Ordering}, @@ -97,10 +96,13 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; /// message: Protocol::PushMessage(Pubkey::default(), Vec::default()) const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44; /// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes -/// such that the serialized size of the push/pull message stays bellow +/// such that the serialized size of the push/pull message stays below /// PACKET_DATA_SIZE. // TODO: Update this to 26 once payload sizes are upgraded across fleet. pub const MAX_SNAPSHOT_HASHES: usize = 16; +/// Maximum number of origin nodes that a PruneData may contain, such that the +/// serialized size of the PruneMessage stays below PACKET_DATA_SIZE. +const MAX_PRUNE_DATA_NODES: usize = 32; /// Number of bytes in the randomly generated token sent with ping messages. const GOSSIP_PING_TOKEN_SIZE: usize = 32; const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; @@ -342,6 +344,27 @@ pub struct PruneData { pub wallclock: u64, } +impl PruneData { + /// New random PruneData for tests and benchmarks. + #[cfg(test)] + fn new_rand(rng: &mut R, self_keypair: &Keypair, num_nodes: Option) -> Self { + let wallclock = crds_value::new_rand_timestamp(rng); + let num_nodes = num_nodes.unwrap_or_else(|| rng.gen_range(0, MAX_PRUNE_DATA_NODES + 1)); + let prunes = std::iter::repeat_with(Pubkey::new_unique) + .take(num_nodes) + .collect(); + let mut prune_data = PruneData { + pubkey: self_keypair.pubkey(), + prunes, + signature: Signature::default(), + destination: Pubkey::new_unique(), + wallclock, + }; + prune_data.sign(&self_keypair); + prune_data + } +} + impl Sanitize for PruneData { fn sanitize(&self) -> std::result::Result<(), SanitizeError> { if self.wallclock >= MAX_WALLCLOCK { @@ -2373,22 +2396,36 @@ impl ClusterInfo { .collect() }; // Generate prune messages. - let prunes_map = self + let prunes = self .time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache) .prune_received_cache(updated_labels, stakes); + let prunes: Vec<(Pubkey /*from*/, Vec /*origins*/)> = prunes + .into_iter() + .flat_map(|(from, prunes)| { + std::iter::repeat(from).zip( + prunes + .into_iter() + .chunks(MAX_PRUNE_DATA_NODES) + .into_iter() + .map(Iterator::collect) + .collect::>(), + ) + }) + .collect(); + let prune_messages: Vec<_> = { let gossip = self.gossip.read().unwrap(); let wallclock = timestamp(); let self_pubkey = self.id(); thread_pool.install(|| { - Vec::from_iter(prunes_map) + prunes .into_par_iter() .with_min_len(256) - .filter_map(|(from, prune_set)| { + .filter_map(|(from, prunes)| { let peer = gossip.crds.get_contact_info(&from)?; let mut prune_data = PruneData { pubkey: self_pubkey, - prunes: Vec::from_iter(prune_set), + prunes, signature: Signature::default(), destination: from, wallclock, @@ -3101,7 +3138,7 @@ mod tests { use solana_vote_program::{vote_instruction, vote_state::Vote}; use std::collections::HashSet; use std::iter::repeat_with; - use std::net::{IpAddr, Ipv4Addr, SocketAddrV4}; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4}; use std::sync::Arc; #[test] @@ -3143,15 +3180,30 @@ mod tests { ); } + fn new_rand_socket_addr(rng: &mut R) -> SocketAddr { + let addr = if rng.gen_bool(0.5) { + IpAddr::V4(Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen())) + } else { + IpAddr::V6(Ipv6Addr::new( + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + rng.gen(), + )) + }; + SocketAddr::new(addr, /*port=*/ rng.gen()) + } + fn new_rand_remote_node(rng: &mut R) -> (Keypair, SocketAddr) where R: Rng, { let keypair = Keypair::new(); - let socket = SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), - rng.gen(), - )); + let socket = new_rand_socket_addr(rng); (keypair, socket) } @@ -3322,10 +3374,7 @@ mod tests { let crds_value = CrdsValue::new_signed(CrdsData::SnapshotHashes(snapshot_hash), &Keypair::new()); let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]); - let socket = SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), - rng.gen(), - )); + let socket = new_rand_socket_addr(&mut rng); assert!(Packet::from_data(&socket, message).is_ok()); } } @@ -3338,14 +3387,31 @@ mod tests { let crds_value = CrdsValue::new_signed(CrdsData::AccountsHashes(snapshot_hash), &Keypair::new()); let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]); - let socket = SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), - rng.gen(), - )); + let socket = new_rand_socket_addr(&mut rng); assert!(Packet::from_data(&socket, response).is_ok()); } } + #[test] + fn test_max_prune_data_pubkeys() { + let mut rng = rand::thread_rng(); + for _ in 0..64 { + let self_keypair = Keypair::new(); + let prune_data = + PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES)); + let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data); + let socket = new_rand_socket_addr(&mut rng); + assert!(Packet::from_data(&socket, prune_message).is_ok()); + } + // Assert that MAX_PRUNE_DATA_NODES is highest possible. + let self_keypair = Keypair::new(); + let prune_data = + PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES + 1)); + let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data); + let socket = new_rand_socket_addr(&mut rng); + assert!(Packet::from_data(&socket, prune_message).is_err()); + } + #[test] fn test_push_message_max_payload_size() { let header = Protocol::PushMessage(Pubkey::default(), Vec::default()); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 195c3004d..14caa3c01 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -112,9 +112,9 @@ impl Sanitize for CrdsData { } /// Random timestamp for tests and benchmarks. -fn new_rand_timestamp(rng: &mut R) -> u64 { - let delay = 10 * 60 * 1000; // 10 minutes - timestamp() - delay + rng.gen_range(0, 2 * delay) +pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { + const DELAY: u64 = 10 * 60 * 1000; // 10 minutes + timestamp() - DELAY + rng.gen_range(0, 2 * DELAY) } impl CrdsData { diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 981bb756f..d257f2e7c 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -88,6 +88,9 @@ pub fn to_packets_with_destination( for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 { if let Err(e) = Packet::populate_packet(o, Some(&dest_and_data.0), &dest_and_data.1) { + // TODO: This should never happen. Instead the caller should + // break the payload into smaller messages, and here any errors + // should be propagated. error!("Couldn't write to packet {:?}. Data skipped.", e); } } else {