Store and persists full stack of tower votes in gossip (#6695)

* vote array

wip

wip

wip

update

gossip index should match tower index

tests build

clippy

test index after expired vote

test

bank specific last vote sync time

* verify

* we are likely to see many more warnings about old votes now
This commit is contained in:
anatoly yakovenko 2019-11-04 16:19:54 -08:00 committed by GitHub
parent 57983980a7
commit f54cfcdb8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 201 additions and 44 deletions

View File

@ -12,13 +12,12 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Bank needs to provide an interface for us to query the stake weight
use crate::crds_value::CrdsValue;
use crate::{
contact_info::ContactInfo,
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::{CrdsData, CrdsValueLabel, EpochSlots, Vote},
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote},
packet::{to_shared_blob, Blob, Packet, SharedBlob},
repair_service::RepairType,
result::{Error, Result},
@ -320,10 +319,18 @@ impl ClusterInfo {
.process_push_message(&self.id(), vec![entry], now);
}
pub fn push_vote(&mut self, vote: Transaction) {
pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) {
let now = timestamp();
let vote = Vote::new(&self.id(), vote, now);
let entry = CrdsValue::new_signed(CrdsData::Vote(vote), &self.keypair);
let current_votes: Vec<_> = (0..crds_value::MAX_VOTES)
.filter_map(|ix| {
self.gossip
.crds
.lookup(&CrdsValueLabel::Vote(ix, self.id()))
})
.collect();
let vote_ix = CrdsValue::compute_vote_index(tower_index, current_votes);
let entry = CrdsValue::new_signed(CrdsData::Vote(vote_ix, vote), &self.keypair);
self.gossip
.process_push_message(&self.id(), vec![entry], now);
}
@ -2358,7 +2365,7 @@ mod tests {
// add a vote
let tx = test_tx();
cluster_info.push_vote(tx.clone());
cluster_info.push_vote(0, tx.clone());
// -1 to make sure that the clock is strictly lower then when insert occurred
let (votes, max_ts) = cluster_info.get_votes(now - 1);

View File

@ -48,14 +48,15 @@ impl ClusterInfoVoteListener {
sender: &CrossbeamSender<Vec<Packets>>,
poh_recorder: Arc<Mutex<PohRecorder>>,
) -> Result<()> {
let mut last_ts = 0;
loop {
if exit.load(Ordering::Relaxed) {
return Ok(());
}
let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts);
if poh_recorder.lock().unwrap().has_bank() {
last_ts = new_ts;
if let Some(bank) = poh_recorder.lock().unwrap().bank() {
let last_ts = bank.last_vote_sync.load(Ordering::Relaxed);
let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts);
bank.last_vote_sync
.compare_and_swap(last_ts, new_ts, Ordering::Relaxed);
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
let mut msgs = packet::to_packets(&votes);
if !msgs.is_empty() {

View File

@ -173,7 +173,7 @@ impl Tower {
slot: u64,
hash: Hash,
last_bank_slot: Option<Slot>,
) -> Vote {
) -> (Vote, usize) {
let mut local_vote_state = local_vote_state.clone();
let vote = Vote {
slots: vec![slot],
@ -196,7 +196,7 @@ impl Tower {
slots,
local_vote_state.votes
);
Vote { slots, hash }
(Vote { slots, hash }, local_vote_state.votes.len() - 1)
}
fn last_bank_vote(bank: &Bank, vote_account_pubkey: &Pubkey) -> Option<Slot> {
@ -205,7 +205,7 @@ impl Tower {
bank_vote_state.votes.iter().map(|v| v.slot).last()
}
pub fn new_vote_from_bank(&self, bank: &Bank, vote_account_pubkey: &Pubkey) -> Vote {
pub fn new_vote_from_bank(&self, bank: &Bank, vote_account_pubkey: &Pubkey) -> (Vote, usize) {
let last_vote = Self::last_bank_vote(bank, vote_account_pubkey);
Self::new_vote(&self.lockouts, bank.slot(), bank.hash(), last_vote)
}
@ -794,14 +794,16 @@ mod test {
fn test_new_vote() {
let local = VoteState::default();
let vote = Tower::new_vote(&local, 0, Hash::default(), None);
assert_eq!(vote.slots, vec![0]);
assert_eq!(local.votes.len(), 0);
assert_eq!(vote.0.slots, vec![0]);
assert_eq!(vote.1, 0);
}
#[test]
fn test_new_vote_dup_vote() {
let local = VoteState::default();
let vote = Tower::new_vote(&local, 0, Hash::default(), Some(0));
assert!(vote.slots.is_empty());
assert!(vote.0.slots.is_empty());
}
#[test]
@ -812,8 +814,25 @@ mod test {
hash: Hash::default(),
};
local.process_vote_unchecked(&vote);
assert_eq!(local.votes.len(), 1);
let vote = Tower::new_vote(&local, 1, Hash::default(), Some(0));
assert_eq!(vote.slots, vec![1]);
assert_eq!(vote.0.slots, vec![1]);
assert_eq!(vote.1, 1);
}
#[test]
fn test_new_vote_next_after_expired_vote() {
let mut local = VoteState::default();
let vote = Vote {
slots: vec![0],
hash: Hash::default(),
};
local.process_vote_unchecked(&vote);
assert_eq!(local.votes.len(), 1);
let vote = Tower::new_vote(&local, 3, Hash::default(), Some(0));
//first vote expired, so index should be 0
assert_eq!(vote.0.slots, vec![3]);
assert_eq!(vote.1, 0);
}
#[test]

View File

@ -3,10 +3,15 @@ use bincode::{serialize, serialized_size};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signable, Signature};
use solana_sdk::transaction::Transaction;
use std::borrow::Borrow;
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::fmt;
pub type VoteIndex = u8;
pub const MAX_VOTES: VoteIndex = 32;
/// CrdsValue that is replicated across the cluster
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct CrdsValue {
@ -30,6 +35,17 @@ impl Signable for CrdsValue {
fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
fn verify(&self) -> bool {
let sig_check = self
.get_signature()
.verify(&self.pubkey().as_ref(), self.signable_data().borrow());
let data_check = match &self.data {
CrdsData::Vote(ix, _) => *ix < MAX_VOTES,
_ => true,
};
sig_check && data_check
}
}
/// CrdsData that defines the different types of items CrdsValues can hold
@ -39,7 +55,7 @@ pub enum CrdsData {
/// * Merge Strategy - Latest wallclock is picked
ContactInfo(ContactInfo),
/// * Merge Strategy - Latest wallclock is picked
Vote(Vote),
Vote(VoteIndex, Vote),
/// * Merge Strategy - Latest wallclock is picked
EpochSlots(EpochSlots),
}
@ -85,7 +101,7 @@ impl Vote {
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
pub enum CrdsValueLabel {
ContactInfo(Pubkey),
Vote(Pubkey),
Vote(VoteIndex, Pubkey),
EpochSlots(Pubkey),
}
@ -93,7 +109,7 @@ impl fmt::Display for CrdsValueLabel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
CrdsValueLabel::Vote(_) => write!(f, "Vote({})", self.pubkey()),
CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()),
CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()),
}
}
@ -103,7 +119,7 @@ impl CrdsValueLabel {
pub fn pubkey(&self) -> Pubkey {
match self {
CrdsValueLabel::ContactInfo(p) => *p,
CrdsValueLabel::Vote(p) => *p,
CrdsValueLabel::Vote(_, p) => *p,
CrdsValueLabel::EpochSlots(p) => *p,
}
}
@ -128,21 +144,21 @@ impl CrdsValue {
pub fn wallclock(&self) -> u64 {
match &self.data {
CrdsData::ContactInfo(contact_info) => contact_info.wallclock,
CrdsData::Vote(vote) => vote.wallclock,
CrdsData::Vote(_, vote) => vote.wallclock,
CrdsData::EpochSlots(vote) => vote.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
match &self.data {
CrdsData::ContactInfo(contact_info) => contact_info.id,
CrdsData::Vote(vote) => vote.from,
CrdsData::Vote(_, vote) => vote.from,
CrdsData::EpochSlots(slots) => slots.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
match &self.data {
CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()),
CrdsData::Vote(_) => CrdsValueLabel::Vote(self.pubkey()),
CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()),
CrdsData::EpochSlots(_) => CrdsValueLabel::EpochSlots(self.pubkey()),
}
}
@ -154,10 +170,18 @@ impl CrdsValue {
}
pub fn vote(&self) -> Option<&Vote> {
match &self.data {
CrdsData::Vote(vote) => Some(vote),
CrdsData::Vote(_, vote) => Some(vote),
_ => None,
}
}
pub fn vote_index(&self) -> Option<VoteIndex> {
match &self.data {
CrdsData::Vote(ix, _) => Some(*ix),
_ => None,
}
}
pub fn epoch_slots(&self) -> Option<&EpochSlots> {
match &self.data {
CrdsData::EpochSlots(slots) => Some(slots),
@ -165,18 +189,46 @@ impl CrdsValue {
}
}
/// Return all the possible labels for a record identified by Pubkey.
pub fn record_labels(key: &Pubkey) -> [CrdsValueLabel; 3] {
[
pub fn record_labels(key: &Pubkey) -> Vec<CrdsValueLabel> {
let mut labels = vec![
CrdsValueLabel::ContactInfo(*key),
CrdsValueLabel::Vote(*key),
CrdsValueLabel::EpochSlots(*key),
]
];
labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key)));
labels
}
/// Returns the size (in bytes) of a CrdsValue
pub fn size(&self) -> u64 {
serialized_size(&self).expect("unable to serialize contact info")
}
pub fn compute_vote_index(tower_index: usize, mut votes: Vec<&CrdsValue>) -> VoteIndex {
let mut available: HashSet<VoteIndex> = (0..MAX_VOTES).collect();
votes.iter().filter_map(|v| v.vote_index()).for_each(|ix| {
available.remove(&ix);
});
// free index
if !available.is_empty() {
return *available.iter().next().unwrap();
}
assert!(votes.len() == MAX_VOTES as usize);
votes.sort_by_key(|v| v.vote().expect("all values must be votes").wallclock);
// If Tower is full, oldest removed first
if tower_index + 1 == MAX_VOTES as usize {
return votes[0].vote_index().expect("all values must be votes");
}
// If Tower is not full, the early votes have expired
assert!(tower_index < MAX_VOTES as usize);
votes[tower_index]
.vote_index()
.expect("all values must be votes")
}
}
#[cfg(test)]
@ -190,13 +242,13 @@ mod test {
#[test]
fn test_labels() {
let mut hits = [false; 3];
let mut hits = [false; 2 + MAX_VOTES as usize];
// this method should cover all the possible labels
for v in &CrdsValue::record_labels(&Pubkey::default()) {
match v {
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
CrdsValueLabel::Vote(_) => hits[1] = true,
CrdsValueLabel::EpochSlots(_) => hits[2] = true,
CrdsValueLabel::EpochSlots(_) => hits[1] = true,
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 2] = true,
}
}
assert!(hits.iter().all(|x| *x));
@ -208,11 +260,13 @@ mod test {
let key = v.clone().contact_info().unwrap().id;
assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key));
let v =
CrdsValue::new_unsigned(CrdsData::Vote(Vote::new(&Pubkey::default(), test_tx(), 0)));
let v = CrdsValue::new_unsigned(CrdsData::Vote(
0,
Vote::new(&Pubkey::default(), test_tx(), 0),
));
assert_eq!(v.wallclock(), 0);
let key = v.clone().vote().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::Vote(key));
assert_eq!(v.label(), CrdsValueLabel::Vote(0, key));
let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
Pubkey::default(),
@ -224,6 +278,7 @@ mod test {
let key = v.clone().epoch_slots().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key));
}
#[test]
fn test_signature() {
let keypair = Keypair::new();
@ -233,11 +288,10 @@ mod test {
timestamp(),
)));
verify_signatures(&mut v, &keypair, &wrong_keypair);
v = CrdsValue::new_unsigned(CrdsData::Vote(Vote::new(
&keypair.pubkey(),
test_tx(),
timestamp(),
)));
v = CrdsValue::new_unsigned(CrdsData::Vote(
0,
Vote::new(&keypair.pubkey(), test_tx(), timestamp()),
));
verify_signatures(&mut v, &keypair, &wrong_keypair);
let btreeset: BTreeSet<u64> = vec![1, 2, 3, 6, 8].into_iter().collect();
v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new(
@ -249,7 +303,64 @@ mod test {
verify_signatures(&mut v, &keypair, &wrong_keypair);
}
fn test_serialize_deserialize_value(value: &mut CrdsValue, keypair: &Keypair) {
#[test]
fn test_max_vote_index() {
let keypair = Keypair::new();
let vote = CrdsValue::new_signed(
CrdsData::Vote(
MAX_VOTES,
Vote::new(&keypair.pubkey(), test_tx(), timestamp()),
),
&keypair,
);
assert!(!vote.verify());
}
#[test]
fn test_compute_vote_index_empty() {
for i in 0..MAX_VOTES {
let votes = vec![];
assert!(CrdsValue::compute_vote_index(i as usize, votes) < MAX_VOTES);
}
}
#[test]
fn test_compute_vote_index_one() {
let keypair = Keypair::new();
let vote = CrdsValue::new_unsigned(CrdsData::Vote(
0,
Vote::new(&keypair.pubkey(), test_tx(), 0),
));
for i in 0..MAX_VOTES {
let votes = vec![&vote];
assert!(CrdsValue::compute_vote_index(i as usize, votes) > 0);
let votes = vec![&vote];
assert!(CrdsValue::compute_vote_index(i as usize, votes) < MAX_VOTES);
}
}
#[test]
fn test_compute_vote_index_full() {
let keypair = Keypair::new();
let votes: Vec<_> = (0..MAX_VOTES)
.map(|x| {
CrdsValue::new_unsigned(CrdsData::Vote(
x,
Vote::new(&keypair.pubkey(), test_tx(), x as u64),
))
})
.collect();
let vote_refs = votes.iter().collect();
//pick the oldest vote when full
assert_eq!(CrdsValue::compute_vote_index(31, vote_refs), 0);
//pick the index
let vote_refs = votes.iter().collect();
assert_eq!(CrdsValue::compute_vote_index(0, vote_refs), 0);
let vote_refs = votes.iter().collect();
assert_eq!(CrdsValue::compute_vote_index(30, vote_refs), 30);
}
fn serialize_deserialize_value(value: &mut CrdsValue, keypair: &Keypair) {
let num_tries = 10;
value.sign(keypair);
let original_signature = value.get_signature();
@ -276,6 +387,6 @@ mod test {
assert!(value.verify());
value.sign(&wrong_keypair);
assert!(!value.verify());
test_serialize_deserialize_value(value, correct_keypair);
serialize_deserialize_value(value, correct_keypair);
}
}

View File

@ -493,7 +493,7 @@ impl ReplayStage {
T: 'static + KeypairUtil + Send + Sync,
{
trace!("handle votable bank {}", bank.slot());
let vote = tower.new_vote_from_bank(bank, vote_account);
let (vote, tower_index) = tower.new_vote_from_bank(bank, vote_account);
if let Some(new_root) = tower.record_bank_vote(vote) {
// get the root bank before squash
let root_bank = bank_forks
@ -539,7 +539,10 @@ impl ReplayStage {
let blockhash = bank.last_blockhash();
vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash);
vote_tx.partial_sign(&[voting_keypair.as_ref()], blockhash);
cluster_info.write().unwrap().push_vote(vote_tx);
cluster_info
.write()
.unwrap()
.push_vote(tower_index, vote_tx);
}
Ok(())
}

View File

@ -195,7 +195,7 @@ impl VoteState {
j -= 1;
}
if j == slot_hashes.len() {
warn!(
debug!(
"{} dropped vote {:?} too old: {:?} ",
self.node_pubkey, vote, slot_hashes
);

View File

@ -262,6 +262,10 @@ pub struct Bank {
/// (used to adjust cluster features over time)
#[serde(skip)]
entered_epoch_callback: Arc<RwLock<Option<EnteredEpochCallback>>>,
/// Last time when the cluster info vote listener has synced with this bank
#[serde(skip)]
pub last_vote_sync: AtomicU64,
}
impl Default for BlockhashQueue {
@ -352,6 +356,7 @@ impl Bank {
signature_count: AtomicU64::new(0),
message_processor: MessageProcessor::default(),
entered_epoch_callback: parent.entered_epoch_callback.clone(),
last_vote_sync: AtomicU64::new(parent.last_vote_sync.load(Ordering::Relaxed)),
};
datapoint_debug!(
@ -3347,7 +3352,6 @@ mod tests {
// Non-native loader accounts can not be used for instruction processing
bank.add_instruction_processor(mint_keypair.pubkey(), mock_ix_processor);
}
#[test]
fn test_recent_blockhashes_sysvar() {
let (genesis_block, _mint_keypair) = create_genesis_block(500);
@ -3365,4 +3369,16 @@ mod tests {
bank = Arc::new(new_from_parent(&bank));
}
}
#[test]
fn test_bank_inherit_last_vote_sync() {
let (genesis_block, _) = create_genesis_block(500);
let bank0 = Arc::new(Bank::new(&genesis_block));
let last_ts = bank0.last_vote_sync.load(Ordering::Relaxed);
assert_eq!(last_ts, 0);
bank0.last_vote_sync.store(1, Ordering::Relaxed);
let bank1 =
Bank::new_from_parent(&bank0, &Pubkey::default(), bank0.get_slots_in_epoch(0) - 1);
let last_ts = bank1.last_vote_sync.load(Ordering::Relaxed);
assert_eq!(last_ts, 1);
}
}