diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 4e38ae532..75d01e312 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -47,12 +47,15 @@ use std::{ // Map from a vote account to the authorized voter for an epoch pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>; +pub type VotedHashUpdates = HashMap>; pub type VerifiedLabelVotePacketsSender = CrossbeamSender>; pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver>; pub type VerifiedVoteTransactionsSender = CrossbeamSender>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec)>; pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vec)>; +pub type GossipVerifiedVoteHashSender = CrossbeamSender<(Pubkey, Slot, Hash)>; +pub type GossipVerifiedVoteHashReceiver = CrossbeamReceiver<(Pubkey, Slot, Hash)>; pub type GossipDuplicateConfirmedSlotsSender = CrossbeamSender; pub type GossipDuplicateConfirmedSlotsReceiver = CrossbeamReceiver; @@ -65,14 +68,13 @@ pub struct SlotVoteTracker { // True if seen on gossip, false if only seen in replay. voted: HashMap, optimistic_votes_tracker: HashMap, - updates: Option>, + voted_slot_updates: Option>, gossip_only_stake: u64, } impl SlotVoteTracker { - #[allow(dead_code)] - pub fn get_updates(&mut self) -> Option> { - self.updates.take() + pub fn get_voted_slot_updates(&mut self) -> Option> { + self.voted_slot_updates.take() } pub fn get_or_insert_optimistic_votes_tracker(&mut self, hash: Hash) -> &mut VoteStakeTracker { @@ -119,7 +121,7 @@ impl VoteTracker { let new_slot_tracker = Arc::new(RwLock::new(SlotVoteTracker { voted: HashMap::new(), optimistic_votes_tracker: HashMap::default(), - updates: None, + voted_slot_updates: None, gossip_only_stake: 0, })); self.slot_vote_trackers @@ -170,10 +172,10 @@ impl VoteTracker { let mut w_slot_vote_tracker = slot_vote_tracker.write().unwrap(); w_slot_vote_tracker.voted.insert(pubkey, true); - if let Some(ref mut updates) = w_slot_vote_tracker.updates { - updates.push(pubkey) + if let Some(ref mut voted_slot_updates) = w_slot_vote_tracker.voted_slot_updates { + voted_slot_updates.push(pubkey) } else { - w_slot_vote_tracker.updates = Some(vec![pubkey]); + w_slot_vote_tracker.voted_slot_updates = Some(vec![pubkey]); } } @@ -249,6 +251,7 @@ impl ClusterInfoVoteListener { bank_forks: Arc>, subscriptions: Arc, verified_vote_sender: VerifiedVoteSender, + gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender, replay_votes_receiver: ReplayVoteReceiver, blockstore: Arc, bank_notification_sender: Option, @@ -295,6 +298,7 @@ impl ClusterInfoVoteListener { vote_tracker, bank_forks, subscriptions, + gossip_verified_vote_hash_sender, verified_vote_sender, replay_votes_receiver, blockstore, @@ -422,6 +426,7 @@ impl ClusterInfoVoteListener { vote_tracker: Arc, bank_forks: Arc>, subscriptions: Arc, + gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender, verified_vote_sender: VerifiedVoteSender, replay_votes_receiver: ReplayVoteReceiver, blockstore: Arc, @@ -457,6 +462,7 @@ impl ClusterInfoVoteListener { &vote_tracker, &root_bank, &subscriptions, + &gossip_verified_vote_hash_sender, &verified_vote_sender, &replay_votes_receiver, &bank_notification_sender, @@ -484,6 +490,7 @@ impl ClusterInfoVoteListener { vote_tracker: &VoteTracker, root_bank: &Bank, subscriptions: &RpcSubscriptions, + gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender, verified_vote_sender: &VerifiedVoteSender, replay_votes_receiver: &ReplayVoteReceiver, ) -> Result { @@ -492,6 +499,7 @@ impl ClusterInfoVoteListener { vote_tracker, root_bank, subscriptions, + gossip_verified_vote_hash_sender, verified_vote_sender, replay_votes_receiver, &None, @@ -504,6 +512,7 @@ impl ClusterInfoVoteListener { vote_tracker: &VoteTracker, root_bank: &Bank, subscriptions: &RpcSubscriptions, + gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender, verified_vote_sender: &VerifiedVoteSender, replay_votes_receiver: &ReplayVoteReceiver, bank_notification_sender: &Option, @@ -535,6 +544,7 @@ impl ClusterInfoVoteListener { replay_votes, root_bank, subscriptions, + gossip_verified_vote_hash_sender, verified_vote_sender, bank_notification_sender, cluster_confirmed_slot_sender, @@ -555,6 +565,7 @@ impl ClusterInfoVoteListener { root_bank: &Bank, subscriptions: &RpcSubscriptions, verified_vote_sender: &VerifiedVoteSender, + gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender, diff: &mut HashMap>, new_optimistic_confirmed_slots: &mut ThresholdConfirmedSlots, is_gossip_vote: bool, @@ -604,6 +615,14 @@ impl ClusterInfoVoteListener { total_stake, ); + if is_gossip_vote && is_new && stake > 0 { + let _ = gossip_verified_vote_hash_sender.send(( + *vote_pubkey, + last_vote_slot, + last_vote_hash, + )); + } + if reached_threshold_results[0] { if let Some(sender) = cluster_confirmed_slot_sender { let _ = sender.send(vec![(last_vote_slot, last_vote_hash)]); @@ -691,6 +710,7 @@ impl ClusterInfoVoteListener { replayed_votes: Vec, root_bank: &Bank, subscriptions: &RpcSubscriptions, + gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender, verified_vote_sender: &VerifiedVoteSender, bank_notification_sender: &Option, cluster_confirmed_slot_sender: &Option, @@ -717,6 +737,7 @@ impl ClusterInfoVoteListener { root_bank, subscriptions, verified_vote_sender, + gossip_verified_vote_hash_sender, &mut diff, &mut new_optimistic_confirmed_slots, is_gossip, @@ -742,8 +763,8 @@ impl ClusterInfoVoteListener { }); } let mut w_slot_tracker = slot_tracker.write().unwrap(); - if w_slot_tracker.updates.is_none() { - w_slot_tracker.updates = Some(vec![]); + if w_slot_tracker.voted_slot_updates.is_none() { + w_slot_tracker.voted_slot_updates = Some(vec![]); } let mut gossip_only_stake = 0; let epoch = root_bank.epoch_schedule().get_epoch(slot); @@ -764,7 +785,11 @@ impl ClusterInfoVoteListener { // `is_new || is_new_from_gossip`. In both cases we want to record // `is_new_from_gossip` for the `pubkey` entry. w_slot_tracker.voted.insert(pubkey, seen_in_gossip_above); - w_slot_tracker.updates.as_mut().unwrap().push(pubkey); + w_slot_tracker + .voted_slot_updates + .as_mut() + .unwrap() + .push(pubkey); } w_slot_tracker.gossip_only_stake += gossip_only_stake @@ -997,6 +1022,7 @@ mod tests { let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (votes_sender, votes_receiver) = unbounded(); let (verified_vote_sender, _verified_vote_receiver) = unbounded(); + let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded(); let GenesisConfigInfo { genesis_config, .. } = @@ -1027,6 +1053,7 @@ mod tests { &vote_tracker, &bank3, &subscriptions, + &gossip_verified_vote_hash_sender, &verified_vote_sender, &replay_votes_receiver, &None, @@ -1057,6 +1084,7 @@ mod tests { &vote_tracker, &bank3, &subscriptions, + &gossip_verified_vote_hash_sender, &verified_vote_sender, &replay_votes_receiver, &None, @@ -1109,6 +1137,7 @@ mod tests { let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (votes_txs_sender, votes_txs_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded(); + let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); let GenesisConfigInfo { genesis_config, .. } = @@ -1136,6 +1165,7 @@ mod tests { &vote_tracker, &bank0, &subscriptions, + &gossip_verified_vote_hash_sender, &verified_vote_sender, &replay_votes_receiver, &None, @@ -1143,11 +1173,44 @@ mod tests { ) .unwrap(); + let mut gossip_verified_votes: HashMap>> = HashMap::new(); + for (pubkey, slot, hash) in gossip_verified_vote_hash_receiver.try_iter() { + // send_vote_txs() will send each vote twice, but we should only get a notification + // once for each via this channel + let exists = gossip_verified_votes + .get(&slot) + .and_then(|slot_hashes| slot_hashes.get(&hash)) + .map(|slot_hash_voters| slot_hash_voters.contains(&pubkey)) + .unwrap_or(false); + assert!(!exists); + gossip_verified_votes + .entry(slot) + .or_default() + .entry(hash) + .or_default() + .push(pubkey); + } + + // Only the last vote in the `gossip_vote` set should count towards + // the `voted_hash_updates` set. Important to note here that replay votes + // should not count + let last_gossip_vote_slot = *gossip_vote_slots.last().unwrap(); + assert_eq!(gossip_verified_votes.len(), 1); + let slot_hashes = gossip_verified_votes.get(&last_gossip_vote_slot).unwrap(); + assert_eq!(slot_hashes.len(), 1); + let slot_hash_votes = slot_hashes.get(&Hash::default()).unwrap(); + assert_eq!(slot_hash_votes.len(), validator_voting_keypairs.len()); + for voting_keypairs in &validator_voting_keypairs { + let pubkey = voting_keypairs.vote_keypair.pubkey(); + assert!(slot_hash_votes.contains(&pubkey)); + } + // Check that the received votes were pushed to other commponents // subscribing via `verified_vote_receiver` let all_expected_slots: BTreeSet<_> = gossip_vote_slots + .clone() .into_iter() - .chain(replay_vote_slots.into_iter()) + .chain(replay_vote_slots.clone().into_iter()) .collect(); let mut pubkey_to_votes: HashMap> = HashMap::new(); for (received_pubkey, new_votes) in verified_vote_receiver.try_iter() { @@ -1175,15 +1238,17 @@ mod tests { let pubkey = voting_keypairs.vote_keypair.pubkey(); assert!(r_slot_vote_tracker.voted.contains_key(&pubkey)); assert!(r_slot_vote_tracker - .updates + .voted_slot_updates .as_ref() .unwrap() .contains(&Arc::new(pubkey))); - // Only the last vote in the stack of `gossip_votes` should count towards - // the `optimistic` vote set. + // Only the last vote in the stack of `gossip_vote` and `replay_vote_slots` + // should count towards the `optimistic` vote set, let optimistic_votes_tracker = r_slot_vote_tracker.optimistic_votes_tracker(&Hash::default()); - if vote_slot == 2 || vote_slot == 4 { + if vote_slot == *gossip_vote_slots.last().unwrap() + || vote_slot == *replay_vote_slots.last().unwrap() + { let optimistic_votes_tracker = optimistic_votes_tracker.unwrap(); assert!(optimistic_votes_tracker.voted().contains(&pubkey)); assert_eq!( @@ -1220,6 +1285,7 @@ mod tests { // Send some votes to process let (votes_txs_sender, votes_txs_receiver) = unbounded(); + let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (_replay_votes_sender, replay_votes_receiver) = unbounded(); @@ -1256,6 +1322,7 @@ mod tests { &vote_tracker, &bank0, &subscriptions, + &gossip_verified_vote_hash_sender, &verified_vote_sender, &replay_votes_receiver, &None, @@ -1281,7 +1348,7 @@ mod tests { let pubkey = voting_keypairs.vote_keypair.pubkey(); assert!(r_slot_vote_tracker.voted.contains_key(&pubkey)); assert!(r_slot_vote_tracker - .updates + .voted_slot_updates .as_ref() .unwrap() .contains(&Arc::new(pubkey))); @@ -1302,6 +1369,7 @@ mod tests { fn run_test_process_votes3(switch_proof_hash: Option) { let (votes_sender, votes_receiver) = unbounded(); let (verified_vote_sender, _verified_vote_receiver) = unbounded(); + let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded(); let vote_slot = 1; @@ -1352,6 +1420,7 @@ mod tests { &vote_tracker, &bank, &subscriptions, + &gossip_verified_vote_hash_sender, &verified_vote_sender, &replay_votes_receiver, &None, @@ -1487,6 +1556,7 @@ mod tests { )]; let (verified_vote_sender, _verified_vote_receiver) = unbounded(); + let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded(); ClusterInfoVoteListener::filter_and_confirm_with_new_votes( &vote_tracker, vote_tx, @@ -1498,6 +1568,7 @@ mod tests { )], &bank, &subscriptions, + &gossip_verified_vote_hash_sender, &verified_vote_sender, &None, &None, @@ -1553,6 +1624,7 @@ mod tests { )], &new_root_bank, &subscriptions, + &gossip_verified_vote_hash_sender, &verified_vote_sender, &None, &None, diff --git a/core/src/consensus.rs b/core/src/consensus.rs index b352e777b..32a41e3b4 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -1455,6 +1455,7 @@ pub mod test { None, &mut self.heaviest_subtree_fork_choice, &mut BTreeMap::new(), + &mut BTreeMap::new(), &mut true, &mut Vec::new(), ) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 002dfc4fc..c3899e0d7 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4,7 +4,9 @@ use crate::{ broadcast_stage::RetransmitSlotsSender, cache_block_time_service::CacheBlockTimeSender, cluster_info::ClusterInfo, - cluster_info_vote_listener::{GossipDuplicateConfirmedSlotsReceiver, VoteTracker}, + cluster_info_vote_listener::{ + GossipDuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VoteTracker, + }, cluster_slot_state_verifier::*, cluster_slots::ClusterSlots, commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, @@ -66,6 +68,8 @@ pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; const MAX_VOTE_SIGNATURES: usize = 200; +pub type GossipVerifiedVoteHashes = BTreeMap>>; + #[derive(PartialEq, Debug)] pub(crate) enum HeaviestForkFailures { LockedOut(u64), @@ -134,6 +138,7 @@ pub struct ReplayTiming { bank_count: u64, process_gossip_duplicate_confirmed_slots_elapsed: u64, process_duplicate_slots_elapsed: u64, + process_gossip_verified_vote_hashes_elapsed: u64, } impl ReplayTiming { #[allow(clippy::too_many_arguments)] @@ -153,6 +158,7 @@ impl ReplayTiming { heaviest_fork_failures_elapsed: u64, bank_count: u64, process_gossip_duplicate_confirmed_slots_elapsed: u64, + process_gossip_verified_vote_hashes_elapsed: u64, process_duplicate_slots_elapsed: u64, ) { self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed; @@ -170,6 +176,8 @@ impl ReplayTiming { self.bank_count += bank_count; self.process_gossip_duplicate_confirmed_slots_elapsed += process_gossip_duplicate_confirmed_slots_elapsed; + self.process_gossip_verified_vote_hashes_elapsed += + process_gossip_verified_vote_hashes_elapsed; self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed; let now = timestamp(); let elapsed_ms = now - self.last_print; @@ -224,6 +232,11 @@ impl ReplayTiming { self.process_gossip_duplicate_confirmed_slots_elapsed as i64, i64 ), + ( + "process_gossip_verified_vote_hashes_elapsed", + self.process_gossip_verified_vote_hashes_elapsed as i64, + i64 + ), ( "wait_receive_elapsed", self.wait_receive_elapsed as i64, @@ -270,6 +283,7 @@ impl ReplayStage { _duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, replay_vote_sender: ReplayVoteSender, gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, + gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, ) -> Self { let ReplayStageConfig { my_pubkey, @@ -316,6 +330,7 @@ impl ReplayStage { let mut skipped_slots_info = SkippedSlotsInfo::default(); let mut replay_timing = ReplayTiming::default(); let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = BTreeMap::new(); + let mut gossip_verified_vote_hashes: GossipVerifiedVoteHashes = BTreeMap::new(); let mut voted_signatures = Vec::new(); let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader; loop { @@ -386,6 +401,18 @@ impl ReplayStage { ); process_gossip_duplicate_confirmed_slots_time.stop(); + + // Ingest any new verified votes from gossip. Important for fork choice + // and switching proofs because these may be votes that haven't yet been + // included in a block, so we may not have yet observed these votes just + // by replaying blocks. + let mut process_gossip_verified_vote_hashes_time = Measure::start("process_gossip_duplicate_confirmed_slots"); + Self::process_gossip_verified_vote_hashes( + &gossip_verified_vote_hash_receiver, + &mut gossip_verified_vote_hashes, + ); + process_gossip_verified_vote_hashes_time.stop(); + // Check to remove any duplicated slots from fork choice let mut process_duplicate_slots_time = Measure::start("process_duplicate_slots"); if !tpu_has_bank { @@ -517,6 +544,7 @@ impl ReplayStage { &cache_block_time_sender, &bank_notification_sender, &mut gossip_duplicate_confirmed_slots, + &mut gossip_verified_vote_hashes, &mut voted_signatures, &mut has_new_vote_been_rooted, ); @@ -648,6 +676,7 @@ impl ReplayStage { heaviest_fork_failures_time.as_us(), if did_complete_bank {1} else {0}, process_gossip_duplicate_confirmed_slots_time.as_us(), + process_gossip_verified_vote_hashes_time.as_us(), process_duplicate_slots_time.as_us(), ); } @@ -880,6 +909,21 @@ impl ReplayStage { } } + fn process_gossip_verified_vote_hashes( + gossip_verified_vote_hash_receiver: &GossipVerifiedVoteHashReceiver, + gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes, + ) { + for (pubkey, slot, hash) in gossip_verified_vote_hash_receiver.try_iter() { + // cluster_info_vote_listener will ensure it doesn't push duplicates + gossip_verified_vote_hashes + .entry(slot) + .or_default() + .entry(hash) + .or_default() + .push(pubkey); + } + } + // Checks for and handle forks with duplicate slots. fn process_duplicate_slots( duplicate_slots_receiver: &DuplicateSlotReceiver, @@ -1222,6 +1266,7 @@ impl ReplayStage { cache_block_time_sender: &Option, bank_notification_sender: &Option, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, + gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes, vote_signatures: &mut Vec, has_new_vote_been_rooted: &mut bool, ) { @@ -1276,6 +1321,7 @@ impl ReplayStage { highest_confirmed_root, heaviest_subtree_fork_choice, gossip_duplicate_confirmed_slots, + gossip_verified_vote_hashes, has_new_vote_been_rooted, vote_signatures, ); @@ -1731,7 +1777,9 @@ impl ReplayStage { let newly_voted_pubkeys = slot_vote_tracker .as_ref() - .and_then(|slot_vote_tracker| slot_vote_tracker.write().unwrap().get_updates()) + .and_then(|slot_vote_tracker| { + slot_vote_tracker.write().unwrap().get_voted_slot_updates() + }) .unwrap_or_default(); let cluster_slot_pubkeys = cluster_slot_pubkeys @@ -2122,6 +2170,7 @@ impl ReplayStage { confirmed_forks } + #[allow(clippy::too_many_arguments)] pub(crate) fn handle_new_root( new_root: Slot, bank_forks: &RwLock, @@ -2130,6 +2179,7 @@ impl ReplayStage { highest_confirmed_root: Option, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, + gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes, has_new_vote_been_rooted: &mut bool, voted_signatures: &mut Vec, ) { @@ -2156,6 +2206,10 @@ impl ReplayStage { let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root); // gossip_duplicate_confirmed_slots now only contains entries >= `new_root` std::mem::swap(gossip_duplicate_confirmed_slots, &mut slots_ge_root); + + let mut slots_ge_root = gossip_verified_vote_hashes.split_off(&new_root); + // gossip_verified_vote_hashes now only contains entries >= `new_root` + std::mem::swap(gossip_verified_vote_hashes, &mut slots_ge_root); } fn generate_new_bank_forks( @@ -2564,6 +2618,11 @@ pub(crate) mod tests { .into_iter() .map(|s| (s, Hash::default())) .collect(); + let mut gossip_verified_vote_hashes: GossipVerifiedVoteHashes = + vec![root - 1, root, root + 1] + .into_iter() + .map(|s| (s, HashMap::new())) + .collect(); ReplayStage::handle_new_root( root, &bank_forks, @@ -2572,6 +2631,7 @@ pub(crate) mod tests { None, &mut heaviest_subtree_fork_choice, &mut gossip_duplicate_confirmed_slots, + &mut gossip_verified_vote_hashes, &mut true, &mut Vec::new(), ); @@ -2586,6 +2646,13 @@ pub(crate) mod tests { .collect::>(), vec![root, root + 1] ); + assert_eq!( + gossip_verified_vote_hashes + .keys() + .cloned() + .collect::>(), + vec![root, root + 1] + ); } #[test] @@ -2630,6 +2697,7 @@ pub(crate) mod tests { Some(confirmed_root), &mut heaviest_subtree_fork_choice, &mut BTreeMap::new(), + &mut BTreeMap::new(), &mut true, &mut Vec::new(), ); @@ -3690,7 +3758,7 @@ pub(crate) mod tests { .unwrap() .write() .unwrap() - .get_updates() + .get_voted_slot_updates() .is_none()); // The voter should be recorded diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index b9416d16e..edc62d6d0 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -1272,14 +1272,16 @@ mod tests { }); // Process votes and check they were notified. - let (s, _r) = unbounded(); + let (verified_vote_sender, _verified_vote_receiver) = unbounded(); + let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded(); let (_replay_votes_sender, replay_votes_receiver) = unbounded(); ClusterInfoVoteListener::get_and_process_votes_for_tests( &votes_receiver, &vote_tracker, &bank, &rpc.subscriptions, - &s, + &gossip_verified_vote_hash_sender, + &verified_vote_sender, &replay_votes_receiver, ) .unwrap(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 23fbfd574..600fb891e 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -6,8 +6,8 @@ use crate::{ broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver}, cluster_info::ClusterInfo, cluster_info_vote_listener::{ - ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender, VerifiedVoteSender, - VoteTracker, + ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender, + VerifiedVoteSender, VoteTracker, }, fetch_stage::FetchStage, optimistically_confirmed_bank_tracker::BankNotificationSender, @@ -61,6 +61,7 @@ impl Tpu { vote_tracker: Arc, bank_forks: Arc>, verified_vote_sender: VerifiedVoteSender, + gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender, replay_vote_receiver: ReplayVoteReceiver, replay_vote_sender: ReplayVoteSender, bank_notification_sender: Option, @@ -96,6 +97,7 @@ impl Tpu { bank_forks, subscriptions.clone(), verified_vote_sender, + gossip_verified_vote_hash_sender, replay_vote_receiver, blockstore.clone(), bank_notification_sender, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 4899e6bbd..bbe8efadd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -7,7 +7,8 @@ use crate::{ cache_block_time_service::CacheBlockTimeSender, cluster_info::ClusterInfo, cluster_info_vote_listener::{ - GossipDuplicateConfirmedSlotsReceiver, VerifiedVoteReceiver, VoteTracker, + GossipDuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, + VerifiedVoteReceiver, VoteTracker, }, cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, @@ -119,6 +120,7 @@ impl Tvu { snapshot_config_and_pending_package: Option<(SnapshotConfig, PendingSnapshotPackage)>, vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, + gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, verified_vote_receiver: VerifiedVoteReceiver, replay_vote_sender: ReplayVoteSender, completed_data_sets_sender: CompletedDataSetsSender, @@ -278,6 +280,7 @@ impl Tvu { duplicate_slots_reset_receiver, replay_vote_sender, gossip_confirmed_slots_receiver, + gossip_verified_vote_hash_receiver, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -377,6 +380,7 @@ pub mod tests { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); + let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); let (_verified_vote_sender, verified_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded(); @@ -417,6 +421,7 @@ pub mod tests { None, Arc::new(VoteTracker::new(&bank)), retransmit_slots_sender, + gossip_verified_vote_hash_receiver, verified_vote_receiver, replay_vote_sender, completed_data_sets_sender, diff --git a/core/src/validator.rs b/core/src/validator.rs index 9bea385f7..6c5682538 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -671,6 +671,7 @@ impl Validator { let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded(); + let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded(); let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded(); let tvu = Tvu::new( vote_account, @@ -718,6 +719,7 @@ impl Validator { snapshot_config_and_pending_package, vote_tracker.clone(), retransmit_slots_sender, + gossip_verified_vote_hash_receiver, verified_vote_receiver, replay_vote_sender.clone(), completed_data_sets_sender, @@ -758,6 +760,7 @@ impl Validator { vote_tracker, bank_forks, verified_vote_sender, + gossip_verified_vote_hash_sender, replay_vote_receiver, replay_vote_sender, bank_notification_sender,