diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index ad59280c64..d89e3c580a 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,13 +12,12 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight -use crate::crds_value::CrdsValue; use crate::{ contact_info::ContactInfo, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, - crds_value::{CrdsData, CrdsValueLabel, EpochSlots, Vote}, + crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote}, packet::{to_shared_blob, Blob, Packet, SharedBlob}, repair_service::RepairType, result::{Error, Result}, @@ -320,10 +319,18 @@ impl ClusterInfo { .process_push_message(&self.id(), vec![entry], now); } - pub fn push_vote(&mut self, vote: Transaction) { + pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) { let now = timestamp(); let vote = Vote::new(&self.id(), vote, now); - let entry = CrdsValue::new_signed(CrdsData::Vote(vote), &self.keypair); + let current_votes: Vec<_> = (0..crds_value::MAX_VOTES) + .filter_map(|ix| { + self.gossip + .crds + .lookup(&CrdsValueLabel::Vote(ix, self.id())) + }) + .collect(); + let vote_ix = CrdsValue::compute_vote_index(tower_index, current_votes); + let entry = CrdsValue::new_signed(CrdsData::Vote(vote_ix, vote), &self.keypair); self.gossip .process_push_message(&self.id(), vec![entry], now); } @@ -2358,7 +2365,7 @@ mod tests { // add a vote let tx = test_tx(); - cluster_info.push_vote(tx.clone()); + cluster_info.push_vote(0, tx.clone()); // -1 to make sure that the clock is strictly lower then when insert occurred let (votes, max_ts) = cluster_info.get_votes(now - 1); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 51daab3a5d..ebbee4d24c 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -48,14 +48,15 @@ impl ClusterInfoVoteListener { sender: &CrossbeamSender>, poh_recorder: Arc>, ) -> Result<()> { - let mut last_ts = 0; loop { if exit.load(Ordering::Relaxed) { return Ok(()); } - let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts); - if poh_recorder.lock().unwrap().has_bank() { - last_ts = new_ts; + if let Some(bank) = poh_recorder.lock().unwrap().bank() { + let last_ts = bank.last_vote_sync.load(Ordering::Relaxed); + let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts); + bank.last_vote_sync + .compare_and_swap(last_ts, new_ts, Ordering::Relaxed); inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); let mut msgs = packet::to_packets(&votes); if !msgs.is_empty() { diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 22077fca2d..45754b9dab 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -173,7 +173,7 @@ impl Tower { slot: u64, hash: Hash, last_bank_slot: Option, - ) -> Vote { + ) -> (Vote, usize) { let mut local_vote_state = local_vote_state.clone(); let vote = Vote { slots: vec![slot], @@ -196,7 +196,7 @@ impl Tower { slots, local_vote_state.votes ); - Vote { slots, hash } + (Vote { slots, hash }, local_vote_state.votes.len() - 1) } fn last_bank_vote(bank: &Bank, vote_account_pubkey: &Pubkey) -> Option { @@ -205,7 +205,7 @@ impl Tower { bank_vote_state.votes.iter().map(|v| v.slot).last() } - pub fn new_vote_from_bank(&self, bank: &Bank, vote_account_pubkey: &Pubkey) -> Vote { + pub fn new_vote_from_bank(&self, bank: &Bank, vote_account_pubkey: &Pubkey) -> (Vote, usize) { let last_vote = Self::last_bank_vote(bank, vote_account_pubkey); Self::new_vote(&self.lockouts, bank.slot(), bank.hash(), last_vote) } @@ -794,14 +794,16 @@ mod test { fn test_new_vote() { let local = VoteState::default(); let vote = Tower::new_vote(&local, 0, Hash::default(), None); - assert_eq!(vote.slots, vec![0]); + assert_eq!(local.votes.len(), 0); + assert_eq!(vote.0.slots, vec![0]); + assert_eq!(vote.1, 0); } #[test] fn test_new_vote_dup_vote() { let local = VoteState::default(); let vote = Tower::new_vote(&local, 0, Hash::default(), Some(0)); - assert!(vote.slots.is_empty()); + assert!(vote.0.slots.is_empty()); } #[test] @@ -812,8 +814,25 @@ mod test { hash: Hash::default(), }; local.process_vote_unchecked(&vote); + assert_eq!(local.votes.len(), 1); let vote = Tower::new_vote(&local, 1, Hash::default(), Some(0)); - assert_eq!(vote.slots, vec![1]); + assert_eq!(vote.0.slots, vec![1]); + assert_eq!(vote.1, 1); + } + + #[test] + fn test_new_vote_next_after_expired_vote() { + let mut local = VoteState::default(); + let vote = Vote { + slots: vec![0], + hash: Hash::default(), + }; + local.process_vote_unchecked(&vote); + assert_eq!(local.votes.len(), 1); + let vote = Tower::new_vote(&local, 3, Hash::default(), Some(0)); + //first vote expired, so index should be 0 + assert_eq!(vote.0.slots, vec![3]); + assert_eq!(vote.1, 0); } #[test] diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 7adf3b9209..0e45922f22 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -3,10 +3,15 @@ use bincode::{serialize, serialized_size}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::transaction::Transaction; +use std::borrow::Borrow; use std::borrow::Cow; use std::collections::BTreeSet; +use std::collections::HashSet; use std::fmt; +pub type VoteIndex = u8; +pub const MAX_VOTES: VoteIndex = 32; + /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct CrdsValue { @@ -30,6 +35,17 @@ impl Signable for CrdsValue { fn set_signature(&mut self, signature: Signature) { self.signature = signature } + + fn verify(&self) -> bool { + let sig_check = self + .get_signature() + .verify(&self.pubkey().as_ref(), self.signable_data().borrow()); + let data_check = match &self.data { + CrdsData::Vote(ix, _) => *ix < MAX_VOTES, + _ => true, + }; + sig_check && data_check + } } /// CrdsData that defines the different types of items CrdsValues can hold @@ -39,7 +55,7 @@ pub enum CrdsData { /// * Merge Strategy - Latest wallclock is picked ContactInfo(ContactInfo), /// * Merge Strategy - Latest wallclock is picked - Vote(Vote), + Vote(VoteIndex, Vote), /// * Merge Strategy - Latest wallclock is picked EpochSlots(EpochSlots), } @@ -85,7 +101,7 @@ impl Vote { #[derive(PartialEq, Hash, Eq, Clone, Debug)] pub enum CrdsValueLabel { ContactInfo(Pubkey), - Vote(Pubkey), + Vote(VoteIndex, Pubkey), EpochSlots(Pubkey), } @@ -93,7 +109,7 @@ impl fmt::Display for CrdsValueLabel { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), - CrdsValueLabel::Vote(_) => write!(f, "Vote({})", self.pubkey()), + CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()), CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()), } } @@ -103,7 +119,7 @@ impl CrdsValueLabel { pub fn pubkey(&self) -> Pubkey { match self { CrdsValueLabel::ContactInfo(p) => *p, - CrdsValueLabel::Vote(p) => *p, + CrdsValueLabel::Vote(_, p) => *p, CrdsValueLabel::EpochSlots(p) => *p, } } @@ -128,21 +144,21 @@ impl CrdsValue { pub fn wallclock(&self) -> u64 { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.wallclock, - CrdsData::Vote(vote) => vote.wallclock, + CrdsData::Vote(_, vote) => vote.wallclock, CrdsData::EpochSlots(vote) => vote.wallclock, } } pub fn pubkey(&self) -> Pubkey { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.id, - CrdsData::Vote(vote) => vote.from, + CrdsData::Vote(_, vote) => vote.from, CrdsData::EpochSlots(slots) => slots.from, } } pub fn label(&self) -> CrdsValueLabel { match &self.data { CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()), - CrdsData::Vote(_) => CrdsValueLabel::Vote(self.pubkey()), + CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), CrdsData::EpochSlots(_) => CrdsValueLabel::EpochSlots(self.pubkey()), } } @@ -154,10 +170,18 @@ impl CrdsValue { } pub fn vote(&self) -> Option<&Vote> { match &self.data { - CrdsData::Vote(vote) => Some(vote), + CrdsData::Vote(_, vote) => Some(vote), _ => None, } } + + pub fn vote_index(&self) -> Option { + match &self.data { + CrdsData::Vote(ix, _) => Some(*ix), + _ => None, + } + } + pub fn epoch_slots(&self) -> Option<&EpochSlots> { match &self.data { CrdsData::EpochSlots(slots) => Some(slots), @@ -165,18 +189,46 @@ impl CrdsValue { } } /// Return all the possible labels for a record identified by Pubkey. - pub fn record_labels(key: &Pubkey) -> [CrdsValueLabel; 3] { - [ + pub fn record_labels(key: &Pubkey) -> Vec { + let mut labels = vec![ CrdsValueLabel::ContactInfo(*key), - CrdsValueLabel::Vote(*key), CrdsValueLabel::EpochSlots(*key), - ] + ]; + labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key))); + labels } /// Returns the size (in bytes) of a CrdsValue pub fn size(&self) -> u64 { serialized_size(&self).expect("unable to serialize contact info") } + + pub fn compute_vote_index(tower_index: usize, mut votes: Vec<&CrdsValue>) -> VoteIndex { + let mut available: HashSet = (0..MAX_VOTES).collect(); + votes.iter().filter_map(|v| v.vote_index()).for_each(|ix| { + available.remove(&ix); + }); + + // free index + if !available.is_empty() { + return *available.iter().next().unwrap(); + } + + assert!(votes.len() == MAX_VOTES as usize); + votes.sort_by_key(|v| v.vote().expect("all values must be votes").wallclock); + + // If Tower is full, oldest removed first + if tower_index + 1 == MAX_VOTES as usize { + return votes[0].vote_index().expect("all values must be votes"); + } + + // If Tower is not full, the early votes have expired + assert!(tower_index < MAX_VOTES as usize); + + votes[tower_index] + .vote_index() + .expect("all values must be votes") + } } #[cfg(test)] @@ -190,13 +242,13 @@ mod test { #[test] fn test_labels() { - let mut hits = [false; 3]; + let mut hits = [false; 2 + MAX_VOTES as usize]; // this method should cover all the possible labels for v in &CrdsValue::record_labels(&Pubkey::default()) { match v { CrdsValueLabel::ContactInfo(_) => hits[0] = true, - CrdsValueLabel::Vote(_) => hits[1] = true, - CrdsValueLabel::EpochSlots(_) => hits[2] = true, + CrdsValueLabel::EpochSlots(_) => hits[1] = true, + CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 2] = true, } } assert!(hits.iter().all(|x| *x)); @@ -208,11 +260,13 @@ mod test { let key = v.clone().contact_info().unwrap().id; assert_eq!(v.label(), CrdsValueLabel::ContactInfo(key)); - let v = - CrdsValue::new_unsigned(CrdsData::Vote(Vote::new(&Pubkey::default(), test_tx(), 0))); + let v = CrdsValue::new_unsigned(CrdsData::Vote( + 0, + Vote::new(&Pubkey::default(), test_tx(), 0), + )); assert_eq!(v.wallclock(), 0); let key = v.clone().vote().unwrap().from; - assert_eq!(v.label(), CrdsValueLabel::Vote(key)); + assert_eq!(v.label(), CrdsValueLabel::Vote(0, key)); let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( Pubkey::default(), @@ -224,6 +278,7 @@ mod test { let key = v.clone().epoch_slots().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key)); } + #[test] fn test_signature() { let keypair = Keypair::new(); @@ -233,11 +288,10 @@ mod test { timestamp(), ))); verify_signatures(&mut v, &keypair, &wrong_keypair); - v = CrdsValue::new_unsigned(CrdsData::Vote(Vote::new( - &keypair.pubkey(), - test_tx(), - timestamp(), - ))); + v = CrdsValue::new_unsigned(CrdsData::Vote( + 0, + Vote::new(&keypair.pubkey(), test_tx(), timestamp()), + )); verify_signatures(&mut v, &keypair, &wrong_keypair); let btreeset: BTreeSet = vec![1, 2, 3, 6, 8].into_iter().collect(); v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( @@ -249,7 +303,64 @@ mod test { verify_signatures(&mut v, &keypair, &wrong_keypair); } - fn test_serialize_deserialize_value(value: &mut CrdsValue, keypair: &Keypair) { + #[test] + fn test_max_vote_index() { + let keypair = Keypair::new(); + let vote = CrdsValue::new_signed( + CrdsData::Vote( + MAX_VOTES, + Vote::new(&keypair.pubkey(), test_tx(), timestamp()), + ), + &keypair, + ); + assert!(!vote.verify()); + } + + #[test] + fn test_compute_vote_index_empty() { + for i in 0..MAX_VOTES { + let votes = vec![]; + assert!(CrdsValue::compute_vote_index(i as usize, votes) < MAX_VOTES); + } + } + + #[test] + fn test_compute_vote_index_one() { + let keypair = Keypair::new(); + let vote = CrdsValue::new_unsigned(CrdsData::Vote( + 0, + Vote::new(&keypair.pubkey(), test_tx(), 0), + )); + for i in 0..MAX_VOTES { + let votes = vec![&vote]; + assert!(CrdsValue::compute_vote_index(i as usize, votes) > 0); + let votes = vec![&vote]; + assert!(CrdsValue::compute_vote_index(i as usize, votes) < MAX_VOTES); + } + } + + #[test] + fn test_compute_vote_index_full() { + let keypair = Keypair::new(); + let votes: Vec<_> = (0..MAX_VOTES) + .map(|x| { + CrdsValue::new_unsigned(CrdsData::Vote( + x, + Vote::new(&keypair.pubkey(), test_tx(), x as u64), + )) + }) + .collect(); + let vote_refs = votes.iter().collect(); + //pick the oldest vote when full + assert_eq!(CrdsValue::compute_vote_index(31, vote_refs), 0); + //pick the index + let vote_refs = votes.iter().collect(); + assert_eq!(CrdsValue::compute_vote_index(0, vote_refs), 0); + let vote_refs = votes.iter().collect(); + assert_eq!(CrdsValue::compute_vote_index(30, vote_refs), 30); + } + + fn serialize_deserialize_value(value: &mut CrdsValue, keypair: &Keypair) { let num_tries = 10; value.sign(keypair); let original_signature = value.get_signature(); @@ -276,6 +387,6 @@ mod test { assert!(value.verify()); value.sign(&wrong_keypair); assert!(!value.verify()); - test_serialize_deserialize_value(value, correct_keypair); + serialize_deserialize_value(value, correct_keypair); } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 913d987226..010441597b 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -493,7 +493,7 @@ impl ReplayStage { T: 'static + KeypairUtil + Send + Sync, { trace!("handle votable bank {}", bank.slot()); - let vote = tower.new_vote_from_bank(bank, vote_account); + let (vote, tower_index) = tower.new_vote_from_bank(bank, vote_account); if let Some(new_root) = tower.record_bank_vote(vote) { // get the root bank before squash let root_bank = bank_forks @@ -539,7 +539,10 @@ impl ReplayStage { let blockhash = bank.last_blockhash(); vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[voting_keypair.as_ref()], blockhash); - cluster_info.write().unwrap().push_vote(vote_tx); + cluster_info + .write() + .unwrap() + .push_vote(tower_index, vote_tx); } Ok(()) } diff --git a/programs/vote_api/src/vote_state.rs b/programs/vote_api/src/vote_state.rs index 8ecf5ce232..1a1a1aaeba 100644 --- a/programs/vote_api/src/vote_state.rs +++ b/programs/vote_api/src/vote_state.rs @@ -195,7 +195,7 @@ impl VoteState { j -= 1; } if j == slot_hashes.len() { - warn!( + debug!( "{} dropped vote {:?} too old: {:?} ", self.node_pubkey, vote, slot_hashes ); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index b6fc1cef52..b141f8ad6b 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -262,6 +262,10 @@ pub struct Bank { /// (used to adjust cluster features over time) #[serde(skip)] entered_epoch_callback: Arc>>, + + /// Last time when the cluster info vote listener has synced with this bank + #[serde(skip)] + pub last_vote_sync: AtomicU64, } impl Default for BlockhashQueue { @@ -352,6 +356,7 @@ impl Bank { signature_count: AtomicU64::new(0), message_processor: MessageProcessor::default(), entered_epoch_callback: parent.entered_epoch_callback.clone(), + last_vote_sync: AtomicU64::new(parent.last_vote_sync.load(Ordering::Relaxed)), }; datapoint_debug!( @@ -3347,7 +3352,6 @@ mod tests { // Non-native loader accounts can not be used for instruction processing bank.add_instruction_processor(mint_keypair.pubkey(), mock_ix_processor); } - #[test] fn test_recent_blockhashes_sysvar() { let (genesis_block, _mint_keypair) = create_genesis_block(500); @@ -3365,4 +3369,16 @@ mod tests { bank = Arc::new(new_from_parent(&bank)); } } + #[test] + fn test_bank_inherit_last_vote_sync() { + let (genesis_block, _) = create_genesis_block(500); + let bank0 = Arc::new(Bank::new(&genesis_block)); + let last_ts = bank0.last_vote_sync.load(Ordering::Relaxed); + assert_eq!(last_ts, 0); + bank0.last_vote_sync.store(1, Ordering::Relaxed); + let bank1 = + Bank::new_from_parent(&bank0, &Pubkey::default(), bank0.get_slots_in_epoch(0) - 1); + let last_ts = bank1.last_vote_sync.load(Ordering::Relaxed); + assert_eq!(last_ts, 1); + } }