Do not compress small incomplete slot list (#8355)

automerge
This commit is contained in:
Pankaj Garg 2020-02-20 09:48:39 -08:00 committed by GitHub
parent 45774dc4aa
commit e50bc0d34b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 28 deletions

View File

@ -12,6 +12,7 @@
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//! //!
//! Bank needs to provide an interface for us to query the stake weight //! Bank needs to provide an interface for us to query the stake weight
use crate::crds_value::CompressionType::*;
use crate::crds_value::EpochIncompleteSlots; use crate::crds_value::EpochIncompleteSlots;
use crate::packet::limited_deserialize; use crate::packet::limited_deserialize;
use crate::streamer::{PacketReceiver, PacketSender}; use crate::streamer::{PacketReceiver, PacketSender};
@ -79,6 +80,7 @@ const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;
const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE;
const NUM_BITS_PER_BYTE: u64 = 8; const NUM_BITS_PER_BYTE: u64 = 8;
const MIN_SIZE_TO_COMPRESS_GZIP: u64 = 64;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError { pub enum ClusterInfoError {
@ -342,6 +344,7 @@ impl ClusterInfo {
let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE; let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE;
uncompressed[index as usize] |= 1 << bit_index; uncompressed[index as usize] |= 1 << bit_index;
}); });
if num_uncompressed_bytes >= MIN_SIZE_TO_COMPRESS_GZIP {
if let Ok(compressed) = uncompressed if let Ok(compressed) = uncompressed
.iter() .iter()
.cloned() .cloned()
@ -350,16 +353,40 @@ impl ClusterInfo {
{ {
return EpochIncompleteSlots { return EpochIncompleteSlots {
first: *first_slot, first: *first_slot,
compression: GZip,
compressed_list: compressed, compressed_list: compressed,
}; };
} }
} else {
return EpochIncompleteSlots {
first: *first_slot,
compression: Uncompressed,
compressed_list: uncompressed,
};
}
} }
EpochIncompleteSlots::default() EpochIncompleteSlots::default()
} }
pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet<Slot> { fn bitmap_to_slot_list(first: Slot, bitmap: &[u8]) -> BTreeSet<Slot> {
let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new(); let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
bitmap.iter().enumerate().for_each(|(i, val)| {
if *val != 0 {
(0..8).for_each(|bit_index| {
if (1 << bit_index & *val) != 0 {
let slot = first + i as u64 * NUM_BITS_PER_BYTE + bit_index as u64;
old_incomplete_slots.insert(slot);
}
})
}
});
old_incomplete_slots
}
pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet<Slot> {
match slots.compression {
Uncompressed => Self::bitmap_to_slot_list(slots.first, &slots.compressed_list),
GZip => {
if let Ok(decompressed) = slots if let Ok(decompressed) = slots
.compressed_list .compressed_list
.iter() .iter()
@ -367,19 +394,25 @@ impl ClusterInfo {
.decode(&mut GZipDecoder::new()) .decode(&mut GZipDecoder::new())
.collect::<std::result::Result<Vec<u8>, _>>() .collect::<std::result::Result<Vec<u8>, _>>()
{ {
decompressed.iter().enumerate().for_each(|(i, val)| { Self::bitmap_to_slot_list(slots.first, &decompressed)
if *val != 0 { } else {
(0..8).for_each(|bit_index| { BTreeSet::new()
if (1 << bit_index & *val) != 0 { }
let slot = slots.first + i as u64 * NUM_BITS_PER_BYTE + bit_index; }
old_incomplete_slots.insert(slot as u64); BZip2 => {
if let Ok(decompressed) = slots
.compressed_list
.iter()
.cloned()
.decode(&mut BZip2Decoder::new())
.collect::<std::result::Result<Vec<u8>, _>>()
{
Self::bitmap_to_slot_list(slots.first, &decompressed)
} else {
BTreeSet::new()
} }
})
} }
})
} }
old_incomplete_slots
} }
pub fn push_epoch_slots( pub fn push_epoch_slots(
@ -2523,5 +2556,11 @@ mod tests {
assert_eq!(80, compressed.first); assert_eq!(80, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed); assert_eq!(incomplete_slots, decompressed);
incomplete_slots.insert(10000);
let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
assert_eq!(80, compressed.first);
let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed);
assert_eq!(incomplete_slots, decompressed);
} }
} }

View File

@ -63,9 +63,23 @@ pub enum CrdsData {
EpochSlots(EpochSlotIndex, EpochSlots), EpochSlots(EpochSlotIndex, EpochSlots),
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum CompressionType {
Uncompressed,
GZip,
BZip2,
}
impl Default for CompressionType {
fn default() -> Self {
Self::Uncompressed
}
}
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] #[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)]
pub struct EpochIncompleteSlots { pub struct EpochIncompleteSlots {
pub first: Slot, pub first: Slot,
pub compression: CompressionType,
pub compressed_list: Vec<u8>, pub compressed_list: Vec<u8>,
} }