Simplify EpochSlots update (#9545)

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-04-16 19:32:19 -07:00 committed by GitHub
parent 47ae57610a
commit bcfd379f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 22 deletions

View File

@ -117,6 +117,13 @@ impl RepairService {
}
let mut repair_stats = RepairStats::default();
let mut last_stats = Instant::now();
if let RepairStrategy::RepairAll {
ref completed_slots_receiver,
..
} = repair_strategy
{
Self::initialize_epoch_slots(blockstore, cluster_info, completed_slots_receiver);
}
loop {
if exit.load(Ordering::Relaxed) {
break;
@ -141,14 +148,7 @@ impl RepairService {
let new_root = blockstore.last_root();
let lowest_slot = blockstore.lowest_slot();
Self::update_lowest_slot(&id, lowest_slot, &cluster_info);
Self::update_completed_slots(
&id,
new_root,
&cluster_slots,
blockstore,
completed_slots_receiver,
&cluster_info,
);
Self::update_completed_slots(completed_slots_receiver, &cluster_info);
cluster_slots.update(new_root, cluster_info, bank_forks);
Self::generate_repairs(blockstore, new_root, MAX_REPAIR_LENGTH)
}
@ -324,28 +324,14 @@ impl RepairService {
}
fn update_completed_slots(
id: &Pubkey,
root: Slot,
cluster_slots: &ClusterSlots,
blockstore: &Blockstore,
completed_slots_receiver: &CompletedSlotsReceiver,
cluster_info: &RwLock<ClusterInfo>,
) {
let mine = cluster_slots.collect(id);
let mut slots: Vec<Slot> = vec![];
while let Ok(mut more) = completed_slots_receiver.try_recv() {
more.retain(|x| !mine.contains(x));
slots.append(&mut more);
}
blockstore
.live_slots_iterator(root)
.for_each(|(slot, slot_meta)| {
if slot_meta.is_full() && !mine.contains(&slot) {
slots.push(slot)
}
});
slots.sort();
slots.dedup();
if !slots.is_empty() {
cluster_info.write().unwrap().push_epoch_slots(&slots);
}
@ -358,6 +344,33 @@ impl RepairService {
.push_lowest_slot(*id, lowest_slot);
}
fn initialize_epoch_slots(
blockstore: &Blockstore,
cluster_info: &RwLock<ClusterInfo>,
completed_slots_receiver: &CompletedSlotsReceiver,
) {
let root = blockstore.last_root();
let mut slots: Vec<_> = blockstore
.live_slots_iterator(root)
.filter_map(|(slot, slot_meta)| {
if slot_meta.is_full() {
Some(slot)
} else {
None
}
})
.collect();
while let Ok(mut more) = completed_slots_receiver.try_recv() {
slots.append(&mut more);
}
slots.sort();
slots.dedup();
if !slots.is_empty() {
cluster_info.write().unwrap().push_epoch_slots(&slots);
}
}
pub fn join(self) -> thread::Result<()> {
self.t_repair.join()
}