From 3081b4378d526573a512837edd010436044d1e82 Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 16 Nov 2023 12:35:34 -0800 Subject: [PATCH] Add push and get methods for RestartLastVotedForkSlots (#33613) * Add push and get methods for RestartLastVotedForkSlots * Improve expression format. * Remove fill() from RestartLastVotedForkSlots and move into constructor. * Update ABI signature. * Use flate2 compress directly instead of relying on CompressedSlots. * Make constructor of RestartLastVotedForkSlots return error if necessary. * Use minmax and remove unnecessary code. * Replace flate2 with run-length encoding in RestartLastVotedForkSlots. * Remove accidentally added file. * The passed in last_voted_fork don't need to be mutable any more. * Switch to different type of run-length encoding. * Fix typo. * Move constant into RestartLastVotedForkSlots. * Use BitVec in RawOffsets. * Remove the unnecessary clone. * Use iter functions for RLE. * Use take_while instead of loop. * Change Run length encoding to iterator implementation. * Allow one slot in RestartLastVotedForkSlots. * Various simplifications. * Fix various errors and use customized error type. * Various simplifications. * Return error from push_get_restart_last_voted_fork_slots and remove unnecessary constraints in to_slots. * Allow 81k slots on RestartLastVotedForkSlots. * Limit MAX_SLOTS to 65535 so we can go back to u16. * Use u16::MAX instead of 65535. --- gossip/src/cluster_info.rs | 112 ++++++++++++- gossip/src/crds_value.rs | 315 ++++++++++++++++++++++++++++--------- 2 files changed, 350 insertions(+), 77 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 67f713676..353d1e13d 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -33,7 +33,8 @@ use { }, crds_value::{ self, AccountsHashes, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, - NodeInstance, SnapshotHashes, Version, Vote, MAX_WALLCLOCK, + NodeInstance, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError, + SnapshotHashes, Version, Vote, MAX_WALLCLOCK, }, duplicate_shred::DuplicateShred, epoch_slots::EpochSlots, @@ -267,7 +268,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")] +#[frozen_abi(digest = "HvA9JnnQrJnmkcGxrp8SmTB1b4iSyQ4VK2p6LpSBaoWR")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -962,6 +963,26 @@ impl ClusterInfo { } } + pub fn push_restart_last_voted_fork_slots( + &self, + fork: &[Slot], + last_vote_bankhash: Hash, + ) -> Result<(), RestartLastVotedForkSlotsError> { + let now = timestamp(); + let last_voted_fork_slots = RestartLastVotedForkSlots::new( + self.id(), + now, + fork, + last_vote_bankhash, + self.my_shred_version(), + )?; + self.push_message(CrdsValue::new_signed( + CrdsData::RestartLastVotedForkSlots(last_voted_fork_slots), + &self.keypair(), + )); + Ok(()) + } + fn time_gossip_read_lock<'a>( &'a self, label: &'static str, @@ -1214,6 +1235,24 @@ impl ClusterInfo { .collect() } + pub fn get_restart_last_voted_fork_slots( + &self, + cursor: &mut Cursor, + ) -> Vec { + let self_shred_version = self.my_shred_version(); + let gossip_crds = self.gossip.crds.read().unwrap(); + gossip_crds + .get_entries(cursor) + .filter_map(|entry| { + let CrdsData::RestartLastVotedForkSlots(slots) = &entry.value.data else { + return None; + }; + (slots.shred_version == self_shred_version).then_some(slots) + }) + .cloned() + .collect() + } + /// Returns duplicate-shreds inserted since the given cursor. pub(crate) fn get_duplicate_shreds(&self, cursor: &mut Cursor) -> Vec { let gossip_crds = self.gossip.crds.read().unwrap(); @@ -4487,4 +4526,73 @@ mod tests { assert_eq!(shred_data.chunk_index() as usize, i); } } + + #[test] + fn test_push_restart_last_voted_fork_slots() { + let keypair = Arc::new(Keypair::new()); + let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); + let cluster_info = ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified); + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); + assert!(slots.is_empty()); + let mut update: Vec = vec![0]; + for i in 0..81 { + for j in 0..1000 { + update.push(i * 1050 + j); + } + } + assert!(cluster_info + .push_restart_last_voted_fork_slots(&update, Hash::default()) + .is_ok()); + cluster_info.flush_push_queue(); + + let mut cursor = Cursor::default(); + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); + assert_eq!(slots.len(), 1); + let retrieved_slots = slots[0].to_slots(0); + assert!(retrieved_slots[0] < 69000); + assert_eq!(retrieved_slots.last(), Some(84999).as_ref()); + + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut cursor); + assert!(slots.is_empty()); + + // Test with different shred versions. + let mut rng = rand::thread_rng(); + let node_pubkey = Pubkey::new_unique(); + let mut node = LegacyContactInfo::new_rand(&mut rng, Some(node_pubkey)); + node.set_shred_version(42); + let mut slots = RestartLastVotedForkSlots::new_rand(&mut rng, Some(node_pubkey)); + slots.shred_version = 42; + let entries = vec![ + CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(node)), + CrdsValue::new_unsigned(CrdsData::RestartLastVotedForkSlots(slots)), + ]; + { + let mut gossip_crds = cluster_info.gossip.crds.write().unwrap(); + for entry in entries { + assert!(gossip_crds + .insert(entry, /*now=*/ 0, GossipRoute::LocalMessage) + .is_ok()); + } + } + // Should exclude other node's last-voted-fork-slot because of different + // shred-version. + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); + assert_eq!(slots.len(), 1); + assert_eq!(slots[0].from, cluster_info.id()); + + // Match shred versions. + { + let mut node = cluster_info.my_contact_info.write().unwrap(); + node.set_shred_version(42); + } + assert!(cluster_info + .push_restart_last_voted_fork_slots(&update, Hash::default()) + .is_ok()); + cluster_info.flush_push_queue(); + // Should now include both slots. + let slots = cluster_info.get_restart_last_voted_fork_slots(&mut Cursor::default()); + assert_eq!(slots.len(), 2); + assert_eq!(slots[0].from, node_pubkey); + assert_eq!(slots[1].from, cluster_info.id()); + } } diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 63efa141b..82b22f659 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -1,13 +1,15 @@ use { crate::{ - cluster_info::{MAX_ACCOUNTS_HASHES, MAX_CRDS_OBJECT_SIZE}, + cluster_info::MAX_ACCOUNTS_HASHES, contact_info::ContactInfo, deprecated, duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, - epoch_slots::{CompressedSlots, EpochSlots, MAX_SLOTS_PER_ENTRY}, + epoch_slots::EpochSlots, legacy_contact_info::LegacyContactInfo, }, bincode::{serialize, serialized_size}, + bv::BitVec, + itertools::Itertools, rand::{CryptoRng, Rng}, serde::de::{Deserialize, Deserializer}, solana_sdk::{ @@ -15,6 +17,7 @@ use { hash::Hash, pubkey::{self, Pubkey}, sanitize::{Sanitize, SanitizeError}, + serde_varint, signature::{Keypair, Signable, Signature, Signer}, timing::timestamp, transaction::Transaction, @@ -26,6 +29,7 @@ use { collections::{hash_map::Entry, BTreeSet, HashMap}, fmt, }, + thiserror::Error, }; pub const MAX_WALLCLOCK: u64 = 1_000_000_000_000_000; @@ -490,84 +494,172 @@ impl Sanitize for NodeInstance { } } -#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample, AbiEnumVisitor)] +enum SlotsOffsets { + RunLengthEncoding(RunLengthEncoding), + RawOffsets(RawOffsets), +} + +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)] +struct U16(#[serde(with = "serde_varint")] u16); + +// The vector always starts with 1. Encode number of 1's and 0's consecutively. +// For example, 110000111 is [2, 4, 3]. +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)] +struct RunLengthEncoding(Vec); + +impl RunLengthEncoding { + fn new(bits: &BitVec) -> Self { + let encoded = (0..bits.len()) + .map(|i| bits.get(i)) + .dedup_with_count() + .map_while(|(count, _)| u16::try_from(count).ok()) + .scan(0, |current_bytes, count| { + *current_bytes += ((u16::BITS - count.leading_zeros() + 6) / 7).max(1) as usize; + (*current_bytes <= RestartLastVotedForkSlots::MAX_BYTES).then_some(U16(count)) + }) + .collect(); + Self(encoded) + } + + fn num_encoded_slots(&self) -> usize { + self.0 + .iter() + .map(|x| usize::try_from(x.0).unwrap()) + .sum::() + } + + fn to_slots(&self, last_slot: Slot, min_slot: Slot) -> Vec { + let mut slots: Vec = self + .0 + .iter() + .map_while(|bit_count| usize::try_from(bit_count.0).ok()) + .zip([1, 0].iter().cycle()) + .flat_map(|(bit_count, bit)| std::iter::repeat(bit).take(bit_count)) + .enumerate() + .filter(|(_, bit)| **bit == 1) + .map_while(|(offset, _)| { + let offset = Slot::try_from(offset).ok()?; + last_slot.checked_sub(offset) + }) + .take(RestartLastVotedForkSlots::MAX_SLOTS) + .take_while(|slot| *slot >= min_slot) + .collect(); + slots.reverse(); + slots + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, AbiExample)] +struct RawOffsets(BitVec); + +impl RawOffsets { + fn new(mut bits: BitVec) -> Self { + bits.truncate(RestartLastVotedForkSlots::MAX_BYTES as u64 * 8); + bits.shrink_to_fit(); + Self(bits) + } + + fn to_slots(&self, last_slot: Slot, min_slot: Slot) -> Vec { + let mut slots: Vec = (0..self.0.len()) + .filter(|index| self.0.get(*index)) + .map_while(|offset| last_slot.checked_sub(offset)) + .take_while(|slot| *slot >= min_slot) + .collect(); + slots.reverse(); + slots + } +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, AbiExample, Debug)] pub struct RestartLastVotedForkSlots { pub from: Pubkey, pub wallclock: u64, - pub slots: Vec, + offsets: SlotsOffsets, + pub last_voted_slot: Slot, pub last_voted_hash: Hash, pub shred_version: u16, } impl Sanitize for RestartLastVotedForkSlots { fn sanitize(&self) -> std::result::Result<(), SanitizeError> { - if self.slots.is_empty() { - return Err(SanitizeError::InvalidValue); - } - self.slots.sanitize()?; self.last_voted_hash.sanitize() } } +#[derive(Debug, Error)] +pub enum RestartLastVotedForkSlotsError { + #[error("Last voted fork cannot be empty")] + LastVotedForkEmpty, +} + impl RestartLastVotedForkSlots { - pub fn new(from: Pubkey, now: u64, last_voted_hash: Hash, shred_version: u16) -> Self { - Self { + // This number is MAX_CRDS_OBJECT_SIZE - empty serialized RestartLastVotedForkSlots. + const MAX_BYTES: usize = 824; + + // Per design doc, we should start wen_restart within 7 hours. + pub const MAX_SLOTS: usize = u16::MAX as usize; + + pub fn new( + from: Pubkey, + now: u64, + last_voted_fork: &[Slot], + last_voted_hash: Hash, + shred_version: u16, + ) -> Result { + let Some((&first_voted_slot, &last_voted_slot)) = + last_voted_fork.iter().minmax().into_option() + else { + return Err(RestartLastVotedForkSlotsError::LastVotedForkEmpty); + }; + let max_size = last_voted_slot.saturating_sub(first_voted_slot) + 1; + let mut uncompressed_bitvec = BitVec::new_fill(false, max_size); + for slot in last_voted_fork { + uncompressed_bitvec.set(last_voted_slot - *slot, true); + } + let run_length_encoding = RunLengthEncoding::new(&uncompressed_bitvec); + let offsets = + if run_length_encoding.num_encoded_slots() > RestartLastVotedForkSlots::MAX_BYTES * 8 { + SlotsOffsets::RunLengthEncoding(run_length_encoding) + } else { + SlotsOffsets::RawOffsets(RawOffsets::new(uncompressed_bitvec)) + }; + Ok(Self { from, wallclock: now, - slots: Vec::new(), + offsets, + last_voted_slot, last_voted_hash, shred_version, - } + }) } /// New random Version for tests and benchmarks. pub fn new_rand(rng: &mut R, pubkey: Option) -> Self { let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); - let mut result = - RestartLastVotedForkSlots::new(pubkey, new_rand_timestamp(rng), Hash::new_unique(), 1); let num_slots = rng.gen_range(2..20); - let mut slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512)) + let slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512)) .take(num_slots) .collect::>(); - slots.sort(); - result.fill(&slots); - result - } - - pub fn fill(&mut self, slots: &[Slot]) -> usize { - let slots = &slots[slots.len().saturating_sub(MAX_SLOTS_PER_ENTRY)..]; - let mut num = 0; - let space = self.max_compressed_slot_size(); - if space == 0 { - return 0; - } - while num < slots.len() { - let mut cslot = CompressedSlots::new(space as usize); - num += cslot.add(&slots[num..]); - self.slots.push(cslot); - } - num - } - - pub fn deflate(&mut self) { - for s in self.slots.iter_mut() { - let _ = s.deflate(); - } - } - - pub fn max_compressed_slot_size(&self) -> isize { - let len_header = serialized_size(self).unwrap(); - let len_slot = serialized_size(&CompressedSlots::default()).unwrap(); - MAX_CRDS_OBJECT_SIZE as isize - (len_header + len_slot) as isize + RestartLastVotedForkSlots::new( + pubkey, + new_rand_timestamp(rng), + &slots, + Hash::new_unique(), + 1, + ) + .unwrap() } pub fn to_slots(&self, min_slot: Slot) -> Vec { - self.slots - .iter() - .filter(|s| min_slot < s.first_slot() + s.num_slots() as u64) - .filter_map(|s| s.to_slots(min_slot).ok()) - .flatten() - .collect() + match &self.offsets { + SlotsOffsets::RunLengthEncoding(run_length_encoding) => { + run_length_encoding.to_slots(self.last_voted_slot, min_slot) + } + SlotsOffsets::RawOffsets(raw_offsets) => { + raw_offsets.to_slots(self.last_voted_slot, min_slot) + } + } } } @@ -797,6 +889,7 @@ pub(crate) fn sanitize_wallclock(wallclock: u64) -> Result<(), SanitizeError> { mod test { use { super::*, + crate::cluster_info::MAX_CRDS_OBJECT_SIZE, bincode::{deserialize, Options}, rand::SeedableRng, rand_chacha::ChaChaRng, @@ -1170,20 +1263,63 @@ mod test { assert!(!node.should_force_push(&Pubkey::new_unique())); } + fn make_rand_slots(rng: &mut R) -> impl Iterator + '_ { + repeat_with(|| rng.gen_range(1..5)).scan(0, |slot, step| { + *slot += step; + Some(*slot) + }) + } + + #[test] + fn test_restart_last_voted_fork_slots_max_bytes() { + let keypair = Keypair::new(); + let header = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + &[1, 2], + Hash::default(), + 0, + ) + .unwrap(); + // If the following assert fails, please update RestartLastVotedForkSlots::MAX_BYTES + assert_eq!( + RestartLastVotedForkSlots::MAX_BYTES, + MAX_CRDS_OBJECT_SIZE - serialized_size(&header).unwrap() as usize + ); + + // Create large enough slots to make sure we are discarding some to make slots fit. + let mut rng = rand::thread_rng(); + let large_length = 8000; + let range: Vec = make_rand_slots(&mut rng).take(large_length).collect(); + let large_slots = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + &range, + Hash::default(), + 0, + ) + .unwrap(); + assert!(serialized_size(&large_slots).unwrap() <= MAX_CRDS_OBJECT_SIZE as u64); + let retrieved_slots = large_slots.to_slots(0); + assert!(retrieved_slots.len() <= range.len()); + assert!(retrieved_slots.last().unwrap() - retrieved_slots.first().unwrap() > 5000); + } + #[test] fn test_restart_last_voted_fork_slots() { let keypair = Keypair::new(); let slot = 53; let slot_parent = slot - 5; let shred_version = 21; - let mut slots = RestartLastVotedForkSlots::new( + let original_slots_vec = [slot_parent, slot]; + let slots = RestartLastVotedForkSlots::new( keypair.pubkey(), timestamp(), + &original_slots_vec, Hash::default(), shred_version, - ); - let original_slots_vec = [slot_parent, slot]; - slots.fill(&original_slots_vec); + ) + .unwrap(); let value = CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair); assert_eq!(value.sanitize(), Ok(())); @@ -1194,33 +1330,62 @@ mod test { ); assert_eq!(label.pubkey(), keypair.pubkey()); assert_eq!(value.wallclock(), slots.wallclock); - let retrived_slots = slots.to_slots(0); - assert_eq!(retrived_slots.len(), 2); - assert_eq!(retrived_slots[0], slot_parent); - assert_eq!(retrived_slots[1], slot); + let retrieved_slots = slots.to_slots(0); + assert_eq!(retrieved_slots.len(), 2); + assert_eq!(retrieved_slots[0], slot_parent); + assert_eq!(retrieved_slots[1], slot); - let empty_slots = RestartLastVotedForkSlots::new( + let bad_value = RestartLastVotedForkSlots::new( keypair.pubkey(), timestamp(), + &[], Hash::default(), shred_version, ); - let bad_value = - CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(empty_slots), &keypair); - assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue)); + assert!(bad_value.is_err()); - let last_slot: Slot = (MAX_SLOTS_PER_ENTRY + 10).try_into().unwrap(); - let mut large_slots = RestartLastVotedForkSlots::new( - keypair.pubkey(), - timestamp(), - Hash::default(), - shred_version, - ); + let last_slot: Slot = 8000; let large_slots_vec: Vec = (0..last_slot + 1).collect(); - large_slots.fill(&large_slots_vec); - let retrived_slots = large_slots.to_slots(0); - assert_eq!(retrived_slots.len(), MAX_SLOTS_PER_ENTRY); - assert_eq!(retrived_slots.first(), Some(&11)); - assert_eq!(retrived_slots.last(), Some(&last_slot)); + let large_slots = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + &large_slots_vec, + Hash::default(), + shred_version, + ) + .unwrap(); + assert!(serialized_size(&large_slots).unwrap() < MAX_CRDS_OBJECT_SIZE as u64); + let retrieved_slots = large_slots.to_slots(0); + assert_eq!(retrieved_slots, large_slots_vec); + } + + fn check_run_length_encoding(slots: Vec) { + let last_voted_slot = slots[slots.len() - 1]; + let mut bitvec = BitVec::new_fill(false, last_voted_slot - slots[0] + 1); + for slot in &slots { + bitvec.set(last_voted_slot - slot, true); + } + let rle = RunLengthEncoding::new(&bitvec); + let retrieved_slots = rle.to_slots(last_voted_slot, 0); + assert_eq!(retrieved_slots, slots); + } + + #[test] + fn test_run_length_encoding() { + check_run_length_encoding((1000..16384 + 1000).map(|x| x as Slot).collect_vec()); + check_run_length_encoding([1000 as Slot].into()); + check_run_length_encoding( + [ + 1000 as Slot, + RestartLastVotedForkSlots::MAX_SLOTS as Slot + 999, + ] + .into(), + ); + check_run_length_encoding((1000..1800).step_by(2).map(|x| x as Slot).collect_vec()); + + let mut rng = rand::thread_rng(); + let large_length = 500; + let range: Vec = make_rand_slots(&mut rng).take(large_length).collect(); + check_run_length_encoding(range); } }