Refactor select_fork() to avoid clones and for clarity (#8081)

* Refactor select_fork() to avoid clones and for clarity

* Add test that fork weights are increasing
This commit is contained in:
carllin 2020-02-03 16:48:24 -08:00 committed by GitHub
parent 4c0420b884
commit 0c8cee8c4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 233 additions and 111 deletions

View File

@ -466,7 +466,7 @@ impl Tower {
}
#[cfg(test)]
mod test {
pub mod test {
use super::*;
use crate::replay_stage::{ForkProgress, ReplayStage};
use solana_ledger::bank_forks::BankForks;
@ -489,13 +489,13 @@ mod test {
use std::{thread::sleep, time::Duration};
use trees::{tr, Node, Tree};
struct ValidatorKeypairs {
pub(crate) struct ValidatorKeypairs {
node_keypair: Keypair,
vote_keypair: Keypair,
}
impl ValidatorKeypairs {
fn new(node_keypair: Keypair, vote_keypair: Keypair) -> Self {
pub(crate) fn new(node_keypair: Keypair, vote_keypair: Keypair) -> Self {
Self {
node_keypair,
vote_keypair,
@ -503,19 +503,19 @@ mod test {
}
}
struct VoteSimulator<'a> {
pub(crate) struct VoteSimulator<'a> {
searchable_nodes: HashMap<u64, &'a Node<u64>>,
}
impl<'a> VoteSimulator<'a> {
pub fn new(forks: &'a Tree<u64>) -> Self {
pub(crate) fn new(forks: &'a Tree<u64>) -> Self {
let mut searchable_nodes = HashMap::new();
let root = forks.root();
searchable_nodes.insert(root.data, root);
Self { searchable_nodes }
}
pub fn simulate_vote(
pub(crate) fn simulate_vote(
&mut self,
vote_slot: Slot,
bank_forks: &RwLock<BankForks>,
@ -592,7 +592,21 @@ mod test {
let my_pubkey = my_keypairs.node_keypair.pubkey();
let my_vote_pubkey = my_keypairs.vote_keypair.pubkey();
let ancestors = bank_forks.read().unwrap().ancestors();
ReplayStage::select_fork(&my_pubkey, &ancestors, &bank_forks, tower, progress);
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
ReplayStage::compute_bank_stats(
&my_pubkey,
&ancestors,
&mut frozen_banks,
tower,
progress,
);
ReplayStage::select_fork(&frozen_banks, tower, progress);
let bank = bank_forks
.read()
@ -666,21 +680,17 @@ mod test {
}
#[derive(PartialEq, Debug)]
enum VoteResult {
pub(crate) enum VoteResult {
LockedOut(u64),
FailedThreshold(u64),
FailedAllChecks(u64),
Ok,
}
// Setup BankForks with banks including all the votes per validator as
// specified in the input `validator_votes`
fn initialize_state(
validator_votes: &HashMap<Pubkey, Vec<u64>>,
// Setup BankForks with bank 0 and all the validator accounts
pub(crate) fn initialize_state(
validator_keypairs: &HashMap<Pubkey, ValidatorKeypairs>,
) -> (BankForks, HashMap<u64, ForkProgress>) {
assert!(validator_votes.len() < 1_000_000);
let GenesisConfigInfo {
mut genesis_config,
mint_keypair,
@ -790,7 +800,7 @@ mod test {
);
// Initialize BankForks
let (bank_forks, mut progress) = initialize_state(&HashMap::new(), &keypairs);
let (bank_forks, mut progress) = initialize_state(&keypairs);
let bank_forks = RwLock::new(bank_forks);
// Create the tree of banks
@ -870,8 +880,7 @@ mod test {
votes.extend((45..=50).into_iter());
let mut cluster_votes: HashMap<Pubkey, Vec<Slot>> = HashMap::new();
cluster_votes.insert(node_pubkey, votes.clone());
let (bank_forks, mut progress) = initialize_state(&cluster_votes, &keypairs);
let (bank_forks, mut progress) = initialize_state(&keypairs);
let bank_forks = RwLock::new(bank_forks);
// Simulate the votes. Should fail on trying to come back to the main fork

View File

@ -45,8 +45,6 @@ use std::{
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
type VoteAndPoHBank = Option<(Arc<Bank>, ForkStats)>;
// Implement a destructor for the ReplayStage thread to signal it exited
// even on panics
struct Finalizer {
@ -251,13 +249,40 @@ impl ReplayStage {
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
loop {
let start = allocated.get();
let vote_bank = Self::select_fork(
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
let newly_computed_slot_stats = Self::compute_bank_stats(
&my_pubkey,
&ancestors,
&bank_forks,
&mut frozen_banks,
&tower,
&mut progress,
);
for slot in newly_computed_slot_stats {
let fork_stats = &progress.get(&slot).unwrap().fork_stats;
let confirmed_forks = Self::confirm_forks(
&tower,
&fork_stats.stake_lockouts,
fork_stats.total_staked,
&progress,
&bank_forks,
);
for slot in confirmed_forks {
progress
.get_mut(&slot)
.unwrap()
.fork_stats
.confirmation_reported = true;
}
}
let vote_bank = Self::select_fork(&frozen_banks, &tower, &mut progress);
datapoint_debug!(
"replay_stage-memory",
("select_fork", (allocated.get() - start) as i64, i64),
@ -265,12 +290,21 @@ impl ReplayStage {
if vote_bank.is_none() {
break;
}
let (bank, stats) = vote_bank.unwrap();
let bank = vote_bank.unwrap();
let (is_locked_out, vote_threshold, fork_weight, total_staked) = {
let fork_stats = &progress.get(&bank.slot()).unwrap().fork_stats;
(
fork_stats.is_locked_out,
fork_stats.vote_threshold,
fork_stats.weight,
fork_stats.total_staked,
)
};
let mut done = false;
let mut vote_bank_slot = None;
let start = allocated.get();
if !stats.is_locked_out && stats.vote_threshold {
info!("voting: {} {}", bank.slot(), stats.fork_weight);
if !is_locked_out && vote_threshold {
info!("voting: {} {}", bank.slot(), fork_weight);
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank))
@ -294,7 +328,7 @@ impl ReplayStage {
&blockstore,
&leader_schedule_cache,
&root_bank_sender,
stats.total_staked,
total_staked,
&lockouts_sender,
&snapshot_package_sender,
&latest_root_senders,
@ -738,23 +772,77 @@ impl ReplayStage {
did_complete_bank
}
pub(crate) fn select_fork(
pub(crate) fn compute_bank_stats(
my_pubkey: &Pubkey,
ancestors: &HashMap<u64, HashSet<u64>>,
bank_forks: &RwLock<BankForks>,
frozen_banks: &mut Vec<Arc<Bank>>,
tower: &Tower,
progress: &mut HashMap<u64, ForkProgress>,
) -> VoteAndPoHBank {
let tower_start = Instant::now();
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
) -> Vec<Slot> {
frozen_banks.sort_by_key(|bank| bank.slot());
let mut new_stats = vec![];
for bank in frozen_banks {
// Only time progress map should be missing a bank slot
// is if this node was the leader for this slot as those banks
// are not replayed in replay_active_banks()
let parent_weight = bank
.parent()
.and_then(|b| progress.get(&b.slot()))
.map(|x| x.fork_stats.fork_weight)
.unwrap_or(0);
let stats = &mut progress
.get_mut(&bank.slot())
.expect("All frozen banks must exist in the Progress map")
.fork_stats;
if !stats.computed {
stats.slot = bank.slot();
let (stake_lockouts, total_staked, bank_weight) = tower.collect_vote_lockouts(
bank.slot(),
bank.vote_accounts().into_iter(),
&ancestors,
);
stats.total_staked = total_staked;
stats.weight = bank_weight;
stats.fork_weight = stats.weight + parent_weight;
datapoint_warn!(
"bank_weight",
("slot", bank.slot(), i64),
// u128 too large for influx, convert to hex
("weight", format!("{:X}", stats.weight), String),
);
warn!(
"{} slot_weight: {} {} {} {}",
my_pubkey,
stats.slot,
stats.weight,
stats.fork_weight,
bank.parent().map(|b| b.slot()).unwrap_or(0)
);
stats.stake_lockouts = stake_lockouts;
stats.block_height = bank.block_height();
stats.computed = true;
}
stats.vote_threshold = tower.check_vote_stake_threshold(
bank.slot(),
&stats.stake_lockouts,
stats.total_staked,
);
stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors);
stats.has_voted = tower.has_voted(bank.slot());
stats.is_recent = tower.is_recent(bank.slot());
new_stats.push(stats.slot);
}
new_stats
}
pub(crate) fn select_fork(
frozen_banks: &[Arc<Bank>],
tower: &Tower,
progress: &mut HashMap<u64, ForkProgress>,
) -> Option<Arc<Bank>> {
let tower_start = Instant::now();
let num_frozen_banks = frozen_banks.len();
trace!("frozen_banks {}", frozen_banks.len());
@ -763,72 +851,16 @@ impl ReplayStage {
.filter(|b| b.slot() < tower.root().unwrap_or(0))
.count();
let stats: Vec<ForkStats> = frozen_banks
let stats: Vec<&ForkStats> = frozen_banks
.iter()
.map(|bank| {
// Only time progress map should be missing a bank slot
// is if this node was the leader for this slot as those banks
// are not replayed in replay_active_banks()
let mut stats = progress
&progress
.get(&bank.slot())
.expect("All frozen banks must exist in the Progress map")
.fork_stats
.clone();
if !stats.computed {
stats.slot = bank.slot();
let (stake_lockouts, total_staked, bank_weight) = tower.collect_vote_lockouts(
bank.slot(),
bank.vote_accounts().into_iter(),
&ancestors,
);
Self::confirm_forks(
tower,
&stake_lockouts,
total_staked,
progress,
&bank_forks,
);
stats.total_staked = total_staked;
stats.weight = bank_weight;
stats.fork_weight = stats.weight
+ bank
.parent()
.and_then(|b| progress.get(&b.slot()))
.map(|x| x.fork_stats.fork_weight)
.unwrap_or(0);
datapoint_warn!(
"bank_weight",
("slot", bank.slot(), i64),
// u128 too large for influx, convert to hex
("weight", format!("{:X}", stats.weight), String),
);
warn!(
"{} slot_weight: {} {} {} {}",
my_pubkey,
stats.slot,
stats.weight,
stats.fork_weight,
bank.parent().map(|b| b.slot()).unwrap_or(0)
);
stats.stake_lockouts = stake_lockouts;
stats.block_height = bank.block_height();
stats.computed = true;
}
stats.vote_threshold = tower.check_vote_stake_threshold(
bank.slot(),
&stats.stake_lockouts,
stats.total_staked,
);
stats.is_locked_out = tower.is_locked_out(bank.slot(), &ancestors);
stats.has_voted = tower.has_voted(bank.slot());
stats.is_recent = tower.is_recent(bank.slot());
progress
.get_mut(&bank.slot())
.expect("All frozen banks must exist in the Progress map")
.fork_stats = stats.clone();
stats
})
.collect();
let num_not_recent = stats.iter().filter(|s| !s.is_recent).count();
@ -874,30 +906,31 @@ impl ReplayStage {
),
("tower_duration", ms as i64, i64),
);
rv.cloned().map(|x| (x.0.clone(), x.1.clone()))
rv.map(|x| x.0.clone())
}
fn confirm_forks(
tower: &Tower,
stake_lockouts: &HashMap<u64, StakeLockout>,
total_staked: u64,
progress: &mut HashMap<u64, ForkProgress>,
progress: &HashMap<u64, ForkProgress>,
bank_forks: &RwLock<BankForks>,
) {
for (slot, prog) in progress.iter_mut() {
) -> Vec<Slot> {
let mut confirmed_forks = vec![];
for (slot, prog) in progress.iter() {
if !prog.fork_stats.confirmation_reported {
let bank = bank_forks
.read()
.unwrap()
.get(*slot)
.expect("bank in progress must exist in BankForks")
.clone();
let duration = prog.replay_stats.started.elapsed().as_millis();
if tower.is_slot_confirmed(*slot, stake_lockouts, total_staked)
&& bank_forks
.read()
.unwrap()
.get(*slot)
.map(|s| s.is_frozen())
.unwrap_or(true)
if bank.is_frozen() && tower.is_slot_confirmed(*slot, stake_lockouts, total_staked)
{
info!("validator fork confirmed {} {}ms", *slot, duration);
datapoint_warn!("validator-confirmation", ("duration_ms", duration, i64));
prog.fork_stats.confirmation_reported = true;
confirmed_forks.push(*slot);
} else {
debug!(
"validator fork not confirmed {} {}ms {:?}",
@ -908,6 +941,7 @@ impl ReplayStage {
}
}
}
confirmed_forks
}
pub(crate) fn handle_new_root(
@ -1002,6 +1036,7 @@ pub(crate) mod tests {
use super::*;
use crate::{
commitment::BlockCommitment,
consensus::test::{initialize_state, ValidatorKeypairs, VoteResult, VoteSimulator},
consensus::Tower,
genesis_utils::{create_genesis_config, create_genesis_config_with_leader},
replay_stage::ReplayStage,
@ -1039,6 +1074,7 @@ pub(crate) mod tests {
iter,
sync::{Arc, RwLock},
};
use trees::tr;
struct ForkInfo {
leader: usize,
@ -1228,19 +1264,28 @@ pub(crate) mod tests {
(0..validators.len())
.map(|i| {
let response = ReplayStage::select_fork(
let mut frozen_banks: Vec<_> = wrapped_bank_fork
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
ReplayStage::compute_bank_stats(
&validators[i].keypair.pubkey(),
&bank_fork_ancestors,
&wrapped_bank_fork,
&mut frozen_banks,
&towers[i],
&mut fork_progresses[i],
);
let response =
ReplayStage::select_fork(&frozen_banks, &towers[i], &mut fork_progresses[i]);
if response.is_none() {
None
} else {
let (_bank, stats) = response.unwrap();
let bank = response.unwrap();
let stats = &fork_progresses[i].get(&bank.slot()).unwrap().fork_stats;
Some(ForkSelectionResponse {
slot: stats.slot,
is_locked_out: stats.is_locked_out,
@ -1847,4 +1892,72 @@ pub(crate) mod tests {
}
Blockstore::destroy(&ledger_path).unwrap();
}
#[test]
fn test_child_bank_heavier() {
let node_keypair = Keypair::new();
let vote_keypair = Keypair::new();
let node_pubkey = node_keypair.pubkey();
let mut keypairs = HashMap::new();
keypairs.insert(
node_pubkey,
ValidatorKeypairs::new(node_keypair, vote_keypair),
);
let (bank_forks, mut progress) = initialize_state(&keypairs);
let bank_forks = Arc::new(RwLock::new(bank_forks));
let mut tower = Tower::new_with_key(&node_pubkey);
// Create the tree of banks in a BankForks object
let forks = tr(0) / (tr(1) / (tr(2) / (tr(3))));
let mut voting_simulator = VoteSimulator::new(&forks);
let mut cluster_votes: HashMap<Pubkey, Vec<Slot>> = HashMap::new();
let votes: Vec<Slot> = vec![0, 2];
for vote in &votes {
assert_eq!(
voting_simulator.simulate_vote(
*vote,
&bank_forks,
&mut cluster_votes,
&keypairs,
keypairs.get(&node_pubkey).unwrap(),
&mut progress,
&mut tower,
),
VoteResult::Ok
);
}
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
ReplayStage::compute_bank_stats(
&Pubkey::default(),
&bank_forks.read().unwrap().ancestors(),
&mut frozen_banks,
&tower,
&mut progress,
);
frozen_banks.sort_by_key(|bank| bank.slot());
for pair in frozen_banks.windows(2) {
let first = progress
.get(&pair[0].slot())
.unwrap()
.fork_stats
.fork_weight;
let second = progress
.get(&pair[1].slot())
.unwrap()
.fork_stats
.fork_weight;
assert!(second >= first);
}
}
}