sends only the latest vote of each validator to the banking stage (#15629)

This commit is contained in:
behzad nouri 2021-03-03 19:07:16 +00:00 committed by GitHub
parent aacb28c453
commit 658951e680
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 29 deletions

View File

@ -44,8 +44,8 @@ use std::{
};
// Map from a vote account to the authorized voter for an epoch
pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Packets)>>;
pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Packets)>>;
pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Slot, Packets)>>;
pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Slot, Packets)>>;
pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>;
pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec<Slot>)>;
@ -332,10 +332,11 @@ impl ClusterInfoVoteListener {
}
}
#[allow(clippy::type_complexity)]
fn verify_votes(
votes: Vec<Transaction>,
labels: Vec<CrdsValueLabel>,
) -> (Vec<Transaction>, Vec<(CrdsValueLabel, Packets)>) {
) -> (Vec<Transaction>, Vec<(CrdsValueLabel, Slot, Packets)>) {
let msgs = packet::to_packets_chunked(&votes, 1);
let r = sigverify::ed25519_verify_cpu(&msgs);
@ -353,8 +354,10 @@ impl ClusterInfoVoteListener {
msgs,
)
.filter_map(|(label, vote, verify_result, packet)| {
let slot = vote_transaction::parse_vote_transaction(&vote)
.and_then(|(_, vote, _)| vote.slots.last().copied())?;
if *verify_result != 0 {
Some((vote, (label, packet)))
Some((vote, (label, slot, packet)))
} else {
None
}
@ -1602,8 +1605,8 @@ mod tests {
assert!(packets.is_empty());
}
fn verify_packets_len(packets: &[(CrdsValueLabel, Packets)], ref_value: usize) {
let num_packets: usize = packets.iter().map(|p| p.1.packets.len()).sum();
fn verify_packets_len(packets: &[(CrdsValueLabel, Slot, Packets)], ref_value: usize) {
let num_packets: usize = packets.iter().map(|(_, _, p)| p.packets.len()).sum();
assert_eq!(num_packets, ref_value);
}

View File

@ -3,10 +3,14 @@ use crate::{
result::Result,
};
use solana_perf::packet::Packets;
use std::{collections::HashMap, time::Duration};
use solana_sdk::clock::Slot;
use std::{
collections::{hash_map::Entry, HashMap},
time::Duration,
};
#[derive(Default)]
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Packets)>);
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Slot, Packets)>);
impl VerifiedVotePackets {
pub fn receive_and_process_vote_packets(
@ -17,32 +21,45 @@ impl VerifiedVotePackets {
let timer = Duration::from_millis(200);
let vote_packets = vote_packets_receiver.recv_timeout(timer)?;
*last_update_version += 1;
for (label, packet) in vote_packets {
self.0.insert(label, (*last_update_version, packet));
for (label, slot, packet) in vote_packets {
self.0.insert(label, (*last_update_version, slot, packet));
}
while let Ok(vote_packets) = vote_packets_receiver.try_recv() {
for (label, packet) in vote_packets {
self.0.insert(label, (*last_update_version, packet));
for (label, slot, packet) in vote_packets {
self.0.insert(label, (*last_update_version, slot, packet));
}
}
Ok(())
}
#[cfg(test)]
fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Packets)> {
fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Slot, Packets)> {
self.0.get(key)
}
pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Packets) {
let mut new_update_version = last_update_version;
let packets = self
.0
.values()
.filter(|(v, _)| *v > last_update_version)
.flat_map(|(v, packets)| {
new_update_version = std::cmp::max(*v, new_update_version);
packets.packets.clone()
})
let mut votes = HashMap::new();
for (label, (version, slot, packets)) in &self.0 {
new_update_version = std::cmp::max(*version, new_update_version);
if *version <= last_update_version {
continue;
}
match votes.entry(label.pubkey()) {
Entry::Vacant(entry) => {
entry.insert((slot, packets));
}
Entry::Occupied(mut entry) => {
let (entry_slot, _) = entry.get();
if *entry_slot < slot {
*entry.get_mut() = (slot, packets);
}
}
}
}
let packets = votes
.into_iter()
.flat_map(|(_, (_, packets))| packets.packets.clone())
.collect();
(new_update_version, Packets::new(packets))
}
@ -74,10 +91,10 @@ mod tests {
verified_vote_packets
.0
.insert(label1, (2, none_empty_packets));
.insert(label1, (2, 42, none_empty_packets));
verified_vote_packets
.0
.insert(label2, (1, Packets::default()));
.insert(label2, (1, 23, Packets::default()));
// Both updates have timestamps greater than 0, so both should be returned
let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0);
@ -104,8 +121,10 @@ mod tests {
let label1 = CrdsValueLabel::Vote(0, pubkey);
let label2 = CrdsValueLabel::Vote(1, pubkey);
let mut update_version = 0;
s.send(vec![(label1.clone(), Packets::default())]).unwrap();
s.send(vec![(label2.clone(), Packets::default())]).unwrap();
s.send(vec![(label1.clone(), 17, Packets::default())])
.unwrap();
s.send(vec![(label2.clone(), 23, Packets::default())])
.unwrap();
let data = Packet {
meta: Meta {
@ -116,7 +135,7 @@ mod tests {
};
let later_packets = Packets::new(vec![data, Packet::default()]);
s.send(vec![(label1.clone(), later_packets)]).unwrap();
s.send(vec![(label1.clone(), 42, later_packets)]).unwrap();
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
verified_vote_packets
.receive_and_process_vote_packets(&r, &mut update_version)
@ -134,7 +153,7 @@ mod tests {
verified_vote_packets
.get_vote_packets(&label1)
.unwrap()
.1
.2
.packets
.len()
> 1
@ -143,14 +162,15 @@ mod tests {
verified_vote_packets
.get_vote_packets(&label2)
.unwrap()
.1
.2
.packets
.len(),
0
);
// Test timestamp for next batch overwrites the original
s.send(vec![(label2.clone(), Packets::default())]).unwrap();
s.send(vec![(label2.clone(), 51, Packets::default())])
.unwrap();
verified_vote_packets
.receive_and_process_vote_packets(&r, &mut update_version)
.unwrap();