Fix vote polling (#8829)

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-03-15 20:31:05 -07:00 committed by GitHub
parent 49706172f3
commit 9afc5da2e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 323 additions and 72 deletions

View File

@ -450,23 +450,23 @@ impl ClusterInfo {
/// since. This allows the bank to query for new votes only.
///
/// * return - The votes, and the max timestamp from the new set.
pub fn get_votes(&self, since: u64) -> (Vec<Transaction>, u64) {
let votes: Vec<_> = self
pub fn get_votes(&self, since: u64) -> (Vec<CrdsValueLabel>, Vec<Transaction>, u64) {
let mut max_ts = since;
let (labels, txs): (Vec<CrdsValueLabel>, Vec<Transaction>) = self
.gossip
.crds
.table
.values()
.filter(|x| x.insert_timestamp > since)
.filter_map(|x| {
.iter()
.filter(|(_, x)| x.insert_timestamp > since)
.filter_map(|(label, x)| {
max_ts = std::cmp::max(x.insert_timestamp, max_ts);
x.value
.vote()
.map(|v| (x.insert_timestamp, v.transaction.clone()))
.map(|v| (label.clone(), v.transaction.clone()))
})
.collect();
let max_ts = votes.iter().map(|x| x.0).max().unwrap_or(since);
let txs: Vec<Transaction> = votes.into_iter().map(|x| x.1).collect();
.unzip();
inc_new_counter_info!("cluster_info-get_votes-count", txs.len());
(txs, max_ts)
(labels, txs, max_ts)
}
pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> {
@ -2242,26 +2242,35 @@ mod tests {
#[test]
fn test_push_vote() {
let keys = Keypair::new();
let now = timestamp();
let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0);
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
// make sure empty crds is handled correctly
let (votes, max_ts) = cluster_info.get_votes(now);
let now = timestamp();
let (_, votes, max_ts) = cluster_info.get_votes(now);
assert_eq!(votes, vec![]);
assert_eq!(max_ts, now);
// add a vote
let tx = test_tx();
cluster_info.push_vote(0, tx.clone());
let index = 1;
cluster_info.push_vote(index, tx.clone());
// -1 to make sure that the clock is strictly lower then when insert occurred
let (votes, max_ts) = cluster_info.get_votes(now - 1);
let (labels, votes, max_ts) = cluster_info.get_votes(now - 1);
assert_eq!(votes, vec![tx]);
assert_eq!(labels.len(), 1);
match labels[0] {
CrdsValueLabel::Vote(_, pubkey) => {
assert_eq!(pubkey, keys.pubkey());
}
_ => panic!("Bad match"),
}
assert!(max_ts >= now - 1);
// make sure timestamp filter works
let (votes, new_max_ts) = cluster_info.get_votes(max_ts);
let (_, votes, new_max_ts) = cluster_info.get_votes(max_ts);
assert_eq!(votes, vec![]);
assert_eq!(max_ts, new_max_ts);
}

View File

@ -1,11 +1,16 @@
use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS};
use crate::packet::Packets;
use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use crate::{packet, sigverify};
use crate::{
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
crds_value::CrdsValueLabel,
packet::{self, Packets},
poh_recorder::PohRecorder,
result::{Error, Result},
sigverify,
verified_vote_packets::VerifiedVotePackets,
};
use crossbeam_channel::{
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
};
use itertools::izip;
use log::*;
use solana_ledger::bank_forks::BankForks;
use solana_metrics::inc_new_counter_debug;
@ -19,15 +24,23 @@ use solana_sdk::{
transaction::Transaction,
};
use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::VoteState};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicBool, Ordering},
{Arc, Mutex, RwLock},
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
// Map from a vote account to the authorized voter for an epoch
pub type EpochAuthorizedVoters = HashMap<Arc<Pubkey>, Arc<Pubkey>>;
pub type NodeIdToVoteAccounts = HashMap<Pubkey, Vec<Arc<Pubkey>>>;
pub type VerifiedVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Packets)>>;
pub type VerifiedVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Packets)>>;
pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>;
pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
pub struct SlotVoteTracker {
voted: HashSet<Arc<Pubkey>>,
@ -276,8 +289,9 @@ impl ClusterInfoVoteListener {
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
let exit_ = exit.clone();
let poh_recorder = poh_recorder.clone();
let (vote_txs_sender, vote_txs_receiver) = unbounded();
let (verified_vote_packets_sender, verified_vote_packets_receiver) = unbounded();
let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded();
let listen_thread = Builder::new()
.name("solana-cluster_info_vote_listener".to_string())
.spawn(move || {
@ -285,9 +299,22 @@ impl ClusterInfoVoteListener {
exit_,
&cluster_info,
sigverify_disabled,
&sender,
vote_txs_sender,
verified_vote_packets_sender,
verified_vote_transactions_sender,
);
})
.unwrap();
let exit_ = exit.clone();
let poh_recorder = poh_recorder.clone();
let bank_send_thread = Builder::new()
.name("solana-cluster_info_bank_send".to_string())
.spawn(move || {
let _ = Self::bank_send_loop(
exit_,
verified_vote_packets_receiver,
poh_recorder,
&sender,
);
})
.unwrap();
@ -296,13 +323,17 @@ impl ClusterInfoVoteListener {
let send_thread = Builder::new()
.name("solana-cluster_info_process_votes".to_string())
.spawn(move || {
let _ =
Self::process_votes_loop(exit_, vote_txs_receiver, vote_tracker, &bank_forks);
let _ = Self::process_votes_loop(
exit_,
verified_vote_transactions_receiver,
vote_tracker,
&bank_forks,
);
})
.unwrap();
Self {
thread_hdls: vec![listen_thread, send_thread],
thread_hdls: vec![listen_thread, send_thread, bank_send_thread],
}
}
@ -317,57 +348,104 @@ impl ClusterInfoVoteListener {
exit: Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool,
packets_sender: &CrossbeamSender<Vec<Packets>>,
vote_txs_sender: CrossbeamSender<Vec<Transaction>>,
poh_recorder: Arc<Mutex<PohRecorder>>,
verified_vote_packets_sender: VerifiedVotePacketsSender,
verified_vote_transactions_sender: VerifiedVoteTransactionsSender,
) -> Result<()> {
let mut last_ts = 0;
loop {
if exit.load(Ordering::Relaxed) {
return Ok(());
}
let poh_bank = poh_recorder.lock().unwrap().bank();
if let Some(bank) = poh_bank {
let last_ts = bank.last_vote_sync.load(Ordering::Relaxed);
let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts);
bank.last_vote_sync
.compare_and_swap(last_ts, new_ts, Ordering::Relaxed);
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
let mut msgs = packet::to_packets(&votes);
if !msgs.is_empty() {
let r = if sigverify_disabled {
sigverify::ed25519_verify_disabled(&msgs)
let (labels, votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts);
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
last_ts = new_ts;
let msgs = packet::to_packets(&votes);
if !msgs.is_empty() {
let r = if sigverify_disabled {
sigverify::ed25519_verify_disabled(&msgs)
} else {
sigverify::ed25519_verify_cpu(&msgs)
};
assert_eq!(
r.iter()
.map(|packets_results| packets_results.len())
.sum::<usize>(),
votes.len()
);
let (vote_txs, packets) = izip!(
labels.into_iter(),
votes.into_iter(),
r.iter().flatten(),
msgs
)
.filter_map(|(label, vote, verify_result, packet)| {
if *verify_result != 0 {
Some((vote, (label, packet)))
} else {
sigverify::ed25519_verify_cpu(&msgs)
};
assert_eq!(
r.iter()
.map(|packets_results| packets_results.len())
.sum::<usize>(),
votes.len()
);
let valid_votes: Vec<_> = votes
.into_iter()
.zip(r.iter().flatten())
.filter_map(|(vote, verify_result)| {
if *verify_result != 0 {
Some(vote)
} else {
None
}
})
.collect();
vote_txs_sender.send(valid_votes)?;
sigverify::mark_disabled(&mut msgs, &r);
packets_sender.send(msgs)?;
None
}
})
.unzip();
verified_vote_transactions_sender.send(vote_txs)?;
verified_vote_packets_sender.send(packets)?;
}
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
}
}
fn bank_send_loop(
exit: Arc<AtomicBool>,
verified_vote_packets_receiver: VerifiedVotePacketsReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>,
packets_sender: &CrossbeamSender<Vec<Packets>>,
) -> Result<()> {
let mut verified_vote_packets = VerifiedVotePackets::default();
let mut time_since_lock = Instant::now();
let mut update_version = 0;
loop {
if exit.load(Ordering::Relaxed) {
return Ok(());
}
if let Err(e) = verified_vote_packets
.get_and_process_vote_packets(&verified_vote_packets_receiver, &mut update_version)
{
match e {
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => {
return Ok(());
}
Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
error!("thread {:?} error {:?}", thread::current().name(), e);
}
}
}
if time_since_lock.elapsed().as_millis() > GOSSIP_SLEEP_MILLIS as u128 {
let bank = poh_recorder.lock().unwrap().bank();
if let Some(bank) = bank {
let last_version = bank.last_vote_sync.load(Ordering::Relaxed);
let (new_version, msgs) = verified_vote_packets.get_latest_votes(last_version);
packets_sender.send(msgs)?;
bank.last_vote_sync.compare_and_swap(
last_version,
new_version,
Ordering::Relaxed,
);
time_since_lock = Instant::now();
}
}
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
}
}
fn process_votes_loop(
exit: Arc<AtomicBool>,
vote_txs_receiver: CrossbeamReceiver<Vec<Transaction>>,
vote_txs_receiver: VerifiedVoteTransactionsReceiver,
vote_tracker: Arc<VoteTracker>,
bank_forks: &RwLock<BankForks>,
) -> Result<()> {
@ -425,7 +503,7 @@ impl ClusterInfoVoteListener {
}
fn get_and_process_votes(
vote_txs_receiver: &CrossbeamReceiver<Vec<Transaction>>,
vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
vote_tracker: &Arc<VoteTracker>,
last_root: Slot,
) -> Result<()> {
@ -434,7 +512,6 @@ impl ClusterInfoVoteListener {
while let Ok(new_txs) = vote_txs_receiver.try_recv() {
vote_txs.extend(new_txs);
}
Self::process_votes(vote_tracker, vote_txs, last_root);
Ok(())
}

View File

@ -58,6 +58,7 @@ pub mod tpu;
pub mod transaction_status_service;
pub mod tvu;
pub mod validator;
pub mod verified_vote_packets;
pub mod weighted_shuffle;
pub mod window_service;

View File

@ -0,0 +1,164 @@
use crate::{
cluster_info_vote_listener::VerifiedVotePacketsReceiver, crds_value::CrdsValueLabel,
packet::Packets, result::Result,
};
use std::{collections::HashMap, ops::Deref, time::Duration};
#[derive(Default)]
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Packets)>);
impl Deref for VerifiedVotePackets {
type Target = HashMap<CrdsValueLabel, (u64, Packets)>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl VerifiedVotePackets {
pub fn get_and_process_vote_packets(
&mut self,
vote_packets_receiver: &VerifiedVotePacketsReceiver,
last_update_version: &mut u64,
) -> Result<()> {
let timer = Duration::from_millis(200);
let vote_packets = vote_packets_receiver.recv_timeout(timer)?;
*last_update_version += 1;
for (label, packet) in vote_packets {
self.0.insert(label, (*last_update_version, packet));
}
while let Ok(vote_packets) = vote_packets_receiver.try_recv() {
for (label, packet) in vote_packets {
self.0.insert(label, (*last_update_version, packet));
}
}
Ok(())
}
pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Vec<Packets>) {
let mut new_update_version = last_update_version;
let msgs: Vec<_> = self
.iter()
.filter_map(|(_, (msg_update_version, msg))| {
if *msg_update_version > last_update_version {
new_update_version = std::cmp::max(*msg_update_version, new_update_version);
Some(msg)
} else {
None
}
})
.cloned()
.collect();
(new_update_version, msgs)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
packet::{Meta, Packet},
result::Error,
};
use crossbeam_channel::{unbounded, RecvTimeoutError};
use solana_sdk::pubkey::Pubkey;
#[test]
fn test_get_latest_votes() {
let pubkey = Pubkey::new_rand();
let label1 = CrdsValueLabel::Vote(0 as u8, pubkey);
let label2 = CrdsValueLabel::Vote(1 as u8, pubkey);
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
let data = Packet {
meta: Meta {
repair: true,
..Meta::default()
},
..Packet::default()
};
let none_empty_packets = Packets::new(vec![data, Packet::default()]);
verified_vote_packets
.0
.insert(label1, (2, none_empty_packets));
verified_vote_packets
.0
.insert(label2, (1, Packets::default()));
// Both updates have timestamps greater than 0, so both should be returned
let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0);
assert_eq!(new_update_version, 2);
assert_eq!(updates.len(), 2);
// Only the nonempty packet had a timestamp greater than 1
let (new_update_version, updates) = verified_vote_packets.get_latest_votes(1);
assert_eq!(new_update_version, 2);
assert_eq!(updates.len(), 1);
assert!(updates[0].packets.len() > 0);
// If the given timestamp is greater than all timestamps in any update,
// returned timestamp should be the same as the given timestamp, and
// no updates should be returned
let (new_update_version, updates) = verified_vote_packets.get_latest_votes(3);
assert_eq!(new_update_version, 3);
assert!(updates.is_empty());
}
#[test]
fn test_get_and_process_vote_packets() {
let (s, r) = unbounded();
let pubkey = Pubkey::new_rand();
let label1 = CrdsValueLabel::Vote(0 as u8, pubkey);
let label2 = CrdsValueLabel::Vote(1 as u8, pubkey);
let mut update_version = 0;
s.send(vec![(label1.clone(), Packets::default())]).unwrap();
s.send(vec![(label2.clone(), Packets::default())]).unwrap();
let data = Packet {
meta: Meta {
repair: true,
..Meta::default()
},
..Packet::default()
};
let later_packets = Packets::new(vec![data, Packet::default()]);
s.send(vec![(label1.clone(), later_packets.clone())])
.unwrap();
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
.get_and_process_vote_packets(&r, &mut update_version)
.unwrap();
// Test timestamps for same batch are the same
let update_version1 = verified_vote_packets.get(&label1).unwrap().0;
assert_eq!(
update_version1,
verified_vote_packets.get(&label2).unwrap().0
);
// Test the later value overwrote the earlier one for this label
assert!(verified_vote_packets.get(&label1).unwrap().1.packets.len() > 1);
assert_eq!(
verified_vote_packets.get(&label2).unwrap().1.packets.len(),
0
);
// Test timestamp for next batch overwrites the original
s.send(vec![(label2.clone(), Packets::default())]).unwrap();
verified_vote_packets
.get_and_process_vote_packets(&r, &mut update_version)
.unwrap();
let update_version2 = verified_vote_packets.get(&label2).unwrap().0;
assert!(update_version2 > update_version1);
// Test empty doesn't bump the version
let before = update_version;
assert_matches!(
verified_vote_packets.get_and_process_vote_packets(&r, &mut update_version),
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout))
);
assert_eq!(before, update_version);
}
}