diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index fdedecd18..a66544c28 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -487,18 +487,16 @@ impl AncestorHashesService { // another ancestor repair request from the earliest returned // ancestor from this search. - let potential_slots_to_repair = ancestor_request_decision.slots_to_repair(); + let potential_slot_to_repair = ancestor_request_decision.slot_to_repair(); - // Now signal ReplayStage about the new updated slots. It's important to do this + // Now signal ReplayStage about the new updated slot. It's important to do this // AFTER we've removed the ancestor_hashes_status_ref in case replay // then sends us another dead slot signal based on the updates we are // about to send. - if let Some(slots_to_repair) = potential_slots_to_repair { - if !slots_to_repair.is_empty() { - // Signal ReplayStage to dump the fork that is descended from - // `earliest_mismatched_slot_to_dump`. - let _ = ancestor_duplicate_slots_sender.send(slots_to_repair); - } + if let Some(slot_to_repair) = potential_slot_to_repair { + // Signal ReplayStage to dump the fork that is descended from + // `earliest_mismatched_slot_to_dump`. + let _ = ancestor_duplicate_slots_sender.send(slot_to_repair); } } @@ -1233,9 +1231,12 @@ mod test { // Set up blockstore for responses let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - // Create slots [slot, slot + MAX_ANCESTOR_RESPONSES) with 5 shreds apiece - let (shreds, _) = - make_many_slot_entries(slot_to_query, MAX_ANCESTOR_RESPONSES as u64, 5); + // Create slots [slot - MAX_ANCESTOR_RESPONSES, slot) with 5 shreds apiece + let (shreds, _) = make_many_slot_entries( + slot_to_query - MAX_ANCESTOR_RESPONSES as Slot + 1, + MAX_ANCESTOR_RESPONSES as u64, + 5, + ); blockstore .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); @@ -1346,6 +1347,9 @@ mod test { } } + /// Creates valid fork up to `dead_slot - 1` + /// For `dead_slot - 1` insert the wrong `bank_hash` + /// Mark `dead_slot` as dead fn setup_dead_slot( dead_slot: Slot, correct_bank_hashes: &HashMap, @@ -1377,13 +1381,15 @@ mod test { blockstore .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); - for duplicate_confirmed_slot in 0..dead_slot { + for duplicate_confirmed_slot in 0..(dead_slot - 1) { let bank_hash = correct_bank_hashes .get(&duplicate_confirmed_slot) .cloned() .unwrap_or_else(Hash::new_unique); blockstore.insert_bank_hash(duplicate_confirmed_slot, bank_hash, true); } + // Insert wrong hash for `dead_slot - 1` + blockstore.insert_bank_hash(dead_slot - 1, Hash::new_unique(), false); blockstore.set_dead_slot(dead_slot).unwrap(); replay_blockstore_components } @@ -1532,16 +1538,16 @@ mod test { assert_eq!(slot, dead_slot); assert_eq!( - decision - .repair_status() - .unwrap() - .correct_ancestors_to_repair, - vec![(dead_slot, *correct_bank_hashes.get(&dead_slot).unwrap())] + decision.repair_status().unwrap().correct_ancestor_to_repair, + ( + dead_slot - 1, + *correct_bank_hashes.get(&(dead_slot - 1)).unwrap() + ) ); assert_matches!( (decision, request_type), ( - DuplicateAncestorDecision::EarliestAncestorNotFrozen(_), + DuplicateAncestorDecision::EarliestMismatchFound(_), AncestorRequestType::DeadDuplicateConfirmed, ) ); @@ -1567,7 +1573,7 @@ mod test { assert_eq!(ancestor_hashes_request_statuses.len(), 1); assert!(ancestor_hashes_request_statuses.contains_key(&dead_slot)); - // Should have received valid response since pruned doesn't check hashes + // Should have received valid response let mut response_packet = response_receiver .recv_timeout(Duration::from_millis(10_000)) .unwrap(); @@ -1591,10 +1597,17 @@ mod test { .unwrap(); assert_eq!(slot, dead_slot); + assert_eq!( + decision.repair_status().unwrap().correct_ancestor_to_repair, + ( + dead_slot - 1, + *correct_bank_hashes.get(&(dead_slot - 1)).unwrap() + ) + ); assert_matches!( (decision, request_type), ( - DuplicateAncestorDecision::AncestorsAllMatch, + DuplicateAncestorDecision::EarliestMismatchFound(_), AncestorRequestType::PopularPruned, ) ); @@ -2024,16 +2037,16 @@ mod test { assert_eq!(slot, dead_slot); assert_eq!( - decision - .repair_status() - .unwrap() - .correct_ancestors_to_repair, - vec![(dead_slot, *correct_bank_hashes.get(&dead_slot).unwrap())] + decision.repair_status().unwrap().correct_ancestor_to_repair, + ( + dead_slot - 1, + *correct_bank_hashes.get(&(dead_slot - 1)).unwrap() + ) ); assert_matches!( (decision, request_type), ( - DuplicateAncestorDecision::EarliestAncestorNotFrozen(_), + DuplicateAncestorDecision::EarliestMismatchFound(_), AncestorRequestType::DeadDuplicateConfirmed ) ); diff --git a/core/src/repair/duplicate_repair_status.rs b/core/src/repair/duplicate_repair_status.rs index 4e16dd12b..69b9b1f2e 100644 --- a/core/src/repair/duplicate_repair_status.rs +++ b/core/src/repair/duplicate_repair_status.rs @@ -35,10 +35,8 @@ const RETRY_INTERVAL_SECONDS: usize = 5; #[derive(Debug, PartialEq, Eq, Clone)] pub enum DuplicateAncestorDecision { InvalidSample, - AncestorsAllMatch, SampleNotDuplicateConfirmed, ContinueSearch(DuplicateSlotRepairStatus), - EarliestAncestorNotFrozen(DuplicateSlotRepairStatus), EarliestMismatchFound(DuplicateSlotRepairStatus), EarliestPrunedMismatchFound(DuplicateSlotRepairStatus), } @@ -52,10 +50,7 @@ impl DuplicateAncestorDecision { // so retry | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => true, - DuplicateAncestorDecision::AncestorsAllMatch => false, - DuplicateAncestorDecision::ContinueSearch(_status) - | DuplicateAncestorDecision::EarliestAncestorNotFrozen(_status) | DuplicateAncestorDecision::EarliestMismatchFound(_status) | DuplicateAncestorDecision::EarliestPrunedMismatchFound(_status) => false, } @@ -64,11 +59,9 @@ impl DuplicateAncestorDecision { pub fn repair_status(&self) -> Option<&DuplicateSlotRepairStatus> { match self { DuplicateAncestorDecision::InvalidSample - | DuplicateAncestorDecision::AncestorsAllMatch | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None, DuplicateAncestorDecision::ContinueSearch(status) - | DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) | DuplicateAncestorDecision::EarliestMismatchFound(status) | DuplicateAncestorDecision::EarliestPrunedMismatchFound(status) => Some(status), } @@ -77,11 +70,9 @@ impl DuplicateAncestorDecision { pub fn repair_status_mut(&mut self) -> Option<&mut DuplicateSlotRepairStatus> { match self { DuplicateAncestorDecision::InvalidSample - | DuplicateAncestorDecision::AncestorsAllMatch | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None, DuplicateAncestorDecision::ContinueSearch(status) - | DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) | DuplicateAncestorDecision::EarliestMismatchFound(status) | DuplicateAncestorDecision::EarliestPrunedMismatchFound(status) => Some(status), } @@ -90,25 +81,21 @@ impl DuplicateAncestorDecision { #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct DuplicateSlotRepairStatus { - // Any ancestor slots that are either missing or are mismatched. + // The first ancestor slot that is mismatched. // A mismatched ancestor slot is one that has been replayed (frozen) // that has a different hash than the one agreed upon by the sampled peers. // // - // These are the slots that need to be dumped in order to repair the correct - // versions. The hash is None if the slot is not frozen, because it's: - // 1) Dead - // 2) Hasn't been replayed - // 3) We don't have the slot in our Ledger - pub correct_ancestors_to_repair: Vec<(Slot, Hash)>, + // This is the slots that needs to be dumped in order to replay the originally requested slot. + pub correct_ancestor_to_repair: (Slot, Hash), pub repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>, pub start_ts: u64, } impl DuplicateSlotRepairStatus { - fn new(correct_ancestors_to_repair: Vec<(Slot, Hash)>) -> Self { + fn new(correct_ancestor_to_repair: (Slot, Hash)) -> Self { Self { - correct_ancestors_to_repair, + correct_ancestor_to_repair, repair_pubkey_and_addr: None, start_ts: timestamp(), } @@ -128,19 +115,13 @@ impl AncestorRequestType { } } -pub struct AncestorDuplicateSlotsToRepair { - // Slots that `ancestor_hashes_service` found that need to be repaired - pub slots_to_repair: Vec<(Slot, Hash)>, +pub struct AncestorDuplicateSlotToRepair { + // Slot that `ancestor_hashes_service` found that needs to be repaired + pub slot_to_repair: (Slot, Hash), // Condition that initiated this request pub request_type: AncestorRequestType, } -impl AncestorDuplicateSlotsToRepair { - pub fn is_empty(&self) -> bool { - self.slots_to_repair.is_empty() - } -} - #[derive(Debug, PartialEq, Eq)] pub struct AncestorRequestDecision { // The slot that initiated this request @@ -152,7 +133,7 @@ pub struct AncestorRequestDecision { } impl AncestorRequestDecision { - pub fn slots_to_repair(self) -> Option { + pub fn slot_to_repair(self) -> Option { let Self { request_type, mut decision, @@ -160,8 +141,8 @@ impl AncestorRequestDecision { } = self; decision .repair_status_mut() - .map(|status| AncestorDuplicateSlotsToRepair { - slots_to_repair: std::mem::take(&mut status.correct_ancestors_to_repair), + .map(|status| AncestorDuplicateSlotToRepair { + slot_to_repair: std::mem::take(&mut status.correct_ancestor_to_repair), request_type, }) } @@ -276,7 +257,7 @@ impl AncestorRequestStatus { fn handle_sampled_validators_reached_agreement( &mut self, blockstore: &Blockstore, - mut agreed_response: Vec<(Slot, Hash)>, + agreed_response: Vec<(Slot, Hash)>, ) -> DuplicateAncestorDecision { if agreed_response.is_empty() { info!( @@ -350,15 +331,18 @@ impl AncestorRequestStatus { return DuplicateAncestorDecision::InvalidSample; } ( - Some((mismatch_i, DuplicateAncestorDecision::EarliestAncestorNotFrozen(_))), + Some(( + mismatch_i, + DuplicateAncestorDecision::EarliestPrunedMismatchFound(_), + )), true, ) => { // In this case an earlier ancestor was not frozen in our blockstore, // however this later ancestor is matching with our version. This most // likely should never happen however it could happen if we initiate an - // ancestor_hashes request immediately after startup from snapshot, we will - // have a match for the snapshot bank, however we don't have any of the - // ancestors frozen + // ancestor_hashes request immediately after startup from snapshot on a + // pruned branch, we will have a match for the snapshot bank, however we + // don't have any of the ancestors frozen let (mismatch_slot, mismatch_agreed_upon_hash) = agreed_response[*mismatch_i]; info!( @@ -366,8 +350,9 @@ impl AncestorRequestStatus { was agreed upon by the cluster with hash {mismatch_agreed_upon_hash} but not frozen in our blockstore. However for a later ancestor {ancestor_slot} we have agreement on {our_frozen_hash} as the bank hash. This should only be possible if - we have just started from snapshot and immediately encountered a duplicate block, - otherwise something is seriously wrong. Continuing with the repair", + we have just started from snapshot and immediately encountered a duplicate block on + a popular pruned fork, otherwise something is seriously wrong. Continuing with the + repair", self.requested_mismatched_slot ); } @@ -425,14 +410,14 @@ impl AncestorRequestStatus { // ancestors of our version of `self.requested_mismatched_slot` // // ``` - // 1 - 2 - 3 - 5' - 6' (our current fork) + // 1 - 2 - 3 - 5' - 6 (our current fork) // / // 0 // \ // 1 - 2 - 4 - 5 - 6 (cluster agreed fork) // ``` // - // In this case, if we make a AncestorsHashes(6) request for our dead slot 6', we may + // In this case, if we make a AncestorsHashes(6) request for our dead slot 6, we may // get a response with slot `4` in it, which is a slot that doesn't have a frozen // hash in blockstore yet because either: // @@ -451,25 +436,47 @@ impl AncestorRequestStatus { // There are two cases: // 1) The first such mismatch `first_mismatch` appears somewhere BEFORE the slot `4` that is // missing from our blockstore. - // 2) The first such mismatch `first_mismatch` appears immediately AFTER the slot `4` that is + // 2) The first such mismatch `first_mismatch` appears AFTER the slot `4` that is // missing from our blockstore. // - // Because we know any mismatches will also trigger the mismatch casing earlier in - // the function, we will return`EarliestMismatchFound(first_mismatch)`. This will - // cause us to dump and repair `first_mismatch` and all its descendants, which should - // be the right behavior in both above cases. + // For (1), the earlier cases in this function will cause us to detect the + // mismatch, and stop until that slot is dumped and repaired. + // For (2), we continue searching until we find the mismatch. There must be a + // mismatch for us to have played the requested slot, and that mismatch will be + // found or in case of no mismatch the last slot (requested slot) will be dumped + // and repaired. + // + // In the rare case of multiple incorrect ancestry, where multiple cases of (1) and + // (2) are present, the accompanying code in replay `dump_then_repair`, dumps + // descendants of the earliest mismatch that also have frozen hashes. Failing that, + // in extreme cases another round of ancestor or replay dump then repair will be + // necessary to fix the fork. + // + // On example of an extreme case requiring multiple rounds of dump then repair is as follows: + // + // ``` + // 1 - 2 - 3 - 5' - 6' (our current fork) + // / + // 0 + // \ + // 1 - 2 - 4 - 5 - 6 (cluster agreed fork) + // ``` + // + // In this case suppose we have the wrong version of 5, the correct version is + // supposed to chain to 4, and we also have the wrong version of 6, both versions + // chain to 5 however ours is the duplicate. + // + // The first round of ancestor repair will detect 5' using case (2) above, and + // replay will dump then repair it. Upon successful replay of 5, we see that 6 is + // still dead or incorrect hash. This will require another round of ancestor or + // replay dump then repair to fix. + warn!( "Blockstore is missing frozen hash for slot {}, which the cluster claims is an ancestor of dead slot {}. Potentially our version of the dead slot chains to the wrong fork!", ancestor_slot, self.requested_mismatched_slot ); - earliest_erroring_ancestor = Some(( - agreed_response.len() - i - 1, - DuplicateAncestorDecision::EarliestAncestorNotFrozen( - DuplicateSlotRepairStatus::default(), - ), - )); } last_ancestor = *ancestor_slot; } @@ -481,20 +488,26 @@ impl AncestorRequestStatus { if earliest_erroring_ancestor_index == agreed_response.len() - 1 { // If the earliest ancestor is missing or a mismatch, then we need to keep searching // for earlier mismatches - let repair_status = DuplicateSlotRepairStatus::new(agreed_response); + let repair_status = + DuplicateSlotRepairStatus::new(*agreed_response.last().unwrap()); DuplicateAncestorDecision::ContinueSearch(repair_status) } else { - // We only need to look through the first `earliest_erroring_ancestor_index + 1` - // elements and dump/repair any mismatches. - agreed_response.truncate(earliest_erroring_ancestor_index + 1); + // We only need to dump and repair the earliest mismatching ancestor. let repair_status = decision.repair_status_mut().unwrap(); - repair_status.correct_ancestors_to_repair = agreed_response; + repair_status.correct_ancestor_to_repair = + agreed_response[earliest_erroring_ancestor_index]; decision } } else { // If we haven't returned by now, this implies all the ancestors matched our versions - // of those ancestors. Only slot to dump and repair is `self.requested_mismatched_slot` - DuplicateAncestorDecision::AncestorsAllMatch + // of those ancestors, or are missing or incomplete. Only slot to dump and repair is + // `self.requested_mismatched_slot` + let repair_status = DuplicateSlotRepairStatus::new(*agreed_response.first().unwrap()); + if self.request_type.is_pruned() { + DuplicateAncestorDecision::EarliestPrunedMismatchFound(repair_status) + } else { + DuplicateAncestorDecision::EarliestMismatchFound(repair_status) + } } } @@ -869,18 +882,44 @@ pub mod tests { .insert_bank_hash(slot, correct_hash, false); } + // We don't have the earlier ancestors because we just started up, however sample should + // not be rejected as invalid. + let DuplicateAncestorDecision::EarliestMismatchFound(repair_status) = run_add_multiple_correct_and_incorrect_responses( + vec![], + &mut test_setup, + ) else { + panic!("Incorrect decision") + }; + assert_eq!( + repair_status.correct_ancestor_to_repair, + *test_setup.correct_ancestors_response.first().unwrap() + ); + } + + #[test] + fn test_add_multiple_responses_start_from_snapshot_missing_then_mismatch() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Only insert the later half in our blockstore, however make mistakes + for &(slot, _) in test_setup.correct_ancestors_response.iter().take(5) { + test_setup + .blockstore + .insert_bank_hash(slot, Hash::new_unique(), false); + } + // We don't have the earlier ancestors because we just started up, however sample should // not be rejected as invalid. let repair_status = match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { - DuplicateAncestorDecision::ContinueSearch(repair_status) => repair_status, + DuplicateAncestorDecision::EarliestMismatchFound(repair_status) => repair_status, x => panic!("Incorrect decision {x:?}"), }; - // Expect to find everything in the `correct_ancestors_to_repair`. + // Expect to find the first mismatch that is present assert_eq!( - repair_status.correct_ancestors_to_repair, - test_setup.correct_ancestors_response + repair_status.correct_ancestor_to_repair, + test_setup.correct_ancestors_response[4] ); } @@ -899,18 +938,16 @@ pub mod tests { )]; // We have no entries in the blockstore, so all the ancestors will be missing - match run_add_multiple_correct_and_incorrect_responses( + let DuplicateAncestorDecision::EarliestMismatchFound(repair_status) = run_add_multiple_correct_and_incorrect_responses( desired_incorrect_responses, &mut test_setup, - ) { - DuplicateAncestorDecision::ContinueSearch(repair_status) => { - assert_eq!( - repair_status.correct_ancestors_to_repair, - test_setup.correct_ancestors_response - ); - } - x => panic!("Incorrect decision {x:?}"), + ) else { + panic!("Incorrect decision") }; + assert_eq!( + repair_status.correct_ancestor_to_repair, + *test_setup.correct_ancestors_response.first().unwrap() + ); } #[test] @@ -939,11 +976,11 @@ pub mod tests { // // 1) If we skip slot 93, and insert mismatched slot 94 we're testing the order of // events `Not frozen -> Mismatched hash` which should return - // `EarliestAncestorNotFrozen` + // `EarliestMismatchFound(94)` // // 2) If we insert mismatched slot 93, and skip slot 94 we're testing the order of // events `Mismatched hash -> Not frozen`, which should return - // `EarliestMismatchFound` + // `EarliestMismatchFound(93)` test_setup .blockstore .insert_bank_hash(slot, Hash::new_unique(), false); @@ -952,28 +989,19 @@ pub mod tests { let repair_status = match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { - DuplicateAncestorDecision::EarliestMismatchFound(repair_status) - if insert_even_or_odds == 1 => - { - repair_status - } - DuplicateAncestorDecision::EarliestAncestorNotFrozen(repair_status) - if insert_even_or_odds == 0 => - { - repair_status - } + DuplicateAncestorDecision::EarliestMismatchFound(repair_status) => repair_status, x => panic!("Incorrect decision {x:?}"), }; - // Expect to find everything after 92 in the `correct_ancestors_to_repair`. - let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup + // Expect to find 93 or 94 (see comment above) + let expected_mismatched = test_setup .correct_ancestors_response .into_iter() - .filter(|(slot, _)| *slot > 92) - .collect(); + .find(|(slot, _)| *slot == if insert_even_or_odds == 0 { 94 } else { 93 }) + .unwrap(); assert_eq!( - repair_status.correct_ancestors_to_repair, - expected_mismatched_slots + repair_status.correct_ancestor_to_repair, + expected_mismatched ); } @@ -993,8 +1021,8 @@ pub mod tests { match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { DuplicateAncestorDecision::ContinueSearch(repair_status) => { assert_eq!( - repair_status.correct_ancestors_to_repair, - test_setup.correct_ancestors_response + repair_status.correct_ancestor_to_repair, + *test_setup.correct_ancestors_response.last().unwrap() ); } x => panic!("Incorrect decision {x:?}"), @@ -1032,15 +1060,16 @@ pub mod tests { // All the ancestors are mismatched, so we need to continue the search match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { DuplicateAncestorDecision::EarliestMismatchFound(repair_status) => { - // Expect to find everything after 92 in the `correct_ancestors_to_repair`. - let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup + // Expect to find the first slot after 92 in the `correct_ancestor_to_repair`. + let expected_mismatched = test_setup .correct_ancestors_response .into_iter() - .filter(|(slot, _)| *slot > 92) - .collect(); + .rev() + .find(|(slot, _)| *slot > 92) + .unwrap(); assert_eq!( - repair_status.correct_ancestors_to_repair, - expected_mismatched_slots + repair_status.correct_ancestor_to_repair, + expected_mismatched, ); } x => panic!("Incorrect decision {x:?}"), @@ -1059,10 +1088,16 @@ pub mod tests { .insert_bank_hash(slot, correct_hash, false); } - // All the ancestors matched + // All the ancestors matched, only the requested slot should be dumped + let DuplicateAncestorDecision::EarliestMismatchFound(repair_status) = run_add_multiple_correct_and_incorrect_responses( + vec![], + &mut test_setup, + ) else { + panic!("Incorrect decision") + }; assert_eq!( - run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup), - DuplicateAncestorDecision::AncestorsAllMatch + repair_status.correct_ancestor_to_repair, + *test_setup.correct_ancestors_response.first().unwrap() ); } @@ -1075,8 +1110,8 @@ pub mod tests { match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { DuplicateAncestorDecision::ContinueSearch(repair_status) => { assert_eq!( - repair_status.correct_ancestors_to_repair, - test_setup.correct_ancestors_response + repair_status.correct_ancestor_to_repair, + *test_setup.correct_ancestors_response.last().unwrap() ); } x => panic!("Incorrect decision {x:?}"), @@ -1097,10 +1132,16 @@ pub mod tests { .blockstore .add_tree(tree, true, true, 2, Hash::default()); - // All the ancestors matched + // All the ancestors matched, only the requested slot should be dumped + let DuplicateAncestorDecision::EarliestPrunedMismatchFound(repair_status) = run_add_multiple_correct_and_incorrect_responses( + vec![], + &mut test_setup, + ) else { + panic!("Incorrect decision") + }; assert_eq!( - run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup), - DuplicateAncestorDecision::AncestorsAllMatch + repair_status.correct_ancestor_to_repair, + *test_setup.correct_ancestors_response.first().unwrap() ); } @@ -1135,15 +1176,16 @@ pub mod tests { x => panic!("Incorrect decision {x:?}"), }; - // Expect to find everything after 93 in the `correct_ancestors_to_repair`. - let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup + // Expect to find first slot after 93 in the `correct_ancestor_to_repair`. + let expected_mismatched = test_setup .correct_ancestors_response .into_iter() - .filter(|(slot, _)| *slot > 93) - .collect(); + .rev() + .find(|(slot, _)| *slot > 93) + .unwrap(); assert_eq!( - repair_status.correct_ancestors_to_repair, - expected_mismatched_slots + repair_status.correct_ancestor_to_repair, + expected_mismatched, ); } @@ -1183,15 +1225,16 @@ pub mod tests { x => panic!("Incorrect decision {x:?}"), }; - // Expect to find everything after 92 in the `correct_ancestors_to_repair`. - let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup + // Expect to find first slot after 92 in the `correct_ancestor_to_repair`. + let expected_mismatched = test_setup .correct_ancestors_response .into_iter() - .filter(|(slot, _)| *slot >= 93) - .collect(); + .rev() + .find(|(slot, _)| *slot >= 93) + .unwrap(); assert_eq!( - repair_status.correct_ancestors_to_repair, - expected_mismatched_slots + repair_status.correct_ancestor_to_repair, + expected_mismatched, ); } } diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 4cdf2f022..fda0931a4 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -11,7 +11,7 @@ use { cluster_slots_service::cluster_slots::ClusterSlots, repair::{ ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService}, - duplicate_repair_status::AncestorDuplicateSlotsToRepair, + duplicate_repair_status::AncestorDuplicateSlotToRepair, outstanding_requests::OutstandingRequests, repair_weight::RepairWeight, serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY}, @@ -52,8 +52,8 @@ use { const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200); const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK; -pub type AncestorDuplicateSlotsSender = CrossbeamSender; -pub type AncestorDuplicateSlotsReceiver = CrossbeamReceiver; +pub type AncestorDuplicateSlotsSender = CrossbeamSender; +pub type AncestorDuplicateSlotsReceiver = CrossbeamReceiver; pub type ConfirmedSlotsSender = CrossbeamSender>; pub type ConfirmedSlotsReceiver = CrossbeamReceiver>; pub type DumpedSlotsSender = CrossbeamSender>; @@ -822,7 +822,7 @@ impl RepairService { .repair_request_duplicate_compute_best_peer(slot, cluster_slots, repair_validators) .ok(); let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { - correct_ancestors_to_repair: vec![(slot, Hash::default())], + correct_ancestor_to_repair: (slot, Hash::default()), repair_pubkey_and_addr, start_ts: timestamp(), }; @@ -1206,7 +1206,7 @@ mod test { let dead_slot = 9; let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); let duplicate_status = DuplicateSlotRepairStatus { - correct_ancestors_to_repair: vec![(dead_slot, Hash::default())], + correct_ancestor_to_repair: (dead_slot, Hash::default()), start_ts: std::u64::MAX, repair_pubkey_and_addr: None, }; @@ -1313,7 +1313,7 @@ mod test { // Not enough time has passed, should not update the // address let mut duplicate_status = DuplicateSlotRepairStatus { - correct_ancestors_to_repair: vec![(dead_slot, Hash::default())], + correct_ancestor_to_repair: (dead_slot, Hash::default()), start_ts: std::u64::MAX, repair_pubkey_and_addr: dummy_addr, }; @@ -1328,7 +1328,7 @@ mod test { // If the repair address is None, should try to update let mut duplicate_status = DuplicateSlotRepairStatus { - correct_ancestors_to_repair: vec![(dead_slot, Hash::default())], + correct_ancestor_to_repair: (dead_slot, Hash::default()), start_ts: std::u64::MAX, repair_pubkey_and_addr: None, }; @@ -1343,7 +1343,7 @@ mod test { // If sufficient time has passed, should try to update let mut duplicate_status = DuplicateSlotRepairStatus { - correct_ancestors_to_repair: vec![(dead_slot, Hash::default())], + correct_ancestor_to_repair: (dead_slot, Hash::default()), start_ts: timestamp() - MAX_DUPLICATE_WAIT_MS as u64, repair_pubkey_and_addr: dummy_addr, }; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 13db775a6..a03bb69b1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -22,7 +22,7 @@ use { repair::{ ancestor_hashes_service::AncestorHashesReplayUpdateSender, cluster_slot_state_verifier::*, - duplicate_repair_status::AncestorDuplicateSlotsToRepair, + duplicate_repair_status::AncestorDuplicateSlotToRepair, repair_service::{ AncestorDuplicateSlotsReceiver, DumpedSlotsSender, PopularPrunedForksReceiver, }, @@ -1416,46 +1416,42 @@ impl ReplayStage { purge_repair_slot_counter: &mut PurgeRepairSlotCounter, ) { let root = bank_forks.read().unwrap().root(); - for AncestorDuplicateSlotsToRepair { - slots_to_repair: maybe_repairable_duplicate_slots, + for AncestorDuplicateSlotToRepair { + slot_to_repair: (epoch_slots_frozen_slot, epoch_slots_frozen_hash), request_type, } in ancestor_duplicate_slots_receiver.try_iter() { warn!( - "{} ReplayStage notified of duplicate slots from ancestor hashes service but we observed as {}: {:?}", - pubkey, if request_type.is_pruned() {"pruned"} else {"dead"}, maybe_repairable_duplicate_slots, + "{} ReplayStage notified of duplicate slot from ancestor hashes service but we observed as {}: {:?}", + pubkey, if request_type.is_pruned() {"pruned"} else {"dead"}, (epoch_slots_frozen_slot, epoch_slots_frozen_hash), + ); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new_from_state( + epoch_slots_frozen_slot, + epoch_slots_frozen_hash, + gossip_duplicate_confirmed_slots, + fork_choice, + || progress.is_dead(epoch_slots_frozen_slot).unwrap_or(false), + || { + bank_forks + .read() + .unwrap() + .get(epoch_slots_frozen_slot) + .map(|b| b.hash()) + }, + request_type.is_pruned(), + ); + check_slot_agrees_with_cluster( + epoch_slots_frozen_slot, + root, + blockstore, + duplicate_slots_tracker, + epoch_slots_frozen_slots, + fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), ); - for (epoch_slots_frozen_slot, epoch_slots_frozen_hash) in - maybe_repairable_duplicate_slots.into_iter() - { - let epoch_slots_frozen_state = EpochSlotsFrozenState::new_from_state( - epoch_slots_frozen_slot, - epoch_slots_frozen_hash, - gossip_duplicate_confirmed_slots, - fork_choice, - || progress.is_dead(epoch_slots_frozen_slot).unwrap_or(false), - || { - bank_forks - .read() - .unwrap() - .get(epoch_slots_frozen_slot) - .map(|b| b.hash()) - }, - request_type.is_pruned(), - ); - check_slot_agrees_with_cluster( - epoch_slots_frozen_slot, - root, - blockstore, - duplicate_slots_tracker, - epoch_slots_frozen_slots, - fork_choice, - duplicate_slots_to_repair, - ancestor_hashes_replay_update_sender, - purge_repair_slot_counter, - SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), - ); - } } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index e089fee7e..efc385d45 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -16,10 +16,14 @@ use { validator::ValidatorConfig, }, solana_download_utils::download_snapshot_archive, + solana_entry::entry::create_ticks, solana_gossip::{contact_info::LegacyContactInfo, gossip_service::discover_cluster}, solana_ledger::{ - ancestor_iterator::AncestorIterator, bank_forks_utils, blockstore::Blockstore, - blockstore_processor::ProcessOptions, leader_schedule::FixedSchedule, + ancestor_iterator::AncestorIterator, + bank_forks_utils, + blockstore::{entries_to_test_shreds, Blockstore}, + blockstore_processor::ProcessOptions, + leader_schedule::FixedSchedule, use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup, }, solana_local_cluster::{ @@ -80,7 +84,6 @@ use { thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }, - trees::tr, }; mod common; @@ -4542,7 +4545,6 @@ fn test_duplicate_with_pruned_ancestor() { "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", "4mx9yoFBeYasDKBGDWCTWGJdWuJCKbgqmuP8bN9umybCh5Jzngw7KQxe99Rf5uzfyzgba1i65rJW4Wqk7Ab5S8ye", - "3zsEPEDsjfEay7te9XqNjRTCE7vwuT6u4DHzBJC19yp7GS8BuNRMRjnpVrKCBzb3d44kxc4KPGSHkCmk6tEfswCg", ] .iter() .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) @@ -4568,7 +4570,6 @@ fn test_duplicate_with_pruned_ancestor() { }); let mut validator_configs = make_identical_validator_configs(&default_config, num_nodes); - validator_configs[3].voting_disabled = true; // Don't let majority produce anything past the fork by tricking its leader schedule validator_configs[0].fixed_leader_schedule = Some(FixedSchedule { leader_schedule: Arc::new(create_custom_leader_schedule( @@ -4704,7 +4705,10 @@ fn test_duplicate_with_pruned_ancestor() { let last_majority_vote = wait_for_last_vote_in_tower_to_land_in_ledger(&majority_ledger_path, &majority_pubkey); - info!("Creating duplicate block built off of pruned branch for our node. Last majority vote {last_majority_vote}, Last minority vote {last_minority_vote}"); + info!( + "Creating duplicate block built off of pruned branch for our node. + Last majority vote {last_majority_vote}, Last minority vote {last_minority_vote}" + ); { { // Copy majority fork @@ -4723,14 +4727,21 @@ fn test_duplicate_with_pruned_ancestor() { // Change last block parent to chain off of (purged) minority fork info!("For our node, changing parent of {last_majority_vote} to {last_minority_vote}"); - purge_slots_with_count(&our_blockstore, last_majority_vote, 1); - our_blockstore.add_tree( - tr(last_minority_vote) / tr(last_majority_vote), - false, - true, - 64, + our_blockstore.clear_unconfirmed_slot(last_majority_vote); + let entries = create_ticks( + 64 * (std::cmp::max(1, last_majority_vote - last_minority_vote)), + 0, Hash::default(), ); + let shreds = entries_to_test_shreds( + &entries, + last_majority_vote, + last_minority_vote, + true, + 0, + true, // merkle_variant + ); + our_blockstore.insert_shreds(shreds, None, false).unwrap(); // Update the root to set minority fork back as pruned our_blockstore.set_last_root(fork_slot + fork_length);