Add RestartHeaviestFork to Gossip (#34161)

* Add RestartHeaviestFork to Gossip.

* Add a test for out of bound value.

* Send observed_stake and total_epoch_stake in ResatartHeaviestFork.

* Remove total_epoch_stake from RestartHeaviestFork.

* Forgot to update ABI digest.

* Remove checking of whether stake is zero.

* Remove unnecessary new function and make new_rand pub(crate).
This commit is contained in:
Wen 2024-01-19 13:59:25 -08:00 committed by GitHub
parent 3eb06b4b37
commit 4a2871f384
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 73 additions and 6 deletions

View File

@ -268,7 +268,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "7a2P1GeQjyqCHMyBrhNPTKfPfG4iv32vki7XHahoN55z")]
#[frozen_abi(digest = "ogEqvffeEkPpojAaSiUbCv2HdJcdXDQ1ykgYyvKvLo2")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
@ -395,6 +395,7 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartHeaviestFork(_)
| CrdsData::RestartLastVotedForkSlots(_) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP

View File

@ -637,6 +637,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.counts[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.counts[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.counts[13], i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
@ -684,6 +686,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.fails[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.fails[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.fails[13], i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);

View File

@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
PushMessage(/*from:*/ &'a Pubkey),
}
type CrdsCountsArray = [usize; 13];
type CrdsCountsArray = [usize; 14];
pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
@ -722,6 +722,7 @@ impl CrdsDataStats {
CrdsData::SnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
CrdsData::RestartLastVotedForkSlots(_) => 12,
CrdsData::RestartHeaviestFork(_) => 13,
// Update CrdsCountsArray if new items are added here.
}
}

View File

@ -6,7 +6,7 @@ use {
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
legacy_contact_info::LegacyContactInfo,
restart_crds_values::RestartLastVotedForkSlots,
restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots},
},
bincode::{serialize, serialized_size},
rand::{CryptoRng, Rng},
@ -96,6 +96,7 @@ pub enum CrdsData {
SnapshotHashes(SnapshotHashes),
ContactInfo(ContactInfo),
RestartLastVotedForkSlots(RestartLastVotedForkSlots),
RestartHeaviestFork(RestartHeaviestFork),
}
impl Sanitize for CrdsData {
@ -135,6 +136,7 @@ impl Sanitize for CrdsData {
CrdsData::SnapshotHashes(val) => val.sanitize(),
CrdsData::ContactInfo(node) => node.sanitize(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(),
CrdsData::RestartHeaviestFork(fork) => fork.sanitize(),
}
}
}
@ -148,7 +150,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..8);
let kind = rng.gen_range(0..9);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
@ -163,6 +165,7 @@ impl CrdsData {
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
rng, pubkey,
)),
7 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
@ -508,6 +511,7 @@ pub enum CrdsValueLabel {
SnapshotHashes(Pubkey),
ContactInfo(Pubkey),
RestartLastVotedForkSlots(Pubkey),
RestartHeaviestFork(Pubkey),
}
impl fmt::Display for CrdsValueLabel {
@ -534,6 +538,9 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::RestartLastVotedForkSlots(_) => {
write!(f, "RestartLastVotedForkSlots({})", self.pubkey())
}
CrdsValueLabel::RestartHeaviestFork(_) => {
write!(f, "RestartHeaviestFork({})", self.pubkey())
}
}
}
}
@ -554,6 +561,7 @@ impl CrdsValueLabel {
CrdsValueLabel::SnapshotHashes(p) => *p,
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
CrdsValueLabel::RestartLastVotedForkSlots(p) => *p,
CrdsValueLabel::RestartHeaviestFork(p) => *p,
}
}
}
@ -605,6 +613,7 @@ impl CrdsValue {
CrdsData::SnapshotHashes(hash) => hash.wallclock,
CrdsData::ContactInfo(node) => node.wallclock(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock,
CrdsData::RestartHeaviestFork(fork) => fork.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
@ -622,6 +631,7 @@ impl CrdsValue {
CrdsData::SnapshotHashes(hash) => hash.from,
CrdsData::ContactInfo(node) => *node.pubkey(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.from,
CrdsData::RestartHeaviestFork(fork) => fork.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
@ -643,6 +653,7 @@ impl CrdsValue {
CrdsData::RestartLastVotedForkSlots(_) => {
CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey())
}
CrdsData::RestartHeaviestFork(_) => CrdsValueLabel::RestartHeaviestFork(self.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {

View File

@ -1,5 +1,5 @@
use {
crate::crds_value::new_rand_timestamp,
crate::crds_value::{new_rand_timestamp, sanitize_wallclock},
bv::BitVec,
itertools::Itertools,
rand::Rng,
@ -29,6 +29,16 @@ pub enum RestartLastVotedForkSlotsError {
LastVotedForkEmpty,
}
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, AbiExample, Debug)]
pub struct RestartHeaviestFork {
pub from: Pubkey,
pub wallclock: u64,
pub last_slot: Slot,
pub last_slot_hash: Hash,
pub observed_stake: u64,
pub shred_version: u16,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample, AbiEnumVisitor)]
enum SlotsOffsets {
RunLengthEncoding(RunLengthEncoding),
@ -48,6 +58,7 @@ struct RawOffsets(BitVec<u8>);
impl Sanitize for RestartLastVotedForkSlots {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
sanitize_wallclock(self.wallclock)?;
self.last_voted_hash.sanitize()
}
}
@ -94,7 +105,7 @@ impl RestartLastVotedForkSlots {
}
/// New random Version for tests and benchmarks.
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
pub(crate) fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
let num_slots = rng.gen_range(2..20);
let slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
@ -122,6 +133,27 @@ impl RestartLastVotedForkSlots {
}
}
impl Sanitize for RestartHeaviestFork {
fn sanitize(&self) -> Result<(), SanitizeError> {
sanitize_wallclock(self.wallclock)?;
self.last_slot_hash.sanitize()
}
}
impl RestartHeaviestFork {
pub(crate) fn new_rand<R: Rng>(rng: &mut R, from: Option<Pubkey>) -> Self {
let from = from.unwrap_or_else(solana_sdk::pubkey::new_rand);
Self {
from,
wallclock: new_rand_timestamp(rng),
last_slot: rng.gen_range(0..1000),
last_slot_hash: Hash::new_unique(),
observed_stake: rng.gen_range(1..u64::MAX),
shred_version: 1,
}
}
}
impl RunLengthEncoding {
fn new(bits: &BitVec<u8>) -> Self {
let encoded = (0..bits.len())
@ -317,4 +349,22 @@ mod test {
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
check_run_length_encoding(range);
}
#[test]
fn test_restart_heaviest_fork() {
let keypair = Keypair::new();
let slot = 53;
let mut fork = RestartHeaviestFork {
from: keypair.pubkey(),
wallclock: timestamp(),
last_slot: slot,
last_slot_hash: Hash::default(),
observed_stake: 800_000,
shred_version: 1,
};
assert_eq!(fork.sanitize(), Ok(()));
assert_eq!(fork.observed_stake, 800_000);
fork.wallclock = crate::crds_value::MAX_WALLCLOCK;
assert_eq!(fork.sanitize(), Err(SanitizeError::ValueOutOfBounds));
}
}