diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8a0112251..8f36eb6d2 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,6 +12,7 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight +use crate::crds_value::EpochIncompleteSlots; use crate::packet::limited_deserialize; use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ @@ -77,6 +78,8 @@ const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; /// 128MB/PACKET_DATA_SIZE const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; +const NUM_BITS_PER_BYTE: u64 = 8; + #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { NoPeers, @@ -316,7 +319,7 @@ impl ClusterInfo { ) } - pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet) -> (Slot, Vec) { + pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet) -> EpochIncompleteSlots { if !incomplete_slots.is_empty() { let first_slot = incomplete_slots .iter() @@ -326,9 +329,18 @@ impl ClusterInfo { .iter() .next_back() .expect("expected to find last slot"); - let mut uncompressed = vec![0u8; (last_slot.saturating_sub(*first_slot) + 1) as usize]; + let num_uncompressed_bits = last_slot.saturating_sub(*first_slot) + 1; + let num_uncompressed_bytes = if num_uncompressed_bits % NUM_BITS_PER_BYTE > 0 { + 1 + } else { + 0 + } + num_uncompressed_bits / NUM_BITS_PER_BYTE; + let mut uncompressed = vec![0u8; num_uncompressed_bytes as usize]; incomplete_slots.iter().for_each(|slot| { - uncompressed[slot.saturating_sub(*first_slot) as usize] = 1; + let offset_from_first_slot = slot.saturating_sub(*first_slot); + let index = offset_from_first_slot / NUM_BITS_PER_BYTE; + let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE; + uncompressed[index as usize] |= 1 << bit_index; }); if let Ok(compressed) = uncompressed .iter() @@ -336,27 +348,33 @@ impl ClusterInfo { .encode(&mut GZipEncoder::new(), Action::Finish) .collect::, _>>() { - (*first_slot, compressed) - } else { - (0, vec![]) + return EpochIncompleteSlots { + first: *first_slot, + compressed_list: compressed, + }; } - } else { - (0, vec![]) } + EpochIncompleteSlots::default() } - pub fn decompress_incomplete_slots(first_slot: u64, compressed: &[u8]) -> BTreeSet { + pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet { let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); - if let Ok(decompressed) = compressed + if let Ok(decompressed) = slots + .compressed_list .iter() .cloned() .decode(&mut GZipDecoder::new()) .collect::, _>>() { decompressed.iter().enumerate().for_each(|(i, val)| { - if *val == 1 { - old_incomplete_slots.insert(first_slot + i as u64); + if *val != 0 { + (0..8).for_each(|bit_index| { + if (1 << bit_index & *val) != 0 { + let slot = slots.first + i as u64 * NUM_BITS_PER_BYTE + bit_index; + old_incomplete_slots.insert(slot as u64); + } + }) } }) } @@ -372,19 +390,13 @@ impl ClusterInfo { slots: BTreeSet, incomplete_slots: &BTreeSet, ) { - let (first_missing_slot, compressed_map) = - Self::compress_incomplete_slots(incomplete_slots); + let compressed = Self::compress_incomplete_slots(incomplete_slots); let now = timestamp(); let entry = CrdsValue::new_signed( - CrdsData::EpochSlots(EpochSlots::new( - id, - root, - min, - slots, - first_missing_slot, - compressed_map, - now, - )), + CrdsData::EpochSlots( + 0, + EpochSlots::new(id, root, min, slots, vec![compressed], now), + ), &self.keypair, ); self.gossip @@ -2221,15 +2233,17 @@ mod tests { for i in 0..128 { btree_slots.insert(i); } - let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots: btree_slots, - first_missing: 0, - stash: vec![], - wallclock: 0, - })); + let value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots: btree_slots, + stash: vec![], + wallclock: 0, + }, + )); test_split_messages(value); } @@ -2240,15 +2254,17 @@ mod tests { let payload: Vec = vec![]; let vec_size = serialized_size(&payload).unwrap(); let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size; - let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots: BTreeSet::new(), - first_missing: 0, - stash: vec![], - wallclock: 0, - })); + let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots: BTreeSet::new(), + stash: vec![], + wallclock: 0, + }, + )); let mut i = 0; while value.size() <= desired_size { @@ -2260,15 +2276,17 @@ mod tests { desired_size ); } - value.data = CrdsData::EpochSlots(EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots, - first_missing: 0, - stash: vec![], - wallclock: 0, - }); + value.data = CrdsData::EpochSlots( + 0, + EpochSlots { + from: Pubkey::default(), + root: 0, + lowest: 0, + slots, + stash: vec![], + wallclock: 0, + }, + ); i += 1; } let split = ClusterInfo::split_gossip_messages(vec![value.clone()]); @@ -2408,15 +2426,17 @@ mod tests { let other_node_pubkey = Pubkey::new_rand(); let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); cluster_info.insert_info(other_node.clone()); - let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( - other_node_pubkey, - peer_root, - peer_lowest, - BTreeSet::new(), + let value = CrdsValue::new_unsigned(CrdsData::EpochSlots( 0, - vec![], - timestamp(), - ))); + EpochSlots::new( + other_node_pubkey, + peer_root, + peer_lowest, + BTreeSet::new(), + vec![], + timestamp(), + ), + )); let _ = cluster_info.gossip.crds.insert(value, timestamp()); } // only half the visible peers should be eligible to serve this repair @@ -2482,26 +2502,26 @@ mod tests { let mut incomplete_slots: BTreeSet = BTreeSet::new(); assert_eq!( - (0, vec![]), + EpochIncompleteSlots::default(), ClusterInfo::compress_incomplete_slots(&incomplete_slots) ); incomplete_slots.insert(100); - let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); - assert_eq!(100, first); - let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); assert_eq!(incomplete_slots, decompressed); incomplete_slots.insert(104); - let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); - assert_eq!(100, first); - let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); assert_eq!(incomplete_slots, decompressed); incomplete_slots.insert(80); - let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); - assert_eq!(80, first); - let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(80, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); assert_eq!(incomplete_slots, decompressed); } } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 848c12d67..8b9017aa2 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -15,6 +15,8 @@ use std::{ pub type VoteIndex = u8; pub const MAX_VOTES: VoteIndex = 32; +pub type EpochSlotIndex = u8; + /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct CrdsValue { @@ -58,7 +60,13 @@ impl Signable for CrdsValue { pub enum CrdsData { ContactInfo(ContactInfo), Vote(VoteIndex, Vote), - EpochSlots(EpochSlots), + EpochSlots(EpochSlotIndex, EpochSlots), +} + +#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] +pub struct EpochIncompleteSlots { + pub first: Slot, + pub compressed_list: Vec, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -67,8 +75,7 @@ pub struct EpochSlots { pub root: Slot, pub lowest: Slot, pub slots: BTreeSet, - pub first_missing: Slot, - pub stash: Vec, + pub stash: Vec, pub wallclock: u64, } @@ -78,8 +85,7 @@ impl EpochSlots { root: Slot, lowest: Slot, slots: BTreeSet, - first_missing: Slot, - stash: Vec, + stash: Vec, wallclock: u64, ) -> Self { Self { @@ -87,7 +93,6 @@ impl EpochSlots { root, lowest, slots, - first_missing, stash, wallclock, } @@ -160,21 +165,21 @@ impl CrdsValue { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.wallclock, CrdsData::Vote(_, vote) => vote.wallclock, - CrdsData::EpochSlots(vote) => vote.wallclock, + CrdsData::EpochSlots(_, vote) => vote.wallclock, } } pub fn pubkey(&self) -> Pubkey { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.id, CrdsData::Vote(_, vote) => vote.from, - CrdsData::EpochSlots(slots) => slots.from, + CrdsData::EpochSlots(_, slots) => slots.from, } } pub fn label(&self) -> CrdsValueLabel { match &self.data { CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()), CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), - CrdsData::EpochSlots(_) => CrdsValueLabel::EpochSlots(self.pubkey()), + CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -199,7 +204,7 @@ impl CrdsValue { pub fn epoch_slots(&self) -> Option<&EpochSlots> { match &self.data { - CrdsData::EpochSlots(slots) => Some(slots), + CrdsData::EpochSlots(_, slots) => Some(slots), _ => None, } } @@ -283,15 +288,10 @@ mod test { let key = v.clone().vote().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::Vote(0, key)); - let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( - Pubkey::default(), + let v = CrdsValue::new_unsigned(CrdsData::EpochSlots( 0, - 0, - BTreeSet::new(), - 0, - vec![], - 0, - ))); + EpochSlots::new(Pubkey::default(), 0, 0, BTreeSet::new(), vec![], 0), + )); assert_eq!(v.wallclock(), 0); let key = v.clone().epoch_slots().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key)); @@ -312,15 +312,10 @@ mod test { )); verify_signatures(&mut v, &keypair, &wrong_keypair); let btreeset: BTreeSet = vec![1, 2, 3, 6, 8].into_iter().collect(); - v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( - keypair.pubkey(), + v = CrdsValue::new_unsigned(CrdsData::EpochSlots( 0, - 0, - btreeset, - 0, - vec![], - timestamp(), - ))); + EpochSlots::new(keypair.pubkey(), 0, 0, btreeset, vec![], timestamp()), + )); verify_signatures(&mut v, &keypair, &wrong_keypair); }