diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 38b150ee99..869bcaa759 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -217,16 +217,18 @@ impl ReplayStage { ); Self::report_memory(&allocated, "replay_active_banks", start); - let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); - let descendants = bank_forks.read().unwrap().descendants(); + let mut ancestors = bank_forks.read().unwrap().ancestors(); + let mut descendants = bank_forks.read().unwrap().descendants(); let forks_root = bank_forks.read().unwrap().root(); let start = allocated.get(); + // Reset any duplicate slots that have been confirmed // by the network in anticipation of the confirmed version of // the slot Self::reset_duplicate_slots( &duplicate_slots_reset_receiver, - &descendants, + &mut ancestors, + &mut descendants, &mut progress, &bank_forks, ); @@ -473,13 +475,15 @@ impl ReplayStage { fn reset_duplicate_slots( duplicate_slots_reset_receiver: &DuplicateSlotsResetReceiver, - descendants: &HashMap>, + ancestors: &mut HashMap>, + descendants: &mut HashMap>, progress: &mut ProgressMap, bank_forks: &RwLock, ) { for duplicate_slot in duplicate_slots_reset_receiver.try_iter() { Self::purge_unconfirmed_duplicate_slot( duplicate_slot, + ancestors, descendants, progress, bank_forks, @@ -489,19 +493,29 @@ impl ReplayStage { fn purge_unconfirmed_duplicate_slot( duplicate_slot: Slot, - descendants: &HashMap>, + ancestors: &mut HashMap>, + descendants: &mut HashMap>, progress: &mut ProgressMap, bank_forks: &RwLock, ) { - error!("purging slot {}", duplicate_slot); - let slot_descendants = descendants.get(&duplicate_slot); + warn!("purging slot {}", duplicate_slot); + let slot_descendants = descendants.get(&duplicate_slot).cloned(); if slot_descendants.is_none() { // Root has already moved past this slot, no need to purge it return; } + // Clear the ancestors/descendants map to keep them + // consistent + let slot_descendants = slot_descendants.unwrap(); + Self::purge_ancestors_descendants( + duplicate_slot, + &slot_descendants, + ancestors, + descendants, + ); + for d in slot_descendants - .unwrap() .iter() .chain(std::iter::once(&duplicate_slot)) { @@ -514,18 +528,54 @@ impl ReplayStage { // Purging should have already been taken care of by logic // in repair_service, so make sure drop implementation doesn't // run - w_bank_forks - .get(*d) - .expect("Bank in descendants map must exist in BankForks") - .skip_drop - .store(true, Ordering::Relaxed); - w_bank_forks - .remove(*d) - .expect("Bank in descendants map must exist in BankForks"); + if let Some(b) = w_bank_forks.get(*d) { + b.skip_drop.store(true, Ordering::Relaxed) + } + w_bank_forks.remove(*d); } } } + // Purge given slot and all its descendants from the `ancestors` and + // `descendants` structures so that they're consistent with `BankForks` + // and the `progress` map. + fn purge_ancestors_descendants( + slot: Slot, + slot_descendants: &HashSet, + ancestors: &mut HashMap>, + descendants: &mut HashMap>, + ) { + if !ancestors.contains_key(&slot) { + // Slot has already been purged + return; + } + + // Purge this slot from each of its ancestors' `descendants` maps + for a in ancestors + .get(&slot) + .expect("must exist based on earlier check") + { + descendants + .get_mut(&a) + .expect("If exists in ancestor map must exist in descendants map") + .retain(|d| *d != slot && !slot_descendants.contains(d)); + } + ancestors + .remove(&slot) + .expect("must exist based on earlier check"); + + // Purge all the descendants of this slot from both maps + for descendant in slot_descendants { + ancestors.remove(&descendant).expect("must exist"); + descendants + .remove(&descendant) + .expect("must exist based on earlier check"); + } + descendants + .remove(&slot) + .expect("must exist based on earlier check"); + } + fn log_leader_change( my_pubkey: &Pubkey, bank_slot: Slot, @@ -3610,10 +3660,17 @@ pub(crate) mod tests { #[test] fn test_purge_unconfirmed_duplicate_slot() { let (bank_forks, mut progress) = setup_forks(); - let descendants = bank_forks.read().unwrap().descendants(); + let mut descendants = bank_forks.read().unwrap().descendants(); + let mut ancestors = bank_forks.read().unwrap().ancestors(); // Purging slot 5 should purge only slots 5 and its descendant 6 - ReplayStage::purge_unconfirmed_duplicate_slot(5, &descendants, &mut progress, &bank_forks); + ReplayStage::purge_unconfirmed_duplicate_slot( + 5, + &mut ancestors, + &mut descendants, + &mut progress, + &bank_forks, + ); for i in 5..=6 { assert!(bank_forks.read().unwrap().get(i).is_none()); assert!(progress.get(&i).is_none()); @@ -3624,8 +3681,15 @@ pub(crate) mod tests { } // Purging slot 4 should purge only slot 4 - let descendants = bank_forks.read().unwrap().descendants(); - ReplayStage::purge_unconfirmed_duplicate_slot(4, &descendants, &mut progress, &bank_forks); + let mut descendants = bank_forks.read().unwrap().descendants(); + let mut ancestors = bank_forks.read().unwrap().ancestors(); + ReplayStage::purge_unconfirmed_duplicate_slot( + 4, + &mut ancestors, + &mut descendants, + &mut progress, + &bank_forks, + ); for i in 4..=6 { assert!(bank_forks.read().unwrap().get(i).is_none()); assert!(progress.get(&i).is_none()); @@ -3636,8 +3700,15 @@ pub(crate) mod tests { } // Purging slot 1 should purge both forks 2 and 3 - let descendants = bank_forks.read().unwrap().descendants(); - ReplayStage::purge_unconfirmed_duplicate_slot(1, &descendants, &mut progress, &bank_forks); + let mut descendants = bank_forks.read().unwrap().descendants(); + let mut ancestors = bank_forks.read().unwrap().ancestors(); + ReplayStage::purge_unconfirmed_duplicate_slot( + 1, + &mut ancestors, + &mut descendants, + &mut progress, + &bank_forks, + ); for i in 1..=6 { assert!(bank_forks.read().unwrap().get(i).is_none()); assert!(progress.get(&i).is_none()); @@ -3646,6 +3717,55 @@ pub(crate) mod tests { assert!(progress.get(&0).is_some()); } + #[test] + fn test_purge_ancestors_descendants() { + let (bank_forks, _) = setup_forks(); + + // Purge branch rooted at slot 2 + let mut descendants = bank_forks.read().unwrap().descendants(); + let mut ancestors = bank_forks.read().unwrap().ancestors(); + let slot_2_descendants = descendants.get(&2).unwrap().clone(); + ReplayStage::purge_ancestors_descendants( + 2, + &slot_2_descendants, + &mut ancestors, + &mut descendants, + ); + + // Result should be equivalent to removing slot from BankForks + // and regeneratinig the `ancestor` `descendant` maps + for d in slot_2_descendants { + bank_forks.write().unwrap().remove(d); + } + bank_forks.write().unwrap().remove(2); + assert!(check_map_eq( + &ancestors, + &bank_forks.read().unwrap().ancestors() + )); + assert!(check_map_eq( + &descendants, + &bank_forks.read().unwrap().descendants() + )); + + // Try to purge the root + bank_forks.write().unwrap().set_root(3, &None, None); + let mut descendants = bank_forks.read().unwrap().descendants(); + let mut ancestors = bank_forks.read().unwrap().ancestors(); + let slot_3_descendants = descendants.get(&3).unwrap().clone(); + ReplayStage::purge_ancestors_descendants( + 3, + &slot_3_descendants, + &mut ancestors, + &mut descendants, + ); + + assert!(ancestors.is_empty()); + // Only remaining keys should be ones < root + for k in descendants.keys() { + assert!(*k < 3); + } + } + fn setup_forks() -> (RwLock, ProgressMap) { /* Build fork structure: @@ -3667,4 +3787,11 @@ pub(crate) mod tests { (vote_simulator.bank_forks, vote_simulator.progress) } + + fn check_map_eq( + map1: &HashMap, + map2: &HashMap, + ) -> bool { + map1.len() == map2.len() && map1.iter().all(|(k, v)| map2.get(k).unwrap() == v) + } }