Move avalanche logic to ClusterInfo

The simulator doesn't depend on RetransmitStage. It depends on
just one function, which is similar in spirit to many of the
methods in ClusterInfo.
This commit is contained in:
Greg Fitzgerald 2019-02-18 09:51:00 -07:00 committed by Grimes
parent 1c3f2bba6d
commit 0317583489
3 changed files with 54 additions and 53 deletions

View File

@ -26,6 +26,7 @@ use crate::result::Result;
use crate::rpc_service::RPC_PORT;
use crate::streamer::{BlobReceiver, BlobSender};
use bincode::{deserialize, serialize};
use core::cmp;
use hashbrown::HashMap;
use log::Level;
use rand::{thread_rng, Rng};
@ -1259,6 +1260,55 @@ impl ClusterInfo {
}
}
/// Avalanche logic
/// 1 - For the current node find out if it is in layer 1
/// 1.1 - If yes, then broadcast to all layer 1 nodes
/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size
/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them
/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance)
pub fn compute_retransmit_peers(
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
fanout: usize,
hood_size: usize,
grow: bool,
) -> (Vec<NodeInfo>, Vec<NodeInfo>) {
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank);
let my_id = cluster_info.read().unwrap().id();
//calc num_layers and num_neighborhoods using the total number of nodes
let (num_layers, layer_indices) =
ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow);
if num_layers <= 1 {
/* single layer data plane */
(peers, vec![])
} else {
//find my index (my ix is the same as the first node with smaller stake)
let my_index = peers
.iter()
.position(|ci| bank.get_balance(&ci.id) <= bank.get_balance(&my_id));
//find my layer
let locality = ClusterInfo::localize(
&layer_indices,
hood_size,
my_index.unwrap_or(peers.len() - 1),
);
let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len());
let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec();
let mut children = Vec::new();
for ix in locality.child_layer_peers {
if let Some(peer) = peers.get(ix) {
children.push(peer.clone());
continue;
}
break;
}
(neighbors, children)
}
}
#[derive(Debug)]
pub struct Sockets {
pub gossip: UdpSocket,

View File

@ -3,7 +3,8 @@
use crate::bank::Bank;
use crate::blocktree::Blocktree;
use crate::cluster_info::{
ClusterInfo, NodeInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE,
compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY,
NEIGHBORHOOD_SIZE,
};
use crate::counter::Counter;
use crate::leader_scheduler::LeaderScheduler;
@ -12,7 +13,6 @@ use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::BlobReceiver;
use crate::window_service::WindowService;
use core::cmp;
use log::Level;
use solana_metrics::{influxdb, submit};
use std::net::UdpSocket;
@ -23,55 +23,6 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
/// Avalanche logic
/// 1 - For the current node find out if it is in layer 1
/// 1.1 - If yes, then broadcast to all layer 1 nodes
/// 1 - using the layer 1 index, broadcast to all layer 2 nodes assuming you know neighborhood size
/// 1.2 - If no, then figure out what layer the node is in and who the neighbors are and only broadcast to them
/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance)
pub fn compute_retransmit_peers(
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
fanout: usize,
hood_size: usize,
grow: bool,
) -> (Vec<NodeInfo>, Vec<NodeInfo>) {
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank);
let my_id = cluster_info.read().unwrap().id();
//calc num_layers and num_neighborhoods using the total number of nodes
let (num_layers, layer_indices) =
ClusterInfo::describe_data_plane(peers.len(), fanout, hood_size, grow);
if num_layers <= 1 {
/* single layer data plane */
(peers, vec![])
} else {
//find my index (my ix is the same as the first node with smaller stake)
let my_index = peers
.iter()
.position(|ci| bank.get_balance(&ci.id) <= bank.get_balance(&my_id));
//find my layer
let locality = ClusterInfo::localize(
&layer_indices,
hood_size,
my_index.unwrap_or(peers.len() - 1),
);
let upper_bound = cmp::min(locality.neighbor_bounds.1, peers.len());
let neighbors = peers[locality.neighbor_bounds.0..upper_bound].to_vec();
let mut children = Vec::new();
for ix in locality.child_layer_peers {
if let Some(peer) = peers.get(ix) {
children.push(peer.clone());
continue;
}
break;
}
(neighbors, children)
}
}
fn retransmit(
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,

View File

@ -2,11 +2,11 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use solana::bank::Bank;
use solana::cluster_info::{
ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE,
compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT, GROW_LAYER_CAPACITY,
NEIGHBORHOOD_SIZE,
};
use solana::contact_info::ContactInfo;
use solana::genesis_block::GenesisBlock;
use solana::retransmit_stage::compute_retransmit_peers;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::{HashMap, HashSet};