Add RestartLastVotedForkSlots for wen_restart. (#33239)
* Add RestartLastVotedForkSlots and RestartHeaviestFork for wen_restart. * Fix linter errors. * Revert RestartHeaviestFork, it will be added in another PR. * Update frozen abi message. * Fix wrong number in test generation, change to pub(crate) to limit scope. * Separate push_epoch_slots and push_restart_last_voted_fork_slots. * Add RestartLastVotedForkSlots data structure. * Remove unused parts to make PR smaller. * Remove unused clone. * Use CompressedSlotsVec to share code between EpochSlots and RestartLastVotedForkSlots. * Add total_messages to show how many messages are there. * Reduce RestartLastVotedForkSlots to one packet (16k slots). * Replace last_vote_slot with shred_version, revert CompressedSlotsVec.
This commit is contained in:
parent
55f3f203c6
commit
0a3810854f
|
@ -267,7 +267,7 @@ pub fn make_accounts_hashes_message(
|
|||
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
||||
|
||||
// TODO These messages should go through the gpu pipeline for spam filtering
|
||||
#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")]
|
||||
#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")]
|
||||
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub(crate) enum Protocol {
|
||||
|
@ -393,7 +393,8 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
|
|||
CrdsData::AccountsHashes(_) => true,
|
||||
CrdsData::LowestSlot(_, _)
|
||||
| CrdsData::LegacyVersion(_)
|
||||
| CrdsData::DuplicateShred(_, _) => {
|
||||
| CrdsData::DuplicateShred(_, _)
|
||||
| CrdsData::RestartLastVotedForkSlots(_) => {
|
||||
let stake = stakes.get(&value.pubkey()).copied();
|
||||
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
|
||||
}
|
||||
|
@ -4020,7 +4021,7 @@ mod tests {
|
|||
ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, values.clone())
|
||||
.collect();
|
||||
let self_pubkey = solana_sdk::pubkey::new_rand();
|
||||
assert!(splits.len() * 3 < NUM_CRDS_VALUES);
|
||||
assert!(splits.len() * 2 < NUM_CRDS_VALUES);
|
||||
// Assert that all messages are included in the splits.
|
||||
assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::<usize>());
|
||||
splits
|
||||
|
|
|
@ -627,6 +627,16 @@ pub(crate) fn submit_gossip_stats(
|
|||
("SnapshotHashes-pull", crds_stats.pull.counts[10], i64),
|
||||
("ContactInfo-push", crds_stats.push.counts[11], i64),
|
||||
("ContactInfo-pull", crds_stats.pull.counts[11], i64),
|
||||
(
|
||||
"RestartLastVotedForkSlots-push",
|
||||
crds_stats.push.counts[12],
|
||||
i64
|
||||
),
|
||||
(
|
||||
"RestartLastVotedForkSlots-pull",
|
||||
crds_stats.pull.counts[12],
|
||||
i64
|
||||
),
|
||||
(
|
||||
"all-push",
|
||||
crds_stats.push.counts.iter().sum::<usize>(),
|
||||
|
@ -664,6 +674,16 @@ pub(crate) fn submit_gossip_stats(
|
|||
("SnapshotHashes-pull", crds_stats.pull.fails[10], i64),
|
||||
("ContactInfo-push", crds_stats.push.fails[11], i64),
|
||||
("ContactInfo-pull", crds_stats.pull.fails[11], i64),
|
||||
(
|
||||
"RestartLastVotedForkSlots-push",
|
||||
crds_stats.push.fails[12],
|
||||
i64
|
||||
),
|
||||
(
|
||||
"RestartLastVotedForkSlots-pull",
|
||||
crds_stats.pull.fails[12],
|
||||
i64
|
||||
),
|
||||
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
|
||||
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
|
||||
);
|
||||
|
|
|
@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
|
|||
PushMessage(/*from:*/ &'a Pubkey),
|
||||
}
|
||||
|
||||
type CrdsCountsArray = [usize; 12];
|
||||
type CrdsCountsArray = [usize; 13];
|
||||
|
||||
pub(crate) struct CrdsDataStats {
|
||||
pub(crate) counts: CrdsCountsArray,
|
||||
|
@ -721,6 +721,7 @@ impl CrdsDataStats {
|
|||
CrdsData::DuplicateShred(_, _) => 9,
|
||||
CrdsData::SnapshotHashes(_) => 10,
|
||||
CrdsData::ContactInfo(_) => 11,
|
||||
CrdsData::RestartLastVotedForkSlots(_) => 12,
|
||||
// Update CrdsCountsArray if new items are added here.
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use {
|
||||
crate::{
|
||||
cluster_info::MAX_ACCOUNTS_HASHES,
|
||||
cluster_info::{MAX_ACCOUNTS_HASHES, MAX_CRDS_OBJECT_SIZE},
|
||||
contact_info::ContactInfo,
|
||||
deprecated,
|
||||
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
|
||||
epoch_slots::EpochSlots,
|
||||
epoch_slots::{CompressedSlots, EpochSlots, MAX_SLOTS_PER_ENTRY},
|
||||
legacy_contact_info::LegacyContactInfo,
|
||||
},
|
||||
bincode::{serialize, serialized_size},
|
||||
|
@ -94,6 +94,7 @@ pub enum CrdsData {
|
|||
DuplicateShred(DuplicateShredIndex, DuplicateShred),
|
||||
SnapshotHashes(SnapshotHashes),
|
||||
ContactInfo(ContactInfo),
|
||||
RestartLastVotedForkSlots(RestartLastVotedForkSlots),
|
||||
}
|
||||
|
||||
impl Sanitize for CrdsData {
|
||||
|
@ -132,6 +133,7 @@ impl Sanitize for CrdsData {
|
|||
}
|
||||
CrdsData::SnapshotHashes(val) => val.sanitize(),
|
||||
CrdsData::ContactInfo(node) => node.sanitize(),
|
||||
CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -145,7 +147,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
|
|||
impl CrdsData {
|
||||
/// New random CrdsData for tests and benchmarks.
|
||||
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
|
||||
let kind = rng.gen_range(0..7);
|
||||
let kind = rng.gen_range(0..8);
|
||||
// TODO: Implement other kinds of CrdsData here.
|
||||
// TODO: Assign ranges to each arm proportional to their frequency in
|
||||
// the mainnet crds table.
|
||||
|
@ -157,6 +159,9 @@ impl CrdsData {
|
|||
3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)),
|
||||
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
|
||||
5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
|
||||
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
|
||||
rng, pubkey,
|
||||
)),
|
||||
_ => CrdsData::EpochSlots(
|
||||
rng.gen_range(0..MAX_EPOCH_SLOTS),
|
||||
EpochSlots::new_rand(rng, pubkey),
|
||||
|
@ -485,6 +490,87 @@ impl Sanitize for NodeInstance {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)]
|
||||
pub struct RestartLastVotedForkSlots {
|
||||
pub from: Pubkey,
|
||||
pub wallclock: u64,
|
||||
pub slots: Vec<CompressedSlots>,
|
||||
pub last_voted_hash: Hash,
|
||||
pub shred_version: u16,
|
||||
}
|
||||
|
||||
impl Sanitize for RestartLastVotedForkSlots {
|
||||
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
|
||||
if self.slots.is_empty() {
|
||||
return Err(SanitizeError::InvalidValue);
|
||||
}
|
||||
self.slots.sanitize()?;
|
||||
self.last_voted_hash.sanitize()
|
||||
}
|
||||
}
|
||||
|
||||
impl RestartLastVotedForkSlots {
|
||||
pub fn new(from: Pubkey, now: u64, last_voted_hash: Hash, shred_version: u16) -> Self {
|
||||
Self {
|
||||
from,
|
||||
wallclock: now,
|
||||
slots: Vec::new(),
|
||||
last_voted_hash,
|
||||
shred_version,
|
||||
}
|
||||
}
|
||||
|
||||
/// New random Version for tests and benchmarks.
|
||||
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
|
||||
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
|
||||
let mut result =
|
||||
RestartLastVotedForkSlots::new(pubkey, new_rand_timestamp(rng), Hash::new_unique(), 1);
|
||||
let num_slots = rng.gen_range(2..20);
|
||||
let mut slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
|
||||
.take(num_slots)
|
||||
.collect::<Vec<Slot>>();
|
||||
slots.sort();
|
||||
result.fill(&slots);
|
||||
result
|
||||
}
|
||||
|
||||
pub fn fill(&mut self, slots: &[Slot]) -> usize {
|
||||
let slots = &slots[slots.len().saturating_sub(MAX_SLOTS_PER_ENTRY)..];
|
||||
let mut num = 0;
|
||||
let space = self.max_compressed_slot_size();
|
||||
if space == 0 {
|
||||
return 0;
|
||||
}
|
||||
while num < slots.len() {
|
||||
let mut cslot = CompressedSlots::new(space as usize);
|
||||
num += cslot.add(&slots[num..]);
|
||||
self.slots.push(cslot);
|
||||
}
|
||||
num
|
||||
}
|
||||
|
||||
pub fn deflate(&mut self) {
|
||||
for s in self.slots.iter_mut() {
|
||||
let _ = s.deflate();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn max_compressed_slot_size(&self) -> isize {
|
||||
let len_header = serialized_size(self).unwrap();
|
||||
let len_slot = serialized_size(&CompressedSlots::default()).unwrap();
|
||||
MAX_CRDS_OBJECT_SIZE as isize - (len_header + len_slot) as isize
|
||||
}
|
||||
|
||||
pub fn to_slots(&self, min_slot: Slot) -> Vec<Slot> {
|
||||
self.slots
|
||||
.iter()
|
||||
.filter(|s| min_slot < s.first_slot() + s.num_slots() as u64)
|
||||
.filter_map(|s| s.to_slots(min_slot).ok())
|
||||
.flatten()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Type of the replicated value
|
||||
/// These are labels for values in a record that is associated with `Pubkey`
|
||||
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
|
||||
|
@ -501,6 +587,7 @@ pub enum CrdsValueLabel {
|
|||
DuplicateShred(DuplicateShredIndex, Pubkey),
|
||||
SnapshotHashes(Pubkey),
|
||||
ContactInfo(Pubkey),
|
||||
RestartLastVotedForkSlots(Pubkey),
|
||||
}
|
||||
|
||||
impl fmt::Display for CrdsValueLabel {
|
||||
|
@ -524,6 +611,9 @@ impl fmt::Display for CrdsValueLabel {
|
|||
write!(f, "SnapshotHashes({})", self.pubkey())
|
||||
}
|
||||
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
|
||||
CrdsValueLabel::RestartLastVotedForkSlots(_) => {
|
||||
write!(f, "RestartLastVotedForkSlots({})", self.pubkey())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -543,6 +633,7 @@ impl CrdsValueLabel {
|
|||
CrdsValueLabel::DuplicateShred(_, p) => *p,
|
||||
CrdsValueLabel::SnapshotHashes(p) => *p,
|
||||
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
|
||||
CrdsValueLabel::RestartLastVotedForkSlots(p) => *p,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -593,6 +684,7 @@ impl CrdsValue {
|
|||
CrdsData::DuplicateShred(_, shred) => shred.wallclock,
|
||||
CrdsData::SnapshotHashes(hash) => hash.wallclock,
|
||||
CrdsData::ContactInfo(node) => node.wallclock(),
|
||||
CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock,
|
||||
}
|
||||
}
|
||||
pub fn pubkey(&self) -> Pubkey {
|
||||
|
@ -609,6 +701,7 @@ impl CrdsValue {
|
|||
CrdsData::DuplicateShred(_, shred) => shred.from,
|
||||
CrdsData::SnapshotHashes(hash) => hash.from,
|
||||
CrdsData::ContactInfo(node) => *node.pubkey(),
|
||||
CrdsData::RestartLastVotedForkSlots(slots) => slots.from,
|
||||
}
|
||||
}
|
||||
pub fn label(&self) -> CrdsValueLabel {
|
||||
|
@ -627,6 +720,9 @@ impl CrdsValue {
|
|||
CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from),
|
||||
CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()),
|
||||
CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()),
|
||||
CrdsData::RestartLastVotedForkSlots(_) => {
|
||||
CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey())
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
|
||||
|
@ -1073,4 +1169,58 @@ mod test {
|
|||
assert!(node.should_force_push(&pubkey));
|
||||
assert!(!node.should_force_push(&Pubkey::new_unique()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_restart_last_voted_fork_slots() {
|
||||
let keypair = Keypair::new();
|
||||
let slot = 53;
|
||||
let slot_parent = slot - 5;
|
||||
let shred_version = 21;
|
||||
let mut slots = RestartLastVotedForkSlots::new(
|
||||
keypair.pubkey(),
|
||||
timestamp(),
|
||||
Hash::default(),
|
||||
shred_version,
|
||||
);
|
||||
let original_slots_vec = [slot_parent, slot];
|
||||
slots.fill(&original_slots_vec);
|
||||
let value =
|
||||
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair);
|
||||
assert_eq!(value.sanitize(), Ok(()));
|
||||
let label = value.label();
|
||||
assert_eq!(
|
||||
label,
|
||||
CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey())
|
||||
);
|
||||
assert_eq!(label.pubkey(), keypair.pubkey());
|
||||
assert_eq!(value.wallclock(), slots.wallclock);
|
||||
let retrived_slots = slots.to_slots(0);
|
||||
assert_eq!(retrived_slots.len(), 2);
|
||||
assert_eq!(retrived_slots[0], slot_parent);
|
||||
assert_eq!(retrived_slots[1], slot);
|
||||
|
||||
let empty_slots = RestartLastVotedForkSlots::new(
|
||||
keypair.pubkey(),
|
||||
timestamp(),
|
||||
Hash::default(),
|
||||
shred_version,
|
||||
);
|
||||
let bad_value =
|
||||
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(empty_slots), &keypair);
|
||||
assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue));
|
||||
|
||||
let last_slot: Slot = (MAX_SLOTS_PER_ENTRY + 10).try_into().unwrap();
|
||||
let mut large_slots = RestartLastVotedForkSlots::new(
|
||||
keypair.pubkey(),
|
||||
timestamp(),
|
||||
Hash::default(),
|
||||
shred_version,
|
||||
);
|
||||
let large_slots_vec: Vec<Slot> = (0..last_slot + 1).collect();
|
||||
large_slots.fill(&large_slots_vec);
|
||||
let retrived_slots = large_slots.to_slots(0);
|
||||
assert_eq!(retrived_slots.len(), MAX_SLOTS_PER_ENTRY);
|
||||
assert_eq!(retrived_slots.first(), Some(&11));
|
||||
assert_eq!(retrived_slots.last(), Some(&last_slot));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -178,7 +178,7 @@ impl Default for CompressedSlots {
|
|||
}
|
||||
|
||||
impl CompressedSlots {
|
||||
fn new(max_size: usize) -> Self {
|
||||
pub(crate) fn new(max_size: usize) -> Self {
|
||||
CompressedSlots::Uncompressed(Uncompressed::new(max_size))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue