Fix stake selection for the Data Plane (#2863)

* Update data-plane to use stakes instead of a bank directly

* Rename get_stakes to staked_nodes
This commit is contained in:
Sagar Dhawan 2019-02-20 21:38:16 -08:00 committed by GitHub
parent 59ee2b8892
commit 13d018e3e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 37 deletions

View File

@ -597,14 +597,13 @@ impl Bank {
.collect()
}
/// Collect all the stakes into a Map keyed on the Node id.
pub fn get_stakes(&self) -> HashMap<Pubkey, u64> {
let map: HashMap<_, _> = self
.vote_states(|_| true)
/// Collect the node Pubkey and staker account balance for nodes
/// that have non-zero balance in their corresponding staker accounts
pub fn staked_nodes(&self) -> HashMap<Pubkey, u64> {
self.vote_states(|state| self.get_balance(&state.staker_id) > 0)
.iter()
.map(|state| (state.node_id, self.get_balance(&state.staker_id)))
.collect();
map
.collect()
}
pub fn tick_height(&self) -> u64 {

View File

@ -227,7 +227,10 @@ impl BroadcastService {
if exit_signal.load(Ordering::Relaxed) {
return BroadcastServiceReturnType::ExitSignal;
}
let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank);
let mut broadcast_table = cluster_info
.read()
.unwrap()
.sorted_tvu_peers(&bank.staked_nodes());
// Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);

View File

@ -365,28 +365,34 @@ impl ClusterInfo {
.collect()
}
fn sort_by_stake(peers: &[NodeInfo], bank: &Arc<Bank>) -> Vec<(u64, NodeInfo)> {
fn sort_by_stake<S: std::hash::BuildHasher>(
peers: &[NodeInfo],
stakes: &HashMap<Pubkey, u64, S>,
) -> Vec<(u64, NodeInfo)> {
let mut peers_with_stakes: Vec<_> = peers
.iter()
.map(|c| (bank.get_balance(&c.id), c.clone()))
.map(|c| (*stakes.get(&c.id).unwrap_or(&0), c.clone()))
.collect();
peers_with_stakes.sort_unstable();
peers_with_stakes.reverse();
peers_with_stakes
}
pub fn sorted_retransmit_peers(&self, bank: &Arc<Bank>) -> Vec<NodeInfo> {
pub fn sorted_retransmit_peers<S: std::hash::BuildHasher>(
&self,
stakes: &HashMap<Pubkey, u64, S>,
) -> Vec<NodeInfo> {
let peers = self.retransmit_peers();
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, bank);
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes);
peers_with_stakes
.iter()
.map(|(_, peer)| (*peer).clone())
.collect()
}
pub fn sorted_tvu_peers(&self, bank: &Arc<Bank>) -> Vec<NodeInfo> {
pub fn sorted_tvu_peers(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<NodeInfo> {
let peers = self.tvu_peers();
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, bank);
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes);
peers_with_stakes
.iter()
.map(|(_, peer)| (*peer).clone())
@ -865,7 +871,7 @@ impl ClusterInfo {
loop {
let start = timestamp();
let stakes: HashMap<_, _> = match bank {
Some(ref bank) => bank.get_stakes(),
Some(ref bank) => bank.staked_nodes(),
None => HashMap::new(),
};
let _ = Self::run_gossip(&obj, &stakes, &blob_sender);
@ -1282,14 +1288,14 @@ impl ClusterInfo {
/// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance)
pub fn compute_retransmit_peers(
bank: &Arc<Bank>,
pub fn compute_retransmit_peers<S: std::hash::BuildHasher>(
stakes: &HashMap<Pubkey, u64, S>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
fanout: usize,
hood_size: usize,
grow: bool,
) -> (Vec<NodeInfo>, Vec<NodeInfo>) {
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank);
let peers = cluster_info.read().unwrap().sorted_retransmit_peers(stakes);
let my_id = cluster_info.read().unwrap().id();
//calc num_layers and num_neighborhoods using the total number of nodes
let (num_layers, layer_indices) =
@ -1302,7 +1308,7 @@ pub fn compute_retransmit_peers(
//find my index (my ix is the same as the first node with smaller stake)
let my_index = peers
.iter()
.position(|ci| bank.get_balance(&ci.id) <= bank.get_balance(&my_id));
.position(|ci| *stakes.get(&ci.id).unwrap_or(&0) <= *stakes.get(&my_id).unwrap_or(&0));
//find my layer
let locality = ClusterInfo::localize(
&layer_indices,

View File

@ -41,7 +41,7 @@ fn retransmit(
.to_owned(),
);
let (neighbors, children) = compute_retransmit_peers(
&bank,
&bank.staked_nodes(),
cluster_info,
DATA_PLANE_FANOUT,
NEIGHBORHOOD_SIZE,

View File

@ -1,3 +1,4 @@
use hashbrown::{HashMap, HashSet};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use solana::cluster_info::{
@ -5,11 +6,8 @@ use solana::cluster_info::{
NEIGHBORHOOD_SIZE,
};
use solana::contact_info::ContactInfo;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::channel;
use std::sync::mpsc::TryRecvError;
use std::sync::mpsc::{Receiver, Sender};
@ -37,19 +35,13 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
// set timeout to 5 minutes
let timeout = 60 * 5;
// math yo
let required_balance = num_nodes * (num_nodes + 1) / 2;
// create a genesis block
let (genesis_block, mint_keypair) = GenesisBlock::new(required_balance);
// describe the leader
let leader_info = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
let mut cluster_info = ClusterInfo::new(leader_info.clone());
cluster_info.set_leader(leader_info.id);
// create a bank
let bank = Arc::new(Bank::new(&genesis_block));
// setup stakes
let mut stakes = HashMap::new();
// setup accounts for all nodes (leader has 0 bal)
let (s, r) = channel();
@ -69,8 +61,7 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
//distribute neighbors across threads to maximize parallel compute
let batch_ix = *i as usize % batches.len();
let node = ContactInfo::new_localhost(Keypair::new().pubkey(), 0);
bank.transfer(*i, &mint_keypair, node.id, bank.last_id())
.unwrap();
stakes.insert(node.id, *i);
cluster_info.insert_info(node.clone());
let (s, r) = channel();
batches
@ -82,14 +73,11 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
});
let c_info = cluster_info.clone();
// check that all tokens have been exhausted
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0);
// create some "blobs".
let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect();
// pretend to broadcast from leader - cluster_info::create_broadcast_orders
let mut broadcast_table = cluster_info.sorted_tvu_peers(&bank);
let mut broadcast_table = cluster_info.sorted_tvu_peers(&stakes);
broadcast_table.truncate(fanout);
let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table);
@ -119,7 +107,7 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) {
cluster.gossip.set_self(*id);
if !mapped_peers.contains_key(id) {
let (neighbors, children) = compute_retransmit_peers(
&bank,
&stakes,
&Arc::new(RwLock::new(cluster.clone())),
fanout,
hood_size,