Add push_heaviest_fork and get_heaviest_fork. (#34892)

Add push_get_heaviest_fork and push_get_heaviest_fork.
This commit is contained in:
Wen 2024-01-24 08:57:50 -08:00 committed by GitHub
parent ef233eaaa7
commit 0d92254736
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 112 additions and 1 deletions

View File

@ -39,7 +39,9 @@ use {
epoch_slots::EpochSlots,
gossip_error::GossipError,
ping_pong::{self, PingCache, Pong},
restart_crds_values::{RestartLastVotedForkSlots, RestartLastVotedForkSlotsError},
restart_crds_values::{
RestartHeaviestFork, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError,
},
socketaddr, socketaddr_any,
weighted_shuffle::WeightedShuffle,
},
@ -984,6 +986,26 @@ impl ClusterInfo {
Ok(())
}
pub fn push_restart_heaviest_fork(
&self,
last_slot: Slot,
last_slot_hash: Hash,
observed_stake: u64,
) {
let restart_heaviest_fork = RestartHeaviestFork {
from: self.id(),
wallclock: timestamp(),
last_slot,
last_slot_hash,
observed_stake,
shred_version: self.my_shred_version(),
};
self.push_message(CrdsValue::new_signed(
CrdsData::RestartHeaviestFork(restart_heaviest_fork),
&self.keypair(),
));
}
fn time_gossip_read_lock<'a>(
&'a self,
label: &'static str,
@ -1254,6 +1276,21 @@ impl ClusterInfo {
.collect()
}
pub fn get_restart_heaviest_fork(&self, cursor: &mut Cursor) -> Vec<RestartHeaviestFork> {
let self_shred_version = self.my_shred_version();
let gossip_crds = self.gossip.crds.read().unwrap();
gossip_crds
.get_entries(cursor)
.filter_map(|entry| {
let CrdsData::RestartHeaviestFork(fork) = &entry.value.data else {
return None;
};
(fork.shred_version == self_shred_version).then_some(fork)
})
.cloned()
.collect()
}
/// Returns duplicate-shreds inserted since the given cursor.
pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec<DuplicateShred> {
let gossip_crds = self.gossip.crds.read().unwrap();
@ -4603,4 +4640,78 @@ mod tests {
assert_eq!(slots[0].from, node_pubkey);
assert_eq!(slots[1].from, cluster_info.id());
}
#[test]
fn test_push_restart_heaviest_fork() {
solana_logger::setup();
let keypair = Arc::new(Keypair::new());
let pubkey = keypair.pubkey();
let contact_info = ContactInfo::new_localhost(&pubkey, 0);
let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified);
// make sure empty crds is handled correctly
let mut cursor = Cursor::default();
let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor);
assert_eq!(heaviest_forks, vec![]);
// add new message
let slot1 = 53;
let hash1 = Hash::new_unique();
let stake1 = 15_000_000;
cluster_info.push_restart_heaviest_fork(slot1, hash1, stake1);
cluster_info.flush_push_queue();
let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor);
assert_eq!(heaviest_forks.len(), 1);
let fork = &heaviest_forks[0];
assert_eq!(fork.last_slot, slot1);
assert_eq!(fork.last_slot_hash, hash1);
assert_eq!(fork.observed_stake, stake1);
assert_eq!(fork.from, pubkey);
// Test with different shred versions.
let mut rng = rand::thread_rng();
let pubkey2 = Pubkey::new_unique();
let mut new_node = LegacyContactInfo::new_rand(&mut rng, Some(pubkey2));
new_node.set_shred_version(42);
let slot2 = 54;
let hash2 = Hash::new_unique();
let stake2 = 23_000_000;
let entries = vec![
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new_node)),
CrdsValue::new_unsigned(CrdsData::RestartHeaviestFork(RestartHeaviestFork {
from: pubkey2,
wallclock: timestamp(),
last_slot: slot2,
last_slot_hash: hash2,
observed_stake: stake2,
shred_version: 42,
})),
];
{
let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
for entry in entries {
assert!(gossip_crds
.insert(entry, /*now=*/ 0, GossipRoute::LocalMessage)
.is_ok());
}
}
// Should exclude other node's heaviest_fork because of different
// shred-version.
let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default());
assert_eq!(heaviest_forks.len(), 1);
assert_eq!(heaviest_forks[0].from, pubkey);
// Match shred versions.
{
let mut node = cluster_info.my_contact_info.write().unwrap();
node.set_shred_version(42);
}
cluster_info.push_self();
cluster_info.flush_push_queue();
// Should now include the previous heaviest_fork from the other node.
let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default());
assert_eq!(heaviest_forks.len(), 1);
assert_eq!(heaviest_forks[0].from, pubkey2);
}
}