diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 56b75010f..23038b040 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -39,7 +39,9 @@ use { epoch_slots::EpochSlots, gossip_error::GossipError, ping_pong::{self, PingCache, Pong}, - restart_crds_values::{RestartLastVotedForkSlots, RestartLastVotedForkSlotsError}, + restart_crds_values::{ + RestartHeaviestFork, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError, + }, socketaddr, socketaddr_any, weighted_shuffle::WeightedShuffle, }, @@ -984,6 +986,26 @@ impl ClusterInfo { Ok(()) } + pub fn push_restart_heaviest_fork( + &self, + last_slot: Slot, + last_slot_hash: Hash, + observed_stake: u64, + ) { + let restart_heaviest_fork = RestartHeaviestFork { + from: self.id(), + wallclock: timestamp(), + last_slot, + last_slot_hash, + observed_stake, + shred_version: self.my_shred_version(), + }; + self.push_message(CrdsValue::new_signed( + CrdsData::RestartHeaviestFork(restart_heaviest_fork), + &self.keypair(), + )); + } + fn time_gossip_read_lock<'a>( &'a self, label: &'static str, @@ -1254,6 +1276,21 @@ impl ClusterInfo { .collect() } + pub fn get_restart_heaviest_fork(&self, cursor: &mut Cursor) -> Vec { + let self_shred_version = self.my_shred_version(); + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds + .get_entries(cursor) + .filter_map(|entry| { + let CrdsData::RestartHeaviestFork(fork) = &entry.value.data else { + return None; + }; + (fork.shred_version == self_shred_version).then_some(fork) + }) + .cloned() + .collect() + } + /// Returns duplicate-shreds inserted since the given cursor. pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec { let gossip_crds = self.gossip.crds.read().unwrap(); @@ -4603,4 +4640,78 @@ mod tests { assert_eq!(slots[0].from, node_pubkey); assert_eq!(slots[1].from, cluster_info.id()); } + + #[test] + fn test_push_restart_heaviest_fork() { + solana_logger::setup(); + let keypair = Arc::new(Keypair::new()); + let pubkey = keypair.pubkey(); + let contact_info = ContactInfo::new_localhost(&pubkey, 0); + let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); + + // make sure empty crds is handled correctly + let mut cursor = Cursor::default(); + let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor); + assert_eq!(heaviest_forks, vec![]); + + // add new message + let slot1 = 53; + let hash1 = Hash::new_unique(); + let stake1 = 15_000_000; + cluster_info.push_restart_heaviest_fork(slot1, hash1, stake1); + cluster_info.flush_push_queue(); + + let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut cursor); + assert_eq!(heaviest_forks.len(), 1); + let fork = &heaviest_forks[0]; + assert_eq!(fork.last_slot, slot1); + assert_eq!(fork.last_slot_hash, hash1); + assert_eq!(fork.observed_stake, stake1); + assert_eq!(fork.from, pubkey); + + // Test with different shred versions. + let mut rng = rand::thread_rng(); + let pubkey2 = Pubkey::new_unique(); + let mut new_node = LegacyContactInfo::new_rand(&mut rng, Some(pubkey2)); + new_node.set_shred_version(42); + let slot2 = 54; + let hash2 = Hash::new_unique(); + let stake2 = 23_000_000; + let entries = vec![ + CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(new_node)), + CrdsValue::new_unsigned(CrdsData::RestartHeaviestFork(RestartHeaviestFork { + from: pubkey2, + wallclock: timestamp(), + last_slot: slot2, + last_slot_hash: hash2, + observed_stake: stake2, + shred_version: 42, + })), + ]; + { + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); + for entry in entries { + assert!(gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok()); + } + } + // Should exclude other node's heaviest_fork because of different + // shred-version. + let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default()); + assert_eq!(heaviest_forks.len(), 1); + assert_eq!(heaviest_forks[0].from, pubkey); + // Match shred versions. + { + let mut node = cluster_info.my_contact_info.write().unwrap(); + node.set_shred_version(42); + } + cluster_info.push_self(); + cluster_info.flush_push_queue(); + + // Should now include the previous heaviest_fork from the other node. + let heaviest_forks = cluster_info.get_restart_heaviest_fork(&mut Cursor::default()); + assert_eq!(heaviest_forks.len(), 1); + assert_eq!(heaviest_forks[0].from, pubkey2); + } }