From 13d018e3e12a6e2dab47e3a625f8ecdc81dfa920 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 20 Feb 2019 21:38:16 -0800 Subject: [PATCH] Fix stake selection for the Data Plane (#2863) * Update data-plane to use stakes instead of a bank directly * Rename get_stakes to staked_nodes --- runtime/src/bank.rs | 11 +++++------ src/broadcast_service.rs | 5 ++++- src/cluster_info.rs | 28 +++++++++++++++++----------- src/retransmit_stage.rs | 2 +- tests/cluster_info.rs | 24 ++++++------------------ 5 files changed, 33 insertions(+), 37 deletions(-) diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 1a1c4ae37f..ce22f6c3ca 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -597,14 +597,13 @@ impl Bank { .collect() } - /// Collect all the stakes into a Map keyed on the Node id. - pub fn get_stakes(&self) -> HashMap { - let map: HashMap<_, _> = self - .vote_states(|_| true) + /// Collect the node Pubkey and staker account balance for nodes + /// that have non-zero balance in their corresponding staker accounts + pub fn staked_nodes(&self) -> HashMap { + self.vote_states(|state| self.get_balance(&state.staker_id) > 0) .iter() .map(|state| (state.node_id, self.get_balance(&state.staker_id))) - .collect(); - map + .collect() } pub fn tick_height(&self) -> u64 { diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index a3f43a29c7..057e86ae93 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -227,7 +227,10 @@ impl BroadcastService { if exit_signal.load(Ordering::Relaxed) { return BroadcastServiceReturnType::ExitSignal; } - let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank); + let mut broadcast_table = cluster_info + .read() + .unwrap() + .sorted_tvu_peers(&bank.staked_nodes()); // Layer 1, leader nodes are limited to the fanout size. broadcast_table.truncate(DATA_PLANE_FANOUT); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); diff --git a/src/cluster_info.rs b/src/cluster_info.rs index bedcbf4cd5..92955975e4 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -365,28 +365,34 @@ impl ClusterInfo { .collect() } - fn sort_by_stake(peers: &[NodeInfo], bank: &Arc) -> Vec<(u64, NodeInfo)> { + fn sort_by_stake( + peers: &[NodeInfo], + stakes: &HashMap, + ) -> Vec<(u64, NodeInfo)> { let mut peers_with_stakes: Vec<_> = peers .iter() - .map(|c| (bank.get_balance(&c.id), c.clone())) + .map(|c| (*stakes.get(&c.id).unwrap_or(&0), c.clone())) .collect(); peers_with_stakes.sort_unstable(); peers_with_stakes.reverse(); peers_with_stakes } - pub fn sorted_retransmit_peers(&self, bank: &Arc) -> Vec { + pub fn sorted_retransmit_peers( + &self, + stakes: &HashMap, + ) -> Vec { let peers = self.retransmit_peers(); - let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, bank); + let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes); peers_with_stakes .iter() .map(|(_, peer)| (*peer).clone()) .collect() } - pub fn sorted_tvu_peers(&self, bank: &Arc) -> Vec { + pub fn sorted_tvu_peers(&self, stakes: &HashMap) -> Vec { let peers = self.tvu_peers(); - let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, bank); + let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes); peers_with_stakes .iter() .map(|(_, peer)| (*peer).clone()) @@ -865,7 +871,7 @@ impl ClusterInfo { loop { let start = timestamp(); let stakes: HashMap<_, _> = match bank { - Some(ref bank) => bank.get_stakes(), + Some(ref bank) => bank.staked_nodes(), None => HashMap::new(), }; let _ = Self::run_gossip(&obj, &stakes, &blob_sender); @@ -1282,14 +1288,14 @@ impl ClusterInfo { /// 1 - also check if there are nodes in lower layers and repeat the layer 1 to layer 2 logic /// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) -pub fn compute_retransmit_peers( - bank: &Arc, +pub fn compute_retransmit_peers( + stakes: &HashMap, cluster_info: &Arc>, fanout: usize, hood_size: usize, grow: bool, ) -> (Vec, Vec) { - let peers = cluster_info.read().unwrap().sorted_retransmit_peers(bank); + let peers = cluster_info.read().unwrap().sorted_retransmit_peers(stakes); let my_id = cluster_info.read().unwrap().id(); //calc num_layers and num_neighborhoods using the total number of nodes let (num_layers, layer_indices) = @@ -1302,7 +1308,7 @@ pub fn compute_retransmit_peers( //find my index (my ix is the same as the first node with smaller stake) let my_index = peers .iter() - .position(|ci| bank.get_balance(&ci.id) <= bank.get_balance(&my_id)); + .position(|ci| *stakes.get(&ci.id).unwrap_or(&0) <= *stakes.get(&my_id).unwrap_or(&0)); //find my layer let locality = ClusterInfo::localize( &layer_indices, diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 74ac89eb95..78a8277c09 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -41,7 +41,7 @@ fn retransmit( .to_owned(), ); let (neighbors, children) = compute_retransmit_peers( - &bank, + &bank.staked_nodes(), cluster_info, DATA_PLANE_FANOUT, NEIGHBORHOOD_SIZE, diff --git a/tests/cluster_info.rs b/tests/cluster_info.rs index b9e434f6ef..35ea99d4c9 100644 --- a/tests/cluster_info.rs +++ b/tests/cluster_info.rs @@ -1,3 +1,4 @@ +use hashbrown::{HashMap, HashSet}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::*; use solana::cluster_info::{ @@ -5,11 +6,8 @@ use solana::cluster_info::{ NEIGHBORHOOD_SIZE, }; use solana::contact_info::ContactInfo; -use solana_runtime::bank::Bank; -use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; -use std::collections::{HashMap, HashSet}; use std::sync::mpsc::channel; use std::sync::mpsc::TryRecvError; use std::sync::mpsc::{Receiver, Sender}; @@ -37,19 +35,13 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { // set timeout to 5 minutes let timeout = 60 * 5; - // math yo - let required_balance = num_nodes * (num_nodes + 1) / 2; - - // create a genesis block - let (genesis_block, mint_keypair) = GenesisBlock::new(required_balance); - // describe the leader let leader_info = ContactInfo::new_localhost(Keypair::new().pubkey(), 0); let mut cluster_info = ClusterInfo::new(leader_info.clone()); cluster_info.set_leader(leader_info.id); - // create a bank - let bank = Arc::new(Bank::new(&genesis_block)); + // setup stakes + let mut stakes = HashMap::new(); // setup accounts for all nodes (leader has 0 bal) let (s, r) = channel(); @@ -69,8 +61,7 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { //distribute neighbors across threads to maximize parallel compute let batch_ix = *i as usize % batches.len(); let node = ContactInfo::new_localhost(Keypair::new().pubkey(), 0); - bank.transfer(*i, &mint_keypair, node.id, bank.last_id()) - .unwrap(); + stakes.insert(node.id, *i); cluster_info.insert_info(node.clone()); let (s, r) = channel(); batches @@ -82,14 +73,11 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { }); let c_info = cluster_info.clone(); - // check that all tokens have been exhausted - assert_eq!(bank.get_balance(&mint_keypair.pubkey()), 0); - // create some "blobs". let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect(); // pretend to broadcast from leader - cluster_info::create_broadcast_orders - let mut broadcast_table = cluster_info.sorted_tvu_peers(&bank); + let mut broadcast_table = cluster_info.sorted_tvu_peers(&stakes); broadcast_table.truncate(fanout); let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table); @@ -119,7 +107,7 @@ fn run_simulation(num_nodes: u64, fanout: usize, hood_size: usize) { cluster.gossip.set_self(*id); if !mapped_peers.contains_key(id) { let (neighbors, children) = compute_retransmit_peers( - &bank, + &stakes, &Arc::new(RwLock::new(cluster.clone())), fanout, hood_size,