breaks prunes data into chunks to fit into packets (#13613)

Validator logs show that prune messages are dropped because they exceed
packet data size:
https://github.com/solana-labs/solana/blob/f25c969ad/perf/src/packet.rs#L90-L92
This can exacerbate gossip traffic by redundantly increasing push
messages across network. The workaround is to break prunes into smaller
chunks and send over in multiple messages.
This commit is contained in:
behzad nouri 2020-11-19 16:38:01 +00:00 committed by GitHub
parent 83799356dd
commit 1ffab5de77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 22 deletions

View File

@ -69,7 +69,6 @@ use std::{
cmp::min,
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt::{self, Debug},
iter::FromIterator,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
ops::{Deref, DerefMut},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
@ -97,10 +96,13 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
/// message: Protocol::PushMessage(Pubkey::default(), Vec::default())
const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44;
/// Maximum number of hashes in SnapshotHashes/AccountsHashes a node publishes
/// such that the serialized size of the push/pull message stays bellow
/// such that the serialized size of the push/pull message stays below
/// PACKET_DATA_SIZE.
// TODO: Update this to 26 once payload sizes are upgraded across fleet.
pub const MAX_SNAPSHOT_HASHES: usize = 16;
/// Maximum number of origin nodes that a PruneData may contain, such that the
/// serialized size of the PruneMessage stays below PACKET_DATA_SIZE.
const MAX_PRUNE_DATA_NODES: usize = 32;
/// Number of bytes in the randomly generated token sent with ping messages.
const GOSSIP_PING_TOKEN_SIZE: usize = 32;
const GOSSIP_PING_CACHE_CAPACITY: usize = 16384;
@ -342,6 +344,27 @@ pub struct PruneData {
pub wallclock: u64,
}
impl PruneData {
/// New random PruneData for tests and benchmarks.
#[cfg(test)]
fn new_rand<R: Rng>(rng: &mut R, self_keypair: &Keypair, num_nodes: Option<usize>) -> Self {
let wallclock = crds_value::new_rand_timestamp(rng);
let num_nodes = num_nodes.unwrap_or_else(|| rng.gen_range(0, MAX_PRUNE_DATA_NODES + 1));
let prunes = std::iter::repeat_with(Pubkey::new_unique)
.take(num_nodes)
.collect();
let mut prune_data = PruneData {
pubkey: self_keypair.pubkey(),
prunes,
signature: Signature::default(),
destination: Pubkey::new_unique(),
wallclock,
};
prune_data.sign(&self_keypair);
prune_data
}
}
impl Sanitize for PruneData {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
if self.wallclock >= MAX_WALLCLOCK {
@ -2373,22 +2396,36 @@ impl ClusterInfo {
.collect()
};
// Generate prune messages.
let prunes_map = self
let prunes = self
.time_gossip_write_lock("prune_received_cache", &self.stats.prune_received_cache)
.prune_received_cache(updated_labels, stakes);
let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes
.into_iter()
.flat_map(|(from, prunes)| {
std::iter::repeat(from).zip(
prunes
.into_iter()
.chunks(MAX_PRUNE_DATA_NODES)
.into_iter()
.map(Iterator::collect)
.collect::<Vec<_>>(),
)
})
.collect();
let prune_messages: Vec<_> = {
let gossip = self.gossip.read().unwrap();
let wallclock = timestamp();
let self_pubkey = self.id();
thread_pool.install(|| {
Vec::from_iter(prunes_map)
prunes
.into_par_iter()
.with_min_len(256)
.filter_map(|(from, prune_set)| {
.filter_map(|(from, prunes)| {
let peer = gossip.crds.get_contact_info(&from)?;
let mut prune_data = PruneData {
pubkey: self_pubkey,
prunes: Vec::from_iter(prune_set),
prunes,
signature: Signature::default(),
destination: from,
wallclock,
@ -3101,7 +3138,7 @@ mod tests {
use solana_vote_program::{vote_instruction, vote_state::Vote};
use std::collections::HashSet;
use std::iter::repeat_with;
use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4};
use std::sync::Arc;
#[test]
@ -3143,15 +3180,30 @@ mod tests {
);
}
fn new_rand_socket_addr<R: Rng>(rng: &mut R) -> SocketAddr {
let addr = if rng.gen_bool(0.5) {
IpAddr::V4(Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()))
} else {
IpAddr::V6(Ipv6Addr::new(
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
rng.gen(),
))
};
SocketAddr::new(addr, /*port=*/ rng.gen())
}
fn new_rand_remote_node<R>(rng: &mut R) -> (Keypair, SocketAddr)
where
R: Rng,
{
let keypair = Keypair::new();
let socket = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
));
let socket = new_rand_socket_addr(rng);
(keypair, socket)
}
@ -3322,10 +3374,7 @@ mod tests {
let crds_value =
CrdsValue::new_signed(CrdsData::SnapshotHashes(snapshot_hash), &Keypair::new());
let message = Protocol::PushMessage(Pubkey::new_unique(), vec![crds_value]);
let socket = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
));
let socket = new_rand_socket_addr(&mut rng);
assert!(Packet::from_data(&socket, message).is_ok());
}
}
@ -3338,14 +3387,31 @@ mod tests {
let crds_value =
CrdsValue::new_signed(CrdsData::AccountsHashes(snapshot_hash), &Keypair::new());
let response = Protocol::PullResponse(Pubkey::new_unique(), vec![crds_value]);
let socket = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()),
rng.gen(),
));
let socket = new_rand_socket_addr(&mut rng);
assert!(Packet::from_data(&socket, response).is_ok());
}
}
#[test]
fn test_max_prune_data_pubkeys() {
let mut rng = rand::thread_rng();
for _ in 0..64 {
let self_keypair = Keypair::new();
let prune_data =
PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES));
let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data);
let socket = new_rand_socket_addr(&mut rng);
assert!(Packet::from_data(&socket, prune_message).is_ok());
}
// Assert that MAX_PRUNE_DATA_NODES is highest possible.
let self_keypair = Keypair::new();
let prune_data =
PruneData::new_rand(&mut rng, &self_keypair, Some(MAX_PRUNE_DATA_NODES + 1));
let prune_message = Protocol::PruneMessage(self_keypair.pubkey(), prune_data);
let socket = new_rand_socket_addr(&mut rng);
assert!(Packet::from_data(&socket, prune_message).is_err());
}
#[test]
fn test_push_message_max_payload_size() {
let header = Protocol::PushMessage(Pubkey::default(), Vec::default());

View File

@ -112,9 +112,9 @@ impl Sanitize for CrdsData {
}
/// Random timestamp for tests and benchmarks.
fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
let delay = 10 * 60 * 1000; // 10 minutes
timestamp() - delay + rng.gen_range(0, 2 * delay)
pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
const DELAY: u64 = 10 * 60 * 1000; // 10 minutes
timestamp() - DELAY + rng.gen_range(0, 2 * DELAY)
}
impl CrdsData {

View File

@ -88,6 +88,9 @@ pub fn to_packets_with_destination<T: Serialize>(
for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) {
if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 {
if let Err(e) = Packet::populate_packet(o, Some(&dest_and_data.0), &dest_and_data.1) {
// TODO: This should never happen. Instead the caller should
// break the payload into smaller messages, and here any errors
// should be propagated.
error!("Couldn't write to packet {:?}. Data skipped.", e);
}
} else {