diff --git a/Cargo.lock b/Cargo.lock index 6ee6541a1..44cddfe39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -560,6 +560,17 @@ dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "compression" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "console" version = "0.9.2" @@ -3829,6 +3840,7 @@ dependencies = [ "bs58 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "core_affinity 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "ed25519-dalek 1.0.0-pre.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6085,6 +6097,7 @@ dependencies = [ "checksum codespan-reporting 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab081a14ab8f9598ce826890fe896d0addee68c7a58ab49008369ccbb51510a8" "checksum colored 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6cdb90b60f2927f8d76139c72dbde7e10c3a2bc47c8594c9c7a66529f2687c03" "checksum combine 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1645a65a99c7c8d345761f4b75a6ffe5be3b3b27a93ee731fccc5050ba6be97c" +"checksum compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3a82b366ae14633c67a1cbb4aa3738210a23f77d2868a0fd50faa23a956f9ec4" "checksum console 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "45e0f3986890b3acbc782009e2629dfe2baa430ac091519ce3be26164a2ae6c0" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5" diff --git a/core/Cargo.toml b/core/Cargo.toml index fb0a7a92d..ab41dde0f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,6 +18,7 @@ bincode = "1.2.1" bs58 = "0.3.0" byteorder = "1.3.2" chrono = { version = "0.4.10", features = ["serde"] } +compression = "0.1.5" core_affinity = "0.5.10" crossbeam-channel = "0.3" fs_extra = "1.1.0" diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 18ff3ea4a..beefe8ce6 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -26,6 +26,7 @@ use crate::{ weighted_shuffle::{weighted_best, weighted_shuffle}, }; use bincode::{serialize, serialized_size}; +use compression::prelude::*; use core::cmp; use itertools::Itertools; use solana_ledger::{bank_forks::BankForks, staking_utils}; @@ -307,10 +308,75 @@ impl ClusterInfo { ) } - pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet) { + pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet) -> (Slot, Vec) { + if !incomplete_slots.is_empty() { + let first_slot = incomplete_slots + .iter() + .next() + .expect("expected to find at least one slot"); + let last_slot = incomplete_slots + .iter() + .next_back() + .expect("expected to find last slot"); + let mut uncompressed = vec![0u8; (last_slot.saturating_sub(*first_slot) + 1) as usize]; + incomplete_slots.iter().for_each(|slot| { + uncompressed[slot.saturating_sub(*first_slot) as usize] = 1; + }); + if let Ok(compressed) = uncompressed + .iter() + .cloned() + .encode(&mut GZipEncoder::new(), Action::Finish) + .collect::, _>>() + { + (*first_slot, compressed) + } else { + (0, vec![]) + } + } else { + (0, vec![]) + } + } + + pub fn decompress_incomplete_slots(first_slot: u64, compressed: &[u8]) -> BTreeSet { + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); + + if let Ok(decompressed) = compressed + .iter() + .cloned() + .decode(&mut GZipDecoder::new()) + .collect::, _>>() + { + decompressed.iter().enumerate().for_each(|(i, val)| { + if *val == 1 { + old_incomplete_slots.insert(first_slot + i as u64); + } + }) + } + + old_incomplete_slots + } + + pub fn push_epoch_slots( + &mut self, + id: Pubkey, + root: Slot, + min: Slot, + slots: BTreeSet, + incomplete_slots: &BTreeSet, + ) { + let (first_missing_slot, compressed_map) = + Self::compress_incomplete_slots(incomplete_slots); let now = timestamp(); let entry = CrdsValue::new_signed( - CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)), + CrdsData::EpochSlots(EpochSlots::new( + id, + root, + min, + slots, + first_missing_slot, + compressed_map, + now, + )), &self.keypair, ); self.gossip @@ -2133,6 +2199,8 @@ mod tests { root: 0, lowest: 0, slots: btree_slots, + first_missing: 0, + stash: vec![], wallclock: 0, })); test_split_messages(value); @@ -2150,6 +2218,8 @@ mod tests { root: 0, lowest: 0, slots: BTreeSet::new(), + first_missing: 0, + stash: vec![], wallclock: 0, })); @@ -2168,6 +2238,8 @@ mod tests { root: 0, lowest: 0, slots, + first_missing: 0, + stash: vec![], wallclock: 0, }); i += 1; @@ -2314,6 +2386,8 @@ mod tests { peer_root, peer_lowest, BTreeSet::new(), + 0, + vec![], timestamp(), ))); let _ = cluster_info.gossip.crds.insert(value, timestamp()); @@ -2375,4 +2449,32 @@ mod tests { serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; PACKET_DATA_SIZE - (protocol_size - filter_size) } + + #[test] + fn test_compress_incomplete_slots() { + let mut incomplete_slots: BTreeSet = BTreeSet::new(); + + assert_eq!( + (0, vec![]), + ClusterInfo::compress_incomplete_slots(&incomplete_slots) + ); + + incomplete_slots.insert(100); + let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, first); + let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(104); + let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(100, first); + let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + assert_eq!(incomplete_slots, decompressed); + + incomplete_slots.insert(80); + let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots); + assert_eq!(80, first); + let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed); + assert_eq!(incomplete_slots, decompressed); + } } diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 06d44e35e..93a5d4d11 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -67,6 +67,8 @@ pub struct EpochSlots { pub root: Slot, pub lowest: Slot, pub slots: BTreeSet, + pub first_missing: Slot, + pub stash: Vec, pub wallclock: u64, } @@ -76,6 +78,8 @@ impl EpochSlots { root: Slot, lowest: Slot, slots: BTreeSet, + first_missing: Slot, + stash: Vec, wallclock: u64, ) -> Self { Self { @@ -83,6 +87,8 @@ impl EpochSlots { root, lowest, slots, + first_missing, + stash, wallclock, } } @@ -283,6 +289,8 @@ mod test { 0, BTreeSet::new(), 0, + vec![], + 0, ))); assert_eq!(v.wallclock(), 0); let key = v.clone().epoch_slots().unwrap().from; @@ -309,6 +317,8 @@ mod test { 0, 0, btreeset, + 0, + vec![], timestamp(), ))); verify_signatures(&mut v, &keypair, &wrong_keypair); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index d6a849964..a31f22de6 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -9,11 +9,12 @@ use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, }; +use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey}; use std::{ collections::BTreeSet, net::UdpSocket, - ops::Bound::{Excluded, Unbounded}, + ops::Bound::{Included, Unbounded}, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, thread::sleep, @@ -25,6 +26,9 @@ pub const MAX_REPAIR_LENGTH: usize = 512; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; +const MAX_COMPLETED_SLOT_CACHE_LEN: usize = 256; +const COMPLETED_SLOT_CACHE_FLUSH_TRIGGER: usize = 512; + pub enum RepairStrategy { RepairRange(RepairSlotRange), RepairAll { @@ -85,17 +89,18 @@ impl RepairService { ) { let serve_repair = ServeRepair::new(cluster_info.clone()); let mut epoch_slots: BTreeSet = BTreeSet::new(); + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); let id = cluster_info.read().unwrap().id(); - let mut current_root = 0; if let RepairStrategy::RepairAll { ref epoch_schedule, .. } = repair_strategy { - current_root = blockstore.last_root(); + let current_root = blockstore.last_root(); Self::initialize_epoch_slots( id, blockstore, &mut epoch_slots, + &old_incomplete_slots, current_root, epoch_schedule, cluster_info, @@ -127,8 +132,8 @@ impl RepairService { id, new_root, lowest_slot, - &mut current_root, &mut epoch_slots, + &mut old_incomplete_slots, &cluster_info, completed_slots_receiver, ); @@ -292,6 +297,7 @@ impl RepairService { id: Pubkey, blockstore: &Blockstore, slots_in_gossip: &mut BTreeSet, + old_incomplete_slots: &BTreeSet, root: Slot, epoch_schedule: &EpochSchedule, cluster_info: &RwLock, @@ -307,6 +313,7 @@ impl RepairService { root, blockstore.lowest_slot(), slots_in_gossip.clone(), + old_incomplete_slots, ); } @@ -316,44 +323,83 @@ impl RepairService { id: Pubkey, latest_known_root: Slot, lowest_slot: Slot, - prev_root: &mut Slot, - slots_in_gossip: &mut BTreeSet, + completed_slot_cache: &mut BTreeSet, + incomplete_slot_stash: &mut BTreeSet, cluster_info: &RwLock, completed_slots_receiver: &CompletedSlotsReceiver, ) { - // If the latest known root is different, update gossip. - let mut should_update = latest_known_root != *prev_root; + let mut should_update = false; while let Ok(completed_slots) = completed_slots_receiver.try_recv() { for slot in completed_slots { - // If the newly completed slot > root, and the set did not contain this value - // before, we should update gossip. - if slot > latest_known_root { - should_update |= slots_in_gossip.insert(slot); + let last_slot_in_stash = *incomplete_slot_stash.iter().next_back().unwrap_or(&0); + let removed_from_stash = incomplete_slot_stash.remove(&slot); + // If the newly completed slot was not being tracked in stash, and is > last + // slot being tracked in stash, add it to cache. Also, update gossip + if !removed_from_stash && slot >= last_slot_in_stash { + should_update |= completed_slot_cache.insert(slot); } + // If the slot was removed from stash, update gossip + should_update |= removed_from_stash; } } if should_update { - // Filter out everything <= root - if latest_known_root != *prev_root { - *prev_root = latest_known_root; - Self::retain_slots_greater_than_root(slots_in_gossip, latest_known_root); + if completed_slot_cache.len() >= COMPLETED_SLOT_CACHE_FLUSH_TRIGGER { + Self::stash_old_incomplete_slots(completed_slot_cache, incomplete_slot_stash); + let lowest_completed_slot_in_cache = + *completed_slot_cache.iter().next().unwrap_or(&0); + Self::prune_incomplete_slot_stash( + incomplete_slot_stash, + lowest_completed_slot_in_cache, + ); } cluster_info.write().unwrap().push_epoch_slots( id, latest_known_root, lowest_slot, - slots_in_gossip.clone(), + completed_slot_cache.clone(), + incomplete_slot_stash, ); } } - fn retain_slots_greater_than_root(slot_set: &mut BTreeSet, root: Slot) { - *slot_set = slot_set - .range((Excluded(&root), Unbounded)) - .cloned() - .collect(); + fn stash_old_incomplete_slots(cache: &mut BTreeSet, stash: &mut BTreeSet) { + if cache.len() > MAX_COMPLETED_SLOT_CACHE_LEN { + let mut prev = *cache.iter().next().expect("Expected to find some slot"); + cache.remove(&prev); + while cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN { + let next = *cache.iter().next().expect("Expected to find some slot"); + cache.remove(&next); + // Prev slot and next slot are not included in incomplete slot list. + (prev + 1..next).for_each(|slot| { + stash.insert(slot); + }); + prev = next; + } + } + } + + fn prune_incomplete_slot_stash( + stash: &mut BTreeSet, + lowest_completed_slot_in_cache: Slot, + ) { + if let Some(oldest_incomplete_slot) = stash.iter().next() { + // Prune old slots + // Prune in batches to reduce overhead. Pruning starts when oldest slot is 1.5 epochs + // earlier than the new root. But, we prune all the slots that are older than 1 epoch. + // So slots in a batch of half epoch are getting pruned + if oldest_incomplete_slot + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + < lowest_completed_slot_in_cache + { + let oldest_slot_to_retain = + lowest_completed_slot_in_cache.saturating_sub(DEFAULT_SLOTS_PER_EPOCH); + *stash = stash + .range((Included(&oldest_slot_to_retain), Unbounded)) + .cloned() + .collect(); + } + } } pub fn join(self) -> thread::Result<()> { @@ -373,7 +419,6 @@ mod test { }; use solana_ledger::shred::max_ticks_per_n_shreds; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; - use std::sync::mpsc::channel; use std::thread::Builder; #[test] @@ -703,13 +748,14 @@ mod test { node_info.info.clone(), )); + let mut old_incomplete_slots: BTreeSet = BTreeSet::new(); while completed_slots.len() < num_slots as usize { RepairService::update_epoch_slots( Pubkey::default(), root, blockstore.lowest_slot(), - &mut root.clone(), &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); @@ -726,13 +772,12 @@ mod test { Pubkey::default(), root, 0, - &mut 0, &mut completed_slots, + &mut old_incomplete_slots, &cluster_info, &completed_slots_receiver, ); expected.insert(num_slots + 2); - RepairService::retain_slots_greater_than_root(&mut expected, root); assert_eq!(completed_slots, expected); writer.join().unwrap(); } @@ -740,95 +785,133 @@ mod test { } #[test] - fn test_update_epoch_slots_new_root() { - let mut current_root = 0; + fn test_stash_old_incomplete_slots() { + let mut cache: BTreeSet = BTreeSet::new(); + let mut stash: BTreeSet = BTreeSet::new(); - let mut completed_slots = BTreeSet::new(); - let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); - let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( - node_info.info.clone(), - )); - let my_pubkey = Pubkey::new_rand(); - let (completed_slots_sender, completed_slots_receiver) = channel(); + // When cache is empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); - // Send a new slot before the root is updated - let newly_completed_slot = 63; - completed_slots_sender - .send(vec![newly_completed_slot]) - .unwrap(); - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root, - 0, - &mut current_root.clone(), - &mut completed_slots, - &cluster_info, - &completed_slots_receiver, + // Insert some slots in cache ( < MAX_COMPLETED_SLOT_CACHE_LEN + 1) + cache.insert(101); + cache.insert(102); + cache.insert(104); + cache.insert(105); + + // Not enough slots in cache. So stash should remain empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); + assert_eq!(cache.len(), 4); + + // Insert slots in cache ( = MAX_COMPLETED_SLOT_CACHE_LEN) + let mut cache: BTreeSet = BTreeSet::new(); + (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64) + .into_iter() + .for_each(|slot| { + cache.insert(slot); + }); + + // Not enough slots in cache. So stash should remain empty. + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + assert_eq!(stash.len(), 0); + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN); + + // Insert 1 more to cross the threshold + cache.insert(MAX_COMPLETED_SLOT_CACHE_LEN as u64); + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + // Stash is still empty, as no missing slots + assert_eq!(stash.len(), 0); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + + // Insert more slots to create a missing slot + let mut cache: BTreeSet = BTreeSet::new(); + cache.insert(0); + (2..=MAX_COMPLETED_SLOT_CACHE_LEN as u64 + 2) + .into_iter() + .for_each(|slot| { + cache.insert(slot); + }); + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + + // Stash is not empty + assert!(stash.contains(&1)); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + + // Test multiple missing slots at dispersed locations + let mut cache: BTreeSet = BTreeSet::new(); + (0..MAX_COMPLETED_SLOT_CACHE_LEN as u64 * 2) + .into_iter() + .for_each(|slot| { + cache.insert(slot); + }); + + cache.remove(&10); + cache.remove(&11); + + cache.remove(&28); + cache.remove(&29); + + cache.remove(&148); + cache.remove(&149); + cache.remove(&150); + cache.remove(&151); + + RepairService::stash_old_incomplete_slots(&mut cache, &mut stash); + + // Stash is not empty + assert!(stash.contains(&10)); + assert!(stash.contains(&11)); + assert!(stash.contains(&28)); + assert!(stash.contains(&29)); + assert!(stash.contains(&148)); + assert!(stash.contains(&149)); + assert!(stash.contains(&150)); + assert!(stash.contains(&151)); + + assert!(!stash.contains(&147)); + assert!(!stash.contains(&152)); + // It removed some entries from cache + assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1); + (MAX_COMPLETED_SLOT_CACHE_LEN + 1..MAX_COMPLETED_SLOT_CACHE_LEN * 2) + .into_iter() + .for_each(|slot| { + let slot: u64 = slot as u64; + assert!(cache.contains(&slot)); + }); + } + + #[test] + fn test_prune_incomplete_slot_stash() { + // Prune empty stash + let mut stash: BTreeSet = BTreeSet::new(); + RepairService::prune_incomplete_slot_stash(&mut stash, 0); + assert!(stash.is_empty()); + + // Prune stash with slots < DEFAULT_SLOTS_PER_EPOCH + stash.insert(0); + stash.insert(10); + stash.insert(11); + stash.insert(50); + assert_eq!(stash.len(), 4); + RepairService::prune_incomplete_slot_stash(&mut stash, 100); + assert_eq!(stash.len(), 4); + + // Prune stash with slots > DEFAULT_SLOTS_PER_EPOCH, but < 1.5 * DEFAULT_SLOTS_PER_EPOCH + stash.insert(DEFAULT_SLOTS_PER_EPOCH + 50); + assert_eq!(stash.len(), 5); + RepairService::prune_incomplete_slot_stash(&mut stash, DEFAULT_SLOTS_PER_EPOCH + 100); + assert_eq!(stash.len(), 5); + + // Prune stash with slots > 1.5 * DEFAULT_SLOTS_PER_EPOCH + stash.insert(DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2); + assert_eq!(stash.len(), 6); + RepairService::prune_incomplete_slot_stash( + &mut stash, + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + 1, ); - - // We should see epoch state update - let (my_epoch_slots_in_gossip, updated_ts) = { - let r_cluster_info = cluster_info.read().unwrap(); - - let (my_epoch_slots_in_gossip, updated_ts) = r_cluster_info - .get_epoch_state_for_node(&my_pubkey, None) - .clone() - .unwrap(); - - (my_epoch_slots_in_gossip.clone(), updated_ts) - }; - - assert_eq!(my_epoch_slots_in_gossip.root, 0); - assert_eq!(current_root, 0); - assert_eq!(my_epoch_slots_in_gossip.slots.len(), 1); - assert!(my_epoch_slots_in_gossip - .slots - .contains(&newly_completed_slot)); - - // Calling update again with no updates to either the roots or set of completed slots - // should not update gossip - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root, - 0, - &mut current_root, - &mut completed_slots, - &cluster_info, - &completed_slots_receiver, - ); - - assert!(cluster_info - .read() - .unwrap() - .get_epoch_state_for_node(&my_pubkey, Some(updated_ts)) - .is_none()); - - sleep(Duration::from_millis(10)); - // Updating just the root again should update gossip (simulates replay stage updating root - // after a slot has been signaled as completed) - RepairService::update_epoch_slots( - my_pubkey.clone(), - current_root + 1, - 0, - &mut current_root, - &mut completed_slots, - &cluster_info, - &completed_slots_receiver, - ); - - let r_cluster_info = cluster_info.read().unwrap(); - - let (my_epoch_slots_in_gossip, _) = r_cluster_info - .get_epoch_state_for_node(&my_pubkey, Some(updated_ts)) - .clone() - .unwrap(); - - // Check the root was updated correctly - assert_eq!(my_epoch_slots_in_gossip.root, 1); - assert_eq!(current_root, 1); - assert_eq!(my_epoch_slots_in_gossip.slots.len(), 1); - assert!(my_epoch_slots_in_gossip - .slots - .contains(&newly_completed_slot)); + assert_eq!(stash.len(), 2); } }