From e50bc0d34ba5b23ba3724892a3c45523e46a67a0 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 20 Feb 2020 09:48:39 -0800 Subject: [PATCH] Do not compress small incomplete slot list (#8355) automerge --- core/src/cluster_info.rs | 95 ++++++++++++++++++++++++++++------------ core/src/crds_value.rs | 14 ++++++ 2 files changed, 81 insertions(+), 28 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8f36eb6d24..9e902be276 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,6 +12,7 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight +use crate::crds_value::CompressionType::*; use crate::crds_value::EpochIncompleteSlots; use crate::packet::limited_deserialize; use crate::streamer::{PacketReceiver, PacketSender}; @@ -79,6 +80,7 @@ const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; const NUM_BITS_PER_BYTE: u64 = 8; +const MIN_SIZE_TO_COMPRESS_GZIP: u64 = 64; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -342,46 +344,77 @@ impl ClusterInfo { let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE; uncompressed[index as usize] |= 1 << bit_index; }); - if let Ok(compressed) = uncompressed - .iter() - .cloned() - .encode(&mut GZipEncoder::new(), Action::Finish) - .collect::, _>>() - { + if num_uncompressed_bytes >= MIN_SIZE_TO_COMPRESS_GZIP { + if let Ok(compressed) = uncompressed + .iter() + .cloned() + .encode(&mut GZipEncoder::new(), Action::Finish) + .collect::, _>>() + { + return EpochIncompleteSlots { + first: *first_slot, + compression: GZip, + compressed_list: compressed, + }; + } + } else { return EpochIncompleteSlots { first: *first_slot, - compressed_list: compressed, + compression: Uncompressed, + compressed_list: uncompressed, }; } } EpochIncompleteSlots::default() } - pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet { + fn bitmap_to_slot_list(first: Slot, bitmap: &[u8]) -> BTreeSet { let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); - - if let Ok(decompressed) = slots - .compressed_list - .iter() - .cloned() - .decode(&mut GZipDecoder::new()) - .collect::, _>>() - { - decompressed.iter().enumerate().for_each(|(i, val)| { - if *val != 0 { - (0..8).for_each(|bit_index| { - if (1 << bit_index & *val) != 0 { - let slot = slots.first + i as u64 * NUM_BITS_PER_BYTE + bit_index; - old_incomplete_slots.insert(slot as u64); - } - }) - } - }) - } - + bitmap.iter().enumerate().for_each(|(i, val)| { + if *val != 0 { + (0..8).for_each(|bit_index| { + if (1 << bit_index & *val) != 0 { + let slot = first + i as u64 * NUM_BITS_PER_BYTE + bit_index as u64; + old_incomplete_slots.insert(slot); + } + }) + } + }); old_incomplete_slots } + pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet { + match slots.compression { + Uncompressed => Self::bitmap_to_slot_list(slots.first, &slots.compressed_list), + GZip => { + if let Ok(decompressed) = slots + .compressed_list + .iter() + .cloned() + .decode(&mut GZipDecoder::new()) + .collect::, _>>() + { + Self::bitmap_to_slot_list(slots.first, &decompressed) + } else { + BTreeSet::new() + } + } + BZip2 => { + if let Ok(decompressed) = slots + .compressed_list + .iter() + .cloned() + .decode(&mut BZip2Decoder::new()) + .collect::, _>>() + { + Self::bitmap_to_slot_list(slots.first, &decompressed) + } else { + BTreeSet::new() + } + } + } + } + pub fn push_epoch_slots( &mut self, id: Pubkey, @@ -2523,5 +2556,11 @@ mod tests { assert_eq!(80, compressed.first); let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(10000); + let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(80, compressed.first); + let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); + assert_eq!(incomplete_slots, decompressed); } } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 8b9017aa27..df140a45db 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -63,9 +63,23 @@ pub enum CrdsData { EpochSlots(EpochSlotIndex, EpochSlots), } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub enum CompressionType { + Uncompressed, + GZip, + BZip2, +} + +impl Default for CompressionType { + fn default() -> Self { + Self::Uncompressed + } +} + #[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] pub struct EpochIncompleteSlots { pub first: Slot, + pub compression: CompressionType, pub compressed_list: Vec, }