Properly handle ancestor/descendant maps (#9932)
* Account for descendants < root not existing in BankForks, purge ancestors/descendants map for consistency with BankForks and progress map Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
parent
c970bbea4f
commit
e970c58330
|
@ -217,16 +217,18 @@ impl ReplayStage {
|
||||||
);
|
);
|
||||||
Self::report_memory(&allocated, "replay_active_banks", start);
|
Self::report_memory(&allocated, "replay_active_banks", start);
|
||||||
|
|
||||||
let ancestors = Arc::new(bank_forks.read().unwrap().ancestors());
|
let mut ancestors = bank_forks.read().unwrap().ancestors();
|
||||||
let descendants = bank_forks.read().unwrap().descendants();
|
let mut descendants = bank_forks.read().unwrap().descendants();
|
||||||
let forks_root = bank_forks.read().unwrap().root();
|
let forks_root = bank_forks.read().unwrap().root();
|
||||||
let start = allocated.get();
|
let start = allocated.get();
|
||||||
|
|
||||||
// Reset any duplicate slots that have been confirmed
|
// Reset any duplicate slots that have been confirmed
|
||||||
// by the network in anticipation of the confirmed version of
|
// by the network in anticipation of the confirmed version of
|
||||||
// the slot
|
// the slot
|
||||||
Self::reset_duplicate_slots(
|
Self::reset_duplicate_slots(
|
||||||
&duplicate_slots_reset_receiver,
|
&duplicate_slots_reset_receiver,
|
||||||
&descendants,
|
&mut ancestors,
|
||||||
|
&mut descendants,
|
||||||
&mut progress,
|
&mut progress,
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
);
|
);
|
||||||
|
@ -473,13 +475,15 @@ impl ReplayStage {
|
||||||
|
|
||||||
fn reset_duplicate_slots(
|
fn reset_duplicate_slots(
|
||||||
duplicate_slots_reset_receiver: &DuplicateSlotsResetReceiver,
|
duplicate_slots_reset_receiver: &DuplicateSlotsResetReceiver,
|
||||||
descendants: &HashMap<Slot, HashSet<Slot>>,
|
ancestors: &mut HashMap<Slot, HashSet<Slot>>,
|
||||||
|
descendants: &mut HashMap<Slot, HashSet<Slot>>,
|
||||||
progress: &mut ProgressMap,
|
progress: &mut ProgressMap,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
) {
|
) {
|
||||||
for duplicate_slot in duplicate_slots_reset_receiver.try_iter() {
|
for duplicate_slot in duplicate_slots_reset_receiver.try_iter() {
|
||||||
Self::purge_unconfirmed_duplicate_slot(
|
Self::purge_unconfirmed_duplicate_slot(
|
||||||
duplicate_slot,
|
duplicate_slot,
|
||||||
|
ancestors,
|
||||||
descendants,
|
descendants,
|
||||||
progress,
|
progress,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
|
@ -489,19 +493,29 @@ impl ReplayStage {
|
||||||
|
|
||||||
fn purge_unconfirmed_duplicate_slot(
|
fn purge_unconfirmed_duplicate_slot(
|
||||||
duplicate_slot: Slot,
|
duplicate_slot: Slot,
|
||||||
descendants: &HashMap<Slot, HashSet<Slot>>,
|
ancestors: &mut HashMap<Slot, HashSet<Slot>>,
|
||||||
|
descendants: &mut HashMap<Slot, HashSet<Slot>>,
|
||||||
progress: &mut ProgressMap,
|
progress: &mut ProgressMap,
|
||||||
bank_forks: &RwLock<BankForks>,
|
bank_forks: &RwLock<BankForks>,
|
||||||
) {
|
) {
|
||||||
error!("purging slot {}", duplicate_slot);
|
warn!("purging slot {}", duplicate_slot);
|
||||||
let slot_descendants = descendants.get(&duplicate_slot);
|
let slot_descendants = descendants.get(&duplicate_slot).cloned();
|
||||||
if slot_descendants.is_none() {
|
if slot_descendants.is_none() {
|
||||||
// Root has already moved past this slot, no need to purge it
|
// Root has already moved past this slot, no need to purge it
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear the ancestors/descendants map to keep them
|
||||||
|
// consistent
|
||||||
|
let slot_descendants = slot_descendants.unwrap();
|
||||||
|
Self::purge_ancestors_descendants(
|
||||||
|
duplicate_slot,
|
||||||
|
&slot_descendants,
|
||||||
|
ancestors,
|
||||||
|
descendants,
|
||||||
|
);
|
||||||
|
|
||||||
for d in slot_descendants
|
for d in slot_descendants
|
||||||
.unwrap()
|
|
||||||
.iter()
|
.iter()
|
||||||
.chain(std::iter::once(&duplicate_slot))
|
.chain(std::iter::once(&duplicate_slot))
|
||||||
{
|
{
|
||||||
|
@ -514,18 +528,54 @@ impl ReplayStage {
|
||||||
// Purging should have already been taken care of by logic
|
// Purging should have already been taken care of by logic
|
||||||
// in repair_service, so make sure drop implementation doesn't
|
// in repair_service, so make sure drop implementation doesn't
|
||||||
// run
|
// run
|
||||||
w_bank_forks
|
if let Some(b) = w_bank_forks.get(*d) {
|
||||||
.get(*d)
|
b.skip_drop.store(true, Ordering::Relaxed)
|
||||||
.expect("Bank in descendants map must exist in BankForks")
|
}
|
||||||
.skip_drop
|
w_bank_forks.remove(*d);
|
||||||
.store(true, Ordering::Relaxed);
|
|
||||||
w_bank_forks
|
|
||||||
.remove(*d)
|
|
||||||
.expect("Bank in descendants map must exist in BankForks");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Purge given slot and all its descendants from the `ancestors` and
|
||||||
|
// `descendants` structures so that they're consistent with `BankForks`
|
||||||
|
// and the `progress` map.
|
||||||
|
fn purge_ancestors_descendants(
|
||||||
|
slot: Slot,
|
||||||
|
slot_descendants: &HashSet<Slot>,
|
||||||
|
ancestors: &mut HashMap<Slot, HashSet<Slot>>,
|
||||||
|
descendants: &mut HashMap<Slot, HashSet<Slot>>,
|
||||||
|
) {
|
||||||
|
if !ancestors.contains_key(&slot) {
|
||||||
|
// Slot has already been purged
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Purge this slot from each of its ancestors' `descendants` maps
|
||||||
|
for a in ancestors
|
||||||
|
.get(&slot)
|
||||||
|
.expect("must exist based on earlier check")
|
||||||
|
{
|
||||||
|
descendants
|
||||||
|
.get_mut(&a)
|
||||||
|
.expect("If exists in ancestor map must exist in descendants map")
|
||||||
|
.retain(|d| *d != slot && !slot_descendants.contains(d));
|
||||||
|
}
|
||||||
|
ancestors
|
||||||
|
.remove(&slot)
|
||||||
|
.expect("must exist based on earlier check");
|
||||||
|
|
||||||
|
// Purge all the descendants of this slot from both maps
|
||||||
|
for descendant in slot_descendants {
|
||||||
|
ancestors.remove(&descendant).expect("must exist");
|
||||||
|
descendants
|
||||||
|
.remove(&descendant)
|
||||||
|
.expect("must exist based on earlier check");
|
||||||
|
}
|
||||||
|
descendants
|
||||||
|
.remove(&slot)
|
||||||
|
.expect("must exist based on earlier check");
|
||||||
|
}
|
||||||
|
|
||||||
fn log_leader_change(
|
fn log_leader_change(
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
bank_slot: Slot,
|
bank_slot: Slot,
|
||||||
|
@ -3610,10 +3660,17 @@ pub(crate) mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_purge_unconfirmed_duplicate_slot() {
|
fn test_purge_unconfirmed_duplicate_slot() {
|
||||||
let (bank_forks, mut progress) = setup_forks();
|
let (bank_forks, mut progress) = setup_forks();
|
||||||
let descendants = bank_forks.read().unwrap().descendants();
|
let mut descendants = bank_forks.read().unwrap().descendants();
|
||||||
|
let mut ancestors = bank_forks.read().unwrap().ancestors();
|
||||||
|
|
||||||
// Purging slot 5 should purge only slots 5 and its descendant 6
|
// Purging slot 5 should purge only slots 5 and its descendant 6
|
||||||
ReplayStage::purge_unconfirmed_duplicate_slot(5, &descendants, &mut progress, &bank_forks);
|
ReplayStage::purge_unconfirmed_duplicate_slot(
|
||||||
|
5,
|
||||||
|
&mut ancestors,
|
||||||
|
&mut descendants,
|
||||||
|
&mut progress,
|
||||||
|
&bank_forks,
|
||||||
|
);
|
||||||
for i in 5..=6 {
|
for i in 5..=6 {
|
||||||
assert!(bank_forks.read().unwrap().get(i).is_none());
|
assert!(bank_forks.read().unwrap().get(i).is_none());
|
||||||
assert!(progress.get(&i).is_none());
|
assert!(progress.get(&i).is_none());
|
||||||
|
@ -3624,8 +3681,15 @@ pub(crate) mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purging slot 4 should purge only slot 4
|
// Purging slot 4 should purge only slot 4
|
||||||
let descendants = bank_forks.read().unwrap().descendants();
|
let mut descendants = bank_forks.read().unwrap().descendants();
|
||||||
ReplayStage::purge_unconfirmed_duplicate_slot(4, &descendants, &mut progress, &bank_forks);
|
let mut ancestors = bank_forks.read().unwrap().ancestors();
|
||||||
|
ReplayStage::purge_unconfirmed_duplicate_slot(
|
||||||
|
4,
|
||||||
|
&mut ancestors,
|
||||||
|
&mut descendants,
|
||||||
|
&mut progress,
|
||||||
|
&bank_forks,
|
||||||
|
);
|
||||||
for i in 4..=6 {
|
for i in 4..=6 {
|
||||||
assert!(bank_forks.read().unwrap().get(i).is_none());
|
assert!(bank_forks.read().unwrap().get(i).is_none());
|
||||||
assert!(progress.get(&i).is_none());
|
assert!(progress.get(&i).is_none());
|
||||||
|
@ -3636,8 +3700,15 @@ pub(crate) mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purging slot 1 should purge both forks 2 and 3
|
// Purging slot 1 should purge both forks 2 and 3
|
||||||
let descendants = bank_forks.read().unwrap().descendants();
|
let mut descendants = bank_forks.read().unwrap().descendants();
|
||||||
ReplayStage::purge_unconfirmed_duplicate_slot(1, &descendants, &mut progress, &bank_forks);
|
let mut ancestors = bank_forks.read().unwrap().ancestors();
|
||||||
|
ReplayStage::purge_unconfirmed_duplicate_slot(
|
||||||
|
1,
|
||||||
|
&mut ancestors,
|
||||||
|
&mut descendants,
|
||||||
|
&mut progress,
|
||||||
|
&bank_forks,
|
||||||
|
);
|
||||||
for i in 1..=6 {
|
for i in 1..=6 {
|
||||||
assert!(bank_forks.read().unwrap().get(i).is_none());
|
assert!(bank_forks.read().unwrap().get(i).is_none());
|
||||||
assert!(progress.get(&i).is_none());
|
assert!(progress.get(&i).is_none());
|
||||||
|
@ -3646,6 +3717,55 @@ pub(crate) mod tests {
|
||||||
assert!(progress.get(&0).is_some());
|
assert!(progress.get(&0).is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_purge_ancestors_descendants() {
|
||||||
|
let (bank_forks, _) = setup_forks();
|
||||||
|
|
||||||
|
// Purge branch rooted at slot 2
|
||||||
|
let mut descendants = bank_forks.read().unwrap().descendants();
|
||||||
|
let mut ancestors = bank_forks.read().unwrap().ancestors();
|
||||||
|
let slot_2_descendants = descendants.get(&2).unwrap().clone();
|
||||||
|
ReplayStage::purge_ancestors_descendants(
|
||||||
|
2,
|
||||||
|
&slot_2_descendants,
|
||||||
|
&mut ancestors,
|
||||||
|
&mut descendants,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Result should be equivalent to removing slot from BankForks
|
||||||
|
// and regeneratinig the `ancestor` `descendant` maps
|
||||||
|
for d in slot_2_descendants {
|
||||||
|
bank_forks.write().unwrap().remove(d);
|
||||||
|
}
|
||||||
|
bank_forks.write().unwrap().remove(2);
|
||||||
|
assert!(check_map_eq(
|
||||||
|
&ancestors,
|
||||||
|
&bank_forks.read().unwrap().ancestors()
|
||||||
|
));
|
||||||
|
assert!(check_map_eq(
|
||||||
|
&descendants,
|
||||||
|
&bank_forks.read().unwrap().descendants()
|
||||||
|
));
|
||||||
|
|
||||||
|
// Try to purge the root
|
||||||
|
bank_forks.write().unwrap().set_root(3, &None, None);
|
||||||
|
let mut descendants = bank_forks.read().unwrap().descendants();
|
||||||
|
let mut ancestors = bank_forks.read().unwrap().ancestors();
|
||||||
|
let slot_3_descendants = descendants.get(&3).unwrap().clone();
|
||||||
|
ReplayStage::purge_ancestors_descendants(
|
||||||
|
3,
|
||||||
|
&slot_3_descendants,
|
||||||
|
&mut ancestors,
|
||||||
|
&mut descendants,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(ancestors.is_empty());
|
||||||
|
// Only remaining keys should be ones < root
|
||||||
|
for k in descendants.keys() {
|
||||||
|
assert!(*k < 3);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn setup_forks() -> (RwLock<BankForks>, ProgressMap) {
|
fn setup_forks() -> (RwLock<BankForks>, ProgressMap) {
|
||||||
/*
|
/*
|
||||||
Build fork structure:
|
Build fork structure:
|
||||||
|
@ -3667,4 +3787,11 @@ pub(crate) mod tests {
|
||||||
|
|
||||||
(vote_simulator.bank_forks, vote_simulator.progress)
|
(vote_simulator.bank_forks, vote_simulator.progress)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_map_eq<K: Eq + std::hash::Hash + std::fmt::Debug, T: PartialEq + std::fmt::Debug>(
|
||||||
|
map1: &HashMap<K, T>,
|
||||||
|
map2: &HashMap<K, T>,
|
||||||
|
) -> bool {
|
||||||
|
map1.len() == map2.len() && map1.iter().all(|(k, v)| map2.get(k).unwrap() == v)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue