Add replay votes to gossip vote tracking (#11119)

* Plumb replay vote channel for notifying vote listener of replay votes

* Keep gossip only notification for debugging gossip in the future

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-07-20 17:29:07 -07:00 committed by GitHub
parent 23c2e55cbf
commit 73f3d04798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 439 additions and 80 deletions

View File

@ -1,15 +1,17 @@
use crate::{ use crate::{
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
consensus::PubkeyVotes,
crds_value::CrdsValueLabel, crds_value::CrdsValueLabel,
poh_recorder::PohRecorder, poh_recorder::PohRecorder,
pubkey_references::LockedPubkeyReferences, pubkey_references::LockedPubkeyReferences,
replay_stage::ReplayVotesReceiver,
result::{Error, Result}, result::{Error, Result},
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
sigverify, sigverify,
verified_vote_packets::VerifiedVotePackets, verified_vote_packets::VerifiedVotePackets,
}; };
use crossbeam_channel::{ use crossbeam_channel::{
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select, Sender as CrossbeamSender,
}; };
use itertools::izip; use itertools::izip;
use log::*; use log::*;
@ -28,9 +30,9 @@ use solana_sdk::{
pubkey::Pubkey, pubkey::Pubkey,
transaction::Transaction, transaction::Transaction,
}; };
use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::Vote}; use solana_vote_program::vote_instruction::VoteInstruction;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::HashMap,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
{Arc, Mutex, RwLock}, {Arc, Mutex, RwLock},
@ -44,14 +46,18 @@ pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, P
pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Packets)>>; pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Packets)>>;
pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>; pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>;
pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>; pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vote)>; pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec<Slot>)>;
pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vote)>; pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vec<Slot>)>;
#[derive(Default)] #[derive(Default)]
pub struct SlotVoteTracker { pub struct SlotVoteTracker {
voted: HashSet<Arc<Pubkey>>, // Maps pubkeys that have voted for this slot
// to whether or not we've seen the vote on gossip.
// True if seen on gossip, false if only seen in replay.
voted: HashMap<Arc<Pubkey>, bool>,
updates: Option<Vec<Arc<Pubkey>>>, updates: Option<Vec<Arc<Pubkey>>>,
total_stake: u64, total_stake: u64,
gossip_only_stake: u64,
} }
impl SlotVoteTracker { impl SlotVoteTracker {
@ -128,7 +134,7 @@ impl VoteTracker {
let mut w_slot_vote_tracker = slot_vote_tracker.write().unwrap(); let mut w_slot_vote_tracker = slot_vote_tracker.write().unwrap();
w_slot_vote_tracker.voted.insert(pubkey.clone()); w_slot_vote_tracker.voted.insert(pubkey.clone(), true);
if let Some(ref mut updates) = w_slot_vote_tracker.updates { if let Some(ref mut updates) = w_slot_vote_tracker.updates {
updates.push(pubkey.clone()) updates.push(pubkey.clone())
} else { } else {
@ -210,6 +216,7 @@ impl ClusterInfoVoteListener {
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVotesReceiver,
) -> Self { ) -> Self {
let exit_ = exit.clone(); let exit_ = exit.clone();
@ -253,6 +260,7 @@ impl ClusterInfoVoteListener {
&bank_forks, &bank_forks,
subscriptions, subscriptions,
verified_vote_sender, verified_vote_sender,
replay_votes_receiver,
); );
}) })
.unwrap(); .unwrap();
@ -378,6 +386,7 @@ impl ClusterInfoVoteListener {
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
subscriptions: Arc<RpcSubscriptions>, subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVotesReceiver,
) -> Result<()> { ) -> Result<()> {
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
@ -387,7 +396,6 @@ impl ClusterInfoVoteListener {
let root_bank = bank_forks.read().unwrap().root_bank().clone(); let root_bank = bank_forks.read().unwrap().root_bank().clone();
vote_tracker.process_new_root_bank(&root_bank); vote_tracker.process_new_root_bank(&root_bank);
let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch()); let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch());
if let Err(e) = Self::get_and_process_votes( if let Err(e) = Self::get_and_process_votes(
&vote_txs_receiver, &vote_txs_receiver,
&vote_tracker, &vote_tracker,
@ -395,12 +403,11 @@ impl ClusterInfoVoteListener {
&subscriptions, &subscriptions,
epoch_stakes, epoch_stakes,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver,
) { ) {
match e { match e {
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)
return Ok(()); | Error::ReadyTimeoutError => (),
}
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => { _ => {
error!("thread {:?} error {:?}", thread::current().name(), e); error!("thread {:?} error {:?}", thread::current().name(), e);
} }
@ -416,6 +423,7 @@ impl ClusterInfoVoteListener {
last_root: Slot, last_root: Slot,
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVotesReceiver,
) -> Result<()> { ) -> Result<()> {
Self::get_and_process_votes( Self::get_and_process_votes(
vote_txs_receiver, vote_txs_receiver,
@ -424,6 +432,7 @@ impl ClusterInfoVoteListener {
subscriptions, subscriptions,
None, None,
verified_vote_sender, verified_vote_sender,
replay_votes_receiver,
) )
} }
@ -434,20 +443,40 @@ impl ClusterInfoVoteListener {
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
epoch_stakes: Option<&EpochStakes>, epoch_stakes: Option<&EpochStakes>,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVotesReceiver,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(200); let mut sel = Select::new();
let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; sel.recv(vote_txs_receiver);
while let Ok(new_txs) = vote_txs_receiver.try_recv() { sel.recv(replay_votes_receiver);
vote_txs.extend(new_txs); let mut remaining_wait_time = 200;
loop {
if remaining_wait_time == 0 {
break;
}
let start = Instant::now();
// Wait for one of the receivers to be ready. `ready_timeout`
// will return if channels either have something, or are
// disconnected. `ready_timeout` can wake up spuriously,
// hence the loop
let _ = sel.ready_timeout(Duration::from_millis(remaining_wait_time))?;
let vote_txs: Vec<_> = vote_txs_receiver.try_iter().flatten().collect();
let replay_votes: Vec<_> = replay_votes_receiver.try_iter().collect();
if !vote_txs.is_empty() || !replay_votes.is_empty() {
Self::process_votes(
vote_tracker,
vote_txs,
last_root,
subscriptions,
epoch_stakes,
verified_vote_sender,
&replay_votes,
);
break;
} else {
remaining_wait_time = remaining_wait_time
.saturating_sub(std::cmp::max(start.elapsed().as_millis() as u64, 1));
}
} }
Self::process_votes(
vote_tracker,
vote_txs,
last_root,
subscriptions,
epoch_stakes,
verified_vote_sender,
);
Ok(()) Ok(())
} }
@ -458,10 +487,10 @@ impl ClusterInfoVoteListener {
subscriptions: &RpcSubscriptions, subscriptions: &RpcSubscriptions,
epoch_stakes: Option<&EpochStakes>, epoch_stakes: Option<&EpochStakes>,
verified_vote_sender: &VerifiedVoteSender, verified_vote_sender: &VerifiedVoteSender,
replay_votes: &[Arc<PubkeyVotes>],
) { ) {
let mut diff: HashMap<Slot, HashSet<Arc<Pubkey>>> = HashMap::new(); let mut diff: HashMap<Slot, HashMap<Arc<Pubkey>, bool>> = HashMap::new();
{ {
let all_slot_trackers = &vote_tracker.slot_vote_trackers;
for tx in vote_txs { for tx in vote_txs {
if let (Some(vote_pubkey), Some(vote_instruction)) = tx if let (Some(vote_pubkey), Some(vote_instruction)) = tx
.message .message
@ -515,26 +544,33 @@ impl ClusterInfoVoteListener {
continue; continue;
} }
// Don't insert if we already have marked down this pubkey
// voting for this slot
let maybe_slot_tracker =
all_slot_trackers.read().unwrap().get(&slot).cloned();
if let Some(slot_tracker) = maybe_slot_tracker {
if slot_tracker.read().unwrap().voted.contains(vote_pubkey) {
continue;
}
}
let unduplicated_pubkey = vote_tracker.keys.get_or_insert(vote_pubkey); let unduplicated_pubkey = vote_tracker.keys.get_or_insert(vote_pubkey);
diff.entry(slot).or_default().insert(unduplicated_pubkey); diff.entry(slot)
.or_default()
.insert(unduplicated_pubkey, true);
} }
subscriptions.notify_vote(&vote); subscriptions.notify_vote(&vote);
let _ = verified_vote_sender.send((*vote_pubkey, vote)); let _ = verified_vote_sender.send((*vote_pubkey, vote.slots));
} }
} }
} }
for (slot, slot_diff) in diff { // Process the replay votes
for votes in replay_votes {
for (pubkey, slot) in votes.iter() {
if *slot <= root {
continue;
}
let unduplicated_pubkey = vote_tracker.keys.get_or_insert(pubkey);
diff.entry(*slot)
.or_default()
.entry(unduplicated_pubkey)
.or_default();
}
}
for (slot, mut slot_diff) in diff {
let slot_tracker = vote_tracker let slot_tracker = vote_tracker
.slot_vote_trackers .slot_vote_trackers
.read() .read()
@ -542,15 +578,55 @@ impl ClusterInfoVoteListener {
.get(&slot) .get(&slot)
.cloned(); .cloned();
if let Some(slot_tracker) = slot_tracker { if let Some(slot_tracker) = slot_tracker {
{
let r_slot_tracker = slot_tracker.read().unwrap();
// Only keep the pubkeys we haven't seen voting for this slot
slot_diff.retain(|pubkey, seen_in_gossip_above| {
let seen_in_gossip_previously = r_slot_tracker.voted.get(pubkey);
let is_new = seen_in_gossip_previously.is_none();
if is_new && !*seen_in_gossip_above {
// If this vote wasn't seen in gossip, then it must be a
// replay vote, and we haven't sent a notification for
// those yet
let _ = verified_vote_sender.send((**pubkey, vec![slot]));
}
// `is_new_from_gossip` means we observed a vote for this slot
// for the first time in gossip
let is_new_from_gossip =
!seen_in_gossip_previously.cloned().unwrap_or(false)
&& *seen_in_gossip_above;
is_new || is_new_from_gossip
});
}
let mut w_slot_tracker = slot_tracker.write().unwrap(); let mut w_slot_tracker = slot_tracker.write().unwrap();
if w_slot_tracker.updates.is_none() { if w_slot_tracker.updates.is_none() {
w_slot_tracker.updates = Some(vec![]); w_slot_tracker.updates = Some(vec![]);
} }
let mut current_stake = 0; let mut current_stake = 0;
for pubkey in slot_diff { let mut gossip_only_stake = 0;
Self::sum_stake(&mut current_stake, epoch_stakes, &pubkey); for (pubkey, seen_in_gossip_above) in slot_diff {
let is_new = !w_slot_tracker.voted.contains_key(&pubkey);
Self::sum_stake(
&mut current_stake,
&mut gossip_only_stake,
epoch_stakes,
&pubkey,
// By this point we know if the vote was seen in gossip above,
// it was not seen in gossip at any point in the past, so it's
// safe to pass this in here as an overall indicator of whether
// this vote is new
seen_in_gossip_above,
is_new,
);
w_slot_tracker.voted.insert(pubkey.clone()); // From the `slot_diff.retain` earlier, we know because there are
// no other writers to `slot_vote_tracker` that
// `is_new || is_new_from_gossip`. In both cases we want to record
// `is_new_from_gossip` for the `pubkey` entry.
w_slot_tracker
.voted
.insert(pubkey.clone(), seen_in_gossip_above);
w_slot_tracker.updates.as_mut().unwrap().push(pubkey); w_slot_tracker.updates.as_mut().unwrap().push(pubkey);
} }
Self::notify_for_stake_change( Self::notify_for_stake_change(
@ -561,20 +637,33 @@ impl ClusterInfoVoteListener {
slot, slot,
); );
w_slot_tracker.total_stake += current_stake; w_slot_tracker.total_stake += current_stake;
w_slot_tracker.gossip_only_stake += gossip_only_stake
} else { } else {
let mut total_stake = 0; let mut total_stake = 0;
let voted: HashSet<_> = slot_diff let mut gossip_only_stake = 0;
let voted: HashMap<_, _> = slot_diff
.into_iter() .into_iter()
.map(|pubkey| { .map(|(pubkey, seen_in_gossip_above)| {
Self::sum_stake(&mut total_stake, epoch_stakes, &pubkey); if !seen_in_gossip_above {
pubkey let _ = verified_vote_sender.send((*pubkey, vec![slot]));
}
Self::sum_stake(
&mut total_stake,
&mut gossip_only_stake,
epoch_stakes,
&pubkey,
seen_in_gossip_above,
true,
);
(pubkey, seen_in_gossip_above)
}) })
.collect(); .collect();
Self::notify_for_stake_change(total_stake, 0, &subscriptions, epoch_stakes, slot); Self::notify_for_stake_change(total_stake, 0, &subscriptions, epoch_stakes, slot);
let new_slot_tracker = SlotVoteTracker { let new_slot_tracker = SlotVoteTracker {
voted: voted.clone(), updates: Some(voted.keys().cloned().collect()),
updates: Some(voted.into_iter().collect()), voted,
total_stake, total_stake,
gossip_only_stake,
}; };
vote_tracker vote_tracker
.slot_vote_trackers .slot_vote_trackers
@ -602,10 +691,26 @@ impl ClusterInfoVoteListener {
} }
} }
fn sum_stake(sum: &mut u64, epoch_stakes: Option<&EpochStakes>, pubkey: &Pubkey) { fn sum_stake(
sum: &mut u64,
gossip_only_stake: &mut u64,
epoch_stakes: Option<&EpochStakes>,
pubkey: &Pubkey,
is_new_from_gossip: bool,
is_new: bool,
) {
if !is_new_from_gossip && !is_new {
return;
}
if let Some(stakes) = epoch_stakes { if let Some(stakes) = epoch_stakes {
if let Some(vote_account) = stakes.stakes().vote_accounts().get(pubkey) { if let Some(vote_account) = stakes.stakes().vote_accounts().get(pubkey) {
*sum += vote_account.0; if is_new {
*sum += vote_account.0;
}
if is_new_from_gossip {
*gossip_only_stake += vote_account.0;
}
} }
} }
} }
@ -624,6 +729,7 @@ mod tests {
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::signature::{Keypair, Signer};
use solana_vote_program::vote_transaction; use solana_vote_program::vote_transaction;
use std::collections::BTreeSet;
#[test] #[test]
fn test_max_vote_tx_fits() { fn test_max_vote_tx_fits() {
@ -797,8 +903,10 @@ mod tests {
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup(); let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
let (votes_sender, votes_receiver) = unbounded(); let (votes_sender, votes_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded();
let vote_slots = vec![1, 2]; let vote_slots = vec![1, 2];
let replay_vote_slots = vec![3, 4];
validator_voting_keypairs.iter().for_each(|keypairs| { validator_voting_keypairs.iter().for_each(|keypairs| {
let node_keypair = &keypairs.node_keypair; let node_keypair = &keypairs.node_keypair;
let vote_keypair = &keypairs.vote_keypair; let vote_keypair = &keypairs.vote_keypair;
@ -811,6 +919,15 @@ mod tests {
vote_keypair, vote_keypair,
); );
votes_sender.send(vec![vote_tx]).unwrap(); votes_sender.send(vec![vote_tx]).unwrap();
for vote_slot in &replay_vote_slots {
// Send twice, should only expect to be notified once later
replay_votes_sender
.send(Arc::new(vec![(vote_keypair.pubkey(), *vote_slot)]))
.unwrap();
replay_votes_sender
.send(Arc::new(vec![(vote_keypair.pubkey(), *vote_slot)]))
.unwrap();
}
}); });
// Check that all the votes were registered for each validator correctly // Check that all the votes were registered for each validator correctly
@ -821,25 +938,41 @@ mod tests {
&subscriptions, &subscriptions,
None, None,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver,
) )
.unwrap(); .unwrap();
// Check that the received votes were pushed to other commponents // Check that the received votes were pushed to other commponents
// subscribing via a channel // subscribing via `verified_vote_receiver`
let received_votes: Vec<_> = verified_vote_receiver.try_iter().collect(); let all_expected_slots: BTreeSet<_> = vote_slots
assert_eq!(received_votes.len(), validator_voting_keypairs.len()); .into_iter()
for (voting_keypair, (received_pubkey, received_vote)) in .chain(replay_vote_slots.into_iter())
validator_voting_keypairs.iter().zip(received_votes.iter()) .collect();
{ let mut pubkey_to_votes: HashMap<Pubkey, BTreeSet<Slot>> = HashMap::new();
assert_eq!(voting_keypair.vote_keypair.pubkey(), *received_pubkey); for (received_pubkey, new_votes) in verified_vote_receiver.try_iter() {
assert_eq!(received_vote.slots, vote_slots); let already_received_votes = pubkey_to_votes.entry(received_pubkey).or_default();
for new_vote in new_votes {
// `new_vote` should only be received once
assert!(already_received_votes.insert(new_vote));
}
} }
for vote_slot in vote_slots { assert_eq!(pubkey_to_votes.len(), validator_voting_keypairs.len());
for keypairs in &validator_voting_keypairs {
assert_eq!(
*pubkey_to_votes
.get(&keypairs.vote_keypair.pubkey())
.unwrap(),
all_expected_slots
);
}
// Check the vote trackers were updated correctly
for vote_slot in all_expected_slots {
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
let r_slot_vote_tracker = slot_vote_tracker.read().unwrap(); let r_slot_vote_tracker = slot_vote_tracker.read().unwrap();
for voting_keypairs in &validator_voting_keypairs { for voting_keypairs in &validator_voting_keypairs {
let pubkey = voting_keypairs.vote_keypair.pubkey(); let pubkey = voting_keypairs.vote_keypair.pubkey();
assert!(r_slot_vote_tracker.voted.contains(&pubkey)); assert!(r_slot_vote_tracker.voted.contains_key(&pubkey));
assert!(r_slot_vote_tracker assert!(r_slot_vote_tracker
.updates .updates
.as_ref() .as_ref()
@ -856,6 +989,7 @@ mod tests {
// Send some votes to process // Send some votes to process
let (votes_txs_sender, votes_txs_receiver) = unbounded(); let (votes_txs_sender, votes_txs_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (_replay_votes_sender, replay_votes_receiver) = unbounded();
let mut expected_votes = vec![]; let mut expected_votes = vec![];
for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() {
@ -886,15 +1020,13 @@ mod tests {
&subscriptions, &subscriptions,
None, None,
&verified_vote_sender, &verified_vote_sender,
&replay_votes_receiver,
) )
.unwrap(); .unwrap();
// Check that the received votes were pushed to other commponents // Check that the received votes were pushed to other commponents
// subscribing via a channel // subscribing via a channel
let received_votes: Vec<_> = verified_vote_receiver let received_votes: Vec<_> = verified_vote_receiver.try_iter().collect();
.try_iter()
.map(|(pubkey, vote)| (pubkey, vote.slots))
.collect();
assert_eq!(received_votes.len(), validator_voting_keypairs.len()); assert_eq!(received_votes.len(), validator_voting_keypairs.len());
for (expected_pubkey_vote, received_pubkey_vote) in for (expected_pubkey_vote, received_pubkey_vote) in
expected_votes.iter().zip(received_votes.iter()) expected_votes.iter().zip(received_votes.iter())
@ -908,7 +1040,7 @@ mod tests {
let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap(); let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap();
for voting_keypairs in keyset { for voting_keypairs in keyset {
let pubkey = voting_keypairs.vote_keypair.pubkey(); let pubkey = voting_keypairs.vote_keypair.pubkey();
assert!(r_slot_vote_tracker.voted.contains(&pubkey)); assert!(r_slot_vote_tracker.voted.contains_key(&pubkey));
assert!(r_slot_vote_tracker assert!(r_slot_vote_tracker
.updates .updates
.as_ref() .as_ref()
@ -918,6 +1050,79 @@ mod tests {
} }
} }
#[test]
fn test_process_votes3() {
let (votes_sender, votes_receiver) = unbounded();
let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded();
let vote_slot = 1;
// Events:
// 0: Send gossip vote
// 1: Send replay vote
// 2: Send both
let ordered_events = vec![
vec![0],
vec![1],
vec![0, 1],
vec![1, 0],
vec![2],
vec![0, 1, 2],
vec![1, 0, 2],
];
for events in ordered_events {
let (vote_tracker, bank, validator_voting_keypairs, subscriptions) = setup();
let node_keypair = &validator_voting_keypairs[0].node_keypair;
let vote_keypair = &validator_voting_keypairs[0].vote_keypair;
for &e in &events {
if e == 0 || e == 2 {
// Create vote transaction
let vote_tx = vote_transaction::new_vote_transaction(
vec![vote_slot],
Hash::default(),
Hash::default(),
node_keypair,
vote_keypair,
vote_keypair,
);
votes_sender.send(vec![vote_tx.clone()]).unwrap();
}
if e == 1 || e == 2 {
replay_votes_sender
.send(Arc::new(vec![(vote_keypair.pubkey(), vote_slot)]))
.unwrap();
}
let _ = ClusterInfoVoteListener::get_and_process_votes(
&votes_receiver,
&vote_tracker,
0,
&subscriptions,
Some(
// Make sure `epoch_stakes` exists for this slot by unwrapping
bank.epoch_stakes(bank.epoch_schedule().get_epoch(vote_slot))
.unwrap(),
),
&verified_vote_sender,
&replay_votes_receiver,
);
}
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap();
if events == vec![1] {
// Check `gossip_only_stake` is not incremented
assert_eq!(r_slot_vote_tracker.total_stake, 100);
assert_eq!(r_slot_vote_tracker.gossip_only_stake, 0);
} else {
// Check that both the `gossip_only_stake` and `total_stake` both
// increased
assert_eq!(r_slot_vote_tracker.total_stake, 100);
assert_eq!(r_slot_vote_tracker.gossip_only_stake, 100);
}
}
}
#[test] #[test]
fn test_get_voters_by_epoch() { fn test_get_voters_by_epoch() {
// Create some voters at genesis // Create some voters at genesis
@ -981,14 +1186,14 @@ mod tests {
let ref_count_per_vote = 2; let ref_count_per_vote = 2;
// Create some voters at genesis // Create some voters at genesis
let validator_voting_keypairs: Vec<_> = (0..2) let validator_keypairs: Vec<_> = (0..2)
.map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new()))
.collect(); .collect();
let GenesisConfigInfo { genesis_config, .. } = let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts( genesis_utils::create_genesis_config_with_vote_accounts(
10_000, 10_000,
&validator_voting_keypairs, &validator_keypairs,
100, 100,
); );
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
@ -1004,10 +1209,11 @@ mod tests {
// Send a vote to process, should add a reference to the pubkey for that voter // Send a vote to process, should add a reference to the pubkey for that voter
// in the tracker // in the tracker
let validator0_keypairs = &validator_voting_keypairs[0]; let validator0_keypairs = &validator_keypairs[0];
let voted_slot = bank.slot() + 1;
let vote_tx = vec![vote_transaction::new_vote_transaction( let vote_tx = vec![vote_transaction::new_vote_transaction(
// Must vote > root to be processed // Must vote > root to be processed
vec![bank.slot() + 1], vec![voted_slot],
Hash::default(), Hash::default(),
Hash::default(), Hash::default(),
&validator0_keypairs.node_keypair, &validator0_keypairs.node_keypair,
@ -1023,6 +1229,11 @@ mod tests {
&subscriptions, &subscriptions,
None, None,
&verified_vote_sender, &verified_vote_sender,
// Add vote for same slot, should not affect outcome
&[Arc::new(vec![(
validator0_keypairs.vote_keypair.pubkey(),
voted_slot,
)])],
); );
let ref_count = Arc::strong_count( let ref_count = Arc::strong_count(
&vote_tracker &vote_tracker
@ -1057,8 +1268,9 @@ mod tests {
// Test with votes across two epochs // Test with votes across two epochs
let first_slot_in_new_epoch = bank.epoch_schedule().get_first_slot_in_epoch(new_epoch); let first_slot_in_new_epoch = bank.epoch_schedule().get_first_slot_in_epoch(new_epoch);
// Make 2 new votes in two different epochs, ref count should go up // Make 2 new votes in two different epochs for the same pubkey,
// by 2 * ref_count_per_vote // the ref count should go up by 3 * ref_count_per_vote
// Add 1 vote through the replay channel, ref count should
let vote_txs: Vec<_> = [bank.slot() + 2, first_slot_in_new_epoch] let vote_txs: Vec<_> = [bank.slot() + 2, first_slot_in_new_epoch]
.iter() .iter()
.map(|slot| { .map(|slot| {
@ -1081,8 +1293,25 @@ mod tests {
&subscriptions, &subscriptions,
None, None,
&verified_vote_sender, &verified_vote_sender,
&[Arc::new(vec![(
validator_keypairs[1].vote_keypair.pubkey(),
first_slot_in_new_epoch,
)])],
); );
// Check new replay vote pubkey first
let ref_count = Arc::strong_count(
&vote_tracker
.keys
.0
.read()
.unwrap()
.get(&validator_keypairs[1].vote_keypair.pubkey())
.unwrap(),
);
assert_eq!(ref_count, current_ref_count);
// Check the existing pubkey
let ref_count = Arc::strong_count( let ref_count = Arc::strong_count(
&vote_tracker &vote_tracker
.keys .keys
@ -1204,4 +1433,78 @@ mod tests {
assert_eq!(vote_txs.len(), 2); assert_eq!(vote_txs.len(), 2);
verify_packets_len(&packets, 2); verify_packets_len(&packets, 2);
} }
#[test]
fn test_sum_stake() {
let (_, bank, validator_voting_keypairs, _) = setup();
let vote_keypair = &validator_voting_keypairs[0].vote_keypair;
let epoch_stakes = bank.epoch_stakes(bank.epoch()).unwrap();
// If `is_new_from_gossip` and `is_new` are both true, both fields
// should increase
let mut total_stake = 0;
let mut gossip_only_stake = 0;
let is_new_from_gossip = true;
let is_new = true;
ClusterInfoVoteListener::sum_stake(
&mut total_stake,
&mut gossip_only_stake,
Some(epoch_stakes),
&vote_keypair.pubkey(),
is_new_from_gossip,
is_new,
);
assert_eq!(total_stake, 100);
assert_eq!(gossip_only_stake, 100);
// If `is_new_from_gossip` and `is_new` are both false, none should increase
let mut total_stake = 0;
let mut gossip_only_stake = 0;
let is_new_from_gossip = false;
let is_new = false;
ClusterInfoVoteListener::sum_stake(
&mut total_stake,
&mut gossip_only_stake,
Some(epoch_stakes),
&vote_keypair.pubkey(),
is_new_from_gossip,
is_new,
);
assert_eq!(total_stake, 0);
assert_eq!(gossip_only_stake, 0);
// If only `is_new`, but not `is_new_from_gossip` then
// `total_stake` will increase, but `gossip_only_stake` won't
let mut total_stake = 0;
let mut gossip_only_stake = 0;
let is_new_from_gossip = false;
let is_new = true;
ClusterInfoVoteListener::sum_stake(
&mut total_stake,
&mut gossip_only_stake,
Some(epoch_stakes),
&vote_keypair.pubkey(),
is_new_from_gossip,
is_new,
);
assert_eq!(total_stake, 100);
assert_eq!(gossip_only_stake, 0);
// If only `is_new_from_gossip`, but not `is_new` then
// `gossip_only_stake` will increase, but `total_stake` won't
let mut total_stake = 0;
let mut gossip_only_stake = 0;
let is_new_from_gossip = true;
let is_new = false;
ClusterInfoVoteListener::sum_stake(
&mut total_stake,
&mut gossip_only_stake,
Some(epoch_stakes),
&vote_keypair.pubkey(),
is_new_from_gossip,
is_new,
);
assert_eq!(total_stake, 0);
assert_eq!(gossip_only_stake, 100);
}
} }

View File

@ -61,6 +61,7 @@ pub const SWITCH_FORK_THRESHOLD: f64 = 0.38;
pub type Stake = u64; pub type Stake = u64;
pub type VotedStakes = HashMap<Slot, Stake>; pub type VotedStakes = HashMap<Slot, Stake>;
pub type PubkeyVotes = Vec<(Pubkey, Slot)>;
pub(crate) struct ComputedBankState { pub(crate) struct ComputedBankState {
pub voted_stakes: VotedStakes, pub voted_stakes: VotedStakes,
@ -69,7 +70,7 @@ pub(crate) struct ComputedBankState {
// Tree of intervals of lockouts of the form [slot, slot + slot.lockout], // Tree of intervals of lockouts of the form [slot, slot + slot.lockout],
// keyed by end of the range // keyed by end of the range
pub lockout_intervals: LockoutIntervals, pub lockout_intervals: LockoutIntervals,
pub pubkey_votes: Vec<(Pubkey, Slot)>, pub pubkey_votes: Arc<PubkeyVotes>,
} }
#[frozen_abi(digest = "2ZUeCLMVQxmHYbeqMH7M97ifVSKoVErGvRHzyxcQRjgU")] #[frozen_abi(digest = "2ZUeCLMVQxmHYbeqMH7M97ifVSKoVErGvRHzyxcQRjgU")]
@ -250,7 +251,7 @@ impl Tower {
total_stake, total_stake,
bank_weight, bank_weight,
lockout_intervals, lockout_intervals,
pubkey_votes, pubkey_votes: Arc::new(pubkey_votes),
} }
} }
@ -665,6 +666,7 @@ pub mod test {
progress_map::ForkProgress, progress_map::ForkProgress,
replay_stage::{HeaviestForkFailures, ReplayStage}, replay_stage::{HeaviestForkFailures, ReplayStage},
}; };
use crossbeam_channel::unbounded;
use solana_runtime::{ use solana_runtime::{
bank::Bank, bank::Bank,
bank_forks::BankForks, bank_forks::BankForks,
@ -784,6 +786,7 @@ pub mod test {
.cloned() .cloned()
.collect(); .collect();
let (replay_slot_sender, _replay_slot_receiver) = unbounded();
let _ = ReplayStage::compute_bank_stats( let _ = ReplayStage::compute_bank_stats(
&my_pubkey, &my_pubkey,
&ancestors, &ancestors,
@ -796,6 +799,7 @@ pub mod test {
&mut PubkeyReferences::default(), &mut PubkeyReferences::default(),
&mut self.heaviest_subtree_fork_choice, &mut self.heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(), &mut BankWeightForkChoice::default(),
&replay_slot_sender,
); );
let vote_bank = self let vote_bank = self
@ -1356,7 +1360,7 @@ pub mod test {
//two accounts voting for slot 0 with 1 token staked //two accounts voting for slot 0 with 1 token staked
let mut accounts = gen_stakes(&[(1, &[0]), (1, &[0])]); let mut accounts = gen_stakes(&[(1, &[0]), (1, &[0])]);
accounts.sort_by_key(|(pk, _)| *pk); accounts.sort_by_key(|(pk, _)| *pk);
let account_latest_votes: Vec<(Pubkey, Slot)> = let account_latest_votes: PubkeyVotes =
accounts.iter().map(|(pubkey, _)| (*pubkey, 0)).collect(); accounts.iter().map(|(pubkey, _)| (*pubkey, 0)).collect();
let ancestors = vec![(1, vec![0].into_iter().collect()), (0, HashSet::new())] let ancestors = vec![(1, vec![0].into_iter().collect()), (0, HashSet::new())]
@ -1366,7 +1370,7 @@ pub mod test {
voted_stakes, voted_stakes,
total_stake, total_stake,
bank_weight, bank_weight,
mut pubkey_votes, pubkey_votes,
.. ..
} = Tower::collect_vote_lockouts( } = Tower::collect_vote_lockouts(
&Pubkey::default(), &Pubkey::default(),
@ -1377,6 +1381,7 @@ pub mod test {
); );
assert_eq!(voted_stakes[&0], 2); assert_eq!(voted_stakes[&0], 2);
assert_eq!(total_stake, 2); assert_eq!(total_stake, 2);
let mut pubkey_votes = Arc::try_unwrap(pubkey_votes).unwrap();
pubkey_votes.sort(); pubkey_votes.sort();
assert_eq!(pubkey_votes, account_latest_votes); assert_eq!(pubkey_votes, account_latest_votes);
@ -1391,7 +1396,7 @@ pub mod test {
//two accounts voting for slots 0..MAX_LOCKOUT_HISTORY with 1 token staked //two accounts voting for slots 0..MAX_LOCKOUT_HISTORY with 1 token staked
let mut accounts = gen_stakes(&[(1, &votes), (1, &votes)]); let mut accounts = gen_stakes(&[(1, &votes), (1, &votes)]);
accounts.sort_by_key(|(pk, _)| *pk); accounts.sort_by_key(|(pk, _)| *pk);
let account_latest_votes: Vec<(Pubkey, Slot)> = accounts let account_latest_votes: PubkeyVotes = accounts
.iter() .iter()
.map(|(pubkey, _)| (*pubkey, (MAX_LOCKOUT_HISTORY - 1) as Slot)) .map(|(pubkey, _)| (*pubkey, (MAX_LOCKOUT_HISTORY - 1) as Slot))
.collect(); .collect();
@ -1418,7 +1423,7 @@ pub mod test {
let ComputedBankState { let ComputedBankState {
voted_stakes, voted_stakes,
bank_weight, bank_weight,
mut pubkey_votes, pubkey_votes,
.. ..
} = Tower::collect_vote_lockouts( } = Tower::collect_vote_lockouts(
&Pubkey::default(), &Pubkey::default(),
@ -1433,6 +1438,7 @@ pub mod test {
// should be the sum of all the weights for root // should be the sum of all the weights for root
assert_eq!(bank_weight, expected_bank_weight); assert_eq!(bank_weight, expected_bank_weight);
let mut pubkey_votes = Arc::try_unwrap(pubkey_votes).unwrap();
pubkey_votes.sort(); pubkey_votes.sort();
assert_eq!(pubkey_votes, account_latest_votes); assert_eq!(pubkey_votes, account_latest_votes);
} }

View File

@ -217,8 +217,8 @@ impl RepairService {
let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new(); let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new();
verified_vote_receiver verified_vote_receiver
.try_iter() .try_iter()
.for_each(|(vote_pubkey, vote)| { .for_each(|(vote_pubkey, vote_slots)| {
for slot in vote.slots { for slot in vote_slots {
slot_to_vote_pubkeys slot_to_vote_pubkeys
.entry(slot) .entry(slot)
.or_default() .or_default()

View File

@ -7,7 +7,7 @@ use crate::{
cluster_info_vote_listener::VoteTracker, cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
commitment_service::{AggregateCommitmentService, CommitmentAggregationData}, commitment_service::{AggregateCommitmentService, CommitmentAggregationData},
consensus::{ComputedBankState, Stake, SwitchForkDecision, Tower, VotedStakes}, consensus::{ComputedBankState, PubkeyVotes, Stake, SwitchForkDecision, Tower, VotedStakes},
fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
@ -18,6 +18,7 @@ use crate::{
rewards_recorder_service::RewardsRecorderSender, rewards_recorder_service::RewardsRecorderSender,
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
}; };
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use solana_ledger::{ use solana_ledger::{
block_error::BlockError, block_error::BlockError,
blockstore::Blockstore, blockstore::Blockstore,
@ -61,6 +62,9 @@ pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
pub const MAX_UNCONFIRMED_SLOTS: usize = 5; pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
pub type ReplayVotesSender = CrossbeamSender<Arc<PubkeyVotes>>;
pub type ReplayVotesReceiver = CrossbeamReceiver<Arc<PubkeyVotes>>;
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
pub(crate) enum HeaviestForkFailures { pub(crate) enum HeaviestForkFailures {
LockedOut(u64), LockedOut(u64),
@ -221,6 +225,7 @@ impl ReplayStage {
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver,
replay_votes_sender: ReplayVotesSender,
) -> Self { ) -> Self {
let ReplayStageConfig { let ReplayStageConfig {
my_pubkey, my_pubkey,
@ -387,6 +392,7 @@ impl ReplayStage {
&mut all_pubkeys, &mut all_pubkeys,
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut bank_weight_fork_choice, &mut bank_weight_fork_choice,
&replay_votes_sender,
); );
compute_bank_stats_time.stop(); compute_bank_stats_time.stop();
@ -1303,6 +1309,7 @@ impl ReplayStage {
all_pubkeys: &mut PubkeyReferences, all_pubkeys: &mut PubkeyReferences,
heaviest_subtree_fork_choice: &mut dyn ForkChoice, heaviest_subtree_fork_choice: &mut dyn ForkChoice,
bank_weight_fork_choice: &mut dyn ForkChoice, bank_weight_fork_choice: &mut dyn ForkChoice,
replay_votes_sender: &ReplayVotesSender,
) -> Vec<Slot> { ) -> Vec<Slot> {
frozen_banks.sort_by_key(|bank| bank.slot()); frozen_banks.sort_by_key(|bank| bank.slot());
let mut new_stats = vec![]; let mut new_stats = vec![];
@ -1324,6 +1331,9 @@ impl ReplayStage {
&ancestors, &ancestors,
all_pubkeys, all_pubkeys,
); );
// Notify any listeners of the votes found in this newly computed
// bank
let _ = replay_votes_sender.send(computed_bank_state.pubkey_votes.clone());
heaviest_subtree_fork_choice.compute_bank_stats( heaviest_subtree_fork_choice.compute_bank_stats(
&bank, &bank,
tower, tower,
@ -2686,6 +2696,7 @@ pub(crate) mod tests {
.cloned() .cloned()
.collect(); .collect();
let tower = Tower::new_for_tests(0, 0.67); let tower = Tower::new_for_tests(0, 0.67);
let (replay_votes_sender, replay_votes_receiver) = unbounded();
let newly_computed = ReplayStage::compute_bank_stats( let newly_computed = ReplayStage::compute_bank_stats(
&node_pubkey, &node_pubkey,
&ancestors, &ancestors,
@ -2698,8 +2709,13 @@ pub(crate) mod tests {
&mut PubkeyReferences::default(), &mut PubkeyReferences::default(),
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(), &mut BankWeightForkChoice::default(),
&replay_votes_sender,
); );
// bank 0 has no votes, should not send any votes on the channel
assert_eq!(replay_votes_receiver.try_recv().unwrap(), Arc::new(vec![]));
assert_eq!(newly_computed, vec![0]); assert_eq!(newly_computed, vec![0]);
// The only vote is in bank 1, and bank_forks does not currently contain // The only vote is in bank 1, and bank_forks does not currently contain
// bank 1, so no slot should be confirmed. // bank 1, so no slot should be confirmed.
{ {
@ -2741,8 +2757,15 @@ pub(crate) mod tests {
&mut PubkeyReferences::default(), &mut PubkeyReferences::default(),
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(), &mut BankWeightForkChoice::default(),
&replay_votes_sender,
); );
// Bank 1 had one vote, ensure that `compute_bank_stats` notifies listeners
// via `replay_votes_receiver`.
assert_eq!(
replay_votes_receiver.try_recv().unwrap(),
Arc::new(vec![(my_keypairs.vote_keypair.pubkey(), 0)])
);
assert_eq!(newly_computed, vec![1]); assert_eq!(newly_computed, vec![1]);
{ {
let fork_progress = progress.get(&1).unwrap(); let fork_progress = progress.get(&1).unwrap();
@ -2776,8 +2799,10 @@ pub(crate) mod tests {
&mut PubkeyReferences::default(), &mut PubkeyReferences::default(),
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(), &mut BankWeightForkChoice::default(),
&replay_votes_sender,
); );
// No new stats should have been computed // No new stats should have been computed
assert!(replay_votes_receiver.try_iter().next().is_none());
assert!(newly_computed.is_empty()); assert!(newly_computed.is_empty());
} }
@ -2802,6 +2827,7 @@ pub(crate) mod tests {
.collect(); .collect();
let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors();
let (replay_votes_sender, _replay_votes_receiver) = unbounded();
ReplayStage::compute_bank_stats( ReplayStage::compute_bank_stats(
&node_pubkey, &node_pubkey,
&ancestors, &ancestors,
@ -2814,6 +2840,7 @@ pub(crate) mod tests {
&mut PubkeyReferences::default(), &mut PubkeyReferences::default(),
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(), &mut BankWeightForkChoice::default(),
&replay_votes_sender,
); );
assert_eq!( assert_eq!(
@ -2876,6 +2903,7 @@ pub(crate) mod tests {
.cloned() .cloned()
.collect(); .collect();
let (replay_votes_sender, _replay_votes_receiver) = unbounded();
ReplayStage::compute_bank_stats( ReplayStage::compute_bank_stats(
&node_pubkey, &node_pubkey,
&vote_simulator.bank_forks.read().unwrap().ancestors(), &vote_simulator.bank_forks.read().unwrap().ancestors(),
@ -2888,6 +2916,7 @@ pub(crate) mod tests {
&mut PubkeyReferences::default(), &mut PubkeyReferences::default(),
&mut vote_simulator.heaviest_subtree_fork_choice, &mut vote_simulator.heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(), &mut BankWeightForkChoice::default(),
&replay_votes_sender,
); );
frozen_banks.sort_by_key(|bank| bank.slot()); frozen_banks.sort_by_key(|bank| bank.slot());

View File

@ -17,6 +17,7 @@ pub enum Error {
RecvError(std::sync::mpsc::RecvError), RecvError(std::sync::mpsc::RecvError),
TryCrossbeamRecvError(crossbeam_channel::TryRecvError), TryCrossbeamRecvError(crossbeam_channel::TryRecvError),
CrossbeamRecvTimeoutError(crossbeam_channel::RecvTimeoutError), CrossbeamRecvTimeoutError(crossbeam_channel::RecvTimeoutError),
ReadyTimeoutError,
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
CrossbeamSendError, CrossbeamSendError,
TryRecvError(std::sync::mpsc::TryRecvError), TryRecvError(std::sync::mpsc::TryRecvError),
@ -61,6 +62,11 @@ impl std::convert::From<crossbeam_channel::RecvTimeoutError> for Error {
Error::CrossbeamRecvTimeoutError(e) Error::CrossbeamRecvTimeoutError(e)
} }
} }
impl std::convert::From<crossbeam_channel::ReadyTimeoutError> for Error {
fn from(_e: crossbeam_channel::ReadyTimeoutError) -> Error {
Error::ReadyTimeoutError
}
}
impl std::convert::From<std::sync::mpsc::RecvTimeoutError> for Error { impl std::convert::From<std::sync::mpsc::RecvTimeoutError> for Error {
fn from(e: std::sync::mpsc::RecvTimeoutError) -> Error { fn from(e: std::sync::mpsc::RecvTimeoutError) -> Error {
Error::RecvTimeoutError(e) Error::RecvTimeoutError(e)

View File

@ -877,12 +877,14 @@ mod tests {
// Process votes and check they were notified. // Process votes and check they were notified.
let (s, _r) = unbounded(); let (s, _r) = unbounded();
let (_replay_votes_sender, replay_votes_receiver) = unbounded();
ClusterInfoVoteListener::get_and_process_votes_for_tests( ClusterInfoVoteListener::get_and_process_votes_for_tests(
&votes_receiver, &votes_receiver,
&vote_tracker, &vote_tracker,
0, 0,
&rpc.subscriptions, &rpc.subscriptions,
&s, &s,
&replay_votes_receiver,
) )
.unwrap(); .unwrap();

View File

@ -705,6 +705,9 @@ impl RpcSubscriptions {
notifier.notify(slot_info, sink); notifier.notify(slot_info, sink);
} }
} }
// These notifications are only triggered by votes observed on gossip,
// unlike `NotificationEntry::Gossip`, which also accounts for slots seen
// in VoteState's from bank states built in ReplayStage.
NotificationEntry::Vote(ref vote_info) => { NotificationEntry::Vote(ref vote_info) => {
let subscriptions = subscriptions.vote_subscriptions.read().unwrap(); let subscriptions = subscriptions.vote_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() { for (_, sink) in subscriptions.iter() {

View File

@ -8,6 +8,7 @@ use crate::{
cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker}, cluster_info_vote_listener::{ClusterInfoVoteListener, VerifiedVoteSender, VoteTracker},
fetch_stage::FetchStage, fetch_stage::FetchStage,
poh_recorder::{PohRecorder, WorkingBankEntry}, poh_recorder::{PohRecorder, WorkingBankEntry},
replay_stage::ReplayVotesReceiver,
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
sigverify::TransactionSigVerifier, sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage, sigverify_stage::SigVerifyStage,
@ -52,6 +53,7 @@ impl Tpu {
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
verified_vote_sender: VerifiedVoteSender, verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVotesReceiver,
) -> Self { ) -> Self {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender( let fetch_stage = FetchStage::new_with_sender(
@ -78,6 +80,7 @@ impl Tpu {
bank_forks, bank_forks,
subscriptions.clone(), subscriptions.clone(),
verified_vote_sender, verified_vote_sender,
replay_votes_receiver,
); );
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(

View File

@ -10,7 +10,7 @@ use crate::{
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
ledger_cleanup_service::LedgerCleanupService, ledger_cleanup_service::LedgerCleanupService,
poh_recorder::PohRecorder, poh_recorder::PohRecorder,
replay_stage::{ReplayStage, ReplayStageConfig}, replay_stage::{ReplayStage, ReplayStageConfig, ReplayVotesSender},
retransmit_stage::RetransmitStage, retransmit_stage::RetransmitStage,
rewards_recorder_service::RewardsRecorderSender, rewards_recorder_service::RewardsRecorderSender,
rpc_subscriptions::RpcSubscriptions, rpc_subscriptions::RpcSubscriptions,
@ -98,6 +98,7 @@ impl Tvu {
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
replay_votes_sender: ReplayVotesSender,
tvu_config: TvuConfig, tvu_config: TvuConfig,
) -> Self { ) -> Self {
let keypair: Arc<Keypair> = cluster_info.keypair.clone(); let keypair: Arc<Keypair> = cluster_info.keypair.clone();
@ -198,6 +199,7 @@ impl Tvu {
cluster_slots, cluster_slots,
retransmit_slots_sender, retransmit_slots_sender,
duplicate_slots_reset_receiver, duplicate_slots_reset_receiver,
replay_votes_sender,
); );
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
@ -283,6 +285,7 @@ pub mod tests {
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
let (_verified_vote_sender, verified_vote_receiver) = unbounded(); let (_verified_vote_sender, verified_vote_receiver) = unbounded();
let (replay_votes_sender, _replay_votes_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let tvu = Tvu::new( let tvu = Tvu::new(
&vote_keypair.pubkey(), &vote_keypair.pubkey(),
@ -316,6 +319,7 @@ pub mod tests {
Arc::new(VoteTracker::new(&bank)), Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver, verified_vote_receiver,
replay_votes_sender,
TvuConfig::default(), TvuConfig::default(),
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);

View File

@ -405,6 +405,7 @@ impl Validator {
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded(); let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded();
let tvu = Tvu::new( let tvu = Tvu::new(
vote_account, vote_account,
authorized_voter_keypairs, authorized_voter_keypairs,
@ -450,6 +451,7 @@ impl Validator {
vote_tracker.clone(), vote_tracker.clone(),
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver, verified_vote_receiver,
replay_votes_sender,
TvuConfig { TvuConfig {
max_ledger_shreds: config.max_ledger_shreds, max_ledger_shreds: config.max_ledger_shreds,
halt_on_trusted_validators_accounts_hash_mismatch: config halt_on_trusted_validators_accounts_hash_mismatch: config
@ -477,6 +479,7 @@ impl Validator {
vote_tracker, vote_tracker,
bank_forks, bank_forks,
verified_vote_sender, verified_vote_sender,
replay_votes_receiver,
); );
datapoint_info!("validator-new", ("id", id.to_string(), String)); datapoint_info!("validator-new", ("id", id.to_string(), String));