Track gossip vote updates per hash for replay stage (#16421)

* Track gossip vote updates per hash for replay stage
This commit is contained in:
carllin 2021-04-10 17:34:45 -07:00 committed by GitHub
parent 91d5f6ab30
commit 99b3aab703
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 178 additions and 25 deletions

View File

@ -47,12 +47,15 @@ use std::{
// Map from a vote account to the authorized voter for an epoch // Map from a vote account to the authorized voter for an epoch
pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>; pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>;
pub type VotedHashUpdates = HashMap<Hash, Vec<Pubkey>>;
pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Slot, Packets)>>; pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Slot, Packets)>>;
pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Slot, Packets)>>; pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Slot, Packets)>>;
pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>; pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>;
pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec<Slot>)>; pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec<Slot>)>;
pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vec<Slot>)>; pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vec<Slot>)>;
pub type GossipVerifiedVoteHashSender = CrossbeamSender<(Pubkey, Slot, Hash)>;
pub type GossipVerifiedVoteHashReceiver = CrossbeamReceiver<(Pubkey, Slot, Hash)>;
pub type GossipDuplicateConfirmedSlotsSender = CrossbeamSender<ThresholdConfirmedSlots>; pub type GossipDuplicateConfirmedSlotsSender = CrossbeamSender<ThresholdConfirmedSlots>;
pub type GossipDuplicateConfirmedSlotsReceiver = CrossbeamReceiver<ThresholdConfirmedSlots>; pub type GossipDuplicateConfirmedSlotsReceiver = CrossbeamReceiver<ThresholdConfirmedSlots>;
@ -65,14 +68,13 @@ pub struct SlotVoteTracker {
// True if seen on gossip, false if only seen in replay. // True if seen on gossip, false if only seen in replay.
voted: HashMap<Pubkey, bool>, voted: HashMap<Pubkey, bool>,
optimistic_votes_tracker: HashMap<Hash, VoteStakeTracker>, optimistic_votes_tracker: HashMap<Hash, VoteStakeTracker>,
updates: Option<Vec<Pubkey>>, voted_slot_updates: Option<Vec<Pubkey>>,
gossip_only_stake: u64, gossip_only_stake: u64,
} }
impl SlotVoteTracker { impl SlotVoteTracker {
#[allow(dead_code)] pub fn get_voted_slot_updates(&mut self) -> Option<Vec<Pubkey>> {
pub fn get_updates(&mut self) -> Option<Vec<Pubkey>> { self.voted_slot_updates.take()
self.updates.take()
} }
pub fn get_or_insert_optimistic_votes_tracker(&mut self, hash: Hash) -> &mut VoteStakeTracker { pub fn get_or_insert_optimistic_votes_tracker(&mut self, hash: Hash) -> &mut VoteStakeTracker {
@ -119,7 +121,7 @@ impl VoteTracker {
let new_slot_tracker = Arc::new(RwLock::new(SlotVoteTracker { let new_slot_tracker = Arc::new(RwLock::new(SlotVoteTracker {
voted: HashMap::new(), voted: HashMap::new(),
optimistic_votes_tracker: HashMap::default(), optimistic_votes_tracker: HashMap::default(),
updates: None, voted_slot_updates: None,
gossip_only_stake: 0, gossip_only_stake: 0,
})); }));
self.slot_vote_trackers self.slot_vote_trackers
@ -170,10 +172,10 @@ impl VoteTracker {
let mut w_slot_vote_tracker = slot_vote_tracker.write().unwrap(); let mut w_slot_vote_tracker = slot_vote_tracker.write().unwrap();
w_slot_vote_tracker.voted.insert(pubkey, true); w_slot_vote_tracker.voted.insert(pubkey, true);
if let Some(ref mut updates) = w_slot_vote_tracker.updates { if let Some(ref mut voted_slot_updates) = w_slot_vote_tracker.voted_slot_updates {
updates.push(pubkey) voted_slot_updates.push(pubkey)
} else { } else {
w_slot_vote_tracker.updates = Some(vec![pubkey]); w_slot_vote_tracker.voted_slot_updates = Some(vec![pubkey]);
} }
} }
@ -249,6 +251,7 @@ impl ClusterInfoVoteListener {
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
replay_votes_receiver: ReplayVoteReceiver, replay_votes_receiver: ReplayVoteReceiver,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
bank_notification_sender: Option<BankNotificationSender>, bank_notification_sender: Option<BankNotificationSender>,
@ -295,6 +298,7 @@ impl ClusterInfoVoteListener {
vote_tracker, vote_tracker,
bank_forks, bank_forks,
subscriptions, subscriptions,
gossip_verified_vote_hash_sender,
verified_vote_sender, verified_vote_sender,
replay_votes_receiver, replay_votes_receiver,
blockstore, blockstore,
@ -422,6 +426,7 @@ impl ClusterInfoVoteListener {
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVoteReceiver, replay_votes_receiver: ReplayVoteReceiver,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
@ -457,6 +462,7 @@ impl ClusterInfoVoteListener {
&vote_tracker, &vote_tracker,
&root_bank, &root_bank,
&subscriptions, &subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&bank_notification_sender, &bank_notification_sender,
@ -484,6 +490,7 @@ impl ClusterInfoVoteListener {
vote_tracker: &VoteTracker, vote_tracker: &VoteTracker,
root_bank: &Bank, root_bank: &Bank,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVoteReceiver, replay_votes_receiver: &ReplayVoteReceiver,
) -> Result<ThresholdConfirmedSlots> { ) -> Result<ThresholdConfirmedSlots> {
@ -492,6 +499,7 @@ impl ClusterInfoVoteListener {
vote_tracker, vote_tracker,
root_bank, root_bank,
subscriptions, subscriptions,
gossip_verified_vote_hash_sender,
verified_vote_sender, verified_vote_sender,
replay_votes_receiver, replay_votes_receiver,
&None, &None,
@ -504,6 +512,7 @@ impl ClusterInfoVoteListener {
vote_tracker: &VoteTracker, vote_tracker: &VoteTracker,
root_bank: &Bank, root_bank: &Bank,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVoteReceiver, replay_votes_receiver: &ReplayVoteReceiver,
bank_notification_sender: &Option<BankNotificationSender>, bank_notification_sender: &Option<BankNotificationSender>,
@ -535,6 +544,7 @@ impl ClusterInfoVoteListener {
replay_votes, replay_votes,
root_bank, root_bank,
subscriptions, subscriptions,
gossip_verified_vote_hash_sender,
verified_vote_sender, verified_vote_sender,
bank_notification_sender, bank_notification_sender,
cluster_confirmed_slot_sender, cluster_confirmed_slot_sender,
@ -555,6 +565,7 @@ impl ClusterInfoVoteListener {
root_bank: &Bank, root_bank: &Bank,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
diff: &mut HashMap<Slot, HashMap<Pubkey, bool>>, diff: &mut HashMap<Slot, HashMap<Pubkey, bool>>,
new_optimistic_confirmed_slots: &mut ThresholdConfirmedSlots, new_optimistic_confirmed_slots: &mut ThresholdConfirmedSlots,
is_gossip_vote: bool, is_gossip_vote: bool,
@ -604,6 +615,14 @@ impl ClusterInfoVoteListener {
total_stake, total_stake,
); );
if is_gossip_vote && is_new && stake > 0 {
let _ = gossip_verified_vote_hash_sender.send((
*vote_pubkey,
last_vote_slot,
last_vote_hash,
));
}
if reached_threshold_results[0] { if reached_threshold_results[0] {
if let Some(sender) = cluster_confirmed_slot_sender { if let Some(sender) = cluster_confirmed_slot_sender {
let _ = sender.send(vec![(last_vote_slot, last_vote_hash)]); let _ = sender.send(vec![(last_vote_slot, last_vote_hash)]);
@ -691,6 +710,7 @@ impl ClusterInfoVoteListener {
replayed_votes: Vec<ReplayedVote>, replayed_votes: Vec<ReplayedVote>,
root_bank: &Bank, root_bank: &Bank,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
bank_notification_sender: &Option<BankNotificationSender>, bank_notification_sender: &Option<BankNotificationSender>,
cluster_confirmed_slot_sender: &Option<GossipDuplicateConfirmedSlotsSender>, cluster_confirmed_slot_sender: &Option<GossipDuplicateConfirmedSlotsSender>,
@ -717,6 +737,7 @@ impl ClusterInfoVoteListener {
root_bank, root_bank,
subscriptions, subscriptions,
verified_vote_sender, verified_vote_sender,
gossip_verified_vote_hash_sender,
&mut diff, &mut diff,
&mut new_optimistic_confirmed_slots, &mut new_optimistic_confirmed_slots,
is_gossip, is_gossip,
@ -742,8 +763,8 @@ impl ClusterInfoVoteListener {
}); });
} }
let mut w_slot_tracker = slot_tracker.write().unwrap(); let mut w_slot_tracker = slot_tracker.write().unwrap();
if w_slot_tracker.updates.is_none() { if w_slot_tracker.voted_slot_updates.is_none() {
w_slot_tracker.updates = Some(vec![]); w_slot_tracker.voted_slot_updates = Some(vec![]);
} }
let mut gossip_only_stake = 0; let mut gossip_only_stake = 0;
let epoch = root_bank.epoch_schedule().get_epoch(slot); let epoch = root_bank.epoch_schedule().get_epoch(slot);
@ -764,7 +785,11 @@ impl ClusterInfoVoteListener {
// `is_new || is_new_from_gossip`. In both cases we want to record // `is_new || is_new_from_gossip`. In both cases we want to record
// `is_new_from_gossip` for the `pubkey` entry. // `is_new_from_gossip` for the `pubkey` entry.
w_slot_tracker.voted.insert(pubkey, seen_in_gossip_above); w_slot_tracker.voted.insert(pubkey, seen_in_gossip_above);
w_slot_tracker.updates.as_mut().unwrap().push(pubkey); w_slot_tracker
.voted_slot_updates
.as_mut()
.unwrap()
.push(pubkey);
} }
w_slot_tracker.gossip_only_stake += gossip_only_stake w_slot_tracker.gossip_only_stake += gossip_only_stake
@ -997,6 +1022,7 @@ mod tests {
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
let (votes_sender, votes_receiver) = unbounded(); let (votes_sender, votes_receiver) = unbounded();
let (verified_vote_sender, _verified_vote_receiver) = unbounded(); let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded();
let GenesisConfigInfo { genesis_config, .. } = let GenesisConfigInfo { genesis_config, .. } =
@ -1027,6 +1053,7 @@ mod tests {
&vote_tracker, &vote_tracker,
&bank3, &bank3,
&subscriptions, &subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None, &None,
@ -1057,6 +1084,7 @@ mod tests {
&vote_tracker, &vote_tracker,
&bank3, &bank3,
&subscriptions, &subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None, &None,
@ -1109,6 +1137,7 @@ mod tests {
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
let (votes_txs_sender, votes_txs_receiver) = unbounded(); let (votes_txs_sender, votes_txs_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded();
let GenesisConfigInfo { genesis_config, .. } = let GenesisConfigInfo { genesis_config, .. } =
@ -1136,6 +1165,7 @@ mod tests {
&vote_tracker, &vote_tracker,
&bank0, &bank0,
&subscriptions, &subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None, &None,
@ -1143,11 +1173,44 @@ mod tests {
) )
.unwrap(); .unwrap();
let mut gossip_verified_votes: HashMap<Slot, HashMap<Hash, Vec<Pubkey>>> = HashMap::new();
for (pubkey, slot, hash) in gossip_verified_vote_hash_receiver.try_iter() {
// send_vote_txs() will send each vote twice, but we should only get a notification
// once for each via this channel
let exists = gossip_verified_votes
.get(&slot)
.and_then(|slot_hashes| slot_hashes.get(&hash))
.map(|slot_hash_voters| slot_hash_voters.contains(&pubkey))
.unwrap_or(false);
assert!(!exists);
gossip_verified_votes
.entry(slot)
.or_default()
.entry(hash)
.or_default()
.push(pubkey);
}
// Only the last vote in the `gossip_vote` set should count towards
// the `voted_hash_updates` set. Important to note here that replay votes
// should not count
let last_gossip_vote_slot = *gossip_vote_slots.last().unwrap();
assert_eq!(gossip_verified_votes.len(), 1);
let slot_hashes = gossip_verified_votes.get(&last_gossip_vote_slot).unwrap();
assert_eq!(slot_hashes.len(), 1);
let slot_hash_votes = slot_hashes.get(&Hash::default()).unwrap();
assert_eq!(slot_hash_votes.len(), validator_voting_keypairs.len());
for voting_keypairs in &validator_voting_keypairs {
let pubkey = voting_keypairs.vote_keypair.pubkey();
assert!(slot_hash_votes.contains(&pubkey));
}
// Check that the received votes were pushed to other commponents // Check that the received votes were pushed to other commponents
// subscribing via `verified_vote_receiver` // subscribing via `verified_vote_receiver`
let all_expected_slots: BTreeSet<_> = gossip_vote_slots let all_expected_slots: BTreeSet<_> = gossip_vote_slots
.clone()
.into_iter() .into_iter()
.chain(replay_vote_slots.into_iter()) .chain(replay_vote_slots.clone().into_iter())
.collect(); .collect();
let mut pubkey_to_votes: HashMap<Pubkey, BTreeSet<Slot>> = HashMap::new(); let mut pubkey_to_votes: HashMap<Pubkey, BTreeSet<Slot>> = HashMap::new();
for (received_pubkey, new_votes) in verified_vote_receiver.try_iter() { for (received_pubkey, new_votes) in verified_vote_receiver.try_iter() {
@ -1175,15 +1238,17 @@ mod tests {
let pubkey = voting_keypairs.vote_keypair.pubkey(); let pubkey = voting_keypairs.vote_keypair.pubkey();
assert!(r_slot_vote_tracker.voted.contains_key(&pubkey)); assert!(r_slot_vote_tracker.voted.contains_key(&pubkey));
assert!(r_slot_vote_tracker assert!(r_slot_vote_tracker
.updates .voted_slot_updates
.as_ref() .as_ref()
.unwrap() .unwrap()
.contains(&Arc::new(pubkey))); .contains(&Arc::new(pubkey)));
// Only the last vote in the stack of `gossip_votes` should count towards // Only the last vote in the stack of `gossip_vote` and `replay_vote_slots`
// the `optimistic` vote set. // should count towards the `optimistic` vote set,
let optimistic_votes_tracker = let optimistic_votes_tracker =
r_slot_vote_tracker.optimistic_votes_tracker(&Hash::default()); r_slot_vote_tracker.optimistic_votes_tracker(&Hash::default());
if vote_slot == 2 || vote_slot == 4 { if vote_slot == *gossip_vote_slots.last().unwrap()
|| vote_slot == *replay_vote_slots.last().unwrap()
{
let optimistic_votes_tracker = optimistic_votes_tracker.unwrap(); let optimistic_votes_tracker = optimistic_votes_tracker.unwrap();
assert!(optimistic_votes_tracker.voted().contains(&pubkey)); assert!(optimistic_votes_tracker.voted().contains(&pubkey));
assert_eq!( assert_eq!(
@ -1220,6 +1285,7 @@ mod tests {
// Send some votes to process // Send some votes to process
let (votes_txs_sender, votes_txs_receiver) = unbounded(); let (votes_txs_sender, votes_txs_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (_replay_votes_sender, replay_votes_receiver) = unbounded(); let (_replay_votes_sender, replay_votes_receiver) = unbounded();
@ -1256,6 +1322,7 @@ mod tests {
&vote_tracker, &vote_tracker,
&bank0, &bank0,
&subscriptions, &subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None, &None,
@ -1281,7 +1348,7 @@ mod tests {
let pubkey = voting_keypairs.vote_keypair.pubkey(); let pubkey = voting_keypairs.vote_keypair.pubkey();
assert!(r_slot_vote_tracker.voted.contains_key(&pubkey)); assert!(r_slot_vote_tracker.voted.contains_key(&pubkey));
assert!(r_slot_vote_tracker assert!(r_slot_vote_tracker
.updates .voted_slot_updates
.as_ref() .as_ref()
.unwrap() .unwrap()
.contains(&Arc::new(pubkey))); .contains(&Arc::new(pubkey)));
@ -1302,6 +1369,7 @@ mod tests {
fn run_test_process_votes3(switch_proof_hash: Option<Hash>) { fn run_test_process_votes3(switch_proof_hash: Option<Hash>) {
let (votes_sender, votes_receiver) = unbounded(); let (votes_sender, votes_receiver) = unbounded();
let (verified_vote_sender, _verified_vote_receiver) = unbounded(); let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded(); let (replay_votes_sender, replay_votes_receiver) = unbounded();
let vote_slot = 1; let vote_slot = 1;
@ -1352,6 +1420,7 @@ mod tests {
&vote_tracker, &vote_tracker,
&bank, &bank,
&subscriptions, &subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
&None, &None,
@ -1487,6 +1556,7 @@ mod tests {
)]; )];
let (verified_vote_sender, _verified_vote_receiver) = unbounded(); let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
ClusterInfoVoteListener::filter_and_confirm_with_new_votes( ClusterInfoVoteListener::filter_and_confirm_with_new_votes(
&vote_tracker, &vote_tracker,
vote_tx, vote_tx,
@ -1498,6 +1568,7 @@ mod tests {
)], )],
&bank, &bank,
&subscriptions, &subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender, &verified_vote_sender,
&None, &None,
&None, &None,
@ -1553,6 +1624,7 @@ mod tests {
)], )],
&new_root_bank, &new_root_bank,
&subscriptions, &subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender, &verified_vote_sender,
&None, &None,
&None, &None,

View File

@ -1455,6 +1455,7 @@ pub mod test {
None, None,
&mut self.heaviest_subtree_fork_choice, &mut self.heaviest_subtree_fork_choice,
&mut BTreeMap::new(), &mut BTreeMap::new(),
&mut BTreeMap::new(),
&mut true, &mut true,
&mut Vec::new(), &mut Vec::new(),
) )

View File

@ -4,7 +4,9 @@ use crate::{
broadcast_stage::RetransmitSlotsSender, broadcast_stage::RetransmitSlotsSender,
cache_block_time_service::CacheBlockTimeSender, cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::{GossipDuplicateConfirmedSlotsReceiver, VoteTracker}, cluster_info_vote_listener::{
GossipDuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VoteTracker,
},
cluster_slot_state_verifier::*, cluster_slot_state_verifier::*,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, commitment_service::{AggregateCommitmentService, CommitmentAggregationData},
@ -66,6 +68,8 @@ pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;
const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_SIGNATURES: usize = 200;
pub type GossipVerifiedVoteHashes = BTreeMap<Slot, HashMap<Hash, Vec<Pubkey>>>;
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
pub(crate) enum HeaviestForkFailures { pub(crate) enum HeaviestForkFailures {
LockedOut(u64), LockedOut(u64),
@ -134,6 +138,7 @@ pub struct ReplayTiming {
bank_count: u64, bank_count: u64,
process_gossip_duplicate_confirmed_slots_elapsed: u64, process_gossip_duplicate_confirmed_slots_elapsed: u64,
process_duplicate_slots_elapsed: u64, process_duplicate_slots_elapsed: u64,
process_gossip_verified_vote_hashes_elapsed: u64,
} }
impl ReplayTiming { impl ReplayTiming {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -153,6 +158,7 @@ impl ReplayTiming {
heaviest_fork_failures_elapsed: u64, heaviest_fork_failures_elapsed: u64,
bank_count: u64, bank_count: u64,
process_gossip_duplicate_confirmed_slots_elapsed: u64, process_gossip_duplicate_confirmed_slots_elapsed: u64,
process_gossip_verified_vote_hashes_elapsed: u64,
process_duplicate_slots_elapsed: u64, process_duplicate_slots_elapsed: u64,
) { ) {
self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed; self.collect_frozen_banks_elapsed += collect_frozen_banks_elapsed;
@ -170,6 +176,8 @@ impl ReplayTiming {
self.bank_count += bank_count; self.bank_count += bank_count;
self.process_gossip_duplicate_confirmed_slots_elapsed += self.process_gossip_duplicate_confirmed_slots_elapsed +=
process_gossip_duplicate_confirmed_slots_elapsed; process_gossip_duplicate_confirmed_slots_elapsed;
self.process_gossip_verified_vote_hashes_elapsed +=
process_gossip_verified_vote_hashes_elapsed;
self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed; self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed;
let now = timestamp(); let now = timestamp();
let elapsed_ms = now - self.last_print; let elapsed_ms = now - self.last_print;
@ -224,6 +232,11 @@ impl ReplayTiming {
self.process_gossip_duplicate_confirmed_slots_elapsed as i64, self.process_gossip_duplicate_confirmed_slots_elapsed as i64,
i64 i64
), ),
(
"process_gossip_verified_vote_hashes_elapsed",
self.process_gossip_verified_vote_hashes_elapsed as i64,
i64
),
( (
"wait_receive_elapsed", "wait_receive_elapsed",
self.wait_receive_elapsed as i64, self.wait_receive_elapsed as i64,
@ -270,6 +283,7 @@ impl ReplayStage {
_duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, _duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
) -> Self { ) -> Self {
let ReplayStageConfig { let ReplayStageConfig {
my_pubkey, my_pubkey,
@ -316,6 +330,7 @@ impl ReplayStage {
let mut skipped_slots_info = SkippedSlotsInfo::default(); let mut skipped_slots_info = SkippedSlotsInfo::default();
let mut replay_timing = ReplayTiming::default(); let mut replay_timing = ReplayTiming::default();
let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = BTreeMap::new(); let mut gossip_duplicate_confirmed_slots: GossipDuplicateConfirmedSlots = BTreeMap::new();
let mut gossip_verified_vote_hashes: GossipVerifiedVoteHashes = BTreeMap::new();
let mut voted_signatures = Vec::new(); let mut voted_signatures = Vec::new();
let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader; let mut has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
loop { loop {
@ -386,6 +401,18 @@ impl ReplayStage {
); );
process_gossip_duplicate_confirmed_slots_time.stop(); process_gossip_duplicate_confirmed_slots_time.stop();
// Ingest any new verified votes from gossip. Important for fork choice
// and switching proofs because these may be votes that haven't yet been
// included in a block, so we may not have yet observed these votes just
// by replaying blocks.
let mut process_gossip_verified_vote_hashes_time = Measure::start("process_gossip_duplicate_confirmed_slots");
Self::process_gossip_verified_vote_hashes(
&gossip_verified_vote_hash_receiver,
&mut gossip_verified_vote_hashes,
);
process_gossip_verified_vote_hashes_time.stop();
// Check to remove any duplicated slots from fork choice // Check to remove any duplicated slots from fork choice
let mut process_duplicate_slots_time = Measure::start("process_duplicate_slots"); let mut process_duplicate_slots_time = Measure::start("process_duplicate_slots");
if !tpu_has_bank { if !tpu_has_bank {
@ -517,6 +544,7 @@ impl ReplayStage {
&cache_block_time_sender, &cache_block_time_sender,
&bank_notification_sender, &bank_notification_sender,
&mut gossip_duplicate_confirmed_slots, &mut gossip_duplicate_confirmed_slots,
&mut gossip_verified_vote_hashes,
&mut voted_signatures, &mut voted_signatures,
&mut has_new_vote_been_rooted, &mut has_new_vote_been_rooted,
); );
@ -648,6 +676,7 @@ impl ReplayStage {
heaviest_fork_failures_time.as_us(), heaviest_fork_failures_time.as_us(),
if did_complete_bank {1} else {0}, if did_complete_bank {1} else {0},
process_gossip_duplicate_confirmed_slots_time.as_us(), process_gossip_duplicate_confirmed_slots_time.as_us(),
process_gossip_verified_vote_hashes_time.as_us(),
process_duplicate_slots_time.as_us(), process_duplicate_slots_time.as_us(),
); );
} }
@ -880,6 +909,21 @@ impl ReplayStage {
} }
} }
fn process_gossip_verified_vote_hashes(
gossip_verified_vote_hash_receiver: &GossipVerifiedVoteHashReceiver,
gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes,
) {
for (pubkey, slot, hash) in gossip_verified_vote_hash_receiver.try_iter() {
// cluster_info_vote_listener will ensure it doesn't push duplicates
gossip_verified_vote_hashes
.entry(slot)
.or_default()
.entry(hash)
.or_default()
.push(pubkey);
}
}
// Checks for and handle forks with duplicate slots. // Checks for and handle forks with duplicate slots.
fn process_duplicate_slots( fn process_duplicate_slots(
duplicate_slots_receiver: &DuplicateSlotReceiver, duplicate_slots_receiver: &DuplicateSlotReceiver,
@ -1222,6 +1266,7 @@ impl ReplayStage {
cache_block_time_sender: &Option<CacheBlockTimeSender>, cache_block_time_sender: &Option<CacheBlockTimeSender>,
bank_notification_sender: &Option<BankNotificationSender>, bank_notification_sender: &Option<BankNotificationSender>,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes,
vote_signatures: &mut Vec<Signature>, vote_signatures: &mut Vec<Signature>,
has_new_vote_been_rooted: &mut bool, has_new_vote_been_rooted: &mut bool,
) { ) {
@ -1276,6 +1321,7 @@ impl ReplayStage {
highest_confirmed_root, highest_confirmed_root,
heaviest_subtree_fork_choice, heaviest_subtree_fork_choice,
gossip_duplicate_confirmed_slots, gossip_duplicate_confirmed_slots,
gossip_verified_vote_hashes,
has_new_vote_been_rooted, has_new_vote_been_rooted,
vote_signatures, vote_signatures,
); );
@ -1731,7 +1777,9 @@ impl ReplayStage {
let newly_voted_pubkeys = slot_vote_tracker let newly_voted_pubkeys = slot_vote_tracker
.as_ref() .as_ref()
.and_then(|slot_vote_tracker| slot_vote_tracker.write().unwrap().get_updates()) .and_then(|slot_vote_tracker| {
slot_vote_tracker.write().unwrap().get_voted_slot_updates()
})
.unwrap_or_default(); .unwrap_or_default();
let cluster_slot_pubkeys = cluster_slot_pubkeys let cluster_slot_pubkeys = cluster_slot_pubkeys
@ -2122,6 +2170,7 @@ impl ReplayStage {
confirmed_forks confirmed_forks
} }
#[allow(clippy::too_many_arguments)]
pub(crate) fn handle_new_root( pub(crate) fn handle_new_root(
new_root: Slot, new_root: Slot,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
@ -2130,6 +2179,7 @@ impl ReplayStage {
highest_confirmed_root: Option<Slot>, highest_confirmed_root: Option<Slot>,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots, gossip_duplicate_confirmed_slots: &mut GossipDuplicateConfirmedSlots,
gossip_verified_vote_hashes: &mut GossipVerifiedVoteHashes,
has_new_vote_been_rooted: &mut bool, has_new_vote_been_rooted: &mut bool,
voted_signatures: &mut Vec<Signature>, voted_signatures: &mut Vec<Signature>,
) { ) {
@ -2156,6 +2206,10 @@ impl ReplayStage {
let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root); let mut slots_ge_root = gossip_duplicate_confirmed_slots.split_off(&new_root);
// gossip_duplicate_confirmed_slots now only contains entries >= `new_root` // gossip_duplicate_confirmed_slots now only contains entries >= `new_root`
std::mem::swap(gossip_duplicate_confirmed_slots, &mut slots_ge_root); std::mem::swap(gossip_duplicate_confirmed_slots, &mut slots_ge_root);
let mut slots_ge_root = gossip_verified_vote_hashes.split_off(&new_root);
// gossip_verified_vote_hashes now only contains entries >= `new_root`
std::mem::swap(gossip_verified_vote_hashes, &mut slots_ge_root);
} }
fn generate_new_bank_forks( fn generate_new_bank_forks(
@ -2564,6 +2618,11 @@ pub(crate) mod tests {
.into_iter() .into_iter()
.map(|s| (s, Hash::default())) .map(|s| (s, Hash::default()))
.collect(); .collect();
let mut gossip_verified_vote_hashes: GossipVerifiedVoteHashes =
vec![root - 1, root, root + 1]
.into_iter()
.map(|s| (s, HashMap::new()))
.collect();
ReplayStage::handle_new_root( ReplayStage::handle_new_root(
root, root,
&bank_forks, &bank_forks,
@ -2572,6 +2631,7 @@ pub(crate) mod tests {
None, None,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut gossip_duplicate_confirmed_slots, &mut gossip_duplicate_confirmed_slots,
&mut gossip_verified_vote_hashes,
&mut true, &mut true,
&mut Vec::new(), &mut Vec::new(),
); );
@ -2586,6 +2646,13 @@ pub(crate) mod tests {
.collect::<Vec<Slot>>(), .collect::<Vec<Slot>>(),
vec![root, root + 1] vec![root, root + 1]
); );
assert_eq!(
gossip_verified_vote_hashes
.keys()
.cloned()
.collect::<Vec<Slot>>(),
vec![root, root + 1]
);
} }
#[test] #[test]
@ -2630,6 +2697,7 @@ pub(crate) mod tests {
Some(confirmed_root), Some(confirmed_root),
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut BTreeMap::new(), &mut BTreeMap::new(),
&mut BTreeMap::new(),
&mut true, &mut true,
&mut Vec::new(), &mut Vec::new(),
); );
@ -3690,7 +3758,7 @@ pub(crate) mod tests {
.unwrap() .unwrap()
.write() .write()
.unwrap() .unwrap()
.get_updates() .get_voted_slot_updates()
.is_none()); .is_none());
// The voter should be recorded // The voter should be recorded

View File

@ -1272,14 +1272,16 @@ mod tests {
}); });
// Process votes and check they were notified. // Process votes and check they were notified.
let (s, _r) = unbounded(); let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (_replay_votes_sender, replay_votes_receiver) = unbounded(); let (_replay_votes_sender, replay_votes_receiver) = unbounded();
ClusterInfoVoteListener::get_and_process_votes_for_tests( ClusterInfoVoteListener::get_and_process_votes_for_tests(
&votes_receiver, &votes_receiver,
&vote_tracker, &vote_tracker,
&bank, &bank,
&rpc.subscriptions, &rpc.subscriptions,
&s, &gossip_verified_vote_hash_sender,
&verified_vote_sender,
&replay_votes_receiver, &replay_votes_receiver,
) )
.unwrap(); .unwrap();

View File

@ -6,8 +6,8 @@ use crate::{
broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver}, broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver},
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::{ cluster_info_vote_listener::{
ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender, VerifiedVoteSender, ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender, GossipVerifiedVoteHashSender,
VoteTracker, VerifiedVoteSender, VoteTracker,
}, },
fetch_stage::FetchStage, fetch_stage::FetchStage,
optimistically_confirmed_bank_tracker::BankNotificationSender, optimistically_confirmed_bank_tracker::BankNotificationSender,
@ -61,6 +61,7 @@ impl Tpu {
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
replay_vote_receiver: ReplayVoteReceiver, replay_vote_receiver: ReplayVoteReceiver,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
bank_notification_sender: Option<BankNotificationSender>, bank_notification_sender: Option<BankNotificationSender>,
@ -96,6 +97,7 @@ impl Tpu {
bank_forks, bank_forks,
subscriptions.clone(), subscriptions.clone(),
verified_vote_sender, verified_vote_sender,
gossip_verified_vote_hash_sender,
replay_vote_receiver, replay_vote_receiver,
blockstore.clone(), blockstore.clone(),
bank_notification_sender, bank_notification_sender,

View File

@ -7,7 +7,8 @@ use crate::{
cache_block_time_service::CacheBlockTimeSender, cache_block_time_service::CacheBlockTimeSender,
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::{ cluster_info_vote_listener::{
GossipDuplicateConfirmedSlotsReceiver, VerifiedVoteReceiver, VoteTracker, GossipDuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver,
VerifiedVoteReceiver, VoteTracker,
}, },
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
@ -119,6 +120,7 @@ impl Tvu {
snapshot_config_and_pending_package: Option<(SnapshotConfig, PendingSnapshotPackage)>, snapshot_config_and_pending_package: Option<(SnapshotConfig, PendingSnapshotPackage)>,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
completed_data_sets_sender: CompletedDataSetsSender, completed_data_sets_sender: CompletedDataSetsSender,
@ -278,6 +280,7 @@ impl Tvu {
duplicate_slots_reset_receiver, duplicate_slots_reset_receiver,
replay_vote_sender, replay_vote_sender,
gossip_confirmed_slots_receiver, gossip_confirmed_slots_receiver,
gossip_verified_vote_hash_receiver,
); );
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
@ -377,6 +380,7 @@ pub mod tests {
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
let (_gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (_verified_vote_sender, verified_vote_receiver) = unbounded(); let (_verified_vote_sender, verified_vote_receiver) = unbounded();
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded(); let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded();
@ -417,6 +421,7 @@ pub mod tests {
None, None,
Arc::new(VoteTracker::new(&bank)), Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender, retransmit_slots_sender,
gossip_verified_vote_hash_receiver,
verified_vote_receiver, verified_vote_receiver,
replay_vote_sender, replay_vote_sender,
completed_data_sets_sender, completed_data_sets_sender,

View File

@ -671,6 +671,7 @@ impl Validator {
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded(); let (cluster_confirmed_slot_sender, cluster_confirmed_slot_receiver) = unbounded();
let tvu = Tvu::new( let tvu = Tvu::new(
vote_account, vote_account,
@ -718,6 +719,7 @@ impl Validator {
snapshot_config_and_pending_package, snapshot_config_and_pending_package,
vote_tracker.clone(), vote_tracker.clone(),
retransmit_slots_sender, retransmit_slots_sender,
gossip_verified_vote_hash_receiver,
verified_vote_receiver, verified_vote_receiver,
replay_vote_sender.clone(), replay_vote_sender.clone(),
completed_data_sets_sender, completed_data_sets_sender,
@ -758,6 +760,7 @@ impl Validator {
vote_tracker, vote_tracker,
bank_forks, bank_forks,
verified_vote_sender, verified_vote_sender,
gossip_verified_vote_hash_sender,
replay_vote_receiver, replay_vote_receiver,
replay_vote_sender, replay_vote_sender,
bank_notification_sender, bank_notification_sender,