Update epoch slots to include all missing slots (#8276)
* Update epoch slots to include all missing slots * new test for compress/decompress * address review comments * limit cache based on size, instead of comparing roots
This commit is contained in:
parent
027ec71aa9
commit
0d5c1239c6
|
@ -560,6 +560,17 @@ dependencies = [
|
|||
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "compression"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"cfg-if 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console"
|
||||
version = "0.9.2"
|
||||
|
@ -3829,6 +3840,7 @@ dependencies = [
|
|||
"bs58 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"core_affinity 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"ed25519-dalek 1.0.0-pre.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -6085,6 +6097,7 @@ dependencies = [
|
|||
"checksum codespan-reporting 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ab081a14ab8f9598ce826890fe896d0addee68c7a58ab49008369ccbb51510a8"
|
||||
"checksum colored 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6cdb90b60f2927f8d76139c72dbde7e10c3a2bc47c8594c9c7a66529f2687c03"
|
||||
"checksum combine 2.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1645a65a99c7c8d345761f4b75a6ffe5be3b3b27a93ee731fccc5050ba6be97c"
|
||||
"checksum compression 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3a82b366ae14633c67a1cbb4aa3738210a23f77d2868a0fd50faa23a956f9ec4"
|
||||
"checksum console 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "45e0f3986890b3acbc782009e2629dfe2baa430ac091519ce3be26164a2ae6c0"
|
||||
"checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e"
|
||||
"checksum cookie 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "888604f00b3db336d2af898ec3c1d5d0ddf5e6d462220f2ededc33a87ac4bbd5"
|
||||
|
|
|
@ -18,6 +18,7 @@ bincode = "1.2.1"
|
|||
bs58 = "0.3.0"
|
||||
byteorder = "1.3.2"
|
||||
chrono = { version = "0.4.10", features = ["serde"] }
|
||||
compression = "0.1.5"
|
||||
core_affinity = "0.5.10"
|
||||
crossbeam-channel = "0.3"
|
||||
fs_extra = "1.1.0"
|
||||
|
|
|
@ -26,6 +26,7 @@ use crate::{
|
|||
weighted_shuffle::{weighted_best, weighted_shuffle},
|
||||
};
|
||||
use bincode::{serialize, serialized_size};
|
||||
use compression::prelude::*;
|
||||
use core::cmp;
|
||||
use itertools::Itertools;
|
||||
use solana_ledger::{bank_forks::BankForks, staking_utils};
|
||||
|
@ -307,10 +308,75 @@ impl ClusterInfo {
|
|||
)
|
||||
}
|
||||
|
||||
pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet<Slot>) {
|
||||
pub fn compress_incomplete_slots(incomplete_slots: &BTreeSet<Slot>) -> (Slot, Vec<u8>) {
|
||||
if !incomplete_slots.is_empty() {
|
||||
let first_slot = incomplete_slots
|
||||
.iter()
|
||||
.next()
|
||||
.expect("expected to find at least one slot");
|
||||
let last_slot = incomplete_slots
|
||||
.iter()
|
||||
.next_back()
|
||||
.expect("expected to find last slot");
|
||||
let mut uncompressed = vec![0u8; (last_slot.saturating_sub(*first_slot) + 1) as usize];
|
||||
incomplete_slots.iter().for_each(|slot| {
|
||||
uncompressed[slot.saturating_sub(*first_slot) as usize] = 1;
|
||||
});
|
||||
if let Ok(compressed) = uncompressed
|
||||
.iter()
|
||||
.cloned()
|
||||
.encode(&mut GZipEncoder::new(), Action::Finish)
|
||||
.collect::<std::result::Result<Vec<u8>, _>>()
|
||||
{
|
||||
(*first_slot, compressed)
|
||||
} else {
|
||||
(0, vec![])
|
||||
}
|
||||
} else {
|
||||
(0, vec![])
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decompress_incomplete_slots(first_slot: u64, compressed: &[u8]) -> BTreeSet<Slot> {
|
||||
let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
|
||||
|
||||
if let Ok(decompressed) = compressed
|
||||
.iter()
|
||||
.cloned()
|
||||
.decode(&mut GZipDecoder::new())
|
||||
.collect::<std::result::Result<Vec<u8>, _>>()
|
||||
{
|
||||
decompressed.iter().enumerate().for_each(|(i, val)| {
|
||||
if *val == 1 {
|
||||
old_incomplete_slots.insert(first_slot + i as u64);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
old_incomplete_slots
|
||||
}
|
||||
|
||||
pub fn push_epoch_slots(
|
||||
&mut self,
|
||||
id: Pubkey,
|
||||
root: Slot,
|
||||
min: Slot,
|
||||
slots: BTreeSet<Slot>,
|
||||
incomplete_slots: &BTreeSet<Slot>,
|
||||
) {
|
||||
let (first_missing_slot, compressed_map) =
|
||||
Self::compress_incomplete_slots(incomplete_slots);
|
||||
let now = timestamp();
|
||||
let entry = CrdsValue::new_signed(
|
||||
CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)),
|
||||
CrdsData::EpochSlots(EpochSlots::new(
|
||||
id,
|
||||
root,
|
||||
min,
|
||||
slots,
|
||||
first_missing_slot,
|
||||
compressed_map,
|
||||
now,
|
||||
)),
|
||||
&self.keypair,
|
||||
);
|
||||
self.gossip
|
||||
|
@ -2133,6 +2199,8 @@ mod tests {
|
|||
root: 0,
|
||||
lowest: 0,
|
||||
slots: btree_slots,
|
||||
first_missing: 0,
|
||||
stash: vec![],
|
||||
wallclock: 0,
|
||||
}));
|
||||
test_split_messages(value);
|
||||
|
@ -2150,6 +2218,8 @@ mod tests {
|
|||
root: 0,
|
||||
lowest: 0,
|
||||
slots: BTreeSet::new(),
|
||||
first_missing: 0,
|
||||
stash: vec![],
|
||||
wallclock: 0,
|
||||
}));
|
||||
|
||||
|
@ -2168,6 +2238,8 @@ mod tests {
|
|||
root: 0,
|
||||
lowest: 0,
|
||||
slots,
|
||||
first_missing: 0,
|
||||
stash: vec![],
|
||||
wallclock: 0,
|
||||
});
|
||||
i += 1;
|
||||
|
@ -2314,6 +2386,8 @@ mod tests {
|
|||
peer_root,
|
||||
peer_lowest,
|
||||
BTreeSet::new(),
|
||||
0,
|
||||
vec![],
|
||||
timestamp(),
|
||||
)));
|
||||
let _ = cluster_info.gossip.crds.insert(value, timestamp());
|
||||
|
@ -2375,4 +2449,32 @@ mod tests {
|
|||
serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize;
|
||||
PACKET_DATA_SIZE - (protocol_size - filter_size)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compress_incomplete_slots() {
|
||||
let mut incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
|
||||
|
||||
assert_eq!(
|
||||
(0, vec![]),
|
||||
ClusterInfo::compress_incomplete_slots(&incomplete_slots)
|
||||
);
|
||||
|
||||
incomplete_slots.insert(100);
|
||||
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
|
||||
assert_eq!(100, first);
|
||||
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
|
||||
assert_eq!(incomplete_slots, decompressed);
|
||||
|
||||
incomplete_slots.insert(104);
|
||||
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
|
||||
assert_eq!(100, first);
|
||||
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
|
||||
assert_eq!(incomplete_slots, decompressed);
|
||||
|
||||
incomplete_slots.insert(80);
|
||||
let (first, compressed) = ClusterInfo::compress_incomplete_slots(&incomplete_slots);
|
||||
assert_eq!(80, first);
|
||||
let decompressed = ClusterInfo::decompress_incomplete_slots(first, &compressed);
|
||||
assert_eq!(incomplete_slots, decompressed);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,8 @@ pub struct EpochSlots {
|
|||
pub root: Slot,
|
||||
pub lowest: Slot,
|
||||
pub slots: BTreeSet<Slot>,
|
||||
pub first_missing: Slot,
|
||||
pub stash: Vec<u8>,
|
||||
pub wallclock: u64,
|
||||
}
|
||||
|
||||
|
@ -76,6 +78,8 @@ impl EpochSlots {
|
|||
root: Slot,
|
||||
lowest: Slot,
|
||||
slots: BTreeSet<Slot>,
|
||||
first_missing: Slot,
|
||||
stash: Vec<u8>,
|
||||
wallclock: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
|
@ -83,6 +87,8 @@ impl EpochSlots {
|
|||
root,
|
||||
lowest,
|
||||
slots,
|
||||
first_missing,
|
||||
stash,
|
||||
wallclock,
|
||||
}
|
||||
}
|
||||
|
@ -283,6 +289,8 @@ mod test {
|
|||
0,
|
||||
BTreeSet::new(),
|
||||
0,
|
||||
vec![],
|
||||
0,
|
||||
)));
|
||||
assert_eq!(v.wallclock(), 0);
|
||||
let key = v.clone().epoch_slots().unwrap().from;
|
||||
|
@ -309,6 +317,8 @@ mod test {
|
|||
0,
|
||||
0,
|
||||
btreeset,
|
||||
0,
|
||||
vec![],
|
||||
timestamp(),
|
||||
)));
|
||||
verify_signatures(&mut v, &keypair, &wrong_keypair);
|
||||
|
|
|
@ -9,11 +9,12 @@ use solana_ledger::{
|
|||
bank_forks::BankForks,
|
||||
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
|
||||
};
|
||||
use solana_sdk::clock::DEFAULT_SLOTS_PER_EPOCH;
|
||||
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey};
|
||||
use std::{
|
||||
collections::BTreeSet,
|
||||
net::UdpSocket,
|
||||
ops::Bound::{Excluded, Unbounded},
|
||||
ops::Bound::{Included, Unbounded},
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::{Arc, RwLock},
|
||||
thread::sleep,
|
||||
|
@ -25,6 +26,9 @@ pub const MAX_REPAIR_LENGTH: usize = 512;
|
|||
pub const REPAIR_MS: u64 = 100;
|
||||
pub const MAX_ORPHANS: usize = 5;
|
||||
|
||||
const MAX_COMPLETED_SLOT_CACHE_LEN: usize = 256;
|
||||
const COMPLETED_SLOT_CACHE_FLUSH_TRIGGER: usize = 512;
|
||||
|
||||
pub enum RepairStrategy {
|
||||
RepairRange(RepairSlotRange),
|
||||
RepairAll {
|
||||
|
@ -85,17 +89,18 @@ impl RepairService {
|
|||
) {
|
||||
let serve_repair = ServeRepair::new(cluster_info.clone());
|
||||
let mut epoch_slots: BTreeSet<Slot> = BTreeSet::new();
|
||||
let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
|
||||
let id = cluster_info.read().unwrap().id();
|
||||
let mut current_root = 0;
|
||||
if let RepairStrategy::RepairAll {
|
||||
ref epoch_schedule, ..
|
||||
} = repair_strategy
|
||||
{
|
||||
current_root = blockstore.last_root();
|
||||
let current_root = blockstore.last_root();
|
||||
Self::initialize_epoch_slots(
|
||||
id,
|
||||
blockstore,
|
||||
&mut epoch_slots,
|
||||
&old_incomplete_slots,
|
||||
current_root,
|
||||
epoch_schedule,
|
||||
cluster_info,
|
||||
|
@ -127,8 +132,8 @@ impl RepairService {
|
|||
id,
|
||||
new_root,
|
||||
lowest_slot,
|
||||
&mut current_root,
|
||||
&mut epoch_slots,
|
||||
&mut old_incomplete_slots,
|
||||
&cluster_info,
|
||||
completed_slots_receiver,
|
||||
);
|
||||
|
@ -292,6 +297,7 @@ impl RepairService {
|
|||
id: Pubkey,
|
||||
blockstore: &Blockstore,
|
||||
slots_in_gossip: &mut BTreeSet<Slot>,
|
||||
old_incomplete_slots: &BTreeSet<Slot>,
|
||||
root: Slot,
|
||||
epoch_schedule: &EpochSchedule,
|
||||
cluster_info: &RwLock<ClusterInfo>,
|
||||
|
@ -307,6 +313,7 @@ impl RepairService {
|
|||
root,
|
||||
blockstore.lowest_slot(),
|
||||
slots_in_gossip.clone(),
|
||||
old_incomplete_slots,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -316,44 +323,83 @@ impl RepairService {
|
|||
id: Pubkey,
|
||||
latest_known_root: Slot,
|
||||
lowest_slot: Slot,
|
||||
prev_root: &mut Slot,
|
||||
slots_in_gossip: &mut BTreeSet<Slot>,
|
||||
completed_slot_cache: &mut BTreeSet<Slot>,
|
||||
incomplete_slot_stash: &mut BTreeSet<Slot>,
|
||||
cluster_info: &RwLock<ClusterInfo>,
|
||||
completed_slots_receiver: &CompletedSlotsReceiver,
|
||||
) {
|
||||
// If the latest known root is different, update gossip.
|
||||
let mut should_update = latest_known_root != *prev_root;
|
||||
let mut should_update = false;
|
||||
while let Ok(completed_slots) = completed_slots_receiver.try_recv() {
|
||||
for slot in completed_slots {
|
||||
// If the newly completed slot > root, and the set did not contain this value
|
||||
// before, we should update gossip.
|
||||
if slot > latest_known_root {
|
||||
should_update |= slots_in_gossip.insert(slot);
|
||||
let last_slot_in_stash = *incomplete_slot_stash.iter().next_back().unwrap_or(&0);
|
||||
let removed_from_stash = incomplete_slot_stash.remove(&slot);
|
||||
// If the newly completed slot was not being tracked in stash, and is > last
|
||||
// slot being tracked in stash, add it to cache. Also, update gossip
|
||||
if !removed_from_stash && slot >= last_slot_in_stash {
|
||||
should_update |= completed_slot_cache.insert(slot);
|
||||
}
|
||||
// If the slot was removed from stash, update gossip
|
||||
should_update |= removed_from_stash;
|
||||
}
|
||||
}
|
||||
|
||||
if should_update {
|
||||
// Filter out everything <= root
|
||||
if latest_known_root != *prev_root {
|
||||
*prev_root = latest_known_root;
|
||||
Self::retain_slots_greater_than_root(slots_in_gossip, latest_known_root);
|
||||
if completed_slot_cache.len() >= COMPLETED_SLOT_CACHE_FLUSH_TRIGGER {
|
||||
Self::stash_old_incomplete_slots(completed_slot_cache, incomplete_slot_stash);
|
||||
let lowest_completed_slot_in_cache =
|
||||
*completed_slot_cache.iter().next().unwrap_or(&0);
|
||||
Self::prune_incomplete_slot_stash(
|
||||
incomplete_slot_stash,
|
||||
lowest_completed_slot_in_cache,
|
||||
);
|
||||
}
|
||||
|
||||
cluster_info.write().unwrap().push_epoch_slots(
|
||||
id,
|
||||
latest_known_root,
|
||||
lowest_slot,
|
||||
slots_in_gossip.clone(),
|
||||
completed_slot_cache.clone(),
|
||||
incomplete_slot_stash,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn retain_slots_greater_than_root(slot_set: &mut BTreeSet<Slot>, root: Slot) {
|
||||
*slot_set = slot_set
|
||||
.range((Excluded(&root), Unbounded))
|
||||
.cloned()
|
||||
.collect();
|
||||
fn stash_old_incomplete_slots(cache: &mut BTreeSet<Slot>, stash: &mut BTreeSet<Slot>) {
|
||||
if cache.len() > MAX_COMPLETED_SLOT_CACHE_LEN {
|
||||
let mut prev = *cache.iter().next().expect("Expected to find some slot");
|
||||
cache.remove(&prev);
|
||||
while cache.len() >= MAX_COMPLETED_SLOT_CACHE_LEN {
|
||||
let next = *cache.iter().next().expect("Expected to find some slot");
|
||||
cache.remove(&next);
|
||||
// Prev slot and next slot are not included in incomplete slot list.
|
||||
(prev + 1..next).for_each(|slot| {
|
||||
stash.insert(slot);
|
||||
});
|
||||
prev = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn prune_incomplete_slot_stash(
|
||||
stash: &mut BTreeSet<Slot>,
|
||||
lowest_completed_slot_in_cache: Slot,
|
||||
) {
|
||||
if let Some(oldest_incomplete_slot) = stash.iter().next() {
|
||||
// Prune old slots
|
||||
// Prune in batches to reduce overhead. Pruning starts when oldest slot is 1.5 epochs
|
||||
// earlier than the new root. But, we prune all the slots that are older than 1 epoch.
|
||||
// So slots in a batch of half epoch are getting pruned
|
||||
if oldest_incomplete_slot + DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2
|
||||
< lowest_completed_slot_in_cache
|
||||
{
|
||||
let oldest_slot_to_retain =
|
||||
lowest_completed_slot_in_cache.saturating_sub(DEFAULT_SLOTS_PER_EPOCH);
|
||||
*stash = stash
|
||||
.range((Included(&oldest_slot_to_retain), Unbounded))
|
||||
.cloned()
|
||||
.collect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
|
@ -373,7 +419,6 @@ mod test {
|
|||
};
|
||||
use solana_ledger::shred::max_ticks_per_n_shreds;
|
||||
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::thread::Builder;
|
||||
|
||||
#[test]
|
||||
|
@ -703,13 +748,14 @@ mod test {
|
|||
node_info.info.clone(),
|
||||
));
|
||||
|
||||
let mut old_incomplete_slots: BTreeSet<Slot> = BTreeSet::new();
|
||||
while completed_slots.len() < num_slots as usize {
|
||||
RepairService::update_epoch_slots(
|
||||
Pubkey::default(),
|
||||
root,
|
||||
blockstore.lowest_slot(),
|
||||
&mut root.clone(),
|
||||
&mut completed_slots,
|
||||
&mut old_incomplete_slots,
|
||||
&cluster_info,
|
||||
&completed_slots_receiver,
|
||||
);
|
||||
|
@ -726,13 +772,12 @@ mod test {
|
|||
Pubkey::default(),
|
||||
root,
|
||||
0,
|
||||
&mut 0,
|
||||
&mut completed_slots,
|
||||
&mut old_incomplete_slots,
|
||||
&cluster_info,
|
||||
&completed_slots_receiver,
|
||||
);
|
||||
expected.insert(num_slots + 2);
|
||||
RepairService::retain_slots_greater_than_root(&mut expected, root);
|
||||
assert_eq!(completed_slots, expected);
|
||||
writer.join().unwrap();
|
||||
}
|
||||
|
@ -740,95 +785,133 @@ mod test {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_update_epoch_slots_new_root() {
|
||||
let mut current_root = 0;
|
||||
fn test_stash_old_incomplete_slots() {
|
||||
let mut cache: BTreeSet<Slot> = BTreeSet::new();
|
||||
let mut stash: BTreeSet<Slot> = BTreeSet::new();
|
||||
|
||||
let mut completed_slots = BTreeSet::new();
|
||||
let node_info = Node::new_localhost_with_pubkey(&Pubkey::default());
|
||||
let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
||||
node_info.info.clone(),
|
||||
));
|
||||
let my_pubkey = Pubkey::new_rand();
|
||||
let (completed_slots_sender, completed_slots_receiver) = channel();
|
||||
// When cache is empty.
|
||||
RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
|
||||
assert_eq!(stash.len(), 0);
|
||||
|
||||
// Send a new slot before the root is updated
|
||||
let newly_completed_slot = 63;
|
||||
completed_slots_sender
|
||||
.send(vec![newly_completed_slot])
|
||||
.unwrap();
|
||||
RepairService::update_epoch_slots(
|
||||
my_pubkey.clone(),
|
||||
current_root,
|
||||
0,
|
||||
&mut current_root.clone(),
|
||||
&mut completed_slots,
|
||||
&cluster_info,
|
||||
&completed_slots_receiver,
|
||||
// Insert some slots in cache ( < MAX_COMPLETED_SLOT_CACHE_LEN + 1)
|
||||
cache.insert(101);
|
||||
cache.insert(102);
|
||||
cache.insert(104);
|
||||
cache.insert(105);
|
||||
|
||||
// Not enough slots in cache. So stash should remain empty.
|
||||
RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
|
||||
assert_eq!(stash.len(), 0);
|
||||
assert_eq!(cache.len(), 4);
|
||||
|
||||
// Insert slots in cache ( = MAX_COMPLETED_SLOT_CACHE_LEN)
|
||||
let mut cache: BTreeSet<Slot> = BTreeSet::new();
|
||||
(0..MAX_COMPLETED_SLOT_CACHE_LEN as u64)
|
||||
.into_iter()
|
||||
.for_each(|slot| {
|
||||
cache.insert(slot);
|
||||
});
|
||||
|
||||
// Not enough slots in cache. So stash should remain empty.
|
||||
RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
|
||||
assert_eq!(stash.len(), 0);
|
||||
assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN);
|
||||
|
||||
// Insert 1 more to cross the threshold
|
||||
cache.insert(MAX_COMPLETED_SLOT_CACHE_LEN as u64);
|
||||
RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
|
||||
// Stash is still empty, as no missing slots
|
||||
assert_eq!(stash.len(), 0);
|
||||
// It removed some entries from cache
|
||||
assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1);
|
||||
|
||||
// Insert more slots to create a missing slot
|
||||
let mut cache: BTreeSet<Slot> = BTreeSet::new();
|
||||
cache.insert(0);
|
||||
(2..=MAX_COMPLETED_SLOT_CACHE_LEN as u64 + 2)
|
||||
.into_iter()
|
||||
.for_each(|slot| {
|
||||
cache.insert(slot);
|
||||
});
|
||||
RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
|
||||
|
||||
// Stash is not empty
|
||||
assert!(stash.contains(&1));
|
||||
// It removed some entries from cache
|
||||
assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1);
|
||||
|
||||
// Test multiple missing slots at dispersed locations
|
||||
let mut cache: BTreeSet<Slot> = BTreeSet::new();
|
||||
(0..MAX_COMPLETED_SLOT_CACHE_LEN as u64 * 2)
|
||||
.into_iter()
|
||||
.for_each(|slot| {
|
||||
cache.insert(slot);
|
||||
});
|
||||
|
||||
cache.remove(&10);
|
||||
cache.remove(&11);
|
||||
|
||||
cache.remove(&28);
|
||||
cache.remove(&29);
|
||||
|
||||
cache.remove(&148);
|
||||
cache.remove(&149);
|
||||
cache.remove(&150);
|
||||
cache.remove(&151);
|
||||
|
||||
RepairService::stash_old_incomplete_slots(&mut cache, &mut stash);
|
||||
|
||||
// Stash is not empty
|
||||
assert!(stash.contains(&10));
|
||||
assert!(stash.contains(&11));
|
||||
assert!(stash.contains(&28));
|
||||
assert!(stash.contains(&29));
|
||||
assert!(stash.contains(&148));
|
||||
assert!(stash.contains(&149));
|
||||
assert!(stash.contains(&150));
|
||||
assert!(stash.contains(&151));
|
||||
|
||||
assert!(!stash.contains(&147));
|
||||
assert!(!stash.contains(&152));
|
||||
// It removed some entries from cache
|
||||
assert_eq!(cache.len(), MAX_COMPLETED_SLOT_CACHE_LEN - 1);
|
||||
(MAX_COMPLETED_SLOT_CACHE_LEN + 1..MAX_COMPLETED_SLOT_CACHE_LEN * 2)
|
||||
.into_iter()
|
||||
.for_each(|slot| {
|
||||
let slot: u64 = slot as u64;
|
||||
assert!(cache.contains(&slot));
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_incomplete_slot_stash() {
|
||||
// Prune empty stash
|
||||
let mut stash: BTreeSet<Slot> = BTreeSet::new();
|
||||
RepairService::prune_incomplete_slot_stash(&mut stash, 0);
|
||||
assert!(stash.is_empty());
|
||||
|
||||
// Prune stash with slots < DEFAULT_SLOTS_PER_EPOCH
|
||||
stash.insert(0);
|
||||
stash.insert(10);
|
||||
stash.insert(11);
|
||||
stash.insert(50);
|
||||
assert_eq!(stash.len(), 4);
|
||||
RepairService::prune_incomplete_slot_stash(&mut stash, 100);
|
||||
assert_eq!(stash.len(), 4);
|
||||
|
||||
// Prune stash with slots > DEFAULT_SLOTS_PER_EPOCH, but < 1.5 * DEFAULT_SLOTS_PER_EPOCH
|
||||
stash.insert(DEFAULT_SLOTS_PER_EPOCH + 50);
|
||||
assert_eq!(stash.len(), 5);
|
||||
RepairService::prune_incomplete_slot_stash(&mut stash, DEFAULT_SLOTS_PER_EPOCH + 100);
|
||||
assert_eq!(stash.len(), 5);
|
||||
|
||||
// Prune stash with slots > 1.5 * DEFAULT_SLOTS_PER_EPOCH
|
||||
stash.insert(DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2);
|
||||
assert_eq!(stash.len(), 6);
|
||||
RepairService::prune_incomplete_slot_stash(
|
||||
&mut stash,
|
||||
DEFAULT_SLOTS_PER_EPOCH + DEFAULT_SLOTS_PER_EPOCH / 2 + 1,
|
||||
);
|
||||
|
||||
// We should see epoch state update
|
||||
let (my_epoch_slots_in_gossip, updated_ts) = {
|
||||
let r_cluster_info = cluster_info.read().unwrap();
|
||||
|
||||
let (my_epoch_slots_in_gossip, updated_ts) = r_cluster_info
|
||||
.get_epoch_state_for_node(&my_pubkey, None)
|
||||
.clone()
|
||||
.unwrap();
|
||||
|
||||
(my_epoch_slots_in_gossip.clone(), updated_ts)
|
||||
};
|
||||
|
||||
assert_eq!(my_epoch_slots_in_gossip.root, 0);
|
||||
assert_eq!(current_root, 0);
|
||||
assert_eq!(my_epoch_slots_in_gossip.slots.len(), 1);
|
||||
assert!(my_epoch_slots_in_gossip
|
||||
.slots
|
||||
.contains(&newly_completed_slot));
|
||||
|
||||
// Calling update again with no updates to either the roots or set of completed slots
|
||||
// should not update gossip
|
||||
RepairService::update_epoch_slots(
|
||||
my_pubkey.clone(),
|
||||
current_root,
|
||||
0,
|
||||
&mut current_root,
|
||||
&mut completed_slots,
|
||||
&cluster_info,
|
||||
&completed_slots_receiver,
|
||||
);
|
||||
|
||||
assert!(cluster_info
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_epoch_state_for_node(&my_pubkey, Some(updated_ts))
|
||||
.is_none());
|
||||
|
||||
sleep(Duration::from_millis(10));
|
||||
// Updating just the root again should update gossip (simulates replay stage updating root
|
||||
// after a slot has been signaled as completed)
|
||||
RepairService::update_epoch_slots(
|
||||
my_pubkey.clone(),
|
||||
current_root + 1,
|
||||
0,
|
||||
&mut current_root,
|
||||
&mut completed_slots,
|
||||
&cluster_info,
|
||||
&completed_slots_receiver,
|
||||
);
|
||||
|
||||
let r_cluster_info = cluster_info.read().unwrap();
|
||||
|
||||
let (my_epoch_slots_in_gossip, _) = r_cluster_info
|
||||
.get_epoch_state_for_node(&my_pubkey, Some(updated_ts))
|
||||
.clone()
|
||||
.unwrap();
|
||||
|
||||
// Check the root was updated correctly
|
||||
assert_eq!(my_epoch_slots_in_gossip.root, 1);
|
||||
assert_eq!(current_root, 1);
|
||||
assert_eq!(my_epoch_slots_in_gossip.slots.len(), 1);
|
||||
assert!(my_epoch_slots_in_gossip
|
||||
.slots
|
||||
.contains(&newly_completed_slot));
|
||||
assert_eq!(stash.len(), 2);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue