From f64ab4930706d58dec3a912cfc12c9ad47bacfbf Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Wed, 11 Mar 2020 21:31:50 -0700 Subject: [PATCH] Cluster has no way to know which slots are available (#8732) automerge --- Cargo.lock | 51 +++- core/Cargo.toml | 3 +- core/src/cluster_info.rs | 384 ++++++++++++------------- core/src/cluster_slots.rs | 267 ++++++++++++++++++ core/src/crds_value.rs | 120 ++++---- core/src/deprecated.rs | 21 ++ core/src/epoch_slots.rs | 401 ++++++++++++++++++++++++++ core/src/lib.rs | 3 + core/src/repair_service.rs | 558 ++++--------------------------------- 9 files changed, 1026 insertions(+), 782 deletions(-) create mode 100644 core/src/cluster_slots.rs create mode 100644 core/src/deprecated.rs create mode 100644 core/src/epoch_slots.rs diff --git a/Cargo.lock b/Cargo.lock index 05e50cc29..8fd405e34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,10 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "adler32" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "aho-corasick" version = "0.5.3" @@ -564,17 +569,6 @@ dependencies = [ "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "compression" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "console" version = "0.10.0" @@ -634,6 +628,14 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crc32fast" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "criterion-stats" version = "0.3.0" @@ -1148,6 +1150,17 @@ name = "fixedbitset" version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "flate2" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", + "miniz_oxide 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "fnv" version = "1.0.6" @@ -2075,6 +2088,14 @@ dependencies = [ "unicase 2.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "miniz_oxide" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "mio" version = "0.6.21" @@ -3877,12 +3898,13 @@ version = "1.1.0" dependencies = [ "bincode 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "bs58 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bv 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", - "compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "core_affinity 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "ed25519-dalek 1.0.0-pre.1 (registry+https://github.com/rust-lang/crates.io-index)", + "flate2 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)", "fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6139,6 +6161,7 @@ dependencies = [ ] [metadata] +"checksum adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" "checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66" "checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" @@ -6209,7 +6232,6 @@ dependencies = [ "checksum codespan-reporting 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab081a14ab8f9598ce826890fe896d0addee68c7a58ab49008369ccbb51510a8" "checksum colored 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6cdb90b60f2927f8d76139c72dbde7e10c3a2bc47c8594c9c7a66529f2687c03" "checksum combine 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1645a65a99c7c8d345761f4b75a6ffe5be3b3b27a93ee731fccc5050ba6be97c" -"checksum compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3a82b366ae14633c67a1cbb4aa3738210a23f77d2868a0fd50faa23a956f9ec4" "checksum console 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6728a28023f207181b193262711102bfbaf47cc9d13bc71d0736607ef8efe88c" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum core-foundation 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "25b9e03f145fd4f2bf705e07b900cd41fc636598fe5dc452fd0db1441c3f496d" @@ -6217,6 +6239,7 @@ dependencies = [ "checksum core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b" "checksum core-foundation-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" "checksum core_affinity 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "7f8a03115cc34fb0d7c321dd154a3914b3ca082ccc5c11d91bf7117dbbe7171f" +"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" "checksum criterion-stats 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "387df94cb74ada1b33e10ce034bb0d9360cc73edb5063e7d7d4120a40ee1c9d2" "checksum crossbeam 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "bd66663db5a988098a89599d4857919b3acf7f61402e61365acfd3919857b9be" "checksum crossbeam 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2d818a4990769aac0c7ff1360e233ef3a41adcb009ebb2036bf6915eb0f6b23c" @@ -6273,6 +6296,7 @@ dependencies = [ "checksum feature-probe 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "835a3dc7d1ec9e75e2b5fb4ba75396837112d2060b03f7d43bc1897c7f7211da" "checksum filetime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "450537dc346f0c4d738dda31e790da1da5d4bd12145aad4da0d03d713cb3794f" "checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" +"checksum flate2 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)" = "6bd6d6f4752952feb71363cffc9ebac9411b75b87c6ab6058c40c8900cf43c0f" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" "checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" @@ -6377,6 +6401,7 @@ dependencies = [ "checksum mime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0" "checksum mime 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)" = "3e27ca21f40a310bd06d9031785f4801710d566c184a6e15bad4f1d9b65f9425" "checksum mime_guess 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1a0ed03949aef72dbdf3116a383d7b38b4768e6f960528cd6a6044aa9ed68599" +"checksum miniz_oxide 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "aa679ff6578b1cddee93d7e82e263b94a575e0bfced07284eb0c037c1d2416a5" "checksum mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)" = "302dec22bcf6bae6dfb69c647187f4b4d0fb6f535521f7bc022430ce8e12008f" "checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40" "checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" diff --git a/core/Cargo.toml b/core/Cargo.toml index 9632345cc..6d88267a4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -15,13 +15,14 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git [dependencies] bincode = "1.2.1" +bv = { version = "0.11.0", features = ["serde"] } bs58 = "0.3.0" byteorder = "1.3.4" chrono = { version = "0.4.11", features = ["serde"] } -compression = "0.1.5" core_affinity = "0.5.10" crossbeam-channel = "0.4" fs_extra = "1.1.0" +flate2 = "1.0" indexmap = "1.3" itertools = "0.9.0" jsonrpc-core = "14.0.5" diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 79fe76e21..2ccd00dc2 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,8 +12,6 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight -use crate::crds_value::CompressionType::*; -use crate::crds_value::EpochIncompleteSlots; use crate::packet::limited_deserialize; use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ @@ -21,14 +19,16 @@ use crate::{ crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, - crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, SnapshotHash, Vote}, + crds_value::{ + self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash, Vote, + }, + epoch_slots::EpochSlots, packet::{Packet, PACKET_DATA_SIZE}, result::{Error, Result}, sendmmsg::{multicast, send_mmsg}, weighted_shuffle::{weighted_best, weighted_shuffle}, }; use bincode::{serialize, serialized_size}; -use compression::prelude::*; use core::cmp; use itertools::Itertools; use rayon::iter::IntoParallelIterator; @@ -46,7 +46,7 @@ use solana_rayon_threadlimit::get_thread_count; use solana_sdk::hash::Hash; use solana_sdk::timing::duration_as_s; use solana_sdk::{ - clock::{Slot, DEFAULT_MS_PER_SLOT}, + clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, pubkey::Pubkey, signature::{Keypair, Signable, Signature}, timing::{duration_as_ms, timestamp}, @@ -55,7 +55,7 @@ use solana_sdk::{ use std::{ borrow::Cow, cmp::min, - collections::{BTreeSet, HashMap, HashSet}, + collections::{HashMap, HashSet}, fmt, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket}, sync::atomic::{AtomicBool, Ordering}, @@ -71,7 +71,8 @@ pub const DATA_PLANE_FANOUT: usize = 200; /// milliseconds we sleep for between gossip requests pub const GOSSIP_SLEEP_MILLIS: u64 = 100; /// The maximum size of a bloom filter -pub const MAX_BLOOM_SIZE: usize = 1018; +pub const MAX_BLOOM_SIZE: usize = MAX_CRDS_OBJECT_SIZE; +pub const MAX_CRDS_OBJECT_SIZE: usize = 928; /// The maximum size of a protocol payload const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; /// The largest protocol header size @@ -81,9 +82,6 @@ const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; /// 128MB/PACKET_DATA_SIZE const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; -const NUM_BITS_PER_BYTE: u64 = 8; -const MIN_SIZE_TO_COMPRESS_GZIP: u64 = 64; - /// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE pub const MAX_SNAPSHOT_HASHES: usize = 16; @@ -254,6 +252,16 @@ impl ClusterInfo { self.lookup(&self.id()).cloned().unwrap() } + pub fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots { + let entry = CrdsValueLabel::EpochSlots(ix, self.id()); + self.gossip + .crds + .lookup(&entry) + .and_then(CrdsValue::epoch_slots) + .cloned() + .unwrap_or_else(|| EpochSlots::new(self.id(), timestamp())) + } + pub fn contact_info_trace(&self) -> String { let now = timestamp(); let mut spy_nodes = 0; @@ -328,121 +336,81 @@ impl ClusterInfo { ) } - pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet) -> EpochIncompleteSlots { - if !incomplete_slots.is_empty() { - let first_slot = incomplete_slots - .iter() - .next() - .expect("expected to find at least one slot"); - let last_slot = incomplete_slots - .iter() - .next_back() - .expect("expected to find last slot"); - let num_uncompressed_bits = last_slot.saturating_sub(*first_slot) + 1; - let num_uncompressed_bytes = if num_uncompressed_bits % NUM_BITS_PER_BYTE > 0 { - 1 - } else { - 0 - } + num_uncompressed_bits / NUM_BITS_PER_BYTE; - let mut uncompressed = vec![0u8; num_uncompressed_bytes as usize]; - incomplete_slots.iter().for_each(|slot| { - let offset_from_first_slot = slot.saturating_sub(*first_slot); - let index = offset_from_first_slot / NUM_BITS_PER_BYTE; - let bit_index = offset_from_first_slot % NUM_BITS_PER_BYTE; - uncompressed[index as usize] |= 1 << bit_index; - }); - if num_uncompressed_bytes >= MIN_SIZE_TO_COMPRESS_GZIP { - if let Ok(compressed) = uncompressed - .iter() - .cloned() - .encode(&mut GZipEncoder::new(), Action::Finish) - .collect::, _>>() - { - return EpochIncompleteSlots { - first: *first_slot, - compression: GZip, - compressed_list: compressed, - }; - } - } else { - return EpochIncompleteSlots { - first: *first_slot, - compression: Uncompressed, - compressed_list: uncompressed, - }; - } - } - EpochIncompleteSlots::default() - } - - fn bitmap_to_slot_list(first: Slot, bitmap: &[u8]) -> BTreeSet { - let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); - bitmap.iter().enumerate().for_each(|(i, val)| { - if *val != 0 { - (0..8).for_each(|bit_index| { - if (1 << bit_index & *val) != 0 { - let slot = first + i as u64 * NUM_BITS_PER_BYTE + bit_index as u64; - old_incomplete_slots.insert(slot); - } - }) - } - }); - old_incomplete_slots - } - - pub fn decompress_incomplete_slots(slots: &EpochIncompleteSlots) -> BTreeSet { - match slots.compression { - Uncompressed => Self::bitmap_to_slot_list(slots.first, &slots.compressed_list), - GZip => { - if let Ok(decompressed) = slots - .compressed_list - .iter() - .cloned() - .decode(&mut GZipDecoder::new()) - .collect::, _>>() - { - Self::bitmap_to_slot_list(slots.first, &decompressed) - } else { - BTreeSet::new() - } - } - BZip2 => { - if let Ok(decompressed) = slots - .compressed_list - .iter() - .cloned() - .decode(&mut BZip2Decoder::new()) - .collect::, _>>() - { - Self::bitmap_to_slot_list(slots.first, &decompressed) - } else { - BTreeSet::new() - } - } - } - } - - pub fn push_epoch_slots( - &mut self, - id: Pubkey, - root: Slot, - min: Slot, - slots: BTreeSet, - incomplete_slots: &BTreeSet, - ) { - let compressed = Self::compress_incomplete_slots(incomplete_slots); + pub fn push_lowest_slot(&mut self, id: Pubkey, min: Slot) { let now = timestamp(); - let entry = CrdsValue::new_signed( - CrdsData::EpochSlots( - 0, - EpochSlots::new(id, root, min, slots, vec![compressed], now), - ), - &self.keypair, - ); - self.gossip - .process_push_message(&self.id(), vec![entry], now); + let last = self + .gossip + .crds + .lookup(&CrdsValueLabel::LowestSlot(self.id())) + .and_then(|x| x.lowest_slot()) + .map(|x| x.lowest) + .unwrap_or(0); + if min > last { + let entry = CrdsValue::new_signed( + CrdsData::LowestSlot(0, LowestSlot::new(id, min, now)), + &self.keypair, + ); + self.gossip + .process_push_message(&self.id(), vec![entry], now); + } } + pub fn push_epoch_slots(&mut self, update: &[Slot]) { + let mut num = 0; + let mut current_slots: Vec<_> = (0..crds_value::MAX_EPOCH_SLOTS) + .filter_map(|ix| { + Some(( + self.gossip + .crds + .lookup(&CrdsValueLabel::EpochSlots(ix, self.id())) + .and_then(CrdsValue::epoch_slots) + .and_then(|x| Some((x.wallclock, x.first_slot()?)))?, + ix, + )) + }) + .collect(); + current_slots.sort(); + let min_slot: Slot = current_slots + .iter() + .map(|((_, s), _)| *s) + .min() + .unwrap_or(0); + let max_slot: Slot = update.iter().max().cloned().unwrap_or(0); + let total_slots = max_slot as isize - min_slot as isize; + // WARN if CRDS is not storing at least a full epoch worth of slots + if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots + && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() + { + inc_new_counter_warn!("cluster_info-epoch_slots-filled", 1); + warn!( + "EPOCH_SLOTS are filling up FAST {}/{}", + total_slots, + current_slots.len() + ); + } + let mut reset = false; + let mut epoch_slot_index = current_slots.last().map(|(_, x)| *x).unwrap_or(0); + while num < update.len() { + let ix = (epoch_slot_index % crds_value::MAX_EPOCH_SLOTS) as u8; + let now = timestamp(); + let mut slots = if !reset { + self.lookup_epoch_slots(ix) + } else { + EpochSlots::new(self.id(), now) + }; + let n = slots.fill(&update[num..], now); + if n > 0 { + let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair); + self.gossip + .process_push_message(&self.id(), vec![entry], now); + } + num += n; + if num < update.len() { + epoch_slot_index += 1; + reset = true; + } + } + } pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) { if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES { warn!( @@ -526,21 +494,39 @@ impl ClusterInfo { .map(|x| &x.value.snapshot_hash().unwrap().hashes) } - pub fn get_epoch_state_for_node( + pub fn get_lowest_slot_for_node( &self, pubkey: &Pubkey, since: Option, - ) -> Option<(&EpochSlots, u64)> { + ) -> Option<(&LowestSlot, u64)> { self.gossip .crds .table - .get(&CrdsValueLabel::EpochSlots(*pubkey)) + .get(&CrdsValueLabel::LowestSlot(*pubkey)) .filter(|x| { since .map(|since| x.insert_timestamp > since) .unwrap_or(true) }) - .map(|x| (x.value.epoch_slots().unwrap(), x.insert_timestamp)) + .map(|x| (x.value.lowest_slot().unwrap(), x.insert_timestamp)) + } + + pub fn get_epoch_slots_since(&self, since: Option) -> (Vec, Option) { + let vals: Vec<_> = self + .gossip + .crds + .table + .values() + .filter(|x| { + since + .map(|since| x.insert_timestamp > since) + .unwrap_or(true) + }) + .filter_map(|x| Some((x.value.epoch_slots()?, x.insert_timestamp))) + .collect(); + let max = vals.iter().map(|x| x.1).max().or(since); + let vec = vals.into_iter().map(|x| x.0).cloned().collect(); + (vec, max) } pub fn get_contact_info_for_node(&self, pubkey: &Pubkey) -> Option<&ContactInfo> { @@ -684,8 +670,8 @@ impl ClusterInfo { && x.shred_version == me.shred_version && ContactInfo::is_valid_address(&x.serve_repair) && { - self.get_epoch_state_for_node(&x.id, None) - .map(|(epoch_slots, _)| epoch_slots.lowest <= slot) + self.get_lowest_slot_for_node(&x.id, None) + .map(|(lowest_slot, _)| lowest_slot.lowest <= slot) .unwrap_or_else(|| /* fallback to legacy behavior */ true) } }) @@ -2280,6 +2266,29 @@ mod tests { assert_eq!(max_ts, new_max_ts); } + #[test] + fn test_push_epoch_slots() { + let keys = Keypair::new(); + let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0); + let mut cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); + let (slots, since) = cluster_info.get_epoch_slots_since(None); + assert!(slots.is_empty()); + assert!(since.is_none()); + cluster_info.push_epoch_slots(&[0]); + + let (slots, since) = cluster_info.get_epoch_slots_since(Some(std::u64::MAX)); + assert!(slots.is_empty()); + assert_eq!(since, Some(std::u64::MAX)); + + let (slots, since) = cluster_info.get_epoch_slots_since(None); + assert_eq!(slots.len(), 1); + assert!(since.is_some()); + + let (slots, since2) = cluster_info.get_epoch_slots_since(since.clone()); + assert!(slots.is_empty()); + assert_eq!(since2, since); + } + #[test] fn test_add_entrypoint() { let node_keypair = Arc::new(Keypair::new()); @@ -2333,20 +2342,9 @@ mod tests { #[test] fn test_split_messages_large() { - let mut btree_slots = BTreeSet::new(); - for i in 0..128 { - btree_slots.insert(i); - } - let value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + let value = CrdsValue::new_unsigned(CrdsData::LowestSlot( 0, - EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots: btree_slots, - stash: vec![], - wallclock: 0, - }, + LowestSlot::new(Pubkey::default(), 0, 0), )); test_split_messages(value); } @@ -2358,39 +2356,19 @@ mod tests { let payload: Vec = vec![]; let vec_size = serialized_size(&payload).unwrap(); let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size; - let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots( - 0, - EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots: BTreeSet::new(), - stash: vec![], - wallclock: 0, - }, - )); + let mut value = CrdsValue::new_unsigned(CrdsData::SnapshotHash(SnapshotHash { + from: Pubkey::default(), + hashes: vec![], + wallclock: 0, + })); let mut i = 0; while value.size() <= desired_size { - let slots = (0..i).collect::>(); - if slots.len() > 200 { - panic!( - "impossible to match size: last {:?} vs desired {:?}", - serialized_size(&value).unwrap(), - desired_size - ); - } - value.data = CrdsData::EpochSlots( - 0, - EpochSlots { - from: Pubkey::default(), - root: 0, - lowest: 0, - slots, - stash: vec![], - wallclock: 0, - }, - ); + value.data = CrdsData::SnapshotHash(SnapshotHash { + from: Pubkey::default(), + hashes: vec![(0, Hash::default()); i], + wallclock: 0, + }); i += 1; } let split = ClusterInfo::split_gossip_messages(vec![value.clone()]); @@ -2520,26 +2498,17 @@ mod tests { node_keypair, ); for i in 0..10 { - let mut peer_root = 5; let mut peer_lowest = 0; if i >= 5 { // make these invalid for the upcoming repair request - peer_root = 15; peer_lowest = 10; } let other_node_pubkey = Pubkey::new_rand(); let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); cluster_info.insert_info(other_node.clone()); - let value = CrdsValue::new_unsigned(CrdsData::EpochSlots( + let value = CrdsValue::new_unsigned(CrdsData::LowestSlot( 0, - EpochSlots::new( - other_node_pubkey, - peer_root, - peer_lowest, - BTreeSet::new(), - vec![], - timestamp(), - ), + LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()), )); let _ = cluster_info.gossip.crds.insert(value, timestamp()); } @@ -2549,7 +2518,8 @@ mod tests { #[test] fn test_max_bloom_size() { - assert_eq!(MAX_BLOOM_SIZE, max_bloom_size()); + // check that the constant fits into the dynamic size + assert!(MAX_BLOOM_SIZE <= max_bloom_size()); } #[test] @@ -2602,36 +2572,24 @@ mod tests { } #[test] - fn test_compress_incomplete_slots() { - let mut incomplete_slots: BTreeSet = BTreeSet::new(); - - assert_eq!( - EpochIncompleteSlots::default(), - ClusterInfo::compress_incomplete_slots(&incomplete_slots) + fn test_push_epoch_slots_large() { + use rand::Rng; + let node_keypair = Arc::new(Keypair::new()); + let mut cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, ); - - incomplete_slots.insert(100); - let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); - assert_eq!(100, compressed.first); - let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); - assert_eq!(incomplete_slots, decompressed); - - incomplete_slots.insert(104); - let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); - assert_eq!(100, compressed.first); - let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); - assert_eq!(incomplete_slots, decompressed); - - incomplete_slots.insert(80); - let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); - assert_eq!(80, compressed.first); - let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); - assert_eq!(incomplete_slots, decompressed); - - incomplete_slots.insert(10000); - let compressed = ClusterInfo::compress_incomplete_slots(&incomplete_slots); - assert_eq!(80, compressed.first); - let decompressed = ClusterInfo::decompress_incomplete_slots(&compressed); - assert_eq!(incomplete_slots, decompressed); + let mut range: Vec = vec![]; + //random should be hard to compress + for _ in 0..32000 { + let last = *range.last().unwrap_or(&0); + range.push(last + rand::thread_rng().gen_range(1, 32)); + } + cluster_info.push_epoch_slots(&range[..16000]); + cluster_info.push_epoch_slots(&range[16000..]); + let (slots, since) = cluster_info.get_epoch_slots_since(None); + let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect(); + assert_eq!(slots, range); + assert!(since.is_some()); } } diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs new file mode 100644 index 000000000..2b188dfd8 --- /dev/null +++ b/core/src/cluster_slots.rs @@ -0,0 +1,267 @@ +use crate::{cluster_info::ClusterInfo, epoch_slots::EpochSlots, serve_repair::RepairType}; + +use solana_ledger::{bank_forks::BankForks, staking_utils}; +use solana_sdk::{clock::Slot, pubkey::Pubkey}; + +use std::{ + collections::{HashMap, HashSet}, + rc::Rc, + sync::RwLock, +}; + +#[derive(Default)] +pub struct ClusterSlots { + cluster_slots: HashMap, u64>>, + keys: HashSet>, + since: Option, + validator_stakes: HashMap, u64>, + epoch: Option, + self_id: Pubkey, +} + +impl ClusterSlots { + pub fn lookup(&self, slot: Slot) -> Option<&HashMap, u64>> { + self.cluster_slots.get(&slot) + } + pub fn update( + &mut self, + root: Slot, + cluster_info: &RwLock, + bank_forks: &RwLock, + ) { + self.update_peers(cluster_info, bank_forks); + let epoch_slots = cluster_info + .read() + .unwrap() + .get_epoch_slots_since(self.since); + self.update_internal(root, epoch_slots); + } + fn update_internal(&mut self, root: Slot, epoch_slots: (Vec, Option)) { + let (epoch_slots_list, since) = epoch_slots; + for epoch_slots in epoch_slots_list { + let slots = epoch_slots.to_slots(root); + for slot in &slots { + if *slot <= root { + continue; + } + let pubkey = Rc::new(epoch_slots.from); + if self.keys.get(&pubkey).is_none() { + self.keys.insert(pubkey.clone()); + } + let from = self.keys.get(&pubkey).unwrap(); + let balance = self.validator_stakes.get(from).cloned().unwrap_or(0); + if self.self_id != **from { + debug!( + "CLUSTER_SLLOTS: {}: insert {} {} {}", + self.self_id, from, *slot, balance + ); + } + self.cluster_slots + .entry(*slot) + .or_insert_with(HashMap::default) + .insert(from.clone(), balance); + } + } + self.cluster_slots.retain(|x, _| *x > root); + self.keys.retain(|x| Rc::strong_count(x) > 1); + self.since = since; + } + pub fn stats(&self) -> (usize, usize, f64) { + let my_slots = self.collect(&self.self_id); + let others: HashMap<_, _> = self + .cluster_slots + .iter() + .filter(|(x, _)| !my_slots.contains(x)) + .flat_map(|(_, x)| x.iter()) + .collect(); + let other_slots: Vec = self + .cluster_slots + .iter() + .filter(|(x, _)| !my_slots.contains(x)) + .map(|(x, _)| *x) + .collect(); + + let weight: u64 = others.values().map(|x| **x).sum(); + let keys: Vec> = others.keys().copied().cloned().collect(); + let total: u64 = self.validator_stakes.values().copied().sum::() + 1u64; + if !other_slots.is_empty() { + debug!( + "{}: CLUSTER_SLOTS STATS {} {:?} {:?}", + self.self_id, + weight as f64 / total as f64, + keys, + other_slots + ); + } + ( + my_slots.len(), + self.cluster_slots.len(), + weight as f64 / total as f64, + ) + } + pub fn collect(&self, id: &Pubkey) -> HashSet { + self.cluster_slots + .iter() + .filter(|(_, keys)| keys.get(id).is_some()) + .map(|(slot, _)| slot) + .cloned() + .collect() + } + + pub fn update_peers( + &mut self, + cluster_info: &RwLock, + bank_forks: &RwLock, + ) { + let root = bank_forks.read().unwrap().root(); + let (epoch, _) = bank_forks + .read() + .unwrap() + .working_bank() + .get_epoch_and_slot_index(root); + if Some(epoch) != self.epoch { + let stakes = staking_utils::staked_nodes_at_epoch( + &bank_forks.read().unwrap().working_bank(), + epoch, + ); + if stakes.is_none() { + return; + } + let stakes = stakes.unwrap(); + self.validator_stakes = HashMap::new(); + for (from, bal) in stakes { + let pubkey = Rc::new(from); + if self.keys.get(&pubkey).is_none() { + self.keys.insert(pubkey.clone()); + } + let from = self.keys.get(&pubkey).unwrap(); + self.validator_stakes.insert(from.clone(), bal); + } + self.self_id = cluster_info.read().unwrap().id(); + self.epoch = Some(epoch); + } + } + pub fn peers(&self, slot: Slot) -> Vec<(Rc, u64)> { + let mut peers: HashMap, u64> = self.validator_stakes.clone(); + if let Some(slot_peers) = self.lookup(slot) { + slot_peers + .iter() + .for_each(|(x, y)| *peers.entry(x.clone()).or_insert(0) += *y); + } + peers.into_iter().filter(|x| *x.0 != self.self_id).collect() + } + + pub fn generate_repairs_for_missing_slots( + &self, + self_id: &Pubkey, + root: Slot, + ) -> Vec { + let my_slots = self.collect(self_id); + self.cluster_slots + .keys() + .filter(|x| **x > root) + .filter(|x| !my_slots.contains(*x)) + .map(|x| RepairType::HighestShred(*x, 0)) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default() { + let cs = ClusterSlots::default(); + assert!(cs.cluster_slots.is_empty()); + assert!(cs.since.is_none()); + } + + #[test] + fn test_update_noop() { + let mut cs = ClusterSlots::default(); + cs.update_internal(0, (vec![], None)); + assert!(cs.cluster_slots.is_empty()); + assert!(cs.since.is_none()); + } + + #[test] + fn test_update_empty() { + let mut cs = ClusterSlots::default(); + let epoch_slot = EpochSlots::default(); + cs.update_internal(0, (vec![epoch_slot], Some(0))); + assert_eq!(cs.since, Some(0)); + assert!(cs.lookup(0).is_none()); + } + + #[test] + fn test_update_rooted() { + //root is 0, so it should clear out the slot + let mut cs = ClusterSlots::default(); + let mut epoch_slot = EpochSlots::default(); + epoch_slot.fill(&[0], 0); + cs.update_internal(0, (vec![epoch_slot], Some(0))); + assert_eq!(cs.since, Some(0)); + assert!(cs.lookup(0).is_none()); + } + + #[test] + fn test_update_new_slot() { + let mut cs = ClusterSlots::default(); + let mut epoch_slot = EpochSlots::default(); + epoch_slot.fill(&[1], 0); + cs.update_internal(0, (vec![epoch_slot], Some(0))); + assert_eq!(cs.since, Some(0)); + assert!(cs.lookup(0).is_none()); + assert!(cs.lookup(1).is_some()); + assert_eq!(cs.lookup(1).unwrap().get(&Pubkey::default()), Some(&0)); + } + + #[test] + fn test_update_new_staked_slot() { + let mut cs = ClusterSlots::default(); + let mut epoch_slot = EpochSlots::default(); + epoch_slot.fill(&[1], 0); + let map = vec![(Rc::new(Pubkey::default()), 1)].into_iter().collect(); + cs.validator_stakes = map; + cs.update_internal(0, (vec![epoch_slot], None)); + assert!(cs.lookup(1).is_some()); + assert_eq!(cs.lookup(1).unwrap().get(&Pubkey::default()), Some(&1)); + } + + #[test] + fn test_generate_repairs() { + let mut cs = ClusterSlots::default(); + let mut epoch_slot = EpochSlots::default(); + epoch_slot.fill(&[1], 0); + cs.update_internal(0, (vec![epoch_slot], None)); + let self_id = Pubkey::new_rand(); + assert_eq!( + cs.generate_repairs_for_missing_slots(&self_id, 0), + vec![RepairType::HighestShred(1, 0)] + ) + } + + #[test] + fn test_collect_my_slots() { + let mut cs = ClusterSlots::default(); + let mut epoch_slot = EpochSlots::default(); + epoch_slot.fill(&[1], 0); + let self_id = epoch_slot.from; + cs.update_internal(0, (vec![epoch_slot], None)); + let slots: Vec = cs.collect(&self_id).into_iter().collect(); + assert_eq!(slots, vec![1]); + } + + #[test] + fn test_generate_repairs_existing() { + let mut cs = ClusterSlots::default(); + let mut epoch_slot = EpochSlots::default(); + epoch_slot.fill(&[1], 0); + let self_id = epoch_slot.from; + cs.update_internal(0, (vec![epoch_slot], None)); + assert!(cs + .generate_repairs_for_missing_slots(&self_id, 0) + .is_empty()); + } +} diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 098db8729..ffae1c751 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -1,4 +1,6 @@ use crate::contact_info::ContactInfo; +use crate::deprecated; +use crate::epoch_slots::EpochSlots; use bincode::{serialize, serialized_size}; use solana_sdk::{ clock::Slot, @@ -16,7 +18,8 @@ use std::{ pub type VoteIndex = u8; pub const MAX_VOTES: VoteIndex = 32; -pub type EpochSlotIndex = u8; +pub type EpochSlotsIndex = u8; +pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -48,6 +51,7 @@ impl Signable for CrdsValue { .verify(&self.pubkey().as_ref(), self.signable_data().borrow()); let data_check = match &self.data { CrdsData::Vote(ix, _) => *ix < MAX_VOTES, + CrdsData::EpochSlots(ix, _) => *ix < MAX_EPOCH_SLOTS, _ => true, }; sig_check && data_check @@ -56,33 +60,15 @@ impl Signable for CrdsValue { /// CrdsData that defines the different types of items CrdsValues can hold /// * Merge Strategy - Latest wallclock is picked +/// * LowestSlot index is deprecated #[allow(clippy::large_enum_variant)] #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum CrdsData { ContactInfo(ContactInfo), Vote(VoteIndex, Vote), - EpochSlots(EpochSlotIndex, EpochSlots), + LowestSlot(u8, LowestSlot), SnapshotHash(SnapshotHash), -} - -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub enum CompressionType { - Uncompressed, - GZip, - BZip2, -} - -impl Default for CompressionType { - fn default() -> Self { - Self::Uncompressed - } -} - -#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] -pub struct EpochIncompleteSlots { - pub first: Slot, - pub compression: CompressionType, - pub compressed_list: Vec, + EpochSlots(EpochSlotsIndex, EpochSlots), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -101,32 +87,24 @@ impl SnapshotHash { } } } - #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] -pub struct EpochSlots { +pub struct LowestSlot { pub from: Pubkey, - pub root: Slot, + root: Slot, //deprecated pub lowest: Slot, - pub slots: BTreeSet, - pub stash: Vec, + slots: BTreeSet, //deprecated + stash: Vec, //deprecated pub wallclock: u64, } -impl EpochSlots { - pub fn new( - from: Pubkey, - root: Slot, - lowest: Slot, - slots: BTreeSet, - stash: Vec, - wallclock: u64, - ) -> Self { +impl LowestSlot { + pub fn new(from: Pubkey, lowest: Slot, wallclock: u64) -> Self { Self { from, - root, + root: 0, lowest, - slots, - stash, + slots: BTreeSet::new(), + stash: vec![], wallclock, } } @@ -155,8 +133,9 @@ impl Vote { pub enum CrdsValueLabel { ContactInfo(Pubkey), Vote(VoteIndex, Pubkey), - EpochSlots(Pubkey), + LowestSlot(Pubkey), SnapshotHash(Pubkey), + EpochSlots(EpochSlotsIndex, Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -164,8 +143,9 @@ impl fmt::Display for CrdsValueLabel { match self { CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()), - CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()), + CrdsValueLabel::LowestSlot(_) => write!(f, "LowestSlot({})", self.pubkey()), CrdsValueLabel::SnapshotHash(_) => write!(f, "SnapshotHash({})", self.pubkey()), + CrdsValueLabel::EpochSlots(ix, _) => write!(f, "EpochSlots({}, {})", ix, self.pubkey()), } } } @@ -175,8 +155,9 @@ impl CrdsValueLabel { match self { CrdsValueLabel::ContactInfo(p) => *p, CrdsValueLabel::Vote(_, p) => *p, - CrdsValueLabel::EpochSlots(p) => *p, + CrdsValueLabel::LowestSlot(p) => *p, CrdsValueLabel::SnapshotHash(p) => *p, + CrdsValueLabel::EpochSlots(_, p) => *p, } } } @@ -201,24 +182,27 @@ impl CrdsValue { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.wallclock, CrdsData::Vote(_, vote) => vote.wallclock, - CrdsData::EpochSlots(_, vote) => vote.wallclock, + CrdsData::LowestSlot(_, obj) => obj.wallclock, CrdsData::SnapshotHash(hash) => hash.wallclock, + CrdsData::EpochSlots(_, p) => p.wallclock, } } pub fn pubkey(&self) -> Pubkey { match &self.data { CrdsData::ContactInfo(contact_info) => contact_info.id, CrdsData::Vote(_, vote) => vote.from, - CrdsData::EpochSlots(_, slots) => slots.from, + CrdsData::LowestSlot(_, slots) => slots.from, CrdsData::SnapshotHash(hash) => hash.from, + CrdsData::EpochSlots(_, p) => p.from, } } pub fn label(&self) -> CrdsValueLabel { match &self.data { CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()), CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), - CrdsData::EpochSlots(_, _) => CrdsValueLabel::EpochSlots(self.pubkey()), + CrdsData::LowestSlot(_, _) => CrdsValueLabel::LowestSlot(self.pubkey()), CrdsData::SnapshotHash(_) => CrdsValueLabel::SnapshotHash(self.pubkey()), + CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -241,9 +225,9 @@ impl CrdsValue { } } - pub fn epoch_slots(&self) -> Option<&EpochSlots> { + pub fn lowest_slot(&self) -> Option<&LowestSlot> { match &self.data { - CrdsData::EpochSlots(_, slots) => Some(slots), + CrdsData::LowestSlot(_, slots) => Some(slots), _ => None, } } @@ -255,14 +239,22 @@ impl CrdsValue { } } + pub fn epoch_slots(&self) -> Option<&EpochSlots> { + match &self.data { + CrdsData::EpochSlots(_, slots) => Some(slots), + _ => None, + } + } + /// Return all the possible labels for a record identified by Pubkey. pub fn record_labels(key: &Pubkey) -> Vec { let mut labels = vec![ CrdsValueLabel::ContactInfo(*key), - CrdsValueLabel::EpochSlots(*key), + CrdsValueLabel::LowestSlot(*key), CrdsValueLabel::SnapshotHash(*key), ]; labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key))); + labels.extend((0..MAX_EPOCH_SLOTS).map(|ix| CrdsValueLabel::EpochSlots(ix, *key))); labels } @@ -310,14 +302,17 @@ mod test { #[test] fn test_labels() { - let mut hits = [false; 3 + MAX_VOTES as usize]; + let mut hits = [false; 3 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; // this method should cover all the possible labels for v in &CrdsValue::record_labels(&Pubkey::default()) { match v { CrdsValueLabel::ContactInfo(_) => hits[0] = true, - CrdsValueLabel::EpochSlots(_) => hits[1] = true, + CrdsValueLabel::LowestSlot(_) => hits[1] = true, CrdsValueLabel::SnapshotHash(_) => hits[2] = true, CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 3] = true, + CrdsValueLabel::EpochSlots(ix, _) => { + hits[*ix as usize + MAX_VOTES as usize + 3] = true + } } } assert!(hits.iter().all(|x| *x)); @@ -337,13 +332,13 @@ mod test { let key = v.clone().vote().unwrap().from; assert_eq!(v.label(), CrdsValueLabel::Vote(0, key)); - let v = CrdsValue::new_unsigned(CrdsData::EpochSlots( + let v = CrdsValue::new_unsigned(CrdsData::LowestSlot( 0, - EpochSlots::new(Pubkey::default(), 0, 0, BTreeSet::new(), vec![], 0), + LowestSlot::new(Pubkey::default(), 0, 0), )); assert_eq!(v.wallclock(), 0); - let key = v.clone().epoch_slots().unwrap().from; - assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key)); + let key = v.clone().lowest_slot().unwrap().from; + assert_eq!(v.label(), CrdsValueLabel::LowestSlot(key)); } #[test] @@ -360,10 +355,9 @@ mod test { Vote::new(&keypair.pubkey(), test_tx(), timestamp()), )); verify_signatures(&mut v, &keypair, &wrong_keypair); - let btreeset: BTreeSet = vec![1, 2, 3, 6, 8].into_iter().collect(); - v = CrdsValue::new_unsigned(CrdsData::EpochSlots( + v = CrdsValue::new_unsigned(CrdsData::LowestSlot( 0, - EpochSlots::new(keypair.pubkey(), 0, 0, btreeset, vec![], timestamp()), + LowestSlot::new(keypair.pubkey(), 0, timestamp()), )); verify_signatures(&mut v, &keypair, &wrong_keypair); } @@ -381,6 +375,18 @@ mod test { assert!(!vote.verify()); } + #[test] + fn test_max_epoch_slots_index() { + let keypair = Keypair::new(); + let item = CrdsValue::new_signed( + CrdsData::EpochSlots( + MAX_EPOCH_SLOTS, + EpochSlots::new(keypair.pubkey(), timestamp()), + ), + &keypair, + ); + assert!(!item.verify()); + } #[test] fn test_compute_vote_index_empty() { for i in 0..MAX_VOTES { diff --git a/core/src/deprecated.rs b/core/src/deprecated.rs new file mode 100644 index 000000000..06e772505 --- /dev/null +++ b/core/src/deprecated.rs @@ -0,0 +1,21 @@ +use solana_sdk::clock::Slot; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +enum CompressionType { + Uncompressed, + GZip, + BZip2, +} + +impl Default for CompressionType { + fn default() -> Self { + Self::Uncompressed + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] +pub(crate) struct EpochIncompleteSlots { + first: Slot, + compression: CompressionType, + compressed_list: Vec, +} diff --git a/core/src/epoch_slots.rs b/core/src/epoch_slots.rs new file mode 100644 index 000000000..a93bf3d71 --- /dev/null +++ b/core/src/epoch_slots.rs @@ -0,0 +1,401 @@ +use crate::cluster_info::MAX_CRDS_OBJECT_SIZE; +use bincode::serialized_size; +use bv::BitVec; +use flate2::{Compress, Compression, Decompress, FlushCompress, FlushDecompress}; +use solana_sdk::clock::Slot; +use solana_sdk::pubkey::Pubkey; + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct Uncompressed { + pub first_slot: Slot, + pub num: usize, + pub slots: BitVec, +} + +#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] +pub struct Flate2 { + pub first_slot: Slot, + pub num: usize, + pub compressed: Vec, +} + +#[derive(Debug, PartialEq)] +pub enum Error { + CompressError, + DecompressError, +} + +pub type Result = std::result::Result; + +impl std::convert::From for Error { + fn from(_e: flate2::CompressError) -> Error { + Error::CompressError + } +} +impl std::convert::From for Error { + fn from(_e: flate2::DecompressError) -> Error { + Error::DecompressError + } +} + +impl Flate2 { + fn deflate(mut unc: Uncompressed) -> Result { + let mut compressed = Vec::with_capacity(unc.slots.block_capacity()); + let mut compressor = Compress::new(Compression::best(), false); + let first_slot = unc.first_slot; + let num = unc.num; + unc.slots.shrink_to_fit(); + let bits = unc.slots.into_boxed_slice(); + compressor.compress_vec(&bits, &mut compressed, FlushCompress::Finish)?; + let rv = Self { + first_slot, + num, + compressed, + }; + let _ = rv.inflate()?; + Ok(rv) + } + pub fn inflate(&self) -> Result { + //add some head room for the decompressor which might spill more bits + let mut uncompressed = Vec::with_capacity(32 + (self.num + 4) / 8); + let mut decompress = Decompress::new(false); + decompress.decompress_vec(&self.compressed, &mut uncompressed, FlushDecompress::Finish)?; + Ok(Uncompressed { + first_slot: self.first_slot, + num: self.num, + slots: BitVec::from_bits(&uncompressed), + }) + } +} + +impl Uncompressed { + pub fn new(max_size: usize) -> Self { + Self { + num: 0, + first_slot: 0, + slots: BitVec::new_fill(false, 8 * max_size as u64), + } + } + pub fn to_slots(&self, min_slot: Slot) -> Vec { + let mut rv = vec![]; + let start = if min_slot < self.first_slot { + 0 as usize + } else { + (min_slot - self.first_slot) as usize + }; + for i in start..self.num { + if self.slots.get(i as u64) { + rv.push(self.first_slot + i as Slot); + } + } + rv + } + pub fn add(&mut self, slots: &[Slot]) -> usize { + for (i, s) in slots.iter().enumerate() { + if self.num == 0 { + self.first_slot = *s; + } + if *s < self.first_slot { + return i; + } + if *s - self.first_slot >= self.slots.capacity() { + return i; + } + self.slots.set(*s - self.first_slot, true); + self.num = std::cmp::max(self.num, 1 + (*s - self.first_slot) as usize); + } + slots.len() + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub enum CompressedSlots { + Flate2(Flate2), + Uncompressed(Uncompressed), +} + +impl Default for CompressedSlots { + fn default() -> Self { + CompressedSlots::new(0) + } +} + +impl CompressedSlots { + fn new(max_size: usize) -> Self { + CompressedSlots::Uncompressed(Uncompressed::new(max_size)) + } + + pub fn first_slot(&self) -> Slot { + match self { + CompressedSlots::Uncompressed(a) => a.first_slot, + CompressedSlots::Flate2(b) => b.first_slot, + } + } + + pub fn num_slots(&self) -> usize { + match self { + CompressedSlots::Uncompressed(a) => a.num, + CompressedSlots::Flate2(b) => b.num, + } + } + + pub fn add(&mut self, slots: &[Slot]) -> usize { + match self { + CompressedSlots::Uncompressed(vals) => vals.add(slots), + CompressedSlots::Flate2(_) => 0, + } + } + pub fn to_slots(&self, min_slot: Slot) -> Result> { + match self { + CompressedSlots::Uncompressed(vals) => Ok(vals.to_slots(min_slot)), + CompressedSlots::Flate2(vals) => { + let unc = vals.inflate()?; + Ok(unc.to_slots(min_slot)) + } + } + } + pub fn deflate(&mut self) -> Result<()> { + match self { + CompressedSlots::Uncompressed(vals) => { + let unc = vals.clone(); + let compressed = Flate2::deflate(unc)?; + let mut new = CompressedSlots::Flate2(compressed); + std::mem::swap(self, &mut new); + Ok(()) + } + CompressedSlots::Flate2(_) => Ok(()), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq)] +pub struct EpochSlots { + pub from: Pubkey, + pub slots: Vec, + pub wallclock: u64, +} + +impl EpochSlots { + pub fn new(from: Pubkey, now: u64) -> Self { + Self { + from, + wallclock: now, + slots: vec![], + } + } + pub fn fill(&mut self, slots: &[Slot], now: u64) -> usize { + let mut num = 0; + self.wallclock = std::cmp::max(now, self.wallclock + 1); + while num < slots.len() { + num += self.add(&slots[num..]); + if num < slots.len() { + if self.deflate().is_err() { + return num; + } + let space = self.max_compressed_slot_size(); + if space > 0 { + let cslot = CompressedSlots::new(space as usize); + self.slots.push(cslot); + } else { + return num; + } + } + } + num + } + pub fn add(&mut self, slots: &[Slot]) -> usize { + let mut num = 0; + for s in &mut self.slots { + num += s.add(&slots[num..]); + if num >= slots.len() { + break; + } + } + num + } + pub fn deflate(&mut self) -> Result<()> { + for s in self.slots.iter_mut() { + s.deflate()?; + } + Ok(()) + } + pub fn max_compressed_slot_size(&self) -> isize { + let len_header = serialized_size(self).unwrap(); + let len_slot = serialized_size(&CompressedSlots::default()).unwrap(); + MAX_CRDS_OBJECT_SIZE as isize - (len_header + len_slot) as isize + } + + pub fn first_slot(&self) -> Option { + self.slots.iter().map(|s| s.first_slot()).min() + } + + pub fn to_slots(&self, min_slot: Slot) -> Vec { + self.slots + .iter() + .filter(|s| min_slot < s.first_slot() + s.num_slots() as u64) + .filter_map(|s| s.to_slots(min_slot).ok()) + .flatten() + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_epoch_slots_max_size() { + let epoch_slots = EpochSlots::default(); + assert!(epoch_slots.max_compressed_slot_size() > 0); + } + + #[test] + fn test_epoch_slots_uncompressed_add_1() { + let mut slots = Uncompressed::new(1); + assert_eq!(slots.slots.capacity(), 8); + assert_eq!(slots.add(&[1]), 1); + assert_eq!(slots.to_slots(1), vec![1]); + assert!(slots.to_slots(2).is_empty()); + } + #[test] + fn test_epoch_slots_uncompressed_add_2() { + let mut slots = Uncompressed::new(1); + assert_eq!(slots.add(&[1, 2]), 2); + assert_eq!(slots.to_slots(1), vec![1, 2]); + } + #[test] + fn test_epoch_slots_uncompressed_add_3a() { + let mut slots = Uncompressed::new(1); + assert_eq!(slots.add(&[1, 3, 2]), 3); + assert_eq!(slots.to_slots(1), vec![1, 2, 3]); + } + + #[test] + fn test_epoch_slots_uncompressed_add_3b() { + let mut slots = Uncompressed::new(1); + assert_eq!(slots.add(&[1, 10, 2]), 1); + assert_eq!(slots.to_slots(1), vec![1]); + } + + #[test] + fn test_epoch_slots_uncompressed_add_3c() { + let mut slots = Uncompressed::new(2); + assert_eq!(slots.add(&[1, 10, 2]), 3); + assert_eq!(slots.to_slots(1), vec![1, 2, 10]); + assert_eq!(slots.to_slots(2), vec![2, 10]); + assert_eq!(slots.to_slots(3), vec![10]); + assert_eq!(slots.to_slots(11).is_empty(), true); + } + #[test] + fn test_epoch_slots_compressed() { + let mut slots = Uncompressed::new(100); + slots.add(&[1, 701, 2]); + assert_eq!(slots.num, 701); + let compressed = Flate2::deflate(slots).unwrap(); + assert_eq!(compressed.first_slot, 1); + assert_eq!(compressed.num, 701); + assert!(compressed.compressed.len() < 32); + let slots = compressed.inflate().unwrap(); + assert_eq!(slots.first_slot, 1); + assert_eq!(slots.num, 701); + assert_eq!(slots.to_slots(1), vec![1, 2, 701]); + } + #[test] + fn test_epoch_slots_fill_range() { + let range: Vec = (0..5000).into_iter().collect(); + let mut slots = EpochSlots::default(); + assert_eq!(slots.fill(&range, 1), 5000); + assert_eq!(slots.wallclock, 1); + assert_eq!(slots.to_slots(0), range); + assert_eq!(slots.to_slots(4999), vec![4999]); + assert_eq!(slots.to_slots(5000).is_empty(), true); + } + #[test] + fn test_epoch_slots_fill_sparce_range() { + let range: Vec = (0..5000).into_iter().map(|x| x * 3).collect(); + let mut slots = EpochSlots::default(); + assert_eq!(slots.fill(&range, 2), 5000); + assert_eq!(slots.wallclock, 2); + assert_eq!(slots.slots.len(), 3); + assert_eq!(slots.slots[0].first_slot(), 0); + assert_ne!(slots.slots[0].num_slots(), 0); + let next = slots.slots[0].num_slots() as u64 + slots.slots[0].first_slot(); + assert!(slots.slots[1].first_slot() >= next); + assert_ne!(slots.slots[1].num_slots(), 0); + assert_ne!(slots.slots[2].num_slots(), 0); + assert_eq!(slots.to_slots(0), range); + assert_eq!(slots.to_slots(4999 * 3), vec![4999 * 3]); + } + + #[test] + fn test_epoch_slots_fill_large_sparce_range() { + let range: Vec = (0..5000).into_iter().map(|x| x * 7).collect(); + let mut slots = EpochSlots::default(); + assert_eq!(slots.fill(&range, 2), 5000); + assert_eq!(slots.to_slots(0), range); + } + + #[test] + fn test_epoch_slots_fill_uncompressed_random_range() { + use rand::Rng; + for _ in 0..10 { + let mut range: Vec = vec![]; + for _ in 0..5000 { + let last = *range.last().unwrap_or(&0); + range.push(last + rand::thread_rng().gen_range(1, 5)); + } + let sz = EpochSlots::default().max_compressed_slot_size(); + let mut slots = Uncompressed::new(sz as usize); + let sz = slots.add(&range); + let slots = slots.to_slots(0); + assert_eq!(slots.len(), sz); + assert_eq!(slots[..], range[..sz]); + } + } + + #[test] + fn test_epoch_slots_fill_compressed_random_range() { + use rand::Rng; + for _ in 0..10 { + let mut range: Vec = vec![]; + for _ in 0..5000 { + let last = *range.last().unwrap_or(&0); + range.push(last + rand::thread_rng().gen_range(1, 5)); + } + let sz = EpochSlots::default().max_compressed_slot_size(); + let mut slots = Uncompressed::new(sz as usize); + let sz = slots.add(&range); + let mut slots = CompressedSlots::Uncompressed(slots); + slots.deflate().unwrap(); + let slots = slots.to_slots(0).unwrap(); + assert_eq!(slots.len(), sz); + assert_eq!(slots[..], range[..sz]); + } + } + + #[test] + fn test_epoch_slots_fill_random_range() { + use rand::Rng; + for _ in 0..10 { + let mut range: Vec = vec![]; + for _ in 0..5000 { + let last = *range.last().unwrap_or(&0); + range.push(last + rand::thread_rng().gen_range(1, 5)); + } + let mut slots = EpochSlots::default(); + let sz = slots.fill(&range, 1); + let last = range[sz - 1]; + assert_eq!( + last, + slots.slots.last().unwrap().first_slot() + + slots.slots.last().unwrap().num_slots() as u64 + - 1 + ); + for s in &slots.slots { + assert!(s.to_slots(0).is_ok()); + } + let slots = slots.to_slots(0); + assert_eq!(slots[..], range[..slots.len()]); + assert_eq!(sz, slots.len()) + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index d0bd3b53f..1d13b3dcf 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,12 +9,14 @@ pub mod banking_stage; pub mod broadcast_stage; pub mod cluster_info_vote_listener; pub mod commitment; +mod deprecated; pub mod shred_fetch_stage; #[macro_use] pub mod contact_info; pub mod blockstream; pub mod blockstream_service; pub mod cluster_info; +pub mod cluster_slots; pub mod consensus; pub mod crds; pub mod crds_gossip; @@ -22,6 +24,7 @@ pub mod crds_gossip_error; pub mod crds_gossip_pull; pub mod crds_gossip_push; pub mod crds_value; +pub mod epoch_slots; pub mod fetch_stage; pub mod gen_keys; pub mod genesis_utils; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index a1a37c8a0..7174a9f03 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -2,6 +2,7 @@ //! regularly finds missing shreds in the ledger and sends repair requests for those shreds use crate::{ cluster_info::ClusterInfo, + cluster_slots::ClusterSlots, result::Result, serve_repair::{RepairType, ServeRepair}, }; @@ -9,13 +10,11 @@ use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, }; -use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey}; use std::{ - collections::{BTreeSet, HashSet}, iter::Iterator, + net::SocketAddr, net::UdpSocket, - ops::Bound::{Included, Unbounded}, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, thread::sleep, @@ -27,9 +26,6 @@ pub const MAX_REPAIR_LENGTH: usize = 512; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; -const MAX_COMPLETED_SLOT_CACHE_LEN: usize = 256; -const COMPLETED_SLOT_CACHE_FLUSH_TRIGGER: usize = 512; - pub enum RepairStrategy { RepairRange(RepairSlotRange), RepairAll { @@ -89,23 +85,10 @@ impl RepairService { repair_strategy: RepairStrategy, ) { let serve_repair = ServeRepair::new(cluster_info.clone()); - let mut epoch_slots: BTreeSet = BTreeSet::new(); - let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); let id = cluster_info.read().unwrap().id(); - if let RepairStrategy::RepairAll { - ref epoch_schedule, .. - } = repair_strategy - { - let current_root = blockstore.last_root(); - Self::initialize_epoch_slots( - id, - blockstore, - &mut epoch_slots, - &old_incomplete_slots, - current_root, - epoch_schedule, - cluster_info, - ); + let mut cluster_slots = ClusterSlots::default(); + if let RepairStrategy::RepairAll { .. } = repair_strategy { + Self::initialize_lowest_slot(id, blockstore, cluster_info); } loop { if exit.load(Ordering::Relaxed) { @@ -125,26 +108,28 @@ impl RepairService { RepairStrategy::RepairAll { ref completed_slots_receiver, + ref bank_forks, .. } => { let new_root = blockstore.last_root(); let lowest_slot = blockstore.lowest_slot(); - Self::update_epoch_slots( - id, + Self::update_lowest_slot(&id, lowest_slot, &cluster_info); + Self::update_completed_slots( + &id, new_root, - lowest_slot, - &mut epoch_slots, - &mut old_incomplete_slots, - &cluster_info, + &mut cluster_slots, + blockstore, completed_slots_receiver, + &cluster_info, ); + cluster_slots.update(new_root, cluster_info, bank_forks); Self::generate_repairs(blockstore, new_root, MAX_REPAIR_LENGTH) } } }; if let Ok(repairs) = repairs { - let reqs: Vec<_> = repairs + let reqs: Vec<((SocketAddr, Vec), RepairType)> = repairs .into_iter() .filter_map(|repair_request| { serve_repair @@ -272,150 +257,54 @@ impl RepairService { } } - fn get_completed_slots_past_root( - blockstore: &Blockstore, - slots_in_gossip: &mut BTreeSet, - root: Slot, - epoch_schedule: &EpochSchedule, - ) { - let last_confirmed_epoch = epoch_schedule.get_leader_schedule_epoch(root); - let last_epoch_slot = epoch_schedule.get_last_slot_in_epoch(last_confirmed_epoch); - - let meta_iter = blockstore - .slot_meta_iterator(root + 1) - .expect("Couldn't get db iterator"); - - for (current_slot, meta) in meta_iter { - if current_slot > last_epoch_slot { - break; - } - if meta.is_full() { - slots_in_gossip.insert(current_slot); - } - } - } - - fn initialize_epoch_slots( + fn initialize_lowest_slot( id: Pubkey, blockstore: &Blockstore, - slots_in_gossip: &mut BTreeSet, - old_incomplete_slots: &BTreeSet, - root: Slot, - epoch_schedule: &EpochSchedule, cluster_info: &RwLock, ) { - Self::get_completed_slots_past_root(blockstore, slots_in_gossip, root, epoch_schedule); - // Safe to set into gossip because by this time, the leader schedule cache should // also be updated with the latest root (done in blockstore_processor) and thus // will provide a schedule to window_service for any incoming shreds up to the // last_confirmed_epoch. - cluster_info.write().unwrap().push_epoch_slots( - id, - root, - blockstore.lowest_slot(), - slots_in_gossip.clone(), - old_incomplete_slots, - ); + cluster_info + .write() + .unwrap() + .push_lowest_slot(id, blockstore.lowest_slot()); } - // Update the gossiped structure used for the "Repairmen" repair protocol. See docs - // for details. - fn update_epoch_slots( - id: Pubkey, - latest_known_root: Slot, - lowest_slot: Slot, - completed_slot_cache: &mut BTreeSet, - incomplete_slot_stash: &mut BTreeSet, - cluster_info: &RwLock, + fn update_completed_slots( + id: &Pubkey, + root: Slot, + cluster_slots: &mut ClusterSlots, + blockstore: &Blockstore, completed_slots_receiver: &CompletedSlotsReceiver, + cluster_info: &RwLock, ) { - let mut should_update = false; - while let Ok(completed_slots) = completed_slots_receiver.try_recv() { - for slot in completed_slots { - let last_slot_in_stash = *incomplete_slot_stash.iter().next_back().unwrap_or(&0); - let removed_from_stash = incomplete_slot_stash.remove(&slot); - // If the newly completed slot was not being tracked in stash, and is > last - // slot being tracked in stash, add it to cache. Also, update gossip - if !removed_from_stash && slot >= last_slot_in_stash { - should_update |= completed_slot_cache.insert(slot); - } - // If the slot was removed from stash, update gossip - should_update |= removed_from_stash; - } + let mine = cluster_slots.collect(id); + let mut slots: Vec = vec![]; + while let Ok(mut more) = completed_slots_receiver.try_recv() { + more.retain(|x| !mine.contains(x)); + slots.append(&mut more); } - - if should_update { - if completed_slot_cache.len() >= COMPLETED_SLOT_CACHE_FLUSH_TRIGGER { - Self::stash_old_incomplete_slots(completed_slot_cache, incomplete_slot_stash); - let lowest_completed_slot_in_cache = - *completed_slot_cache.iter().next().unwrap_or(&0); - Self::prune_incomplete_slot_stash( - incomplete_slot_stash, - lowest_completed_slot_in_cache, - ); - } - - cluster_info.write().unwrap().push_epoch_slots( - id, - latest_known_root, - lowest_slot, - completed_slot_cache.clone(), - incomplete_slot_stash, - ); - } - } - - fn stash_old_incomplete_slots(cache: &mut BTreeSet, stash: &mut BTreeSet) { - if cache.len() > MAX_COMPLETED_SLOT_CACHE_LEN { - let mut prev = *cache.iter().next().expect("Expected to find some slot"); - cache.remove(&prev); - while cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN { - let next = *cache.iter().next().expect("Expected to find some slot"); - cache.remove(&next); - // Prev slot and next slot are not included in incomplete slot list. - (prev + 1..next).for_each(|slot| { - stash.insert(slot); - }); - prev = next; - } - } - } - - fn prune_incomplete_slot_stash( - stash: &mut BTreeSet, - lowest_completed_slot_in_cache: Slot, - ) { - if let Some(oldest_incomplete_slot) = stash.iter().next() { - // Prune old slots - // Prune in batches to reduce overhead. Pruning starts when oldest slot is 1.5 epochs - // earlier than the new root. But, we prune all the slots that are older than 1 epoch. - // So slots in a batch of half epoch are getting pruned - if oldest_incomplete_slot + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 - < lowest_completed_slot_in_cache - { - let oldest_slot_to_retain = - lowest_completed_slot_in_cache.saturating_sub(DEFAULT_SLOTS_PER_EPOCH); - *stash = stash - .range((Included(&oldest_slot_to_retain), Unbounded)) - .cloned() - .collect(); - } - } - } - - #[allow(dead_code)] - fn find_incomplete_slots(blockstore: &Blockstore, root: Slot) -> HashSet { blockstore .live_slots_iterator(root) - .filter_map(|(slot, slot_meta)| { - if !slot_meta.is_full() { - Some(slot) - } else { - None + .for_each(|(slot, slot_meta)| { + if slot_meta.is_full() && !mine.contains(&slot) { + slots.push(slot) } - }) - .collect() + }); + slots.sort(); + slots.dedup(); + if !slots.is_empty() { + cluster_info.write().unwrap().push_epoch_slots(&slots); + } + } + + fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &RwLock) { + cluster_info + .write() + .unwrap() + .push_lowest_slot(*id, lowest_slot); } pub fn join(self) -> thread::Result<()> { @@ -427,15 +316,11 @@ impl RepairService { mod test { use super::*; use crate::cluster_info::Node; - use itertools::Itertools; - use rand::seq::SliceRandom; - use rand::{thread_rng, Rng}; use solana_ledger::blockstore::{ make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, }; use solana_ledger::shred::max_ticks_per_n_shreds; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; - use std::thread::Builder; #[test] pub fn test_repair_orphan() { @@ -651,342 +536,19 @@ mod test { } #[test] - pub fn test_get_completed_slots_past_root() { - let blockstore_path = get_tmp_ledger_path!(); - { - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let num_entries_per_slot = 10; - let root = 10; - - let fork1 = vec![5, 7, root, 15, 20, 21]; - let fork1_shreds: Vec<_> = make_chaining_slot_entries(&fork1, num_entries_per_slot) - .into_iter() - .flat_map(|(shreds, _)| shreds) - .collect(); - let fork2 = vec![8, 12]; - let fork2_shreds = make_chaining_slot_entries(&fork2, num_entries_per_slot); - - // Remove the last shred from each slot to make an incomplete slot - let fork2_incomplete_shreds: Vec<_> = fork2_shreds - .into_iter() - .flat_map(|(mut shreds, _)| { - shreds.pop(); - shreds - }) - .collect(); - let mut full_slots = BTreeSet::new(); - - blockstore.insert_shreds(fork1_shreds, None, false).unwrap(); - blockstore - .insert_shreds(fork2_incomplete_shreds, None, false) - .unwrap(); - - // Test that only slots > root from fork1 were included - let epoch_schedule = EpochSchedule::custom(32, 32, false); - - RepairService::get_completed_slots_past_root( - &blockstore, - &mut full_slots, - root, - &epoch_schedule, - ); - - let mut expected: BTreeSet<_> = fork1.into_iter().filter(|x| *x > root).collect(); - assert_eq!(full_slots, expected); - - // Test that slots past the last confirmed epoch boundary don't get included - let last_epoch = epoch_schedule.get_leader_schedule_epoch(root); - let last_slot = epoch_schedule.get_last_slot_in_epoch(last_epoch); - let fork3 = vec![last_slot, last_slot + 1]; - let fork3_shreds: Vec<_> = make_chaining_slot_entries(&fork3, num_entries_per_slot) - .into_iter() - .flat_map(|(shreds, _)| shreds) - .collect(); - blockstore.insert_shreds(fork3_shreds, None, false).unwrap(); - RepairService::get_completed_slots_past_root( - &blockstore, - &mut full_slots, - root, - &epoch_schedule, - ); - expected.insert(last_slot); - assert_eq!(full_slots, expected); - } - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - - #[test] - pub fn test_update_epoch_slots() { - let blockstore_path = get_tmp_ledger_path!(); - { - // Create blockstore - let (blockstore, _, completed_slots_receiver) = - Blockstore::open_with_signal(&blockstore_path).unwrap(); - - let blockstore = Arc::new(blockstore); - - let mut root = 0; - let num_slots = 100; - let entries_per_slot = 5; - let blockstore_ = blockstore.clone(); - - // Spin up thread to write to blockstore - let writer = Builder::new() - .name("writer".to_string()) - .spawn(move || { - let slots: Vec<_> = (1..num_slots + 1).collect(); - let mut shreds: Vec<_> = make_chaining_slot_entries(&slots, entries_per_slot) - .into_iter() - .flat_map(|(shreds, _)| shreds) - .collect(); - shreds.shuffle(&mut thread_rng()); - let mut i = 0; - let max_step = entries_per_slot * 4; - let repair_interval_ms = 10; - let mut rng = rand::thread_rng(); - let num_shreds = shreds.len(); - while i < num_shreds { - let step = rng.gen_range(1, max_step + 1) as usize; - let step = std::cmp::min(step, num_shreds - i); - let shreds_to_insert = shreds.drain(..step).collect_vec(); - blockstore_ - .insert_shreds(shreds_to_insert, None, false) - .unwrap(); - sleep(Duration::from_millis(repair_interval_ms)); - i += step; - } - }) - .unwrap(); - - let mut completed_slots = BTreeSet::new(); - let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); - let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( - node_info.info.clone(), - )); - - let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); - while completed_slots.len() < num_slots as usize { - RepairService::update_epoch_slots( - Pubkey::default(), - root, - blockstore.lowest_slot(), - &mut completed_slots, - &mut old_incomplete_slots, - &cluster_info, - &completed_slots_receiver, - ); - } - - let mut expected: BTreeSet<_> = (1..num_slots + 1).collect(); - assert_eq!(completed_slots, expected); - - // Update with new root, should filter out the slots <= root - root = num_slots / 2; - let (shreds, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot); - blockstore.insert_shreds(shreds, None, false).unwrap(); - RepairService::update_epoch_slots( - Pubkey::default(), - root, - 0, - &mut completed_slots, - &mut old_incomplete_slots, - &cluster_info, - &completed_slots_receiver, - ); - expected.insert(num_slots + 2); - assert_eq!(completed_slots, expected); - writer.join().unwrap(); - } - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - - #[test] - fn test_stash_old_incomplete_slots() { - let mut cache: BTreeSet = BTreeSet::new(); - let mut stash: BTreeSet = BTreeSet::new(); - - // When cache is empty. - RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); - assert_eq!(stash.len(), 0); - - // Insert some slots in cache ( < MAX_COMPLETED_SLOT_CACHE_LEN + 1) - cache.insert(101); - cache.insert(102); - cache.insert(104); - cache.insert(105); - - // Not enough slots in cache. So stash should remain empty. - RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); - assert_eq!(stash.len(), 0); - assert_eq!(cache.len(), 4); - - // Insert slots in cache ( = MAX_COMPLETED_SLOT_CACHE_LEN) - let mut cache: BTreeSet = BTreeSet::new(); - (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64) - .into_iter() - .for_each(|slot| { - cache.insert(slot); - }); - - // Not enough slots in cache. So stash should remain empty. - RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); - assert_eq!(stash.len(), 0); - assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN); - - // Insert 1 more to cross the threshold - cache.insert(MAX_COMPLETED_SLOT_CACHE_LEN as u64); - RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); - // Stash is still empty, as no missing slots - assert_eq!(stash.len(), 0); - // It removed some entries from cache - assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); - - // Insert more slots to create a missing slot - let mut cache: BTreeSet = BTreeSet::new(); - cache.insert(0); - (2..=MAX_COMPLETED_SLOT_CACHE_LEN as u64 + 2) - .into_iter() - .for_each(|slot| { - cache.insert(slot); - }); - RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); - - // Stash is not empty - assert!(stash.contains(&1)); - // It removed some entries from cache - assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); - - // Test multiple missing slots at dispersed locations - let mut cache: BTreeSet = BTreeSet::new(); - (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64 * 2) - .into_iter() - .for_each(|slot| { - cache.insert(slot); - }); - - cache.remove(&10); - cache.remove(&11); - - cache.remove(&28); - cache.remove(&29); - - cache.remove(&148); - cache.remove(&149); - cache.remove(&150); - cache.remove(&151); - - RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); - - // Stash is not empty - assert!(stash.contains(&10)); - assert!(stash.contains(&11)); - assert!(stash.contains(&28)); - assert!(stash.contains(&29)); - assert!(stash.contains(&148)); - assert!(stash.contains(&149)); - assert!(stash.contains(&150)); - assert!(stash.contains(&151)); - - assert!(!stash.contains(&147)); - assert!(!stash.contains(&152)); - // It removed some entries from cache - assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); - (MAX_COMPLETED_SLOT_CACHE_LEN + 1..MAX_COMPLETED_SLOT_CACHE_LEN * 2) - .into_iter() - .for_each(|slot| { - let slot: u64 = slot as u64; - assert!(cache.contains(&slot)); - }); - } - - #[test] - fn test_prune_incomplete_slot_stash() { - // Prune empty stash - let mut stash: BTreeSet = BTreeSet::new(); - RepairService::prune_incomplete_slot_stash(&mut stash, 0); - assert!(stash.is_empty()); - - // Prune stash with slots < DEFAULT_SLOTS_PER_EPOCH - stash.insert(0); - stash.insert(10); - stash.insert(11); - stash.insert(50); - assert_eq!(stash.len(), 4); - RepairService::prune_incomplete_slot_stash(&mut stash, 100); - assert_eq!(stash.len(), 4); - - // Prune stash with slots > DEFAULT_SLOTS_PER_EPOCH, but < 1.5 * DEFAULT_SLOTS_PER_EPOCH - stash.insert(DEFAULT_SLOTS_PER_EPOCH + 50); - assert_eq!(stash.len(), 5); - RepairService::prune_incomplete_slot_stash(&mut stash, DEFAULT_SLOTS_PER_EPOCH + 100); - assert_eq!(stash.len(), 5); - - // Prune stash with slots > 1.5 * DEFAULT_SLOTS_PER_EPOCH - stash.insert(DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2); - assert_eq!(stash.len(), 6); - RepairService::prune_incomplete_slot_stash( - &mut stash, - DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + 1, - ); - assert_eq!(stash.len(), 2); - } - - #[test] - fn test_find_incomplete_slots() { - let blockstore_path = get_tmp_ledger_path!(); - { - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let num_entries_per_slot = 100; - let (mut shreds, _) = make_slot_entries(0, 0, num_entries_per_slot); - assert!(shreds.len() > 1); - let (shreds4, _) = make_slot_entries(4, 0, num_entries_per_slot); - shreds.extend(shreds4); - blockstore.insert_shreds(shreds, None, false).unwrap(); - - // Nothing is incomplete - assert!(RepairService::find_incomplete_slots(&blockstore, 0).is_empty()); - - // Insert a slot 5 that chains to an incomplete orphan slot 3 - let (shreds5, _) = make_slot_entries(5, 3, num_entries_per_slot); - blockstore.insert_shreds(shreds5, None, false).unwrap(); - assert_eq!( - RepairService::find_incomplete_slots(&blockstore, 0), - vec![3].into_iter().collect() - ); - - // Insert another incomplete orphan slot 2 that is the parent of slot 3. - // Both should be incomplete - let (shreds3, _) = make_slot_entries(3, 2, num_entries_per_slot); - blockstore - .insert_shreds(shreds3[1..].to_vec(), None, false) - .unwrap(); - assert_eq!( - RepairService::find_incomplete_slots(&blockstore, 0), - vec![2, 3].into_iter().collect() - ); - - // Insert a incomplete slot 6 that chains to the root 0, - // should also be incomplete - let (shreds6, _) = make_slot_entries(6, 0, num_entries_per_slot); - blockstore - .insert_shreds(shreds6[1..].to_vec(), None, false) - .unwrap(); - assert_eq!( - RepairService::find_incomplete_slots(&blockstore, 0), - vec![2, 3, 6].into_iter().collect() - ); - - // Complete slot 3, should no longer be marked incomplete - blockstore - .insert_shreds(shreds3[..].to_vec(), None, false) - .unwrap(); - - assert_eq!( - RepairService::find_incomplete_slots(&blockstore, 0), - vec![2, 6].into_iter().collect() - ); - } - - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + pub fn test_update_lowest_slot() { + let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); + let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( + node_info.info.clone(), + )); + RepairService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info); + let lowest = cluster_info + .read() + .unwrap() + .get_lowest_slot_for_node(&Pubkey::default(), None) + .unwrap() + .0 + .clone(); + assert_eq!(lowest.lowest, 5); } }