diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 8c6e2f600..c44b4963b 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -26,6 +26,7 @@ use crate::result::Result; use crate::rpc_service::RPC_PORT; use crate::streamer::{BlobReceiver, BlobSender}; use bincode::{deserialize, serialize}; +use core::cmp; use hashbrown::HashMap; use log::Level; use rand::{thread_rng, Rng}; @@ -1259,6 +1260,55 @@ impl ClusterInfo { } } +/// Avalanche logic +/// 1 - For the current node find out if it is in layer 1 +/// 1.1 - If yes, then broadcast to all layer 1 nodes +/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size +/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them +/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic + +/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) +pub fn compute_retransmit_peers( + bank: &Arc, + cluster_info: &Arc>, + fanout: usize, + hood_size: usize, + grow: bool, +) -> (Vec, Vec) { + let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank); + let my_id = cluster_info.read().unwrap().id(); + //calc num_layers and num_neighborhoods using the total number of nodes + let (num_layers, layer_indices) = + ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow); + + if num_layers <= 1 { + /* single layer data plane */ + (peers, vec![]) + } else { + //find my index (my ix is the same as the first node with smaller stake) + let my_index = peers + .iter() + .position(|ci| bank.get_balance(&ci.id) <= bank.get_balance(&my_id)); + //find my layer + let locality = ClusterInfo::localize( + &layer_indices, + hood_size, + my_index.unwrap_or(peers.len() - 1), + ); + let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len()); + let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); + let mut children = Vec::new(); + for ix in locality.child_layer_peers { + if let Some(peer) = peers.get(ix) { + children.push(peer.clone()); + continue; + } + break; + } + (neighbors, children) + } +} + #[derive(Debug)] pub struct Sockets { pub gossip: UdpSocket, diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 5e65d70df..b44f91f10 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -3,7 +3,8 @@ use crate::bank::Bank; use crate::blocktree::Blocktree; use crate::cluster_info::{ - ClusterInfo, NodeInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE, + compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, + NEIGHBORHOOD_SIZE, }; use crate::counter::Counter; use crate::leader_scheduler::LeaderScheduler; @@ -12,7 +13,6 @@ use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::BlobReceiver; use crate::window_service::WindowService; -use core::cmp; use log::Level; use solana_metrics::{influxdb, submit}; use std::net::UdpSocket; @@ -23,55 +23,6 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -/// Avalanche logic -/// 1 - For the current node find out if it is in layer 1 -/// 1.1 - If yes, then broadcast to all layer 1 nodes -/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size -/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them -/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic - -/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) -pub fn compute_retransmit_peers( - bank: &Arc, - cluster_info: &Arc>, - fanout: usize, - hood_size: usize, - grow: bool, -) -> (Vec, Vec) { - let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank); - let my_id = cluster_info.read().unwrap().id(); - //calc num_layers and num_neighborhoods using the total number of nodes - let (num_layers, layer_indices) = - ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow); - - if num_layers <= 1 { - /* single layer data plane */ - (peers, vec![]) - } else { - //find my index (my ix is the same as the first node with smaller stake) - let my_index = peers - .iter() - .position(|ci| bank.get_balance(&ci.id) <= bank.get_balance(&my_id)); - //find my layer - let locality = ClusterInfo::localize( - &layer_indices, - hood_size, - my_index.unwrap_or(peers.len() - 1), - ); - let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len()); - let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec(); - let mut children = Vec::new(); - for ix in locality.child_layer_peers { - if let Some(peer) = peers.get(ix) { - children.push(peer.clone()); - continue; - } - break; - } - (neighbors, children) - } -} - fn retransmit( bank: &Arc, cluster_info: &Arc>, diff --git a/tests/cluster_info.rs b/tests/cluster_info.rs index 42cf7008a..939716643 100644 --- a/tests/cluster_info.rs +++ b/tests/cluster_info.rs @@ -2,11 +2,11 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::*; use solana::bank::Bank; use solana::cluster_info::{ - ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE, + compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, + NEIGHBORHOOD_SIZE, }; use solana::contact_info::ContactInfo; use solana::genesis_block::GenesisBlock; -use solana::retransmit_stage::compute_retransmit_peers; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::collections::{HashMap, HashSet};