validator confirmation

This commit is contained in:
Anatoly Yakovenko 2019-03-27 04:30:26 -07:00 committed by Grimes
parent 3fc09fb23f
commit e27d6d0988
2 changed files with 108 additions and 20 deletions

View File

@ -173,6 +173,15 @@ impl Locktower {
stake_lockouts
}
pub fn is_slot_confirmed(&self, slot: u64, lockouts: &HashMap<u64, StakeLockout>) -> bool {
lockouts
.get(&slot)
.map(|lockout| {
(lockout.stake as f64 / self.epoch_stakes.total_staked as f64) > self.threshold_size
})
.unwrap_or(false)
}
pub fn is_recent_epoch(&self, bank: &Bank) -> bool {
let bank_epoch = bank.get_epoch_and_slot_index(bank.slot()).0;
bank_epoch >= self.epoch_stakes.slot
@ -491,6 +500,43 @@ mod test {
assert!(locktower.check_vote_stake_threshold(0, &stakes));
}
#[test]
fn test_is_slot_confirmed_not_enough_stake_failure() {
let locktower = Locktower::new(EpochStakes::new_for_tests(2), 1, 0.67);
let stakes = vec![(
0,
StakeLockout {
stake: 1,
lockout: 8,
},
)]
.into_iter()
.collect();
assert!(!locktower.is_slot_confirmed(0, &stakes));
}
#[test]
fn test_is_slot_confirmed_unknown_slot() {
let locktower = Locktower::new(EpochStakes::new_for_tests(2), 1, 0.67);
let stakes = HashMap::new();
assert!(!locktower.is_slot_confirmed(0, &stakes));
}
#[test]
fn test_is_slot_confirmed_pass() {
let locktower = Locktower::new(EpochStakes::new_for_tests(2), 1, 0.67);
let stakes = vec![(
0,
StakeLockout {
stake: 2,
lockout: 8,
},
)]
.into_iter()
.collect();
assert!(locktower.is_slot_confirmed(0, &stakes));
}
#[test]
fn test_is_locked_out_empty() {
let locktower = Locktower::new(EpochStakes::new_for_tests(2), 0, 0.67);

View File

@ -6,12 +6,13 @@ use crate::blocktree_processor;
use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_schedule_utils;
use crate::locktower::Locktower;
use crate::locktower::{Locktower, StakeLockout};
use crate::packet::BlobError;
use crate::poh_recorder::PohRecorder;
use crate::result;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use hashbrown::HashMap;
use solana_metrics::counter::Counter;
use solana_metrics::influxdb;
use solana_runtime::bank::Bank;
@ -21,7 +22,6 @@ use solana_sdk::signature::KeypairUtil;
use solana_sdk::timing::{self, duration_as_ms};
use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction::{Vote, VoteInstruction};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
@ -53,6 +53,24 @@ pub struct ReplayStage {
t_replay: JoinHandle<result::Result<()>>,
}
#[derive(Default)]
struct ForkProgress {
last_entry: Hash,
num_blobs: usize,
started_ms: u64,
supermajority_confirmed_ms: u64,
}
impl ForkProgress {
pub fn new(last_entry: Hash) -> Self {
Self {
last_entry,
num_blobs: 0,
started_ms: timing::timestamp(),
supermajority_confirmed_ms: 0,
}
}
}
impl ReplayStage {
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new<T>(
@ -115,7 +133,8 @@ impl ReplayStage {
ticks_per_slot = bank.ticks_per_slot();
}
let votable = Self::generate_votable_banks(&bank_forks, &locktower);
let votable =
Self::generate_votable_banks(&bank_forks, &locktower, &mut progress);
if let Some((_, bank)) = votable.last() {
subscriptions.notify_subscribers(&bank);
@ -265,10 +284,10 @@ impl ReplayStage {
});
}
}
pub fn replay_blocktree_into_bank(
fn replay_blocktree_into_bank(
bank: &Bank,
blocktree: &Blocktree,
progress: &mut HashMap<u64, (Hash, usize)>,
progress: &mut HashMap<u64, ForkProgress>,
forward_entry_sender: &EntrySender,
) -> result::Result<()> {
let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?;
@ -290,7 +309,7 @@ impl ReplayStage {
bank: &Arc<Bank>,
bank_forks: &Arc<RwLock<BankForks>>,
locktower: &mut Locktower,
progress: &mut HashMap<u64, (Hash, usize)>,
progress: &mut HashMap<u64, ForkProgress>,
voting_keypair: &Option<Arc<T>>,
vote_account: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -341,7 +360,7 @@ impl ReplayStage {
bank_forks: &Arc<RwLock<BankForks>>,
my_id: &Pubkey,
ticks_per_slot: &mut u64,
progress: &mut HashMap<u64, (Hash, usize)>,
progress: &mut HashMap<u64, ForkProgress>,
forward_entry_sender: &EntrySender,
slot_full_sender: &Sender<(u64, Pubkey)>,
) -> result::Result<()> {
@ -370,6 +389,7 @@ impl ReplayStage {
fn generate_votable_banks(
bank_forks: &Arc<RwLock<BankForks>>,
locktower: &Locktower,
progress: &mut HashMap<u64, ForkProgress>,
) -> Vec<(u128, Arc<Bank>)> {
let locktower_start = Instant::now();
// Locktower voting
@ -409,6 +429,7 @@ impl ReplayStage {
.filter(|(b, stake_lockouts)| {
let vote_threshold =
locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts);
Self::confirm_forks(locktower, stake_lockouts, progress);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
vote_threshold
})
@ -434,32 +455,53 @@ impl ReplayStage {
votable
}
pub fn load_blocktree_entries(
fn confirm_forks(
locktower: &Locktower,
stake_lockouts: &HashMap<u64, StakeLockout>,
progress: &mut HashMap<u64, ForkProgress>,
) {
for (slot, prog) in progress.iter_mut() {
if prog.supermajority_confirmed_ms == 0
&& locktower.is_slot_confirmed(*slot, stake_lockouts)
{
prog.supermajority_confirmed_ms = timing::timestamp();
let duration = prog.supermajority_confirmed_ms - prog.started_ms;
info!("fork confirmed {} {}", *slot, duration);
solana_metrics::submit(
influxdb::Point::new(&"validator-confirmation")
.add_field("duration_ms", influxdb::Value::Integer(duration as i64))
.to_owned(),
);
}
}
}
fn load_blocktree_entries(
bank: &Bank,
blocktree: &Blocktree,
progress: &mut HashMap<u64, (Hash, usize)>,
progress: &mut HashMap<u64, ForkProgress>,
) -> result::Result<(Vec<Entry>, usize)> {
let bank_slot = bank.slot();
let bank_progress = &mut progress
.entry(bank_slot)
.or_insert((bank.last_blockhash(), 0));
blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.1 as u64, None)
.or_insert(ForkProgress::new(bank.last_blockhash()));
blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.num_blobs as u64, None)
}
pub fn replay_entries_into_bank(
fn replay_entries_into_bank(
bank: &Bank,
entries: Vec<Entry>,
progress: &mut HashMap<u64, (Hash, usize)>,
progress: &mut HashMap<u64, ForkProgress>,
forward_entry_sender: &EntrySender,
num: usize,
) -> result::Result<()> {
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert((bank.last_blockhash(), 0));
let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.0);
bank_progress.1 += num;
.or_insert(ForkProgress::new(bank.last_blockhash()));
let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.last_entry);
bank_progress.num_blobs += num;
if let Some(last_entry) = entries.last() {
bank_progress.0 = last_entry.hash;
bank_progress.last_entry = last_entry.hash;
}
if result.is_ok() {
forward_entry_sender.send(entries)?;
@ -489,7 +531,7 @@ impl ReplayStage {
fn handle_new_root(
bank_forks: &Arc<RwLock<BankForks>>,
progress: &mut HashMap<u64, (Hash, usize)>,
progress: &mut HashMap<u64, ForkProgress>,
) {
let r_bank_forks = bank_forks.read().unwrap();
progress.retain(|k, _| r_bank_forks.get(*k).is_some());
@ -498,7 +540,7 @@ impl ReplayStage {
fn process_completed_bank(
my_id: &Pubkey,
bank: Arc<Bank>,
progress: &mut HashMap<u64, (Hash, usize)>,
progress: &mut HashMap<u64, ForkProgress>,
slot_full_sender: &Sender<(u64, Pubkey)>,
) {
bank.freeze();
@ -736,7 +778,7 @@ mod test {
let bank0 = Bank::new(&genesis_block);
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0)));
let mut progress = HashMap::new();
progress.insert(5, (Hash::default(), 0));
progress.insert(5, ForkProgress::new(Hash::default()));
ReplayStage::handle_new_root(&bank_forks, &mut progress);
assert!(progress.is_empty());
}