diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 08c31aae1d..a18781b85a 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -117,6 +117,13 @@ impl RepairService { } let mut repair_stats = RepairStats::default(); let mut last_stats = Instant::now(); + if let RepairStrategy::RepairAll { + ref completed_slots_receiver, + .. + } = repair_strategy + { + Self::initialize_epoch_slots(blockstore, cluster_info, completed_slots_receiver); + } loop { if exit.load(Ordering::Relaxed) { break; @@ -141,14 +148,7 @@ impl RepairService { let new_root = blockstore.last_root(); let lowest_slot = blockstore.lowest_slot(); Self::update_lowest_slot(&id, lowest_slot, &cluster_info); - Self::update_completed_slots( - &id, - new_root, - &cluster_slots, - blockstore, - completed_slots_receiver, - &cluster_info, - ); + Self::update_completed_slots(completed_slots_receiver, &cluster_info); cluster_slots.update(new_root, cluster_info, bank_forks); Self::generate_repairs(blockstore, new_root, MAX_REPAIR_LENGTH) } @@ -324,28 +324,14 @@ impl RepairService { } fn update_completed_slots( - id: &Pubkey, - root: Slot, - cluster_slots: &ClusterSlots, - blockstore: &Blockstore, completed_slots_receiver: &CompletedSlotsReceiver, cluster_info: &RwLock, ) { - let mine = cluster_slots.collect(id); let mut slots: Vec = vec![]; while let Ok(mut more) = completed_slots_receiver.try_recv() { - more.retain(|x| !mine.contains(x)); slots.append(&mut more); } - blockstore - .live_slots_iterator(root) - .for_each(|(slot, slot_meta)| { - if slot_meta.is_full() && !mine.contains(&slot) { - slots.push(slot) - } - }); slots.sort(); - slots.dedup(); if !slots.is_empty() { cluster_info.write().unwrap().push_epoch_slots(&slots); } @@ -358,6 +344,33 @@ impl RepairService { .push_lowest_slot(*id, lowest_slot); } + fn initialize_epoch_slots( + blockstore: &Blockstore, + cluster_info: &RwLock, + completed_slots_receiver: &CompletedSlotsReceiver, + ) { + let root = blockstore.last_root(); + let mut slots: Vec<_> = blockstore + .live_slots_iterator(root) + .filter_map(|(slot, slot_meta)| { + if slot_meta.is_full() { + Some(slot) + } else { + None + } + }) + .collect(); + + while let Ok(mut more) = completed_slots_receiver.try_recv() { + slots.append(&mut more); + } + slots.sort(); + slots.dedup(); + if !slots.is_empty() { + cluster_info.write().unwrap().push_epoch_slots(&slots); + } + } + pub fn join(self) -> thread::Result<()> { self.t_repair.join() }