encapsulates turbine peers computations of broadcast & retransmit stages (#18238)

Broadcast stage and retransmit stage should arrange nodes on turbine
broadcast tree in exactly same order. Additionally any changes to this
ordering (e.g. updating how unstaked nodes are handled) requires feature
gating to keep the cluster in sync.

Current implementation is scattered out over several public methods and
exposes too much of implementation details (e.g. usize indices into
peers vector) which makes code changes and checking for feature
activations more difficult.

This commit encapsulates turbine peer computations into a new struct,
and only exposes two public methods, get_broadcast_peer and
get_retransmit_peers, for call-sites.
This commit is contained in:
behzad nouri 2021-07-07 00:35:25 +00:00 committed by GitHub
parent 77f61a5e2e
commit 04787be8b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 589 additions and 273 deletions

View File

@ -3,10 +3,14 @@
extern crate test;
use rand::{thread_rng, Rng};
use solana_core::broadcast_stage::broadcast_metrics::TransmitShredsStats;
use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers};
use solana_gossip::cluster_info::{ClusterInfo, Node};
use solana_gossip::contact_info::ContactInfo;
use solana_core::{
broadcast_stage::{broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage},
cluster_nodes::ClusterNodes,
};
use solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
};
use solana_ledger::shred::Shred;
use solana_sdk::pubkey;
use solana_sdk::timing::timestamp;
@ -36,7 +40,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
}
let cluster_info = Arc::new(cluster_info);
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes));
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
let shreds = Arc::new(shreds);
let last_datapoint = Arc::new(AtomicU64::new(0));
bencher.iter(move || {
@ -44,8 +48,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
broadcast_shreds(
&socket,
&shreds,
&peers_and_stakes,
&peers,
&cluster_nodes,
&last_datapoint,
&mut TransmitShredsStats::default(),
)

View File

@ -6,17 +6,15 @@ use self::{
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
standard_broadcast_run::StandardBroadcastRun,
};
use crate::result::{Error, Result};
use crate::{
cluster_nodes::ClusterNodes,
result::{Error, Result},
};
use crossbeam_channel::{
Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError,
Sender as CrossbeamSender,
};
use solana_gossip::{
cluster_info::{self, ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
weighted_shuffle::weighted_best,
};
use solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError};
use solana_ledger::{blockstore::Blockstore, shred::Shred};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
@ -390,26 +388,16 @@ fn update_peer_stats(
}
}
pub fn get_broadcast_peers(
cluster_info: &ClusterInfo,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = cluster_info.tvu_peers();
let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes);
(peers, peers_and_stakes)
}
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &[Shred],
peers_and_stakes: &[(u64, usize)],
peers: &[ContactInfo],
cluster_nodes: &ClusterNodes<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicU64>,
transmit_stats: &mut TransmitShredsStats,
) -> Result<()> {
let broadcast_len = peers_and_stakes.len();
let broadcast_len = cluster_nodes.num_peers();
if broadcast_len == 0 {
update_peer_stats(1, 1, last_datapoint_submit);
return Ok(());
@ -417,10 +405,9 @@ pub fn broadcast_shreds(
let mut shred_select = Measure::start("shred_select");
let packets: Vec<_> = shreds
.iter()
.map(|shred| {
let broadcast_index = weighted_best(peers_and_stakes, shred.seed());
(&shred.payload, &peers[broadcast_index].tvu)
.filter_map(|shred| {
let node = cluster_nodes.get_broadcast_peer(shred.seed())?;
Some((&shred.payload, &node.tvu))
})
.collect();
shred_select.stop();
@ -439,7 +426,7 @@ pub fn broadcast_shreds(
send_mmsg_time.stop();
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
let num_live_peers = num_live_peers(peers);
let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64;
update_peer_stats(
num_live_peers,
broadcast_len as i64 + 1,
@ -448,25 +435,6 @@ pub fn broadcast_shreds(
Ok(())
}
fn distance(a: u64, b: u64) -> u64 {
if a > b {
a - b
} else {
b - a
}
}
fn num_live_peers(peers: &[ContactInfo]) -> i64 {
let mut num_live_peers = 1i64;
peers.iter().for_each(|p| {
// A peer is considered live if they generated their contact info recently
if distance(timestamp(), p.wallclock) <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS {
num_live_peers += 1;
}
});
num_live_peers
}
#[cfg(test)]
pub mod test {
use super::*;
@ -550,19 +518,6 @@ pub mod test {
assert_eq!(num_expected_coding_shreds, coding_index);
}
#[test]
fn test_num_live_peers() {
let mut ci = ContactInfo {
wallclock: std::u64::MAX,
..ContactInfo::default()
};
assert_eq!(num_live_peers(&[ci.clone()]), 1);
ci.wallclock = timestamp() - 1;
assert_eq!(num_live_peers(&[ci.clone()]), 2);
ci.wallclock = timestamp() - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS - 1;
assert_eq!(num_live_peers(&[ci]), 1);
}
#[test]
fn test_duplicate_retransmit_signal() {
// Setup

View File

@ -1,4 +1,5 @@
use super::*;
use crate::cluster_nodes::ClusterNodes;
use solana_ledger::shred::Shredder;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;
@ -133,13 +134,14 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
) -> Result<()> {
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
// Broadcast data
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes.as_deref());
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(
cluster_info,
stakes.as_deref().unwrap_or(&HashMap::default()),
);
broadcast_shreds(
sock,
&shreds,
&peers_and_stakes,
&peers,
&cluster_nodes,
&Arc::new(AtomicU64::new(0)),
&mut TransmitShredsStats::default(),
)?;

View File

@ -4,7 +4,7 @@ use super::{
broadcast_utils::{self, ReceiveResults},
*,
};
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes};
use solana_ledger::{
entry::Entry,
shred::{
@ -26,16 +26,10 @@ pub struct StandardBroadcastRun {
shred_version: u16,
last_datapoint_submit: Arc<AtomicU64>,
num_batches: usize,
broadcast_peer_cache: Arc<RwLock<BroadcastPeerCache>>,
cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>,
last_peer_update: Arc<AtomicU64>,
}
#[derive(Default)]
struct BroadcastPeerCache {
peers: Vec<ContactInfo>,
peers_and_stakes: Vec<(u64, usize)>,
}
impl StandardBroadcastRun {
pub(super) fn new(shred_version: u16) -> Self {
Self {
@ -48,7 +42,7 @@ impl StandardBroadcastRun {
shred_version,
last_datapoint_submit: Arc::default(),
num_batches: 0,
broadcast_peer_cache: Arc::default(),
cluster_nodes: Arc::default(),
last_peer_update: Arc::default(),
}
}
@ -353,13 +347,13 @@ impl StandardBroadcastRun {
.compare_and_swap(now, last, Ordering::Relaxed)
== last
{
let mut w_broadcast_peer_cache = self.broadcast_peer_cache.write().unwrap();
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
w_broadcast_peer_cache.peers = peers;
w_broadcast_peer_cache.peers_and_stakes = peers_and_stakes;
*self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new(
cluster_info,
stakes.unwrap_or(&HashMap::default()),
);
}
get_peers_time.stop();
let r_broadcast_peer_cache = self.broadcast_peer_cache.read().unwrap();
let cluster_nodes = self.cluster_nodes.read().unwrap();
let mut transmit_stats = TransmitShredsStats::default();
// Broadcast the shreds
@ -367,12 +361,11 @@ impl StandardBroadcastRun {
broadcast_shreds(
sock,
&shreds,
&r_broadcast_peer_cache.peers_and_stakes,
&r_broadcast_peer_cache.peers,
&cluster_nodes,
&self.last_datapoint_submit,
&mut transmit_stats,
)?;
drop(r_broadcast_peer_cache);
drop(cluster_nodes);
transmit_time.stop();
transmit_stats.transmit_elapsed = transmit_time.as_us();

441
core/src/cluster_nodes.rs Normal file
View File

@ -0,0 +1,441 @@
use {
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
itertools::Itertools,
solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo},
contact_info::ContactInfo,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
weighted_shuffle::{weighted_best, weighted_shuffle},
},
solana_sdk::pubkey::Pubkey,
std::{any::TypeId, cmp::Reverse, collections::HashMap, marker::PhantomData},
};
enum NodeId {
// TVU node obtained through gossip (staked or not).
ContactInfo(ContactInfo),
// Staked node with no contact-info in gossip table.
Pubkey(Pubkey),
}
struct Node {
node: NodeId,
stake: u64,
}
pub struct ClusterNodes<T> {
pubkey: Pubkey, // The local node itself.
// All staked nodes + other known tvu-peers + the node itself;
// sorted by (stake, pubkey) in descending order.
nodes: Vec<Node>,
// Weights and indices for sampling peers. weighted_{shuffle,best} expect
// weights >= 1. For backward compatibility we use max(1, stake) for
// weights and exclude nodes with no contact-info.
index: Vec<(/*weight:*/ u64, /*index:*/ usize)>,
_phantom: PhantomData<T>,
}
impl Node {
#[inline]
fn pubkey(&self) -> Pubkey {
match &self.node {
NodeId::Pubkey(pubkey) => *pubkey,
NodeId::ContactInfo(node) => node.id,
}
}
#[inline]
fn contact_info(&self) -> Option<&ContactInfo> {
match &self.node {
NodeId::Pubkey(_) => None,
NodeId::ContactInfo(node) => Some(node),
}
}
}
impl<T> ClusterNodes<T> {
pub fn num_peers(&self) -> usize {
self.index.len()
}
// A peer is considered live if they generated their contact info recently.
pub fn num_peers_live(&self, now: u64) -> usize {
self.index
.iter()
.filter_map(|(_, index)| self.nodes[*index].contact_info())
.filter(|node| {
let elapsed = if node.wallclock < now {
now - node.wallclock
} else {
node.wallclock - now
};
elapsed < CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS
})
.count()
}
}
impl ClusterNodes<BroadcastStage> {
pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Self {
new_cluster_nodes(cluster_info, stakes)
}
/// Returns the root of turbine broadcast tree, which the leader sends the
/// shred to.
pub fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
if self.index.is_empty() {
None
} else {
let index = weighted_best(&self.index, shred_seed);
match &self.nodes[index].node {
NodeId::ContactInfo(node) => Some(node),
NodeId::Pubkey(_) => panic!("this should not happen!"),
}
}
}
}
impl ClusterNodes<RetransmitStage> {
pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Self {
new_cluster_nodes(cluster_info, stakes)
}
pub fn get_retransmit_peers(
&self,
shred_seed: [u8; 32],
fanout: usize,
slot_leader: Option<Pubkey>,
) -> (
Vec<&ContactInfo>, // neighbors
Vec<&ContactInfo>, // children
) {
// Exclude leader from list of nodes.
let index = self.index.iter().copied();
let (weights, index): (Vec<u64>, Vec<usize>) = match slot_leader {
None => {
error!("unknown leader for shred slot");
index.unzip()
}
Some(slot_leader) if slot_leader == self.pubkey => {
error!("retransmit from slot leader: {}", slot_leader);
index.unzip()
}
Some(slot_leader) => index
.filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader)
.unzip(),
};
let index: Vec<_> = {
let shuffle = weighted_shuffle(&weights, shred_seed);
shuffle.into_iter().map(|i| index[i]).collect()
};
let self_index = index
.iter()
.position(|i| self.nodes[*i].pubkey() == self.pubkey)
.unwrap();
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &index);
// Assert that the node itself is included in the set of neighbors, at
// the right offset.
debug_assert_eq!(
self.nodes[neighbors[self_index % fanout]].pubkey(),
self.pubkey
);
let get_contact_infos = |index: Vec<usize>| -> Vec<&ContactInfo> {
index
.into_iter()
.map(|i| self.nodes[i].contact_info().unwrap())
.collect()
};
(get_contact_infos(neighbors), get_contact_infos(children))
}
}
fn new_cluster_nodes<T: 'static>(
cluster_info: &ClusterInfo,
stakes: &HashMap<Pubkey, u64>,
) -> ClusterNodes<T> {
let self_pubkey = cluster_info.id();
let nodes = get_nodes(cluster_info, stakes);
let broadcast = TypeId::of::<T>() == TypeId::of::<BroadcastStage>();
// For backward compatibility:
// * nodes which do not have contact-info are excluded.
// * stakes are floored at 1.
// The sorting key here should be equivalent to
// solana_gossip::deprecated::sorted_stakes_with_index.
// Leader itself is excluded when sampling broadcast peers.
let index = nodes
.iter()
.enumerate()
.filter(|(_, node)| node.contact_info().is_some())
.filter(|(_, node)| !broadcast || node.pubkey() != self_pubkey)
.sorted_by_key(|(_, node)| Reverse((node.stake.max(1), node.pubkey())))
.map(|(index, node)| (node.stake.max(1), index))
.collect();
ClusterNodes {
pubkey: self_pubkey,
nodes,
index,
_phantom: PhantomData::default(),
}
}
// All staked nodes + other known tvu-peers + the node itself;
// sorted by (stake, pubkey) in descending order.
fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<Node> {
let self_pubkey = cluster_info.id();
// The local node itself.
std::iter::once({
let stake = stakes.get(&self_pubkey).copied().unwrap_or_default();
let node = NodeId::from(cluster_info.my_contact_info());
Node { node, stake }
})
// All known tvu-peers from gossip.
.chain(cluster_info.tvu_peers().into_iter().map(|node| {
let stake = stakes.get(&node.id).copied().unwrap_or_default();
let node = NodeId::from(node);
Node { node, stake }
}))
// All staked nodes.
.chain(
stakes
.iter()
.filter(|(_, stake)| **stake > 0)
.map(|(&pubkey, &stake)| Node {
node: NodeId::from(pubkey),
stake,
}),
)
.sorted_by_key(|node| Reverse((node.stake, node.pubkey())))
// Since sorted_by_key is stable, in case of duplicates, this
// will keep nodes with contact-info.
.dedup_by(|a, b| a.pubkey() == b.pubkey())
.collect()
}
impl From<ContactInfo> for NodeId {
fn from(node: ContactInfo) -> Self {
NodeId::ContactInfo(node)
}
}
impl From<Pubkey> for NodeId {
fn from(pubkey: Pubkey) -> Self {
NodeId::Pubkey(pubkey)
}
}
impl<T> Default for ClusterNodes<T> {
fn default() -> Self {
Self {
pubkey: Pubkey::default(),
nodes: Vec::default(),
index: Vec::default(),
_phantom: PhantomData::default(),
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
rand::{seq::SliceRandom, Rng},
solana_gossip::{
crds_value::{CrdsData, CrdsValue},
deprecated::{
shuffle_peers_and_index, sorted_retransmit_peers_and_stakes,
sorted_stakes_with_index,
},
},
solana_sdk::timing::timestamp,
std::iter::repeat_with,
};
// Legacy methods copied for testing backward compatibility.
fn get_broadcast_peers(
cluster_info: &ClusterInfo,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = cluster_info.tvu_peers();
let peers_and_stakes = stake_weight_peers(&mut peers, stakes);
(peers, peers_and_stakes)
}
fn stake_weight_peers(
peers: &mut Vec<ContactInfo>,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Vec<(u64, usize)> {
peers.dedup();
sorted_stakes_with_index(peers, stakes)
}
fn make_cluster<R: Rng>(
rng: &mut R,
) -> (
Vec<ContactInfo>,
HashMap<Pubkey, u64>, // stakes
ClusterInfo,
) {
let mut nodes: Vec<_> = repeat_with(|| ContactInfo::new_rand(rng, None))
.take(1000)
.collect();
nodes.shuffle(rng);
let this_node = nodes[0].clone();
let mut stakes: HashMap<Pubkey, u64> = nodes
.iter()
.filter_map(|node| {
if rng.gen_ratio(1, 7) {
None // No stake for some of the nodes.
} else {
Some((node.id, rng.gen_range(0, 20)))
}
})
.collect();
// Add some staked nodes with no contact-info.
stakes.extend(repeat_with(|| (Pubkey::new_unique(), rng.gen_range(0, 20))).take(100));
let cluster_info = ClusterInfo::new_with_invalid_keypair(this_node);
{
let now = timestamp();
let mut gossip = cluster_info.gossip.write().unwrap();
// First node is pushed to crds table by ClusterInfo constructor.
for node in nodes.iter().skip(1) {
let node = CrdsData::ContactInfo(node.clone());
let node = CrdsValue::new_unsigned(node);
assert_eq!(gossip.crds.insert(node, now), Ok(()));
}
}
(nodes, stakes, cluster_info)
}
#[test]
fn test_cluster_nodes_retransmit() {
let mut rng = rand::thread_rng();
let (nodes, stakes, cluster_info) = make_cluster(&mut rng);
let this_node = cluster_info.my_contact_info();
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = ClusterNodes::<RetransmitStage>::new(&cluster_info, &stakes);
// All nodes with contact-info should be in the index.
assert_eq!(cluster_nodes.index.len(), nodes.len());
// Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len());
// Assert that all nodes keep their contact-info.
// and, all staked nodes are also included.
{
let cluster_nodes: HashMap<_, _> = cluster_nodes
.nodes
.iter()
.map(|node| (node.pubkey(), node))
.collect();
for node in &nodes {
assert_eq!(cluster_nodes[&node.id].contact_info().unwrap().id, node.id);
}
for (pubkey, stake) in &stakes {
if *stake > 0 {
assert_eq!(cluster_nodes[pubkey].stake, *stake);
}
}
}
let (peers, stakes_and_index) =
sorted_retransmit_peers_and_stakes(&cluster_info, Some(&stakes));
assert_eq!(stakes_and_index.len(), peers.len());
assert_eq!(cluster_nodes.index.len(), peers.len());
for (i, node) in cluster_nodes
.index
.iter()
.map(|(_, i)| &cluster_nodes.nodes[*i])
.enumerate()
{
let (stake, index) = stakes_and_index[i];
// Wallclock may be update by ClusterInfo::push_self.
if node.pubkey() == this_node.id {
assert_eq!(this_node.id, peers[index].id)
} else {
assert_eq!(node.contact_info().unwrap(), &peers[index]);
}
assert_eq!(node.stake.max(1), stake);
}
let slot_leader = nodes[1..].choose(&mut rng).unwrap().id;
// Remove slot leader from peers indices.
let stakes_and_index: Vec<_> = stakes_and_index
.into_iter()
.filter(|(_stake, index)| peers[*index].id != slot_leader)
.collect();
assert_eq!(peers.len(), stakes_and_index.len() + 1);
let mut shred_seed = [0u8; 32];
rng.fill(&mut shred_seed[..]);
let (self_index, shuffled_peers_and_stakes) =
shuffle_peers_and_index(&this_node.id, &peers, &stakes_and_index, shred_seed);
let shuffled_index: Vec<_> = shuffled_peers_and_stakes
.into_iter()
.map(|(_, index)| index)
.collect();
assert_eq!(this_node.id, peers[shuffled_index[self_index]].id);
for fanout in 1..200 {
let (neighbors_indices, children_indices) =
compute_retransmit_peers(fanout, self_index, &shuffled_index);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(shred_seed, fanout, Some(slot_leader));
assert_eq!(children.len(), children_indices.len());
for (node, index) in children.into_iter().zip(children_indices) {
assert_eq!(*node, peers[index]);
}
assert_eq!(neighbors.len(), neighbors_indices.len());
assert_eq!(neighbors[0].id, peers[neighbors_indices[0]].id);
for (node, index) in neighbors.into_iter().zip(neighbors_indices).skip(1) {
assert_eq!(*node, peers[index]);
}
}
}
#[test]
fn test_cluster_nodes_broadcast() {
let mut rng = rand::thread_rng();
let (nodes, stakes, cluster_info) = make_cluster(&mut rng);
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
// All nodes with contact-info should be in the index.
// Excluding this node itself.
assert_eq!(cluster_nodes.index.len() + 1, nodes.len());
// Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len());
// Assert that all nodes keep their contact-info.
// and, all staked nodes are also included.
{
let cluster_nodes: HashMap<_, _> = cluster_nodes
.nodes
.iter()
.map(|node| (node.pubkey(), node))
.collect();
for node in &nodes {
assert_eq!(cluster_nodes[&node.id].contact_info().unwrap().id, node.id);
}
for (pubkey, stake) in &stakes {
if *stake > 0 {
assert_eq!(cluster_nodes[pubkey].stake, *stake);
}
}
}
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(&stakes));
assert_eq!(peers_and_stakes.len(), peers.len());
assert_eq!(cluster_nodes.index.len(), peers.len());
for (i, node) in cluster_nodes
.index
.iter()
.map(|(_, i)| &cluster_nodes.nodes[*i])
.enumerate()
{
let (stake, index) = peers_and_stakes[i];
assert_eq!(node.contact_info().unwrap(), &peers[index]);
assert_eq!(node.stake.max(1), stake);
}
for _ in 0..100 {
let mut shred_seed = [0u8; 32];
rng.fill(&mut shred_seed[..]);
let index = weighted_best(&peers_and_stakes, shred_seed);
let peer = cluster_nodes.get_broadcast_peer(shred_seed).unwrap();
assert_eq!(*peer, peers[index]);
}
}
}

View File

@ -12,6 +12,7 @@ pub mod banking_stage;
pub mod broadcast_stage;
pub mod cache_block_meta_service;
pub mod cluster_info_vote_listener;
pub mod cluster_nodes;
pub mod cluster_slot_state_verifier;
pub mod cluster_slots;
pub mod cluster_slots_service;

View File

@ -3,6 +3,7 @@
use crate::{
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_nodes::ClusterNodes,
cluster_slots::ClusterSlots,
cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
completed_data_sets_service::CompletedDataSetsSender,
@ -13,10 +14,7 @@ use crate::{
use crossbeam_channel::{Receiver, Sender};
use lru::LruCache;
use solana_client::rpc_response::SlotUpdate;
use solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
contact_info::ContactInfo,
};
use solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT};
use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats};
use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache};
use solana_measure::measure::Measure;
@ -27,7 +25,6 @@ use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use solana_streamer::streamer::PacketReceiver;
use std::{
cmp,
collections::hash_set::HashSet,
collections::{BTreeMap, BTreeSet, HashMap},
net::UdpSocket,
@ -211,12 +208,6 @@ fn update_retransmit_stats(
}
}
#[derive(Default)]
struct EpochStakesCache {
peers: Vec<ContactInfo>,
stakes_and_index: Vec<(u64, usize)>,
}
use crate::packet_hasher::PacketHasher;
// Map of shred (slot, index, is_data) => list of hash values seen for that key.
pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
@ -277,33 +268,6 @@ fn check_if_first_shred_received(
}
}
// Drops shred slot leader from retransmit peers.
// TODO: decide which bank should be used here.
fn get_retransmit_peers(
self_pubkey: Pubkey,
shred_slot: Slot,
leader_schedule_cache: &LeaderScheduleCache,
bank: &Bank,
stakes_cache: &EpochStakesCache,
) -> Vec<(u64 /*stakes*/, usize /*index*/)> {
match leader_schedule_cache.slot_leader_at(shred_slot, Some(bank)) {
None => {
error!("unknown leader for shred slot");
stakes_cache.stakes_and_index.clone()
}
Some(pubkey) if pubkey == self_pubkey => {
error!("retransmit from slot leader: {}", pubkey);
stakes_cache.stakes_and_index.clone()
}
Some(pubkey) => stakes_cache
.stakes_and_index
.iter()
.filter(|(_, i)| stakes_cache.peers[*i].id != pubkey)
.copied()
.collect(),
}
}
#[allow(clippy::too_many_arguments)]
fn retransmit(
bank_forks: &RwLock<BankForks>,
@ -313,7 +277,7 @@ fn retransmit(
sock: &UdpSocket,
id: u32,
stats: &RetransmitStats,
epoch_stakes_cache: &RwLock<EpochStakesCache>,
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
last_peer_update: &AtomicU64,
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
@ -351,20 +315,17 @@ fn retransmit(
&& last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
{
let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch);
let (peers, stakes_and_index) =
cluster_info.sorted_retransmit_peers_and_stakes(epoch_staked_nodes.as_ref());
{
let mut epoch_stakes_cache = epoch_stakes_cache.write().unwrap();
epoch_stakes_cache.peers = peers;
epoch_stakes_cache.stakes_and_index = stakes_and_index;
}
*cluster_nodes.write().unwrap() = ClusterNodes::<RetransmitStage>::new(
cluster_info,
&epoch_staked_nodes.unwrap_or_default(),
);
{
let mut sr = shreds_received.lock().unwrap();
sr.0.clear();
sr.1.reset();
}
}
let r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap();
let cluster_nodes = cluster_nodes.read().unwrap();
let mut peers_len = 0;
epoch_cache_update.stop();
@ -405,52 +366,19 @@ fn retransmit(
}
let mut compute_turbine_peers = Measure::start("turbine_start");
let stakes_and_index = get_retransmit_peers(
my_id,
shred_slot,
leader_schedule_cache,
r_bank.deref(),
r_epoch_stakes_cache.deref(),
);
let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
&r_epoch_stakes_cache.peers,
&stakes_and_index,
packet.meta.seed,
);
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(r_bank.deref()));
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader);
// If the node is on the critical path (i.e. the first node in each
// neighborhood), then we expect that the packet arrives at tvu socket
// as opposed to tvu-forwards. If this is not the case, then the
// turbine broadcast/retransmit tree is mismatched across nodes.
let anchor_node = my_index % DATA_PLANE_FANOUT == 0;
let anchor_node = neighbors[0].id == my_id;
if packet.meta.forward == anchor_node {
// TODO: Consider forwarding the packet to the root node here.
retransmit_tree_mismatch += 1;
}
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
// split off the indexes, we don't need the stakes anymore
let indexes: Vec<_> = shuffled_stakes_and_index
.into_iter()
.map(|(_, index)| index)
.collect();
debug_assert_eq!(my_id, r_epoch_stakes_cache.peers[indexes[my_index]].id);
let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes);
let neighbors: Vec<_> = neighbors
.into_iter()
.filter_map(|index| {
let peer = &r_epoch_stakes_cache.peers[index];
if peer.id == my_id {
None
} else {
Some(peer)
}
})
.collect();
let children: Vec<_> = children
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
peers_len = peers_len.max(cluster_nodes.num_peers());
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us();
@ -465,7 +393,13 @@ fn retransmit(
// children and also tvu_forward socket of its neighbors. Otherwise it
// should only forward to tvu_forward socket of its children.
if anchor_node {
ClusterInfo::retransmit_to(&neighbors, packet, sock, /*forward socket=*/ true);
// First neighbor is this node itself, so skip it.
ClusterInfo::retransmit_to(
&neighbors[1..],
packet,
sock,
/*forward socket=*/ true,
);
}
ClusterInfo::retransmit_to(
&children,
@ -535,7 +469,7 @@ pub fn retransmitter(
let r = r.clone();
let cluster_info = cluster_info.clone();
let stats = stats.clone();
let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default()));
let cluster_nodes = Arc::default();
let last_peer_update = Arc::new(AtomicU64::new(0));
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
@ -555,7 +489,7 @@ pub fn retransmitter(
&sockets[s],
s as u32,
&stats,
&epoch_stakes_cache,
&cluster_nodes,
&last_peer_update,
&shreds_received,
&max_slots,

View File

@ -1325,80 +1325,6 @@ impl ClusterInfo {
|| !ContactInfo::is_valid_address(&contact_info.tvu)
}
fn sorted_stakes_with_index(
peers: &[ContactInfo],
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Vec<(u64, usize)> {
let stakes_and_index: Vec<_> = peers
.iter()
.enumerate()
.map(|(i, c)| {
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
// assumed to be missing entry. So let's make sure stake weights are atleast 1
let stake = 1.max(
stakes
.as_ref()
.map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)),
);
(stake, i)
})
.sorted_by(|(l_stake, l_info), (r_stake, r_info)| {
if r_stake == l_stake {
peers[*r_info].id.cmp(&peers[*l_info].id)
} else {
r_stake.cmp(l_stake)
}
})
.collect();
stakes_and_index
}
fn stake_weighted_shuffle(
stakes_and_index: &[(u64, usize)],
seed: [u8; 32],
) -> Vec<(u64, usize)> {
let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect();
let shuffle = weighted_shuffle(&stake_weights, seed);
shuffle.iter().map(|x| stakes_and_index[*x]).collect()
}
// Return sorted_retransmit_peers(including self) and their stakes
pub fn sorted_retransmit_peers_and_stakes(
&self,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = self.tvu_peers();
// insert "self" into this list for the layer and neighborhood computation
peers.push(self.my_contact_info());
let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes);
(peers, stakes_and_index)
}
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
pub fn shuffle_peers_and_index(
id: &Pubkey,
peers: &[ContactInfo],
stakes_and_index: &[(u64, usize)],
seed: [u8; 32],
) -> (usize, Vec<(u64, usize)>) {
let shuffled_stakes_and_index = ClusterInfo::stake_weighted_shuffle(stakes_and_index, seed);
let self_index = shuffled_stakes_and_index
.iter()
.enumerate()
.find_map(|(i, (_stake, index))| {
if peers[*index].id == *id {
Some(i)
} else {
None
}
})
.unwrap();
(self_index, shuffled_stakes_and_index)
}
/// compute broadcast table
pub fn tpu_peers(&self) -> Vec<ContactInfo> {
let self_pubkey = self.id();
@ -3071,14 +2997,6 @@ pub fn push_messages_to_peer(
Ok(())
}
pub fn stake_weight_peers(
peers: &mut Vec<ContactInfo>,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Vec<(u64, usize)> {
peers.dedup();
ClusterInfo::sorted_stakes_with_index(peers, stakes)
}
// Filters out values from nodes with different shred-version.
fn filter_on_shred_version(
mut msg: Protocol,
@ -4061,15 +3979,6 @@ mod tests {
assert_ne!(contact_info.shred_version, d.shred_version);
cluster_info.insert_info(contact_info);
stakes.insert(id4, 10);
let mut peers = cluster_info.tvu_peers();
let peers_and_stakes = stake_weight_peers(&mut peers, Some(&stakes));
assert_eq!(peers.len(), 2);
assert_eq!(peers[0].id, id);
assert_eq!(peers[1].id, id2);
assert_eq!(peers_and_stakes.len(), 2);
assert_eq!(peers_and_stakes[0].0, 10);
assert_eq!(peers_and_stakes[1].0, 1);
}
#[test]

View File

@ -105,7 +105,7 @@ impl ContactInfo {
}
/// New random ContactInfo for tests and simulations.
pub(crate) fn new_rand<R: rand::Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
pub fn new_rand<R: rand::Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let delay = 10 * 60 * 1000; // 10 minutes
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);

View File

@ -1,4 +1,11 @@
use solana_sdk::clock::Slot;
use {
crate::{
cluster_info::ClusterInfo, contact_info::ContactInfo, weighted_shuffle::weighted_shuffle,
},
itertools::Itertools,
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::collections::HashMap,
};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, AbiExample, AbiEnumVisitor)]
enum CompressionType {
@ -19,3 +26,74 @@ pub(crate) struct EpochIncompleteSlots {
compression: CompressionType,
compressed_list: Vec<u8>,
}
// Legacy methods copied for testing backward compatibility.
pub fn sorted_retransmit_peers_and_stakes(
cluster_info: &ClusterInfo,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = cluster_info.tvu_peers();
// insert "self" into this list for the layer and neighborhood computation
peers.push(cluster_info.my_contact_info());
let stakes_and_index = sorted_stakes_with_index(&peers, stakes);
(peers, stakes_and_index)
}
pub fn sorted_stakes_with_index(
peers: &[ContactInfo],
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Vec<(u64, usize)> {
let stakes_and_index: Vec<_> = peers
.iter()
.enumerate()
.map(|(i, c)| {
// For stake weighted shuffle a valid weight is atleast 1. Weight 0 is
// assumed to be missing entry. So let's make sure stake weights are atleast 1
let stake = 1.max(
stakes
.as_ref()
.map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)),
);
(stake, i)
})
.sorted_by(|(l_stake, l_info), (r_stake, r_info)| {
if r_stake == l_stake {
peers[*r_info].id.cmp(&peers[*l_info].id)
} else {
r_stake.cmp(l_stake)
}
})
.collect();
stakes_and_index
}
pub fn shuffle_peers_and_index(
id: &Pubkey,
peers: &[ContactInfo],
stakes_and_index: &[(u64, usize)],
seed: [u8; 32],
) -> (usize, Vec<(u64, usize)>) {
let shuffled_stakes_and_index = stake_weighted_shuffle(stakes_and_index, seed);
let self_index = shuffled_stakes_and_index
.iter()
.enumerate()
.find_map(|(i, (_stake, index))| {
if peers[*index].id == *id {
Some(i)
} else {
None
}
})
.unwrap();
(self_index, shuffled_stakes_and_index)
}
fn stake_weighted_shuffle(stakes_and_index: &[(u64, usize)], seed: [u8; 32]) -> Vec<(u64, usize)> {
let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect();
let shuffle = weighted_shuffle(&stake_weights, seed);
shuffle.iter().map(|x| stakes_and_index[*x]).collect()
}

View File

@ -13,7 +13,7 @@ pub mod crds_gossip_push;
pub mod crds_shards;
pub mod crds_value;
pub mod data_budget;
mod deprecated;
pub mod deprecated;
pub mod duplicate_shred;
pub mod epoch_slots;
pub mod gossip_error;

View File

@ -5,6 +5,7 @@ use {
solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo},
contact_info::ContactInfo,
deprecated::{shuffle_peers_and_index, sorted_retransmit_peers_and_stakes},
},
solana_sdk::pubkey::Pubkey,
std::{
@ -118,14 +119,13 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
.map(|i| {
let mut seed = [0; 32];
seed[0..4].copy_from_slice(&i.to_le_bytes());
// TODO: Ideally these should use the new methods in
// solana_core::cluster_nodes, however that would add build
// dependency on solana_core which is not desired.
let (peers, stakes_and_index) =
cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes));
let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index(
&cluster_info.id(),
&peers,
&stakes_and_index,
seed,
);
sorted_retransmit_peers_and_stakes(&cluster_info, Some(&staked_nodes));
let (_, shuffled_stakes_and_indexes) =
shuffle_peers_and_index(&cluster_info.id(), &peers, &stakes_and_index, seed);
shuffled_stakes_and_indexes
.into_iter()
.map(|(_, i)| peers[i].clone())