Clean up VerifiedVotePackets (#14822)
This commit is contained in:
parent
015058e0b7
commit
bd0433c373
|
@ -382,7 +382,7 @@ impl ClusterInfoVoteListener {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = verified_vote_packets.get_and_process_vote_packets(
|
if let Err(e) = verified_vote_packets.receive_and_process_vote_packets(
|
||||||
&verified_vote_label_packets_receiver,
|
&verified_vote_label_packets_receiver,
|
||||||
&mut update_version,
|
&mut update_version,
|
||||||
) {
|
) {
|
||||||
|
|
|
@ -3,20 +3,13 @@ use crate::{
|
||||||
result::Result,
|
result::Result,
|
||||||
};
|
};
|
||||||
use solana_perf::packet::Packets;
|
use solana_perf::packet::Packets;
|
||||||
use std::{collections::HashMap, ops::Deref, time::Duration};
|
use std::{collections::HashMap, time::Duration};
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Packets)>);
|
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Packets)>);
|
||||||
|
|
||||||
impl Deref for VerifiedVotePackets {
|
|
||||||
type Target = HashMap<CrdsValueLabel, (u64, Packets)>;
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl VerifiedVotePackets {
|
impl VerifiedVotePackets {
|
||||||
pub fn get_and_process_vote_packets(
|
pub fn receive_and_process_vote_packets(
|
||||||
&mut self,
|
&mut self,
|
||||||
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
|
vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
|
||||||
last_update_version: &mut u64,
|
last_update_version: &mut u64,
|
||||||
|
@ -35,9 +28,15 @@ impl VerifiedVotePackets {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
fn get_vote_packets(&self, key: &CrdsValueLabel) -> Option<&(u64, Packets)> {
|
||||||
|
self.0.get(key)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Vec<Packets>) {
|
pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Vec<Packets>) {
|
||||||
let mut new_update_version = last_update_version;
|
let mut new_update_version = last_update_version;
|
||||||
let msgs: Vec<_> = self
|
let msgs: Vec<_> = self
|
||||||
|
.0
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(_, (msg_update_version, msg))| {
|
.filter_map(|(_, (msg_update_version, msg))| {
|
||||||
if *msg_update_version > last_update_version {
|
if *msg_update_version > last_update_version {
|
||||||
|
@ -125,35 +124,48 @@ mod tests {
|
||||||
s.send(vec![(label1.clone(), later_packets)]).unwrap();
|
s.send(vec![(label1.clone(), later_packets)]).unwrap();
|
||||||
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
|
let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
|
||||||
verified_vote_packets
|
verified_vote_packets
|
||||||
.get_and_process_vote_packets(&r, &mut update_version)
|
.receive_and_process_vote_packets(&r, &mut update_version)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Test timestamps for same batch are the same
|
// Test timestamps for same batch are the same
|
||||||
let update_version1 = verified_vote_packets.get(&label1).unwrap().0;
|
let update_version1 = verified_vote_packets.get_vote_packets(&label1).unwrap().0;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
update_version1,
|
update_version1,
|
||||||
verified_vote_packets.get(&label2).unwrap().0
|
verified_vote_packets.get_vote_packets(&label2).unwrap().0
|
||||||
);
|
);
|
||||||
|
|
||||||
// Test the later value overwrote the earlier one for this label
|
// Test the later value overwrote the earlier one for this label
|
||||||
assert!(verified_vote_packets.get(&label1).unwrap().1.packets.len() > 1);
|
assert!(
|
||||||
|
verified_vote_packets
|
||||||
|
.get_vote_packets(&label1)
|
||||||
|
.unwrap()
|
||||||
|
.1
|
||||||
|
.packets
|
||||||
|
.len()
|
||||||
|
> 1
|
||||||
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
verified_vote_packets.get(&label2).unwrap().1.packets.len(),
|
verified_vote_packets
|
||||||
|
.get_vote_packets(&label2)
|
||||||
|
.unwrap()
|
||||||
|
.1
|
||||||
|
.packets
|
||||||
|
.len(),
|
||||||
0
|
0
|
||||||
);
|
);
|
||||||
|
|
||||||
// Test timestamp for next batch overwrites the original
|
// Test timestamp for next batch overwrites the original
|
||||||
s.send(vec![(label2.clone(), Packets::default())]).unwrap();
|
s.send(vec![(label2.clone(), Packets::default())]).unwrap();
|
||||||
verified_vote_packets
|
verified_vote_packets
|
||||||
.get_and_process_vote_packets(&r, &mut update_version)
|
.receive_and_process_vote_packets(&r, &mut update_version)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let update_version2 = verified_vote_packets.get(&label2).unwrap().0;
|
let update_version2 = verified_vote_packets.get_vote_packets(&label2).unwrap().0;
|
||||||
assert!(update_version2 > update_version1);
|
assert!(update_version2 > update_version1);
|
||||||
|
|
||||||
// Test empty doesn't bump the version
|
// Test empty doesn't bump the version
|
||||||
let before = update_version;
|
let before = update_version;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
verified_vote_packets.get_and_process_vote_packets(&r, &mut update_version),
|
verified_vote_packets.receive_and_process_vote_packets(&r, &mut update_version),
|
||||||
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout))
|
Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout))
|
||||||
);
|
);
|
||||||
assert_eq!(before, update_version);
|
assert_eq!(before, update_version);
|
||||||
|
|
Loading…
Reference in New Issue