rework staking_utils (#4283)

This commit is contained in:
Rob Walker 2019-05-14 16:15:51 -07:00 committed by GitHub
parent 18b386cd10
commit 0c1191c3ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 68 additions and 80 deletions

View File

@ -72,9 +72,11 @@ impl Broadcast {
} }
let bank_epoch = bank.get_stakers_epoch(bank.slot()); let bank_epoch = bank.get_stakers_epoch(bank.slot());
let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers( let mut broadcast_table = cluster_info
&staking_utils::delegated_stakes_at_epoch(&bank, bank_epoch).unwrap(), .read()
); .unwrap()
.sorted_tvu_peers(staking_utils::staked_nodes_at_epoch(&bank, bank_epoch).as_ref());
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1); inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
// Layer 1, leader nodes are limited to the fanout size. // Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT); broadcast_table.truncate(DATA_PLANE_FANOUT);

View File

@ -424,11 +424,16 @@ impl ClusterInfo {
fn sort_by_stake<S: std::hash::BuildHasher>( fn sort_by_stake<S: std::hash::BuildHasher>(
peers: &[ContactInfo], peers: &[ContactInfo],
stakes: &HashMap<Pubkey, u64, S>, stakes: Option<&HashMap<Pubkey, u64, S>>,
) -> Vec<(u64, ContactInfo)> { ) -> Vec<(u64, ContactInfo)> {
let mut peers_with_stakes: Vec<_> = peers let mut peers_with_stakes: Vec<_> = peers
.iter() .iter()
.map(|c| (*stakes.get(&c.id).unwrap_or(&0), c.clone())) .map(|c| {
(
stakes.map_or(0, |stakes| *stakes.get(&c.id).unwrap_or(&0)),
c.clone(),
)
})
.collect(); .collect();
peers_with_stakes.sort_unstable_by(|(l_stake, l_info), (r_stake, r_info)| { peers_with_stakes.sort_unstable_by(|(l_stake, l_info), (r_stake, r_info)| {
if r_stake == l_stake { if r_stake == l_stake {
@ -444,7 +449,7 @@ impl ClusterInfo {
/// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list /// Return sorted Retransmit peers and index of `Self.id()` as if it were in that list
fn sorted_peers_and_index<S: std::hash::BuildHasher>( fn sorted_peers_and_index<S: std::hash::BuildHasher>(
&self, &self,
stakes: &HashMap<Pubkey, u64, S>, stakes: Option<&HashMap<Pubkey, u64, S>>,
) -> (usize, Vec<ContactInfo>) { ) -> (usize, Vec<ContactInfo>) {
let mut peers = self.retransmit_peers(); let mut peers = self.retransmit_peers();
peers.push(self.lookup(&self.id()).unwrap().clone()); peers.push(self.lookup(&self.id()).unwrap().clone());
@ -465,7 +470,7 @@ impl ClusterInfo {
(index, peers) (index, peers)
} }
pub fn sorted_tvu_peers(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<ContactInfo> { pub fn sorted_tvu_peers(&self, stakes: Option<&HashMap<Pubkey, u64>>) -> Vec<ContactInfo> {
let peers = self.tvu_peers(); let peers = self.tvu_peers();
let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes); let peers_with_stakes: Vec<_> = ClusterInfo::sort_by_stake(&peers, stakes);
peers_with_stakes peers_with_stakes
@ -945,9 +950,9 @@ impl ClusterInfo {
loop { loop {
let start = timestamp(); let start = timestamp();
let stakes: HashMap<_, _> = match bank_forks { let stakes: HashMap<_, _> = match bank_forks {
Some(ref bank_forks) => staking_utils::delegated_stakes( Some(ref bank_forks) => {
&bank_forks.read().unwrap().working_bank(), staking_utils::staked_nodes(&bank_forks.read().unwrap().working_bank())
), }
None => HashMap::new(), None => HashMap::new(),
}; };
let _ = Self::run_gossip(&obj, &stakes, &blob_sender); let _ = Self::run_gossip(&obj, &stakes, &blob_sender);
@ -1421,7 +1426,7 @@ impl ClusterInfo {
/// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance) /// Returns Neighbor Nodes and Children Nodes `(neighbors, children)` for a given node based on its stake (Bank Balance)
pub fn compute_retransmit_peers<S: std::hash::BuildHasher>( pub fn compute_retransmit_peers<S: std::hash::BuildHasher>(
stakes: &HashMap<Pubkey, u64, S>, stakes: Option<&HashMap<Pubkey, u64, S>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
fanout: usize, fanout: usize,
) -> (Vec<ContactInfo>, Vec<ContactInfo>) { ) -> (Vec<ContactInfo>, Vec<ContactInfo>) {

View File

@ -6,7 +6,7 @@ use solana_sdk::timing::NUM_CONSECUTIVE_LEADER_SLOTS;
/// Return the leader schedule for the given epoch. /// Return the leader schedule for the given epoch.
pub fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option<LeaderSchedule> { pub fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option<LeaderSchedule> {
staking_utils::delegated_stakes_at_epoch(bank, epoch_height).map(|stakes| { staking_utils::staked_nodes_at_epoch(bank, epoch_height).map(|stakes| {
let mut seed = [0u8; 32]; let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch_height.to_le_bytes()); seed[0..8].copy_from_slice(&epoch_height.to_le_bytes());
let mut stakes: Vec<_> = stakes.into_iter().collect(); let mut stakes: Vec<_> = stakes.into_iter().collect();
@ -68,7 +68,7 @@ mod tests {
create_genesis_block_with_leader(0, &pubkey, BOOTSTRAP_LEADER_LAMPORTS).0; create_genesis_block_with_leader(0, &pubkey, BOOTSTRAP_LEADER_LAMPORTS).0;
let bank = Bank::new(&genesis_block); let bank = Bank::new(&genesis_block);
let ids_and_stakes: Vec<_> = staking_utils::delegated_stakes(&bank).into_iter().collect(); let ids_and_stakes: Vec<_> = staking_utils::staked_nodes(&bank).into_iter().collect();
let seed = [0u8; 32]; let seed = [0u8; 32];
let leader_schedule = LeaderSchedule::new( let leader_schedule = LeaderSchedule::new(
&ids_and_stakes, &ids_and_stakes,

View File

@ -360,7 +360,7 @@ impl Locktower {
fn initialize_lockouts_from_bank(bank: &Bank, current_epoch: u64) -> VoteState { fn initialize_lockouts_from_bank(bank: &Bank, current_epoch: u64) -> VoteState {
let mut lockouts = VoteState::default(); let mut lockouts = VoteState::default();
if let Some(iter) = staking_utils::node_staked_accounts_at_epoch(&bank, current_epoch) { if let Some(iter) = bank.epoch_vote_accounts(current_epoch) {
for (delegate_id, (_, account)) in iter { for (delegate_id, (_, account)) in iter {
if *delegate_id == bank.collector_id() { if *delegate_id == bank.collector_id() {
let state = VoteState::deserialize(&account.data).expect("votes"); let state = VoteState::deserialize(&account.data).expect("votes");

View File

@ -39,7 +39,7 @@ fn retransmit(
let r_bank = bank_forks.read().unwrap().working_bank(); let r_bank = bank_forks.read().unwrap().working_bank();
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot()); let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
let (neighbors, children) = compute_retransmit_peers( let (neighbors, children) = compute_retransmit_peers(
&staking_utils::delegated_stakes_at_epoch(&r_bank, bank_epoch).unwrap(), staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch).as_ref(),
cluster_info, cluster_info,
DATA_PLANE_FANOUT, DATA_PLANE_FANOUT,
); );

View File

@ -18,18 +18,15 @@ pub fn get_supermajority_slot(bank: &Bank, epoch_height: u64) -> Option<u64> {
} }
pub fn vote_account_stakes(bank: &Bank) -> HashMap<Pubkey, u64> { pub fn vote_account_stakes(bank: &Bank) -> HashMap<Pubkey, u64> {
let node_staked_accounts = node_staked_accounts(bank); bank.vote_accounts()
node_staked_accounts .into_iter()
.map(|(id, (stake, _))| (id, stake)) .map(|(id, (stake, _))| (id, stake))
.collect() .collect()
} }
/// Collect the delegate account balance and vote states for delegates have non-zero balance in /// Collect the staked nodes, as named by staked vote accounts from the given bank
/// any of their managed staking accounts pub fn staked_nodes(bank: &Bank) -> HashMap<Pubkey, u64> {
pub fn delegated_stakes(bank: &Bank) -> HashMap<Pubkey, u64> { to_staked_nodes(to_vote_states(bank.vote_accounts().into_iter()))
let node_staked_accounts = node_staked_accounts(bank);
let node_staked_vote_states = to_vote_state(node_staked_accounts);
to_delegated_stakes(node_staked_vote_states)
} }
/// At the specified epoch, collect the node account balance and vote states for nodes that /// At the specified epoch, collect the node account balance and vote states for nodes that
@ -38,43 +35,23 @@ pub fn vote_account_stakes_at_epoch(
bank: &Bank, bank: &Bank,
epoch_height: u64, epoch_height: u64,
) -> Option<HashMap<Pubkey, u64>> { ) -> Option<HashMap<Pubkey, u64>> {
let node_staked_accounts = node_staked_accounts_at_epoch(bank, epoch_height); bank.epoch_vote_accounts(epoch_height).map(|accounts| {
node_staked_accounts accounts
.map(|epoch_state| epoch_state.map(|(id, (stake, _))| (*id, *stake)).collect()) .iter()
.map(|(id, (stake, _))| (*id, *stake))
.collect()
})
} }
/// At the specified epoch, collect the delegate account balance and vote states for delegates /// At the specified epoch, collect the delegate account balance and vote states for delegates
/// that have non-zero balance in any of their managed staking accounts /// that have non-zero balance in any of their managed staking accounts
pub fn delegated_stakes_at_epoch(bank: &Bank, epoch_height: u64) -> Option<HashMap<Pubkey, u64>> { pub fn staked_nodes_at_epoch(bank: &Bank, epoch_height: u64) -> Option<HashMap<Pubkey, u64>> {
let node_staked_accounts = node_staked_accounts_at_epoch(bank, epoch_height); bank.epoch_vote_accounts(epoch_height)
let node_staked_vote_states = node_staked_accounts.map(to_vote_state); .map(|vote_accounts| to_staked_nodes(to_vote_states(vote_accounts.into_iter())))
node_staked_vote_states.map(to_delegated_stakes)
} }
/// Collect the node account balance and vote states for nodes have non-zero balance in // input (vote_id, (stake, vote_account)) => (stake, vote_state)
/// their corresponding staking accounts fn to_vote_states(
fn node_staked_accounts(bank: &Bank) -> impl Iterator<Item = (Pubkey, (u64, Account))> {
bank.vote_accounts().into_iter()
}
pub fn node_staked_accounts_at_epoch(
bank: &Bank,
epoch_height: u64,
) -> Option<impl Iterator<Item = (&Pubkey, &(u64, Account))>> {
bank.epoch_vote_accounts(epoch_height).map(|vote_accounts| {
vote_accounts
.into_iter()
.filter(|(account_id, (_, account))| filter_no_delegate(account_id, account))
})
}
fn filter_no_delegate(account_id: &Pubkey, account: &Account) -> bool {
VoteState::deserialize(&account.data)
.map(|vote_state| vote_state.node_id != *account_id)
.unwrap_or(false)
}
fn to_vote_state(
node_staked_accounts: impl Iterator<Item = (impl Borrow<Pubkey>, impl Borrow<(u64, Account)>)>, node_staked_accounts: impl Iterator<Item = (impl Borrow<Pubkey>, impl Borrow<(u64, Account)>)>,
) -> impl Iterator<Item = (u64, VoteState)> { ) -> impl Iterator<Item = (u64, VoteState)> {
node_staked_accounts.filter_map(|(_, stake_account)| { node_staked_accounts.filter_map(|(_, stake_account)| {
@ -84,13 +61,13 @@ fn to_vote_state(
}) })
} }
fn to_delegated_stakes( // (stake, vote_state) => (node, stake)
fn to_staked_nodes(
node_staked_accounts: impl Iterator<Item = (u64, VoteState)>, node_staked_accounts: impl Iterator<Item = (u64, VoteState)>,
) -> HashMap<Pubkey, u64> { ) -> HashMap<Pubkey, u64> {
let mut map: HashMap<Pubkey, u64> = HashMap::new(); let mut map: HashMap<Pubkey, u64> = HashMap::new();
node_staked_accounts.for_each(|(stake, state)| { node_staked_accounts.for_each(|(stake, state)| {
let delegate = &state.node_id; map.entry(state.node_id)
map.entry(*delegate)
.and_modify(|s| *s += stake) .and_modify(|s| *s += stake)
.or_insert(stake); .or_insert(stake);
}); });
@ -98,10 +75,12 @@ fn to_delegated_stakes(
} }
fn epoch_stakes_and_lockouts(bank: &Bank, epoch_height: u64) -> Vec<(u64, Option<u64>)> { fn epoch_stakes_and_lockouts(bank: &Bank, epoch_height: u64) -> Vec<(u64, Option<u64>)> {
let node_staked_accounts = let node_staked_accounts = bank
node_staked_accounts_at_epoch(bank, epoch_height).expect("Bank state for epoch is missing"); .epoch_vote_accounts(epoch_height)
let node_staked_vote_states = to_vote_state(node_staked_accounts); .expect("Bank state for epoch is missing")
node_staked_vote_states .into_iter();
to_vote_states(node_staked_accounts)
.map(|(stake, states)| (stake, states.root_slot)) .map(|(stake, states)| (stake, states.root_slot))
.collect() .collect()
} }
@ -148,7 +127,7 @@ pub mod tests {
} }
#[test] #[test]
fn test_bank_staked_nodes_at_epoch() { fn test_vote_account_stakes_at_epoch() {
let (genesis_block, _mint_keypair, voting_keypair) = let (genesis_block, _mint_keypair, voting_keypair) =
create_genesis_block_with_leader(1, &Pubkey::new_rand(), BOOTSTRAP_LEADER_LAMPORTS); create_genesis_block_with_leader(1, &Pubkey::new_rand(), BOOTSTRAP_LEADER_LAMPORTS);
@ -263,22 +242,22 @@ pub mod tests {
} }
#[test] #[test]
fn test_to_delegated_stakes() { fn test_to_staked_nodes() {
let mut stakes = Vec::new(); let mut stakes = Vec::new();
let delegate1 = Pubkey::new_rand(); let node1 = Pubkey::new_rand();
let delegate2 = Pubkey::new_rand(); let node2 = Pubkey::new_rand();
// Delegate 1 has stake of 3 // Node 1 has stake of 3
for i in 0..3 { for i in 0..3 {
stakes.push((i, VoteState::new(&Pubkey::new_rand(), &delegate1, 0))); stakes.push((i, VoteState::new(&Pubkey::new_rand(), &node1, 0)));
} }
// Delegate 1 has stake of 5 // Node 1 has stake of 5
stakes.push((5, VoteState::new(&Pubkey::new_rand(), &delegate2, 0))); stakes.push((5, VoteState::new(&Pubkey::new_rand(), &node2, 0)));
let result = to_delegated_stakes(stakes.into_iter()); let result = to_staked_nodes(stakes.into_iter());
assert_eq!(result.len(), 2); assert_eq!(result.len(), 2);
assert_eq!(result[&delegate1], 3); assert_eq!(result[&node1], 3);
assert_eq!(result[&delegate2], 5); assert_eq!(result[&node2], 5);
} }
} }

View File

@ -72,7 +72,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect(); let blobs: Vec<(_, _)> = (0..100).into_par_iter().map(|i| (i as i32, true)).collect();
// pretend to broadcast from leader - cluster_info::create_broadcast_orders // pretend to broadcast from leader - cluster_info::create_broadcast_orders
let mut broadcast_table = cluster_info.sorted_tvu_peers(&staked_nodes); let mut broadcast_table = cluster_info.sorted_tvu_peers(Some(&staked_nodes));
broadcast_table.truncate(fanout); broadcast_table.truncate(fanout);
let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table); let orders = ClusterInfo::create_broadcast_orders(false, &blobs, &broadcast_table);
@ -106,7 +106,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
cluster.gossip.set_self(&*id); cluster.gossip.set_self(&*id);
if !mapped_peers.contains_key(id) { if !mapped_peers.contains_key(id) {
let (neighbors, children) = compute_retransmit_peers( let (neighbors, children) = compute_retransmit_peers(
&staked_nodes, Some(&staked_nodes),
&Arc::new(RwLock::new(cluster.clone())), &Arc::new(RwLock::new(cluster.clone())),
fanout, fanout,
); );

View File

@ -34,7 +34,7 @@ use std::time::Instant;
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct Stakes { pub struct Stakes {
/// vote accounts /// vote accounts
pub vote_accounts: HashMap<Pubkey, (u64, Account)>, vote_accounts: HashMap<Pubkey, (u64, Account)>,
/// stake_accounts /// stake_accounts
stake_accounts: HashMap<Pubkey, Account>, stake_accounts: HashMap<Pubkey, Account>,
@ -45,7 +45,7 @@ impl Stakes {
solana_vote_api::check_id(&account.owner) || solana_stake_api::check_id(&account.owner) solana_vote_api::check_id(&account.owner) || solana_stake_api::check_id(&account.owner)
} }
pub fn update(&mut self, pubkey: &Pubkey, account: &Account) { pub fn store(&mut self, pubkey: &Pubkey, account: &Account) {
if solana_vote_api::check_id(&account.owner) { if solana_vote_api::check_id(&account.owner) {
if account.lamports != 0 { if account.lamports != 0 {
self.vote_accounts self.vote_accounts
@ -805,7 +805,7 @@ impl Bank {
self.accounts.store_slow(self.slot(), pubkey, account); self.accounts.store_slow(self.slot(), pubkey, account);
if Stakes::is_stake(account) { if Stakes::is_stake(account) {
self.stakes.write().unwrap().update(pubkey, account); self.stakes.write().unwrap().store(pubkey, account);
} }
} }
@ -937,17 +937,19 @@ impl Bank {
.zip(acc.0.iter()) .zip(acc.0.iter())
.filter(|(_, account)| Stakes::is_stake(account)) .filter(|(_, account)| Stakes::is_stake(account))
{ {
self.stakes.write().unwrap().update(pubkey, account); self.stakes.write().unwrap().store(pubkey, account);
} }
} }
} }
/// current vote accounts for this bank /// current vote accounts for this bank along with the stake
/// attributed to each account
pub fn vote_accounts(&self) -> HashMap<Pubkey, (u64, Account)> { pub fn vote_accounts(&self) -> HashMap<Pubkey, (u64, Account)> {
self.stakes.read().unwrap().vote_accounts.clone() self.stakes.read().unwrap().vote_accounts.clone()
} }
/// vote accounts for the specific epoch /// vote accounts for the specific epoch along with the stake
/// attributed to each account
pub fn epoch_vote_accounts(&self, epoch: u64) -> Option<&HashMap<Pubkey, (u64, Account)>> { pub fn epoch_vote_accounts(&self, epoch: u64) -> Option<&HashMap<Pubkey, (u64, Account)>> {
self.epoch_stakes self.epoch_stakes
.get(&epoch) .get(&epoch)