diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index b14b375d6d..c72f049301 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,6 +1,6 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, - consensus::VOTE_THRESHOLD_SIZE, + commitment::VOTE_THRESHOLD_SIZE, crds_value::CrdsValueLabel, poh_recorder::PohRecorder, pubkey_references::LockedPubkeyReferences, diff --git a/core/src/commitment.rs b/core/src/commitment.rs index a26be76655..5717e720ce 100644 --- a/core/src/commitment.rs +++ b/core/src/commitment.rs @@ -1,26 +1,10 @@ -use crate::{consensus::VOTE_THRESHOLD_SIZE, rpc_subscriptions::RpcSubscriptions}; use solana_ledger::blockstore::Blockstore; -use solana_measure::measure::Measure; -use solana_metrics::datapoint_info; use solana_runtime::bank::Bank; use solana_sdk::clock::Slot; -use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY}; -use std::{ - collections::HashMap, - sync::atomic::{AtomicBool, Ordering}, - sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}, - sync::{Arc, RwLock}, - thread::{self, Builder, JoinHandle}, - time::Duration, -}; +use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; +use std::{collections::HashMap, sync::Arc}; -#[derive(Default)] -pub struct CacheSlotInfo { - pub current_slot: Slot, - pub node_root: Slot, - pub largest_confirmed_root: Slot, - pub highest_confirmed_slot: Slot, -} +pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1]; @@ -59,9 +43,9 @@ pub struct BlockCommitmentCache { largest_confirmed_root: Slot, total_stake: u64, bank: Arc, - blockstore: Arc, + pub blockstore: Arc, root: Slot, - highest_confirmed_slot: Slot, + pub highest_confirmed_slot: Slot, } impl std::fmt::Debug for BlockCommitmentCache { @@ -151,7 +135,7 @@ impl BlockCommitmentCache { self.root } - fn calculate_highest_confirmed_slot(&self) -> Slot { + pub fn calculate_highest_confirmed_slot(&self) -> Slot { self.highest_slot_with_confirmation_count(1) } @@ -219,222 +203,11 @@ impl BlockCommitmentCache { } } -pub struct CommitmentAggregationData { - bank: Arc, - root: Slot, - total_staked: u64, -} - -impl CommitmentAggregationData { - pub fn new(bank: Arc, root: Slot, total_staked: u64) -> Self { - Self { - bank, - root, - total_staked, - } - } -} - -fn get_largest_confirmed_root(mut rooted_stake: Vec<(Slot, u64)>, total_stake: u64) -> Slot { - rooted_stake.sort_by(|a, b| a.0.cmp(&b.0).reverse()); - let mut stake_sum = 0; - for (root, stake) in rooted_stake { - stake_sum += stake; - if (stake_sum as f64 / total_stake as f64) > VOTE_THRESHOLD_SIZE { - return root; - } - } - 0 -} - -pub struct AggregateCommitmentService { - t_commitment: JoinHandle<()>, -} - -impl AggregateCommitmentService { - pub fn new( - exit: &Arc, - block_commitment_cache: Arc>, - subscriptions: Arc, - ) -> (Sender, Self) { - let (sender, receiver): ( - Sender, - Receiver, - ) = channel(); - let exit_ = exit.clone(); - ( - sender, - Self { - t_commitment: Builder::new() - .name("solana-aggregate-stake-lockouts".to_string()) - .spawn(move || loop { - if exit_.load(Ordering::Relaxed) { - break; - } - - if let Err(RecvTimeoutError::Disconnected) = - Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit_) - { - break; - } - }) - .unwrap(), - }, - ) - } - - fn run( - receiver: &Receiver, - block_commitment_cache: &RwLock, - subscriptions: &Arc, - exit: &Arc, - ) -> Result<(), RecvTimeoutError> { - loop { - if exit.load(Ordering::Relaxed) { - return Ok(()); - } - - let mut aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?; - - while let Ok(new_data) = receiver.try_recv() { - aggregation_data = new_data; - } - - let ancestors = aggregation_data.bank.status_cache_ancestors(); - if ancestors.is_empty() { - continue; - } - - let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms"); - let (block_commitment, rooted_stake) = - Self::aggregate_commitment(&ancestors, &aggregation_data.bank); - - let largest_confirmed_root = - get_largest_confirmed_root(rooted_stake, aggregation_data.total_staked); - - let mut new_block_commitment = BlockCommitmentCache::new( - block_commitment, - largest_confirmed_root, - aggregation_data.total_staked, - aggregation_data.bank, - block_commitment_cache.read().unwrap().blockstore.clone(), - aggregation_data.root, - aggregation_data.root, - ); - new_block_commitment.highest_confirmed_slot = - new_block_commitment.calculate_highest_confirmed_slot(); - - let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); - - std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); - aggregate_commitment_time.stop(); - datapoint_info!( - "block-commitment-cache", - ( - "aggregate-commitment-ms", - aggregate_commitment_time.as_ms() as i64, - i64 - ) - ); - - subscriptions.notify_subscribers(CacheSlotInfo { - current_slot: w_block_commitment_cache.slot(), - node_root: w_block_commitment_cache.root(), - largest_confirmed_root: w_block_commitment_cache.largest_confirmed_root(), - highest_confirmed_slot: w_block_commitment_cache.highest_confirmed_slot(), - }); - } - } - - pub fn aggregate_commitment( - ancestors: &[Slot], - bank: &Bank, - ) -> (HashMap, Vec<(Slot, u64)>) { - assert!(!ancestors.is_empty()); - - // Check ancestors is sorted - for a in ancestors.windows(2) { - assert!(a[0] < a[1]); - } - - let mut commitment = HashMap::new(); - let mut rooted_stake: Vec<(Slot, u64)> = Vec::new(); - for (_, (lamports, account)) in bank.vote_accounts().into_iter() { - if lamports == 0 { - continue; - } - let vote_state = VoteState::from(&account); - if vote_state.is_none() { - continue; - } - - let vote_state = vote_state.unwrap(); - Self::aggregate_commitment_for_vote_account( - &mut commitment, - &mut rooted_stake, - &vote_state, - ancestors, - lamports, - ); - } - - (commitment, rooted_stake) - } - - fn aggregate_commitment_for_vote_account( - commitment: &mut HashMap, - rooted_stake: &mut Vec<(Slot, u64)>, - vote_state: &VoteState, - ancestors: &[Slot], - lamports: u64, - ) { - assert!(!ancestors.is_empty()); - let mut ancestors_index = 0; - if let Some(root) = vote_state.root_slot { - for (i, a) in ancestors.iter().enumerate() { - if *a <= root { - commitment - .entry(*a) - .or_insert_with(BlockCommitment::default) - .increase_rooted_stake(lamports); - } else { - ancestors_index = i; - break; - } - } - rooted_stake.push((root, lamports)); - } - - for vote in &vote_state.votes { - while ancestors[ancestors_index] <= vote.slot { - commitment - .entry(ancestors[ancestors_index]) - .or_insert_with(BlockCommitment::default) - .increase_confirmation_stake(vote.confirmation_count as usize, lamports); - ancestors_index += 1; - - if ancestors_index == ancestors.len() { - return; - } - } - } - } - - pub fn join(self) -> thread::Result<()> { - self.t_commitment.join() - } -} - #[cfg(test)] mod tests { use super::*; - use solana_ledger::{ - genesis_utils::{create_genesis_config, GenesisConfigInfo}, - get_tmp_ledger_path, - }; + use solana_ledger::get_tmp_ledger_path; use solana_sdk::{genesis_config::GenesisConfig, pubkey::Pubkey}; - use solana_stake_program::stake_state; - use solana_vote_program::vote_state::{self, VoteStateVersions}; #[test] fn test_block_commitment() { @@ -512,21 +285,6 @@ mod tests { assert!(!block_commitment_cache.is_confirmed_rooted(3)); } - #[test] - fn test_get_largest_confirmed_root() { - assert_eq!(get_largest_confirmed_root(vec![], 10), 0); - let mut rooted_stake = vec![]; - rooted_stake.push((0, 5)); - rooted_stake.push((1, 5)); - assert_eq!(get_largest_confirmed_root(rooted_stake, 10), 0); - let mut rooted_stake = vec![]; - rooted_stake.push((1, 5)); - rooted_stake.push((0, 10)); - rooted_stake.push((2, 5)); - rooted_stake.push((1, 4)); - assert_eq!(get_largest_confirmed_root(rooted_stake, 10), 1); - } - #[test] fn test_highest_confirmed_slot() { let bank = Arc::new(Bank::new(&GenesisConfig::default())); @@ -634,211 +392,4 @@ mod tests { assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 0); } - - #[test] - fn test_aggregate_commitment_for_vote_account_1() { - let ancestors = vec![3, 4, 5, 7, 9, 11]; - let mut commitment = HashMap::new(); - let mut rooted_stake = vec![]; - let lamports = 5; - let mut vote_state = VoteState::default(); - - let root = *ancestors.last().unwrap(); - vote_state.root_slot = Some(root); - AggregateCommitmentService::aggregate_commitment_for_vote_account( - &mut commitment, - &mut rooted_stake, - &vote_state, - &ancestors, - lamports, - ); - - for a in ancestors { - let mut expected = BlockCommitment::default(); - expected.increase_rooted_stake(lamports); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } - assert_eq!(rooted_stake[0], (root, lamports)); - } - - #[test] - fn test_aggregate_commitment_for_vote_account_2() { - let ancestors = vec![3, 4, 5, 7, 9, 11]; - let mut commitment = HashMap::new(); - let mut rooted_stake = vec![]; - let lamports = 5; - let mut vote_state = VoteState::default(); - - let root = ancestors[2]; - vote_state.root_slot = Some(root); - vote_state.process_slot_vote_unchecked(*ancestors.last().unwrap()); - AggregateCommitmentService::aggregate_commitment_for_vote_account( - &mut commitment, - &mut rooted_stake, - &vote_state, - &ancestors, - lamports, - ); - - for a in ancestors { - if a <= root { - let mut expected = BlockCommitment::default(); - expected.increase_rooted_stake(lamports); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } else { - let mut expected = BlockCommitment::default(); - expected.increase_confirmation_stake(1, lamports); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } - } - assert_eq!(rooted_stake[0], (root, lamports)); - } - - #[test] - fn test_aggregate_commitment_for_vote_account_3() { - let ancestors = vec![3, 4, 5, 7, 9, 10, 11]; - let mut commitment = HashMap::new(); - let mut rooted_stake = vec![]; - let lamports = 5; - let mut vote_state = VoteState::default(); - - let root = ancestors[2]; - vote_state.root_slot = Some(root); - assert!(ancestors[4] + 2 >= ancestors[6]); - vote_state.process_slot_vote_unchecked(ancestors[4]); - vote_state.process_slot_vote_unchecked(ancestors[6]); - AggregateCommitmentService::aggregate_commitment_for_vote_account( - &mut commitment, - &mut rooted_stake, - &vote_state, - &ancestors, - lamports, - ); - - for (i, a) in ancestors.iter().enumerate() { - if *a <= root { - let mut expected = BlockCommitment::default(); - expected.increase_rooted_stake(lamports); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } else if i <= 4 { - let mut expected = BlockCommitment::default(); - expected.increase_confirmation_stake(2, lamports); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } else if i <= 6 { - let mut expected = BlockCommitment::default(); - expected.increase_confirmation_stake(1, lamports); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } - } - assert_eq!(rooted_stake[0], (root, lamports)); - } - - #[test] - fn test_aggregate_commitment_validity() { - let ancestors = vec![3, 4, 5, 7, 9, 10, 11]; - let GenesisConfigInfo { - mut genesis_config, .. - } = create_genesis_config(10_000); - - let rooted_stake_amount = 40; - - let sk1 = Pubkey::new_rand(); - let pk1 = Pubkey::new_rand(); - let mut vote_account1 = vote_state::create_account(&pk1, &Pubkey::new_rand(), 0, 100); - let stake_account1 = - stake_state::create_account(&sk1, &pk1, &vote_account1, &genesis_config.rent, 100); - let sk2 = Pubkey::new_rand(); - let pk2 = Pubkey::new_rand(); - let mut vote_account2 = vote_state::create_account(&pk2, &Pubkey::new_rand(), 0, 50); - let stake_account2 = - stake_state::create_account(&sk2, &pk2, &vote_account2, &genesis_config.rent, 50); - let sk3 = Pubkey::new_rand(); - let pk3 = Pubkey::new_rand(); - let mut vote_account3 = vote_state::create_account(&pk3, &Pubkey::new_rand(), 0, 1); - let stake_account3 = stake_state::create_account( - &sk3, - &pk3, - &vote_account3, - &genesis_config.rent, - rooted_stake_amount, - ); - let sk4 = Pubkey::new_rand(); - let pk4 = Pubkey::new_rand(); - let mut vote_account4 = vote_state::create_account(&pk4, &Pubkey::new_rand(), 0, 1); - let stake_account4 = stake_state::create_account( - &sk4, - &pk4, - &vote_account4, - &genesis_config.rent, - rooted_stake_amount, - ); - - genesis_config.accounts.extend(vec![ - (pk1, vote_account1.clone()), - (sk1, stake_account1), - (pk2, vote_account2.clone()), - (sk2, stake_account2), - (pk3, vote_account3.clone()), - (sk3, stake_account3), - (pk4, vote_account4.clone()), - (sk4, stake_account4), - ]); - - // Create bank - let bank = Arc::new(Bank::new(&genesis_config)); - - let mut vote_state1 = VoteState::from(&vote_account1).unwrap(); - vote_state1.process_slot_vote_unchecked(3); - vote_state1.process_slot_vote_unchecked(5); - let versioned = VoteStateVersions::Current(Box::new(vote_state1)); - VoteState::to(&versioned, &mut vote_account1).unwrap(); - bank.store_account(&pk1, &vote_account1); - - let mut vote_state2 = VoteState::from(&vote_account2).unwrap(); - vote_state2.process_slot_vote_unchecked(9); - vote_state2.process_slot_vote_unchecked(10); - let versioned = VoteStateVersions::Current(Box::new(vote_state2)); - VoteState::to(&versioned, &mut vote_account2).unwrap(); - bank.store_account(&pk2, &vote_account2); - - let mut vote_state3 = VoteState::from(&vote_account3).unwrap(); - vote_state3.root_slot = Some(1); - let versioned = VoteStateVersions::Current(Box::new(vote_state3)); - VoteState::to(&versioned, &mut vote_account3).unwrap(); - bank.store_account(&pk3, &vote_account3); - - let mut vote_state4 = VoteState::from(&vote_account4).unwrap(); - vote_state4.root_slot = Some(2); - let versioned = VoteStateVersions::Current(Box::new(vote_state4)); - VoteState::to(&versioned, &mut vote_account4).unwrap(); - bank.store_account(&pk4, &vote_account4); - - let (commitment, rooted_stake) = - AggregateCommitmentService::aggregate_commitment(&ancestors, &bank); - - for a in ancestors { - if a <= 3 { - let mut expected = BlockCommitment::default(); - expected.increase_confirmation_stake(2, 150); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } else if a <= 5 { - let mut expected = BlockCommitment::default(); - expected.increase_confirmation_stake(1, 100); - expected.increase_confirmation_stake(2, 50); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } else if a <= 9 { - let mut expected = BlockCommitment::default(); - expected.increase_confirmation_stake(2, 50); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } else if a <= 10 { - let mut expected = BlockCommitment::default(); - expected.increase_confirmation_stake(1, 50); - assert_eq!(*commitment.get(&a).unwrap(), expected); - } else { - assert!(commitment.get(&a).is_none()); - } - } - assert_eq!(rooted_stake.len(), 2); - assert_eq!(get_largest_confirmed_root(rooted_stake, 100), 1) - } } diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs new file mode 100644 index 0000000000..6eda84ae5b --- /dev/null +++ b/core/src/commitment_service.rs @@ -0,0 +1,454 @@ +use crate::{ + commitment::{BlockCommitment, BlockCommitmentCache, VOTE_THRESHOLD_SIZE}, + rpc_subscriptions::{CacheSlotInfo, RpcSubscriptions}, +}; +use solana_measure::measure::Measure; +use solana_metrics::datapoint_info; +use solana_runtime::bank::Bank; +use solana_sdk::clock::Slot; +use solana_vote_program::vote_state::VoteState; +use std::{ + collections::HashMap, + sync::atomic::{AtomicBool, Ordering}, + sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub struct CommitmentAggregationData { + bank: Arc, + root: Slot, + total_staked: u64, +} + +impl CommitmentAggregationData { + pub fn new(bank: Arc, root: Slot, total_staked: u64) -> Self { + Self { + bank, + root, + total_staked, + } + } +} + +fn get_largest_confirmed_root(mut rooted_stake: Vec<(Slot, u64)>, total_stake: u64) -> Slot { + rooted_stake.sort_by(|a, b| a.0.cmp(&b.0).reverse()); + let mut stake_sum = 0; + for (root, stake) in rooted_stake { + stake_sum += stake; + if (stake_sum as f64 / total_stake as f64) > VOTE_THRESHOLD_SIZE { + return root; + } + } + 0 +} + +pub struct AggregateCommitmentService { + t_commitment: JoinHandle<()>, +} + +impl AggregateCommitmentService { + pub fn new( + exit: &Arc, + block_commitment_cache: Arc>, + subscriptions: Arc, + ) -> (Sender, Self) { + let (sender, receiver): ( + Sender, + Receiver, + ) = channel(); + let exit_ = exit.clone(); + ( + sender, + Self { + t_commitment: Builder::new() + .name("solana-aggregate-stake-lockouts".to_string()) + .spawn(move || loop { + if exit_.load(Ordering::Relaxed) { + break; + } + + if let Err(RecvTimeoutError::Disconnected) = + Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit_) + { + break; + } + }) + .unwrap(), + }, + ) + } + + fn run( + receiver: &Receiver, + block_commitment_cache: &RwLock, + subscriptions: &Arc, + exit: &Arc, + ) -> Result<(), RecvTimeoutError> { + loop { + if exit.load(Ordering::Relaxed) { + return Ok(()); + } + + let mut aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?; + + while let Ok(new_data) = receiver.try_recv() { + aggregation_data = new_data; + } + + let ancestors = aggregation_data.bank.status_cache_ancestors(); + if ancestors.is_empty() { + continue; + } + + let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms"); + let (block_commitment, rooted_stake) = + Self::aggregate_commitment(&ancestors, &aggregation_data.bank); + + let largest_confirmed_root = + get_largest_confirmed_root(rooted_stake, aggregation_data.total_staked); + + let mut new_block_commitment = BlockCommitmentCache::new( + block_commitment, + largest_confirmed_root, + aggregation_data.total_staked, + aggregation_data.bank, + block_commitment_cache.read().unwrap().blockstore.clone(), + aggregation_data.root, + aggregation_data.root, + ); + new_block_commitment.highest_confirmed_slot = + new_block_commitment.calculate_highest_confirmed_slot(); + + let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); + + std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + aggregate_commitment_time.stop(); + datapoint_info!( + "block-commitment-cache", + ( + "aggregate-commitment-ms", + aggregate_commitment_time.as_ms() as i64, + i64 + ) + ); + + subscriptions.notify_subscribers(CacheSlotInfo { + current_slot: w_block_commitment_cache.slot(), + node_root: w_block_commitment_cache.root(), + largest_confirmed_root: w_block_commitment_cache.largest_confirmed_root(), + highest_confirmed_slot: w_block_commitment_cache.highest_confirmed_slot(), + }); + } + } + + pub fn aggregate_commitment( + ancestors: &[Slot], + bank: &Bank, + ) -> (HashMap, Vec<(Slot, u64)>) { + assert!(!ancestors.is_empty()); + + // Check ancestors is sorted + for a in ancestors.windows(2) { + assert!(a[0] < a[1]); + } + + let mut commitment = HashMap::new(); + let mut rooted_stake: Vec<(Slot, u64)> = Vec::new(); + for (_, (lamports, account)) in bank.vote_accounts().into_iter() { + if lamports == 0 { + continue; + } + let vote_state = VoteState::from(&account); + if vote_state.is_none() { + continue; + } + + let vote_state = vote_state.unwrap(); + Self::aggregate_commitment_for_vote_account( + &mut commitment, + &mut rooted_stake, + &vote_state, + ancestors, + lamports, + ); + } + + (commitment, rooted_stake) + } + + fn aggregate_commitment_for_vote_account( + commitment: &mut HashMap, + rooted_stake: &mut Vec<(Slot, u64)>, + vote_state: &VoteState, + ancestors: &[Slot], + lamports: u64, + ) { + assert!(!ancestors.is_empty()); + let mut ancestors_index = 0; + if let Some(root) = vote_state.root_slot { + for (i, a) in ancestors.iter().enumerate() { + if *a <= root { + commitment + .entry(*a) + .or_insert_with(BlockCommitment::default) + .increase_rooted_stake(lamports); + } else { + ancestors_index = i; + break; + } + } + rooted_stake.push((root, lamports)); + } + + for vote in &vote_state.votes { + while ancestors[ancestors_index] <= vote.slot { + commitment + .entry(ancestors[ancestors_index]) + .or_insert_with(BlockCommitment::default) + .increase_confirmation_stake(vote.confirmation_count as usize, lamports); + ancestors_index += 1; + + if ancestors_index == ancestors.len() { + return; + } + } + } + } + + pub fn join(self) -> thread::Result<()> { + self.t_commitment.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; + use solana_sdk::pubkey::Pubkey; + use solana_stake_program::stake_state; + use solana_vote_program::vote_state::{self, VoteStateVersions}; + + #[test] + fn test_get_largest_confirmed_root() { + assert_eq!(get_largest_confirmed_root(vec![], 10), 0); + let mut rooted_stake = vec![]; + rooted_stake.push((0, 5)); + rooted_stake.push((1, 5)); + assert_eq!(get_largest_confirmed_root(rooted_stake, 10), 0); + let mut rooted_stake = vec![]; + rooted_stake.push((1, 5)); + rooted_stake.push((0, 10)); + rooted_stake.push((2, 5)); + rooted_stake.push((1, 4)); + assert_eq!(get_largest_confirmed_root(rooted_stake, 10), 1); + } + + #[test] + fn test_aggregate_commitment_for_vote_account_1() { + let ancestors = vec![3, 4, 5, 7, 9, 11]; + let mut commitment = HashMap::new(); + let mut rooted_stake = vec![]; + let lamports = 5; + let mut vote_state = VoteState::default(); + + let root = *ancestors.last().unwrap(); + vote_state.root_slot = Some(root); + AggregateCommitmentService::aggregate_commitment_for_vote_account( + &mut commitment, + &mut rooted_stake, + &vote_state, + &ancestors, + lamports, + ); + + for a in ancestors { + let mut expected = BlockCommitment::default(); + expected.increase_rooted_stake(lamports); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } + assert_eq!(rooted_stake[0], (root, lamports)); + } + + #[test] + fn test_aggregate_commitment_for_vote_account_2() { + let ancestors = vec![3, 4, 5, 7, 9, 11]; + let mut commitment = HashMap::new(); + let mut rooted_stake = vec![]; + let lamports = 5; + let mut vote_state = VoteState::default(); + + let root = ancestors[2]; + vote_state.root_slot = Some(root); + vote_state.process_slot_vote_unchecked(*ancestors.last().unwrap()); + AggregateCommitmentService::aggregate_commitment_for_vote_account( + &mut commitment, + &mut rooted_stake, + &vote_state, + &ancestors, + lamports, + ); + + for a in ancestors { + if a <= root { + let mut expected = BlockCommitment::default(); + expected.increase_rooted_stake(lamports); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } else { + let mut expected = BlockCommitment::default(); + expected.increase_confirmation_stake(1, lamports); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } + } + assert_eq!(rooted_stake[0], (root, lamports)); + } + + #[test] + fn test_aggregate_commitment_for_vote_account_3() { + let ancestors = vec![3, 4, 5, 7, 9, 10, 11]; + let mut commitment = HashMap::new(); + let mut rooted_stake = vec![]; + let lamports = 5; + let mut vote_state = VoteState::default(); + + let root = ancestors[2]; + vote_state.root_slot = Some(root); + assert!(ancestors[4] + 2 >= ancestors[6]); + vote_state.process_slot_vote_unchecked(ancestors[4]); + vote_state.process_slot_vote_unchecked(ancestors[6]); + AggregateCommitmentService::aggregate_commitment_for_vote_account( + &mut commitment, + &mut rooted_stake, + &vote_state, + &ancestors, + lamports, + ); + + for (i, a) in ancestors.iter().enumerate() { + if *a <= root { + let mut expected = BlockCommitment::default(); + expected.increase_rooted_stake(lamports); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } else if i <= 4 { + let mut expected = BlockCommitment::default(); + expected.increase_confirmation_stake(2, lamports); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } else if i <= 6 { + let mut expected = BlockCommitment::default(); + expected.increase_confirmation_stake(1, lamports); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } + } + assert_eq!(rooted_stake[0], (root, lamports)); + } + + #[test] + fn test_aggregate_commitment_validity() { + let ancestors = vec![3, 4, 5, 7, 9, 10, 11]; + let GenesisConfigInfo { + mut genesis_config, .. + } = create_genesis_config(10_000); + + let rooted_stake_amount = 40; + + let sk1 = Pubkey::new_rand(); + let pk1 = Pubkey::new_rand(); + let mut vote_account1 = vote_state::create_account(&pk1, &Pubkey::new_rand(), 0, 100); + let stake_account1 = + stake_state::create_account(&sk1, &pk1, &vote_account1, &genesis_config.rent, 100); + let sk2 = Pubkey::new_rand(); + let pk2 = Pubkey::new_rand(); + let mut vote_account2 = vote_state::create_account(&pk2, &Pubkey::new_rand(), 0, 50); + let stake_account2 = + stake_state::create_account(&sk2, &pk2, &vote_account2, &genesis_config.rent, 50); + let sk3 = Pubkey::new_rand(); + let pk3 = Pubkey::new_rand(); + let mut vote_account3 = vote_state::create_account(&pk3, &Pubkey::new_rand(), 0, 1); + let stake_account3 = stake_state::create_account( + &sk3, + &pk3, + &vote_account3, + &genesis_config.rent, + rooted_stake_amount, + ); + let sk4 = Pubkey::new_rand(); + let pk4 = Pubkey::new_rand(); + let mut vote_account4 = vote_state::create_account(&pk4, &Pubkey::new_rand(), 0, 1); + let stake_account4 = stake_state::create_account( + &sk4, + &pk4, + &vote_account4, + &genesis_config.rent, + rooted_stake_amount, + ); + + genesis_config.accounts.extend(vec![ + (pk1, vote_account1.clone()), + (sk1, stake_account1), + (pk2, vote_account2.clone()), + (sk2, stake_account2), + (pk3, vote_account3.clone()), + (sk3, stake_account3), + (pk4, vote_account4.clone()), + (sk4, stake_account4), + ]); + + // Create bank + let bank = Arc::new(Bank::new(&genesis_config)); + + let mut vote_state1 = VoteState::from(&vote_account1).unwrap(); + vote_state1.process_slot_vote_unchecked(3); + vote_state1.process_slot_vote_unchecked(5); + let versioned = VoteStateVersions::Current(Box::new(vote_state1)); + VoteState::to(&versioned, &mut vote_account1).unwrap(); + bank.store_account(&pk1, &vote_account1); + + let mut vote_state2 = VoteState::from(&vote_account2).unwrap(); + vote_state2.process_slot_vote_unchecked(9); + vote_state2.process_slot_vote_unchecked(10); + let versioned = VoteStateVersions::Current(Box::new(vote_state2)); + VoteState::to(&versioned, &mut vote_account2).unwrap(); + bank.store_account(&pk2, &vote_account2); + + let mut vote_state3 = VoteState::from(&vote_account3).unwrap(); + vote_state3.root_slot = Some(1); + let versioned = VoteStateVersions::Current(Box::new(vote_state3)); + VoteState::to(&versioned, &mut vote_account3).unwrap(); + bank.store_account(&pk3, &vote_account3); + + let mut vote_state4 = VoteState::from(&vote_account4).unwrap(); + vote_state4.root_slot = Some(2); + let versioned = VoteStateVersions::Current(Box::new(vote_state4)); + VoteState::to(&versioned, &mut vote_account4).unwrap(); + bank.store_account(&pk4, &vote_account4); + + let (commitment, rooted_stake) = + AggregateCommitmentService::aggregate_commitment(&ancestors, &bank); + + for a in ancestors { + if a <= 3 { + let mut expected = BlockCommitment::default(); + expected.increase_confirmation_stake(2, 150); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } else if a <= 5 { + let mut expected = BlockCommitment::default(); + expected.increase_confirmation_stake(1, 100); + expected.increase_confirmation_stake(2, 50); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } else if a <= 9 { + let mut expected = BlockCommitment::default(); + expected.increase_confirmation_stake(2, 50); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } else if a <= 10 { + let mut expected = BlockCommitment::default(); + expected.increase_confirmation_stake(1, 50); + assert_eq!(*commitment.get(&a).unwrap(), expected); + } else { + assert!(commitment.get(&a).is_none()); + } + } + assert_eq!(rooted_stake.len(), 2); + assert_eq!(get_largest_confirmed_root(rooted_stake, 100), 1) + } +} diff --git a/core/src/consensus.rs b/core/src/consensus.rs index c8f40c7bca..17359e6436 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1,4 +1,5 @@ use crate::{ + commitment::VOTE_THRESHOLD_SIZE, progress_map::{LockoutIntervals, ProgressMap}, pubkey_references::PubkeyReferences, }; @@ -58,7 +59,6 @@ impl SwitchForkDecision { } pub const VOTE_THRESHOLD_DEPTH: usize = 8; -pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; pub const SWITCH_FORK_THRESHOLD: f64 = 0.38; #[derive(Default, Debug, Clone)] diff --git a/core/src/lib.rs b/core/src/lib.rs index e67e33be88..aefff68d69 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -11,6 +11,7 @@ pub mod banking_stage; pub mod broadcast_stage; pub mod cluster_info_vote_listener; pub mod commitment; +pub mod commitment_service; mod deprecated; pub mod shred_fetch_stage; #[macro_use] diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 01ec5cb705..6847196412 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -3,7 +3,7 @@ use crate::{ cluster_info::ClusterInfo, cluster_slots::ClusterSlots, - consensus::VOTE_THRESHOLD_SIZE, + commitment::VOTE_THRESHOLD_SIZE, result::Result, serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, }; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c15d2708fa..12a0a190c4 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -6,7 +6,8 @@ use crate::{ cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, - commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData}, + commitment::BlockCommitmentCache, + commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, consensus::{ComputedBankState, StakeLockout, SwitchForkDecision, Tower}, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 7e30f94bc5..53e2cb88bf 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -355,8 +355,8 @@ mod tests { use super::*; use crate::{ cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, - commitment::{BlockCommitmentCache, CacheSlotInfo}, - rpc_subscriptions::tests::robust_poll_or_panic, + commitment::BlockCommitmentCache, + rpc_subscriptions::{tests::robust_poll_or_panic, CacheSlotInfo}, }; use crossbeam_channel::unbounded; use jsonrpc_core::{futures::sync::mpsc, Response}; diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 213ad776a2..01af82ab29 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -1,6 +1,6 @@ //! The `pubsub` module implements a threaded subscription service on client RPC request -use crate::commitment::{BlockCommitmentCache, CacheSlotInfo}; +use crate::commitment::BlockCommitmentCache; use core::hash::Hash; use jsonrpc_core::futures::Future; use jsonrpc_pubsub::{ @@ -44,6 +44,14 @@ pub struct SlotInfo { pub root: Slot, } +#[derive(Default)] +pub struct CacheSlotInfo { + pub current_slot: Slot, + pub node_root: Slot, + pub largest_confirmed_root: Slot, + pub highest_confirmed_slot: Slot, +} + // A more human-friendly version of Vote, with the bank state signature base58 encoded. #[derive(Serialize, Deserialize, Debug)] pub struct RpcVote {