Move Gossip values added for wen_retart into restart_crds_values. (#34128)
* HvA9J * Rename file and change orders of definitions. * Use .from() on u16 to usize which shouldn't fail. * Update ABI congest.
This commit is contained in:
parent
45290c4689
commit
ae4b62c6f5
|
@ -33,13 +33,13 @@ use {
|
||||||
},
|
},
|
||||||
crds_value::{
|
crds_value::{
|
||||||
self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot,
|
self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot,
|
||||||
NodeInstance, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError,
|
NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
|
||||||
SnapshotHashes, Version, Vote, MAX_WALLCLOCK,
|
|
||||||
},
|
},
|
||||||
duplicate_shred::DuplicateShred,
|
duplicate_shred::DuplicateShred,
|
||||||
epoch_slots::EpochSlots,
|
epoch_slots::EpochSlots,
|
||||||
gossip_error::GossipError,
|
gossip_error::GossipError,
|
||||||
ping_pong::{self, PingCache, Pong},
|
ping_pong::{self, PingCache, Pong},
|
||||||
|
restart_crds_values::{RestartLastVotedForkSlots, RestartLastVotedForkSlotsError},
|
||||||
socketaddr, socketaddr_any,
|
socketaddr, socketaddr_any,
|
||||||
weighted_shuffle::WeightedShuffle,
|
weighted_shuffle::WeightedShuffle,
|
||||||
},
|
},
|
||||||
|
@ -268,7 +268,7 @@ pub fn make_accounts_hashes_message(
|
||||||
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;
|
||||||
|
|
||||||
// TODO These messages should go through the gpu pipeline for spam filtering
|
// TODO These messages should go through the gpu pipeline for spam filtering
|
||||||
#[frozen_abi(digest = "FW5Ycg6GXPsY5Ek9b2VjP69toxRb95bSNQRRWLSdKv2Y")]
|
#[frozen_abi(digest = "7a2P1GeQjyqCHMyBrhNPTKfPfG4iv32vki7XHahoN55z")]
|
||||||
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
|
||||||
#[allow(clippy::large_enum_variant)]
|
#[allow(clippy::large_enum_variant)]
|
||||||
pub(crate) enum Protocol {
|
pub(crate) enum Protocol {
|
||||||
|
|
|
@ -6,10 +6,9 @@ use {
|
||||||
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
|
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
|
||||||
epoch_slots::EpochSlots,
|
epoch_slots::EpochSlots,
|
||||||
legacy_contact_info::LegacyContactInfo,
|
legacy_contact_info::LegacyContactInfo,
|
||||||
|
restart_crds_values::RestartLastVotedForkSlots,
|
||||||
},
|
},
|
||||||
bincode::{serialize, serialized_size},
|
bincode::{serialize, serialized_size},
|
||||||
bv::BitVec,
|
|
||||||
itertools::Itertools,
|
|
||||||
rand::{CryptoRng, Rng},
|
rand::{CryptoRng, Rng},
|
||||||
serde::de::{Deserialize, Deserializer},
|
serde::de::{Deserialize, Deserializer},
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
|
@ -17,7 +16,6 @@ use {
|
||||||
hash::Hash,
|
hash::Hash,
|
||||||
pubkey::{self, Pubkey},
|
pubkey::{self, Pubkey},
|
||||||
sanitize::{Sanitize, SanitizeError},
|
sanitize::{Sanitize, SanitizeError},
|
||||||
serde_varint,
|
|
||||||
signature::{Keypair, Signable, Signature, Signer},
|
signature::{Keypair, Signable, Signature, Signer},
|
||||||
timing::timestamp,
|
timing::timestamp,
|
||||||
transaction::Transaction,
|
transaction::Transaction,
|
||||||
|
@ -29,7 +27,6 @@ use {
|
||||||
collections::{hash_map::Entry, BTreeSet, HashMap},
|
collections::{hash_map::Entry, BTreeSet, HashMap},
|
||||||
fmt,
|
fmt,
|
||||||
},
|
},
|
||||||
thiserror::Error,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const MAX_WALLCLOCK: u64 = 1_000_000_000_000_000;
|
pub const MAX_WALLCLOCK: u64 = 1_000_000_000_000_000;
|
||||||
|
@ -494,175 +491,6 @@ impl Sanitize for NodeInstance {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample, AbiEnumVisitor)]
|
|
||||||
enum SlotsOffsets {
|
|
||||||
RunLengthEncoding(RunLengthEncoding),
|
|
||||||
RawOffsets(RawOffsets),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
|
|
||||||
struct U16(#[serde(with = "serde_varint")] u16);
|
|
||||||
|
|
||||||
// The vector always starts with 1. Encode number of 1's and 0's consecutively.
|
|
||||||
// For example, 110000111 is [2, 4, 3].
|
|
||||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
|
|
||||||
struct RunLengthEncoding(Vec<U16>);
|
|
||||||
|
|
||||||
impl RunLengthEncoding {
|
|
||||||
fn new(bits: &BitVec<u8>) -> Self {
|
|
||||||
let encoded = (0..bits.len())
|
|
||||||
.map(|i| bits.get(i))
|
|
||||||
.dedup_with_count()
|
|
||||||
.map_while(|(count, _)| u16::try_from(count).ok())
|
|
||||||
.scan(0, |current_bytes, count| {
|
|
||||||
*current_bytes += ((u16::BITS - count.leading_zeros() + 6) / 7).max(1) as usize;
|
|
||||||
(*current_bytes <= RestartLastVotedForkSlots::MAX_BYTES).then_some(U16(count))
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
Self(encoded)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn num_encoded_slots(&self) -> usize {
|
|
||||||
self.0
|
|
||||||
.iter()
|
|
||||||
.map(|x| usize::try_from(x.0).unwrap())
|
|
||||||
.sum::<usize>()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_slots(&self, last_slot: Slot, min_slot: Slot) -> Vec<Slot> {
|
|
||||||
let mut slots: Vec<Slot> = self
|
|
||||||
.0
|
|
||||||
.iter()
|
|
||||||
.map_while(|bit_count| usize::try_from(bit_count.0).ok())
|
|
||||||
.zip([1, 0].iter().cycle())
|
|
||||||
.flat_map(|(bit_count, bit)| std::iter::repeat(bit).take(bit_count))
|
|
||||||
.enumerate()
|
|
||||||
.filter(|(_, bit)| **bit == 1)
|
|
||||||
.map_while(|(offset, _)| {
|
|
||||||
let offset = Slot::try_from(offset).ok()?;
|
|
||||||
last_slot.checked_sub(offset)
|
|
||||||
})
|
|
||||||
.take(RestartLastVotedForkSlots::MAX_SLOTS)
|
|
||||||
.take_while(|slot| *slot >= min_slot)
|
|
||||||
.collect();
|
|
||||||
slots.reverse();
|
|
||||||
slots
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
|
|
||||||
struct RawOffsets(BitVec<u8>);
|
|
||||||
|
|
||||||
impl RawOffsets {
|
|
||||||
fn new(mut bits: BitVec<u8>) -> Self {
|
|
||||||
bits.truncate(RestartLastVotedForkSlots::MAX_BYTES as u64 * 8);
|
|
||||||
bits.shrink_to_fit();
|
|
||||||
Self(bits)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_slots(&self, last_slot: Slot, min_slot: Slot) -> Vec<Slot> {
|
|
||||||
let mut slots: Vec<Slot> = (0..self.0.len())
|
|
||||||
.filter(|index| self.0.get(*index))
|
|
||||||
.map_while(|offset| last_slot.checked_sub(offset))
|
|
||||||
.take_while(|slot| *slot >= min_slot)
|
|
||||||
.collect();
|
|
||||||
slots.reverse();
|
|
||||||
slots
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, AbiExample, Debug)]
|
|
||||||
pub struct RestartLastVotedForkSlots {
|
|
||||||
pub from: Pubkey,
|
|
||||||
pub wallclock: u64,
|
|
||||||
offsets: SlotsOffsets,
|
|
||||||
pub last_voted_slot: Slot,
|
|
||||||
pub last_voted_hash: Hash,
|
|
||||||
pub shred_version: u16,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Sanitize for RestartLastVotedForkSlots {
|
|
||||||
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
|
|
||||||
self.last_voted_hash.sanitize()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum RestartLastVotedForkSlotsError {
|
|
||||||
#[error("Last voted fork cannot be empty")]
|
|
||||||
LastVotedForkEmpty,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RestartLastVotedForkSlots {
|
|
||||||
// This number is MAX_CRDS_OBJECT_SIZE - empty serialized RestartLastVotedForkSlots.
|
|
||||||
const MAX_BYTES: usize = 824;
|
|
||||||
|
|
||||||
// Per design doc, we should start wen_restart within 7 hours.
|
|
||||||
pub const MAX_SLOTS: usize = u16::MAX as usize;
|
|
||||||
|
|
||||||
pub fn new(
|
|
||||||
from: Pubkey,
|
|
||||||
now: u64,
|
|
||||||
last_voted_fork: &[Slot],
|
|
||||||
last_voted_hash: Hash,
|
|
||||||
shred_version: u16,
|
|
||||||
) -> Result<Self, RestartLastVotedForkSlotsError> {
|
|
||||||
let Some((&first_voted_slot, &last_voted_slot)) =
|
|
||||||
last_voted_fork.iter().minmax().into_option()
|
|
||||||
else {
|
|
||||||
return Err(RestartLastVotedForkSlotsError::LastVotedForkEmpty);
|
|
||||||
};
|
|
||||||
let max_size = last_voted_slot.saturating_sub(first_voted_slot) + 1;
|
|
||||||
let mut uncompressed_bitvec = BitVec::new_fill(false, max_size);
|
|
||||||
for slot in last_voted_fork {
|
|
||||||
uncompressed_bitvec.set(last_voted_slot - *slot, true);
|
|
||||||
}
|
|
||||||
let run_length_encoding = RunLengthEncoding::new(&uncompressed_bitvec);
|
|
||||||
let offsets =
|
|
||||||
if run_length_encoding.num_encoded_slots() > RestartLastVotedForkSlots::MAX_BYTES * 8 {
|
|
||||||
SlotsOffsets::RunLengthEncoding(run_length_encoding)
|
|
||||||
} else {
|
|
||||||
SlotsOffsets::RawOffsets(RawOffsets::new(uncompressed_bitvec))
|
|
||||||
};
|
|
||||||
Ok(Self {
|
|
||||||
from,
|
|
||||||
wallclock: now,
|
|
||||||
offsets,
|
|
||||||
last_voted_slot,
|
|
||||||
last_voted_hash,
|
|
||||||
shred_version,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// New random Version for tests and benchmarks.
|
|
||||||
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
|
|
||||||
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
|
|
||||||
let num_slots = rng.gen_range(2..20);
|
|
||||||
let slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
|
|
||||||
.take(num_slots)
|
|
||||||
.collect::<Vec<Slot>>();
|
|
||||||
RestartLastVotedForkSlots::new(
|
|
||||||
pubkey,
|
|
||||||
new_rand_timestamp(rng),
|
|
||||||
&slots,
|
|
||||||
Hash::new_unique(),
|
|
||||||
1,
|
|
||||||
)
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn to_slots(&self, min_slot: Slot) -> Vec<Slot> {
|
|
||||||
match &self.offsets {
|
|
||||||
SlotsOffsets::RunLengthEncoding(run_length_encoding) => {
|
|
||||||
run_length_encoding.to_slots(self.last_voted_slot, min_slot)
|
|
||||||
}
|
|
||||||
SlotsOffsets::RawOffsets(raw_offsets) => {
|
|
||||||
raw_offsets.to_slots(self.last_voted_slot, min_slot)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Type of the replicated value
|
/// Type of the replicated value
|
||||||
/// These are labels for values in a record that is associated with `Pubkey`
|
/// These are labels for values in a record that is associated with `Pubkey`
|
||||||
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
|
#[derive(PartialEq, Hash, Eq, Clone, Debug)]
|
||||||
|
@ -889,7 +717,6 @@ pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> {
|
||||||
mod test {
|
mod test {
|
||||||
use {
|
use {
|
||||||
super::*,
|
super::*,
|
||||||
crate::cluster_info::MAX_CRDS_OBJECT_SIZE,
|
|
||||||
bincode::{deserialize, Options},
|
bincode::{deserialize, Options},
|
||||||
rand::SeedableRng,
|
rand::SeedableRng,
|
||||||
rand_chacha::ChaChaRng,
|
rand_chacha::ChaChaRng,
|
||||||
|
@ -1262,130 +1089,4 @@ mod test {
|
||||||
assert!(node.should_force_push(&pubkey));
|
assert!(node.should_force_push(&pubkey));
|
||||||
assert!(!node.should_force_push(&Pubkey::new_unique()));
|
assert!(!node.should_force_push(&Pubkey::new_unique()));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_rand_slots<R: Rng>(rng: &mut R) -> impl Iterator<Item = Slot> + '_ {
|
|
||||||
repeat_with(|| rng.gen_range(1..5)).scan(0, |slot, step| {
|
|
||||||
*slot += step;
|
|
||||||
Some(*slot)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_restart_last_voted_fork_slots_max_bytes() {
|
|
||||||
let keypair = Keypair::new();
|
|
||||||
let header = RestartLastVotedForkSlots::new(
|
|
||||||
keypair.pubkey(),
|
|
||||||
timestamp(),
|
|
||||||
&[1, 2],
|
|
||||||
Hash::default(),
|
|
||||||
0,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
// If the following assert fails, please update RestartLastVotedForkSlots::MAX_BYTES
|
|
||||||
assert_eq!(
|
|
||||||
RestartLastVotedForkSlots::MAX_BYTES,
|
|
||||||
MAX_CRDS_OBJECT_SIZE - serialized_size(&header).unwrap() as usize
|
|
||||||
);
|
|
||||||
|
|
||||||
// Create large enough slots to make sure we are discarding some to make slots fit.
|
|
||||||
let mut rng = rand::thread_rng();
|
|
||||||
let large_length = 8000;
|
|
||||||
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
|
|
||||||
let large_slots = RestartLastVotedForkSlots::new(
|
|
||||||
keypair.pubkey(),
|
|
||||||
timestamp(),
|
|
||||||
&range,
|
|
||||||
Hash::default(),
|
|
||||||
0,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
assert!(serialized_size(&large_slots).unwrap() <= MAX_CRDS_OBJECT_SIZE as u64);
|
|
||||||
let retrieved_slots = large_slots.to_slots(0);
|
|
||||||
assert!(retrieved_slots.len() <= range.len());
|
|
||||||
assert!(retrieved_slots.last().unwrap() - retrieved_slots.first().unwrap() > 5000);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_restart_last_voted_fork_slots() {
|
|
||||||
let keypair = Keypair::new();
|
|
||||||
let slot = 53;
|
|
||||||
let slot_parent = slot - 5;
|
|
||||||
let shred_version = 21;
|
|
||||||
let original_slots_vec = [slot_parent, slot];
|
|
||||||
let slots = RestartLastVotedForkSlots::new(
|
|
||||||
keypair.pubkey(),
|
|
||||||
timestamp(),
|
|
||||||
&original_slots_vec,
|
|
||||||
Hash::default(),
|
|
||||||
shred_version,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
let value =
|
|
||||||
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair);
|
|
||||||
assert_eq!(value.sanitize(), Ok(()));
|
|
||||||
let label = value.label();
|
|
||||||
assert_eq!(
|
|
||||||
label,
|
|
||||||
CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey())
|
|
||||||
);
|
|
||||||
assert_eq!(label.pubkey(), keypair.pubkey());
|
|
||||||
assert_eq!(value.wallclock(), slots.wallclock);
|
|
||||||
let retrieved_slots = slots.to_slots(0);
|
|
||||||
assert_eq!(retrieved_slots.len(), 2);
|
|
||||||
assert_eq!(retrieved_slots[0], slot_parent);
|
|
||||||
assert_eq!(retrieved_slots[1], slot);
|
|
||||||
|
|
||||||
let bad_value = RestartLastVotedForkSlots::new(
|
|
||||||
keypair.pubkey(),
|
|
||||||
timestamp(),
|
|
||||||
&[],
|
|
||||||
Hash::default(),
|
|
||||||
shred_version,
|
|
||||||
);
|
|
||||||
assert!(bad_value.is_err());
|
|
||||||
|
|
||||||
let last_slot: Slot = 8000;
|
|
||||||
let large_slots_vec: Vec<Slot> = (0..last_slot + 1).collect();
|
|
||||||
let large_slots = RestartLastVotedForkSlots::new(
|
|
||||||
keypair.pubkey(),
|
|
||||||
timestamp(),
|
|
||||||
&large_slots_vec,
|
|
||||||
Hash::default(),
|
|
||||||
shred_version,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
assert!(serialized_size(&large_slots).unwrap() < MAX_CRDS_OBJECT_SIZE as u64);
|
|
||||||
let retrieved_slots = large_slots.to_slots(0);
|
|
||||||
assert_eq!(retrieved_slots, large_slots_vec);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_run_length_encoding(slots: Vec<Slot>) {
|
|
||||||
let last_voted_slot = slots[slots.len() - 1];
|
|
||||||
let mut bitvec = BitVec::new_fill(false, last_voted_slot - slots[0] + 1);
|
|
||||||
for slot in &slots {
|
|
||||||
bitvec.set(last_voted_slot - slot, true);
|
|
||||||
}
|
|
||||||
let rle = RunLengthEncoding::new(&bitvec);
|
|
||||||
let retrieved_slots = rle.to_slots(last_voted_slot, 0);
|
|
||||||
assert_eq!(retrieved_slots, slots);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_run_length_encoding() {
|
|
||||||
check_run_length_encoding((1000..16384 + 1000).map(|x| x as Slot).collect_vec());
|
|
||||||
check_run_length_encoding([1000 as Slot].into());
|
|
||||||
check_run_length_encoding(
|
|
||||||
[
|
|
||||||
1000 as Slot,
|
|
||||||
RestartLastVotedForkSlots::MAX_SLOTS as Slot + 999,
|
|
||||||
]
|
|
||||||
.into(),
|
|
||||||
);
|
|
||||||
check_run_length_encoding((1000..1800).step_by(2).map(|x| x as Slot).collect_vec());
|
|
||||||
|
|
||||||
let mut rng = rand::thread_rng();
|
|
||||||
let large_length = 500;
|
|
||||||
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
|
|
||||||
check_run_length_encoding(range);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ pub mod legacy_contact_info;
|
||||||
pub mod ping_pong;
|
pub mod ping_pong;
|
||||||
mod push_active_set;
|
mod push_active_set;
|
||||||
mod received_cache;
|
mod received_cache;
|
||||||
|
pub mod restart_crds_values;
|
||||||
pub mod weighted_shuffle;
|
pub mod weighted_shuffle;
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
|
|
@ -0,0 +1,320 @@
|
||||||
|
use {
|
||||||
|
crate::crds_value::new_rand_timestamp,
|
||||||
|
bv::BitVec,
|
||||||
|
itertools::Itertools,
|
||||||
|
rand::Rng,
|
||||||
|
solana_sdk::{
|
||||||
|
clock::Slot,
|
||||||
|
hash::Hash,
|
||||||
|
pubkey::Pubkey,
|
||||||
|
sanitize::{Sanitize, SanitizeError},
|
||||||
|
serde_varint,
|
||||||
|
},
|
||||||
|
thiserror::Error,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, AbiExample, Debug)]
|
||||||
|
pub struct RestartLastVotedForkSlots {
|
||||||
|
pub from: Pubkey,
|
||||||
|
pub wallclock: u64,
|
||||||
|
offsets: SlotsOffsets,
|
||||||
|
pub last_voted_slot: Slot,
|
||||||
|
pub last_voted_hash: Hash,
|
||||||
|
pub shred_version: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum RestartLastVotedForkSlotsError {
|
||||||
|
#[error("Last voted fork cannot be empty")]
|
||||||
|
LastVotedForkEmpty,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample, AbiEnumVisitor)]
|
||||||
|
enum SlotsOffsets {
|
||||||
|
RunLengthEncoding(RunLengthEncoding),
|
||||||
|
RawOffsets(RawOffsets),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
|
||||||
|
struct U16(#[serde(with = "serde_varint")] u16);
|
||||||
|
|
||||||
|
// The vector always starts with 1. Encode number of 1's and 0's consecutively.
|
||||||
|
// For example, 110000111 is [2, 4, 3].
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
|
||||||
|
struct RunLengthEncoding(Vec<U16>);
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)]
|
||||||
|
struct RawOffsets(BitVec<u8>);
|
||||||
|
|
||||||
|
impl Sanitize for RestartLastVotedForkSlots {
|
||||||
|
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
|
||||||
|
self.last_voted_hash.sanitize()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RestartLastVotedForkSlots {
|
||||||
|
// This number is MAX_CRDS_OBJECT_SIZE - empty serialized RestartLastVotedForkSlots.
|
||||||
|
const MAX_BYTES: usize = 824;
|
||||||
|
|
||||||
|
// Per design doc, we should start wen_restart within 7 hours.
|
||||||
|
pub const MAX_SLOTS: usize = u16::MAX as usize;
|
||||||
|
|
||||||
|
pub fn new(
|
||||||
|
from: Pubkey,
|
||||||
|
now: u64,
|
||||||
|
last_voted_fork: &[Slot],
|
||||||
|
last_voted_hash: Hash,
|
||||||
|
shred_version: u16,
|
||||||
|
) -> Result<Self, RestartLastVotedForkSlotsError> {
|
||||||
|
let Some((&first_voted_slot, &last_voted_slot)) =
|
||||||
|
last_voted_fork.iter().minmax().into_option()
|
||||||
|
else {
|
||||||
|
return Err(RestartLastVotedForkSlotsError::LastVotedForkEmpty);
|
||||||
|
};
|
||||||
|
let max_size = last_voted_slot.saturating_sub(first_voted_slot) + 1;
|
||||||
|
let mut uncompressed_bitvec = BitVec::new_fill(false, max_size);
|
||||||
|
for slot in last_voted_fork {
|
||||||
|
uncompressed_bitvec.set(last_voted_slot - *slot, true);
|
||||||
|
}
|
||||||
|
let run_length_encoding = RunLengthEncoding::new(&uncompressed_bitvec);
|
||||||
|
let offsets =
|
||||||
|
if run_length_encoding.num_encoded_slots() > RestartLastVotedForkSlots::MAX_BYTES * 8 {
|
||||||
|
SlotsOffsets::RunLengthEncoding(run_length_encoding)
|
||||||
|
} else {
|
||||||
|
SlotsOffsets::RawOffsets(RawOffsets::new(uncompressed_bitvec))
|
||||||
|
};
|
||||||
|
Ok(Self {
|
||||||
|
from,
|
||||||
|
wallclock: now,
|
||||||
|
offsets,
|
||||||
|
last_voted_slot,
|
||||||
|
last_voted_hash,
|
||||||
|
shred_version,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// New random Version for tests and benchmarks.
|
||||||
|
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
|
||||||
|
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
|
||||||
|
let num_slots = rng.gen_range(2..20);
|
||||||
|
let slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
|
||||||
|
.take(num_slots)
|
||||||
|
.collect::<Vec<Slot>>();
|
||||||
|
RestartLastVotedForkSlots::new(
|
||||||
|
pubkey,
|
||||||
|
new_rand_timestamp(rng),
|
||||||
|
&slots,
|
||||||
|
Hash::new_unique(),
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn to_slots(&self, min_slot: Slot) -> Vec<Slot> {
|
||||||
|
match &self.offsets {
|
||||||
|
SlotsOffsets::RunLengthEncoding(run_length_encoding) => {
|
||||||
|
run_length_encoding.to_slots(self.last_voted_slot, min_slot)
|
||||||
|
}
|
||||||
|
SlotsOffsets::RawOffsets(raw_offsets) => {
|
||||||
|
raw_offsets.to_slots(self.last_voted_slot, min_slot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RunLengthEncoding {
|
||||||
|
fn new(bits: &BitVec<u8>) -> Self {
|
||||||
|
let encoded = (0..bits.len())
|
||||||
|
.map(|i| bits.get(i))
|
||||||
|
.dedup_with_count()
|
||||||
|
.map_while(|(count, _)| u16::try_from(count).ok())
|
||||||
|
.scan(0, |current_bytes, count| {
|
||||||
|
*current_bytes += ((u16::BITS - count.leading_zeros() + 6) / 7).max(1) as usize;
|
||||||
|
(*current_bytes <= RestartLastVotedForkSlots::MAX_BYTES).then_some(U16(count))
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Self(encoded)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn num_encoded_slots(&self) -> usize {
|
||||||
|
self.0.iter().map(|x| usize::from(x.0)).sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_slots(&self, last_slot: Slot, min_slot: Slot) -> Vec<Slot> {
|
||||||
|
let mut slots: Vec<Slot> = self
|
||||||
|
.0
|
||||||
|
.iter()
|
||||||
|
.map(|bit_count| usize::from(bit_count.0))
|
||||||
|
.zip([1, 0].iter().cycle())
|
||||||
|
.flat_map(|(bit_count, bit)| std::iter::repeat(bit).take(bit_count))
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(_, bit)| **bit == 1)
|
||||||
|
.map_while(|(offset, _)| {
|
||||||
|
let offset = Slot::try_from(offset).ok()?;
|
||||||
|
last_slot.checked_sub(offset)
|
||||||
|
})
|
||||||
|
.take(RestartLastVotedForkSlots::MAX_SLOTS)
|
||||||
|
.take_while(|slot| *slot >= min_slot)
|
||||||
|
.collect();
|
||||||
|
slots.reverse();
|
||||||
|
slots
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RawOffsets {
|
||||||
|
fn new(mut bits: BitVec<u8>) -> Self {
|
||||||
|
bits.truncate(RestartLastVotedForkSlots::MAX_BYTES as u64 * 8);
|
||||||
|
bits.shrink_to_fit();
|
||||||
|
Self(bits)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_slots(&self, last_slot: Slot, min_slot: Slot) -> Vec<Slot> {
|
||||||
|
let mut slots: Vec<Slot> = (0..self.0.len())
|
||||||
|
.filter(|index| self.0.get(*index))
|
||||||
|
.map_while(|offset| last_slot.checked_sub(offset))
|
||||||
|
.take_while(|slot| *slot >= min_slot)
|
||||||
|
.collect();
|
||||||
|
slots.reverse();
|
||||||
|
slots
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use {
|
||||||
|
super::*,
|
||||||
|
crate::{
|
||||||
|
cluster_info::MAX_CRDS_OBJECT_SIZE,
|
||||||
|
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
|
||||||
|
},
|
||||||
|
bincode::serialized_size,
|
||||||
|
solana_sdk::{signature::Signer, signer::keypair::Keypair, timing::timestamp},
|
||||||
|
std::iter::repeat_with,
|
||||||
|
};
|
||||||
|
|
||||||
|
fn make_rand_slots<R: Rng>(rng: &mut R) -> impl Iterator<Item = Slot> + '_ {
|
||||||
|
repeat_with(|| rng.gen_range(1..5)).scan(0, |slot, step| {
|
||||||
|
*slot += step;
|
||||||
|
Some(*slot)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_restart_last_voted_fork_slots_max_bytes() {
|
||||||
|
let keypair = Keypair::new();
|
||||||
|
let header = RestartLastVotedForkSlots::new(
|
||||||
|
keypair.pubkey(),
|
||||||
|
timestamp(),
|
||||||
|
&[1, 2],
|
||||||
|
Hash::default(),
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
// If the following assert fails, please update RestartLastVotedForkSlots::MAX_BYTES
|
||||||
|
assert_eq!(
|
||||||
|
RestartLastVotedForkSlots::MAX_BYTES,
|
||||||
|
MAX_CRDS_OBJECT_SIZE - serialized_size(&header).unwrap() as usize
|
||||||
|
);
|
||||||
|
|
||||||
|
// Create large enough slots to make sure we are discarding some to make slots fit.
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let large_length = 8000;
|
||||||
|
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
|
||||||
|
let large_slots = RestartLastVotedForkSlots::new(
|
||||||
|
keypair.pubkey(),
|
||||||
|
timestamp(),
|
||||||
|
&range,
|
||||||
|
Hash::default(),
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(serialized_size(&large_slots).unwrap() <= MAX_CRDS_OBJECT_SIZE as u64);
|
||||||
|
let retrieved_slots = large_slots.to_slots(0);
|
||||||
|
assert!(retrieved_slots.len() <= range.len());
|
||||||
|
assert!(retrieved_slots.last().unwrap() - retrieved_slots.first().unwrap() > 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_restart_last_voted_fork_slots() {
|
||||||
|
let keypair = Keypair::new();
|
||||||
|
let slot = 53;
|
||||||
|
let slot_parent = slot - 5;
|
||||||
|
let shred_version = 21;
|
||||||
|
let original_slots_vec = [slot_parent, slot];
|
||||||
|
let slots = RestartLastVotedForkSlots::new(
|
||||||
|
keypair.pubkey(),
|
||||||
|
timestamp(),
|
||||||
|
&original_slots_vec,
|
||||||
|
Hash::default(),
|
||||||
|
shred_version,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let value =
|
||||||
|
CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair);
|
||||||
|
assert_eq!(value.sanitize(), Ok(()));
|
||||||
|
let label = value.label();
|
||||||
|
assert_eq!(
|
||||||
|
label,
|
||||||
|
CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey())
|
||||||
|
);
|
||||||
|
assert_eq!(label.pubkey(), keypair.pubkey());
|
||||||
|
assert_eq!(value.wallclock(), slots.wallclock);
|
||||||
|
let retrieved_slots = slots.to_slots(0);
|
||||||
|
assert_eq!(retrieved_slots.len(), 2);
|
||||||
|
assert_eq!(retrieved_slots[0], slot_parent);
|
||||||
|
assert_eq!(retrieved_slots[1], slot);
|
||||||
|
|
||||||
|
let bad_value = RestartLastVotedForkSlots::new(
|
||||||
|
keypair.pubkey(),
|
||||||
|
timestamp(),
|
||||||
|
&[],
|
||||||
|
Hash::default(),
|
||||||
|
shred_version,
|
||||||
|
);
|
||||||
|
assert!(bad_value.is_err());
|
||||||
|
|
||||||
|
let last_slot: Slot = 8000;
|
||||||
|
let large_slots_vec: Vec<Slot> = (0..last_slot + 1).collect();
|
||||||
|
let large_slots = RestartLastVotedForkSlots::new(
|
||||||
|
keypair.pubkey(),
|
||||||
|
timestamp(),
|
||||||
|
&large_slots_vec,
|
||||||
|
Hash::default(),
|
||||||
|
shred_version,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(serialized_size(&large_slots).unwrap() < MAX_CRDS_OBJECT_SIZE as u64);
|
||||||
|
let retrieved_slots = large_slots.to_slots(0);
|
||||||
|
assert_eq!(retrieved_slots, large_slots_vec);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_run_length_encoding(slots: Vec<Slot>) {
|
||||||
|
let last_voted_slot = slots[slots.len() - 1];
|
||||||
|
let mut bitvec = BitVec::new_fill(false, last_voted_slot - slots[0] + 1);
|
||||||
|
for slot in &slots {
|
||||||
|
bitvec.set(last_voted_slot - slot, true);
|
||||||
|
}
|
||||||
|
let rle = RunLengthEncoding::new(&bitvec);
|
||||||
|
let retrieved_slots = rle.to_slots(last_voted_slot, 0);
|
||||||
|
assert_eq!(retrieved_slots, slots);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_run_length_encoding() {
|
||||||
|
check_run_length_encoding((1000..16384 + 1000).map(|x| x as Slot).collect_vec());
|
||||||
|
check_run_length_encoding([1000 as Slot].into());
|
||||||
|
check_run_length_encoding(
|
||||||
|
[
|
||||||
|
1000 as Slot,
|
||||||
|
RestartLastVotedForkSlots::MAX_SLOTS as Slot + 999,
|
||||||
|
]
|
||||||
|
.into(),
|
||||||
|
);
|
||||||
|
check_run_length_encoding((1000..1800).step_by(2).map(|x| x as Slot).collect_vec());
|
||||||
|
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let large_length = 500;
|
||||||
|
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
|
||||||
|
check_run_length_encoding(range);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue