Only send earliest mismatched ancestor to replay (#31842)

This commit is contained in:
Ashwin Sekar 2023-07-06 21:31:12 -07:00 committed by GitHub
parent 282e043177
commit c172dfd268
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 261 additions and 198 deletions

View File

@ -487,18 +487,16 @@ impl AncestorHashesService {
// another ancestor repair request from the earliest returned
// ancestor from this search.
let potential_slots_to_repair = ancestor_request_decision.slots_to_repair();
let potential_slot_to_repair = ancestor_request_decision.slot_to_repair();
// Now signal ReplayStage about the new updated slots. It's important to do this
// Now signal ReplayStage about the new updated slot. It's important to do this
// AFTER we've removed the ancestor_hashes_status_ref in case replay
// then sends us another dead slot signal based on the updates we are
// about to send.
if let Some(slots_to_repair) = potential_slots_to_repair {
if !slots_to_repair.is_empty() {
// Signal ReplayStage to dump the fork that is descended from
// `earliest_mismatched_slot_to_dump`.
let _ = ancestor_duplicate_slots_sender.send(slots_to_repair);
}
if let Some(slot_to_repair) = potential_slot_to_repair {
// Signal ReplayStage to dump the fork that is descended from
// `earliest_mismatched_slot_to_dump`.
let _ = ancestor_duplicate_slots_sender.send(slot_to_repair);
}
}
@ -1233,9 +1231,12 @@ mod test {
// Set up blockstore for responses
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
// Create slots [slot, slot + MAX_ANCESTOR_RESPONSES) with 5 shreds apiece
let (shreds, _) =
make_many_slot_entries(slot_to_query, MAX_ANCESTOR_RESPONSES as u64, 5);
// Create slots [slot - MAX_ANCESTOR_RESPONSES, slot) with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(
slot_to_query - MAX_ANCESTOR_RESPONSES as Slot + 1,
MAX_ANCESTOR_RESPONSES as u64,
5,
);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
@ -1346,6 +1347,9 @@ mod test {
}
}
/// Creates valid fork up to `dead_slot - 1`
/// For `dead_slot - 1` insert the wrong `bank_hash`
/// Mark `dead_slot` as dead
fn setup_dead_slot(
dead_slot: Slot,
correct_bank_hashes: &HashMap<Slot, Hash>,
@ -1377,13 +1381,15 @@ mod test {
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
for duplicate_confirmed_slot in 0..dead_slot {
for duplicate_confirmed_slot in 0..(dead_slot - 1) {
let bank_hash = correct_bank_hashes
.get(&duplicate_confirmed_slot)
.cloned()
.unwrap_or_else(Hash::new_unique);
blockstore.insert_bank_hash(duplicate_confirmed_slot, bank_hash, true);
}
// Insert wrong hash for `dead_slot - 1`
blockstore.insert_bank_hash(dead_slot - 1, Hash::new_unique(), false);
blockstore.set_dead_slot(dead_slot).unwrap();
replay_blockstore_components
}
@ -1532,16 +1538,16 @@ mod test {
assert_eq!(slot, dead_slot);
assert_eq!(
decision
.repair_status()
.unwrap()
.correct_ancestors_to_repair,
vec![(dead_slot, *correct_bank_hashes.get(&dead_slot).unwrap())]
decision.repair_status().unwrap().correct_ancestor_to_repair,
(
dead_slot - 1,
*correct_bank_hashes.get(&(dead_slot - 1)).unwrap()
)
);
assert_matches!(
(decision, request_type),
(
DuplicateAncestorDecision::EarliestAncestorNotFrozen(_),
DuplicateAncestorDecision::EarliestMismatchFound(_),
AncestorRequestType::DeadDuplicateConfirmed,
)
);
@ -1567,7 +1573,7 @@ mod test {
assert_eq!(ancestor_hashes_request_statuses.len(), 1);
assert!(ancestor_hashes_request_statuses.contains_key(&dead_slot));
// Should have received valid response since pruned doesn't check hashes
// Should have received valid response
let mut response_packet = response_receiver
.recv_timeout(Duration::from_millis(10_000))
.unwrap();
@ -1591,10 +1597,17 @@ mod test {
.unwrap();
assert_eq!(slot, dead_slot);
assert_eq!(
decision.repair_status().unwrap().correct_ancestor_to_repair,
(
dead_slot - 1,
*correct_bank_hashes.get(&(dead_slot - 1)).unwrap()
)
);
assert_matches!(
(decision, request_type),
(
DuplicateAncestorDecision::AncestorsAllMatch,
DuplicateAncestorDecision::EarliestMismatchFound(_),
AncestorRequestType::PopularPruned,
)
);
@ -2024,16 +2037,16 @@ mod test {
assert_eq!(slot, dead_slot);
assert_eq!(
decision
.repair_status()
.unwrap()
.correct_ancestors_to_repair,
vec![(dead_slot, *correct_bank_hashes.get(&dead_slot).unwrap())]
decision.repair_status().unwrap().correct_ancestor_to_repair,
(
dead_slot - 1,
*correct_bank_hashes.get(&(dead_slot - 1)).unwrap()
)
);
assert_matches!(
(decision, request_type),
(
DuplicateAncestorDecision::EarliestAncestorNotFrozen(_),
DuplicateAncestorDecision::EarliestMismatchFound(_),
AncestorRequestType::DeadDuplicateConfirmed
)
);

View File

@ -35,10 +35,8 @@ const RETRY_INTERVAL_SECONDS: usize = 5;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum DuplicateAncestorDecision {
InvalidSample,
AncestorsAllMatch,
SampleNotDuplicateConfirmed,
ContinueSearch(DuplicateSlotRepairStatus),
EarliestAncestorNotFrozen(DuplicateSlotRepairStatus),
EarliestMismatchFound(DuplicateSlotRepairStatus),
EarliestPrunedMismatchFound(DuplicateSlotRepairStatus),
}
@ -52,10 +50,7 @@ impl DuplicateAncestorDecision {
// so retry
| DuplicateAncestorDecision::SampleNotDuplicateConfirmed => true,
DuplicateAncestorDecision::AncestorsAllMatch => false,
DuplicateAncestorDecision::ContinueSearch(_status)
| DuplicateAncestorDecision::EarliestAncestorNotFrozen(_status)
| DuplicateAncestorDecision::EarliestMismatchFound(_status)
| DuplicateAncestorDecision::EarliestPrunedMismatchFound(_status) => false,
}
@ -64,11 +59,9 @@ impl DuplicateAncestorDecision {
pub fn repair_status(&self) -> Option<&DuplicateSlotRepairStatus> {
match self {
DuplicateAncestorDecision::InvalidSample
| DuplicateAncestorDecision::AncestorsAllMatch
| DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None,
DuplicateAncestorDecision::ContinueSearch(status)
| DuplicateAncestorDecision::EarliestAncestorNotFrozen(status)
| DuplicateAncestorDecision::EarliestMismatchFound(status)
| DuplicateAncestorDecision::EarliestPrunedMismatchFound(status) => Some(status),
}
@ -77,11 +70,9 @@ impl DuplicateAncestorDecision {
pub fn repair_status_mut(&mut self) -> Option<&mut DuplicateSlotRepairStatus> {
match self {
DuplicateAncestorDecision::InvalidSample
| DuplicateAncestorDecision::AncestorsAllMatch
| DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None,
DuplicateAncestorDecision::ContinueSearch(status)
| DuplicateAncestorDecision::EarliestAncestorNotFrozen(status)
| DuplicateAncestorDecision::EarliestMismatchFound(status)
| DuplicateAncestorDecision::EarliestPrunedMismatchFound(status) => Some(status),
}
@ -90,25 +81,21 @@ impl DuplicateAncestorDecision {
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct DuplicateSlotRepairStatus {
// Any ancestor slots that are either missing or are mismatched.
// The first ancestor slot that is mismatched.
// A mismatched ancestor slot is one that has been replayed (frozen)
// that has a different hash than the one agreed upon by the sampled peers.
//
//
// These are the slots that need to be dumped in order to repair the correct
// versions. The hash is None if the slot is not frozen, because it's:
// 1) Dead
// 2) Hasn't been replayed
// 3) We don't have the slot in our Ledger
pub correct_ancestors_to_repair: Vec<(Slot, Hash)>,
// This is the slots that needs to be dumped in order to replay the originally requested slot.
pub correct_ancestor_to_repair: (Slot, Hash),
pub repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>,
pub start_ts: u64,
}
impl DuplicateSlotRepairStatus {
fn new(correct_ancestors_to_repair: Vec<(Slot, Hash)>) -> Self {
fn new(correct_ancestor_to_repair: (Slot, Hash)) -> Self {
Self {
correct_ancestors_to_repair,
correct_ancestor_to_repair,
repair_pubkey_and_addr: None,
start_ts: timestamp(),
}
@ -128,19 +115,13 @@ impl AncestorRequestType {
}
}
pub struct AncestorDuplicateSlotsToRepair {
// Slots that `ancestor_hashes_service` found that need to be repaired
pub slots_to_repair: Vec<(Slot, Hash)>,
pub struct AncestorDuplicateSlotToRepair {
// Slot that `ancestor_hashes_service` found that needs to be repaired
pub slot_to_repair: (Slot, Hash),
// Condition that initiated this request
pub request_type: AncestorRequestType,
}
impl AncestorDuplicateSlotsToRepair {
pub fn is_empty(&self) -> bool {
self.slots_to_repair.is_empty()
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct AncestorRequestDecision {
// The slot that initiated this request
@ -152,7 +133,7 @@ pub struct AncestorRequestDecision {
}
impl AncestorRequestDecision {
pub fn slots_to_repair(self) -> Option<AncestorDuplicateSlotsToRepair> {
pub fn slot_to_repair(self) -> Option<AncestorDuplicateSlotToRepair> {
let Self {
request_type,
mut decision,
@ -160,8 +141,8 @@ impl AncestorRequestDecision {
} = self;
decision
.repair_status_mut()
.map(|status| AncestorDuplicateSlotsToRepair {
slots_to_repair: std::mem::take(&mut status.correct_ancestors_to_repair),
.map(|status| AncestorDuplicateSlotToRepair {
slot_to_repair: std::mem::take(&mut status.correct_ancestor_to_repair),
request_type,
})
}
@ -276,7 +257,7 @@ impl AncestorRequestStatus {
fn handle_sampled_validators_reached_agreement(
&mut self,
blockstore: &Blockstore,
mut agreed_response: Vec<(Slot, Hash)>,
agreed_response: Vec<(Slot, Hash)>,
) -> DuplicateAncestorDecision {
if agreed_response.is_empty() {
info!(
@ -350,15 +331,18 @@ impl AncestorRequestStatus {
return DuplicateAncestorDecision::InvalidSample;
}
(
Some((mismatch_i, DuplicateAncestorDecision::EarliestAncestorNotFrozen(_))),
Some((
mismatch_i,
DuplicateAncestorDecision::EarliestPrunedMismatchFound(_),
)),
true,
) => {
// In this case an earlier ancestor was not frozen in our blockstore,
// however this later ancestor is matching with our version. This most
// likely should never happen however it could happen if we initiate an
// ancestor_hashes request immediately after startup from snapshot, we will
// have a match for the snapshot bank, however we don't have any of the
// ancestors frozen
// ancestor_hashes request immediately after startup from snapshot on a
// pruned branch, we will have a match for the snapshot bank, however we
// don't have any of the ancestors frozen
let (mismatch_slot, mismatch_agreed_upon_hash) =
agreed_response[*mismatch_i];
info!(
@ -366,8 +350,9 @@ impl AncestorRequestStatus {
was agreed upon by the cluster with hash {mismatch_agreed_upon_hash} but not
frozen in our blockstore. However for a later ancestor {ancestor_slot} we have
agreement on {our_frozen_hash} as the bank hash. This should only be possible if
we have just started from snapshot and immediately encountered a duplicate block,
otherwise something is seriously wrong. Continuing with the repair",
we have just started from snapshot and immediately encountered a duplicate block on
a popular pruned fork, otherwise something is seriously wrong. Continuing with the
repair",
self.requested_mismatched_slot
);
}
@ -425,14 +410,14 @@ impl AncestorRequestStatus {
// ancestors of our version of `self.requested_mismatched_slot`
//
// ```
// 1 - 2 - 3 - 5' - 6' (our current fork)
// 1 - 2 - 3 - 5' - 6 (our current fork)
// /
// 0
// \
// 1 - 2 - 4 - 5 - 6 (cluster agreed fork)
// ```
//
// In this case, if we make a AncestorsHashes(6) request for our dead slot 6', we may
// In this case, if we make a AncestorsHashes(6) request for our dead slot 6, we may
// get a response with slot `4` in it, which is a slot that doesn't have a frozen
// hash in blockstore yet because either:
//
@ -451,25 +436,47 @@ impl AncestorRequestStatus {
// There are two cases:
// 1) The first such mismatch `first_mismatch` appears somewhere BEFORE the slot `4` that is
// missing from our blockstore.
// 2) The first such mismatch `first_mismatch` appears immediately AFTER the slot `4` that is
// 2) The first such mismatch `first_mismatch` appears AFTER the slot `4` that is
// missing from our blockstore.
//
// Because we know any mismatches will also trigger the mismatch casing earlier in
// the function, we will return`EarliestMismatchFound(first_mismatch)`. This will
// cause us to dump and repair `first_mismatch` and all its descendants, which should
// be the right behavior in both above cases.
// For (1), the earlier cases in this function will cause us to detect the
// mismatch, and stop until that slot is dumped and repaired.
// For (2), we continue searching until we find the mismatch. There must be a
// mismatch for us to have played the requested slot, and that mismatch will be
// found or in case of no mismatch the last slot (requested slot) will be dumped
// and repaired.
//
// In the rare case of multiple incorrect ancestry, where multiple cases of (1) and
// (2) are present, the accompanying code in replay `dump_then_repair`, dumps
// descendants of the earliest mismatch that also have frozen hashes. Failing that,
// in extreme cases another round of ancestor or replay dump then repair will be
// necessary to fix the fork.
//
// On example of an extreme case requiring multiple rounds of dump then repair is as follows:
//
// ```
// 1 - 2 - 3 - 5' - 6' (our current fork)
// /
// 0
// \
// 1 - 2 - 4 - 5 - 6 (cluster agreed fork)
// ```
//
// In this case suppose we have the wrong version of 5, the correct version is
// supposed to chain to 4, and we also have the wrong version of 6, both versions
// chain to 5 however ours is the duplicate.
//
// The first round of ancestor repair will detect 5' using case (2) above, and
// replay will dump then repair it. Upon successful replay of 5, we see that 6 is
// still dead or incorrect hash. This will require another round of ancestor or
// replay dump then repair to fix.
warn!(
"Blockstore is missing frozen hash for slot {},
which the cluster claims is an ancestor of dead slot {}. Potentially
our version of the dead slot chains to the wrong fork!",
ancestor_slot, self.requested_mismatched_slot
);
earliest_erroring_ancestor = Some((
agreed_response.len() - i - 1,
DuplicateAncestorDecision::EarliestAncestorNotFrozen(
DuplicateSlotRepairStatus::default(),
),
));
}
last_ancestor = *ancestor_slot;
}
@ -481,20 +488,26 @@ impl AncestorRequestStatus {
if earliest_erroring_ancestor_index == agreed_response.len() - 1 {
// If the earliest ancestor is missing or a mismatch, then we need to keep searching
// for earlier mismatches
let repair_status = DuplicateSlotRepairStatus::new(agreed_response);
let repair_status =
DuplicateSlotRepairStatus::new(*agreed_response.last().unwrap());
DuplicateAncestorDecision::ContinueSearch(repair_status)
} else {
// We only need to look through the first `earliest_erroring_ancestor_index + 1`
// elements and dump/repair any mismatches.
agreed_response.truncate(earliest_erroring_ancestor_index + 1);
// We only need to dump and repair the earliest mismatching ancestor.
let repair_status = decision.repair_status_mut().unwrap();
repair_status.correct_ancestors_to_repair = agreed_response;
repair_status.correct_ancestor_to_repair =
agreed_response[earliest_erroring_ancestor_index];
decision
}
} else {
// If we haven't returned by now, this implies all the ancestors matched our versions
// of those ancestors. Only slot to dump and repair is `self.requested_mismatched_slot`
DuplicateAncestorDecision::AncestorsAllMatch
// of those ancestors, or are missing or incomplete. Only slot to dump and repair is
// `self.requested_mismatched_slot`
let repair_status = DuplicateSlotRepairStatus::new(*agreed_response.first().unwrap());
if self.request_type.is_pruned() {
DuplicateAncestorDecision::EarliestPrunedMismatchFound(repair_status)
} else {
DuplicateAncestorDecision::EarliestMismatchFound(repair_status)
}
}
}
@ -869,18 +882,44 @@ pub mod tests {
.insert_bank_hash(slot, correct_hash, false);
}
// We don't have the earlier ancestors because we just started up, however sample should
// not be rejected as invalid.
let DuplicateAncestorDecision::EarliestMismatchFound(repair_status) = run_add_multiple_correct_and_incorrect_responses(
vec![],
&mut test_setup,
) else {
panic!("Incorrect decision")
};
assert_eq!(
repair_status.correct_ancestor_to_repair,
*test_setup.correct_ancestors_response.first().unwrap()
);
}
#[test]
fn test_add_multiple_responses_start_from_snapshot_missing_then_mismatch() {
let request_slot = 100;
let mut test_setup = setup_add_response_test(request_slot, 10);
// Only insert the later half in our blockstore, however make mistakes
for &(slot, _) in test_setup.correct_ancestors_response.iter().take(5) {
test_setup
.blockstore
.insert_bank_hash(slot, Hash::new_unique(), false);
}
// We don't have the earlier ancestors because we just started up, however sample should
// not be rejected as invalid.
let repair_status =
match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) {
DuplicateAncestorDecision::ContinueSearch(repair_status) => repair_status,
DuplicateAncestorDecision::EarliestMismatchFound(repair_status) => repair_status,
x => panic!("Incorrect decision {x:?}"),
};
// Expect to find everything in the `correct_ancestors_to_repair`.
// Expect to find the first mismatch that is present
assert_eq!(
repair_status.correct_ancestors_to_repair,
test_setup.correct_ancestors_response
repair_status.correct_ancestor_to_repair,
test_setup.correct_ancestors_response[4]
);
}
@ -899,18 +938,16 @@ pub mod tests {
)];
// We have no entries in the blockstore, so all the ancestors will be missing
match run_add_multiple_correct_and_incorrect_responses(
let DuplicateAncestorDecision::EarliestMismatchFound(repair_status) = run_add_multiple_correct_and_incorrect_responses(
desired_incorrect_responses,
&mut test_setup,
) {
DuplicateAncestorDecision::ContinueSearch(repair_status) => {
assert_eq!(
repair_status.correct_ancestors_to_repair,
test_setup.correct_ancestors_response
);
}
x => panic!("Incorrect decision {x:?}"),
) else {
panic!("Incorrect decision")
};
assert_eq!(
repair_status.correct_ancestor_to_repair,
*test_setup.correct_ancestors_response.first().unwrap()
);
}
#[test]
@ -939,11 +976,11 @@ pub mod tests {
//
// 1) If we skip slot 93, and insert mismatched slot 94 we're testing the order of
// events `Not frozen -> Mismatched hash` which should return
// `EarliestAncestorNotFrozen`
// `EarliestMismatchFound(94)`
//
// 2) If we insert mismatched slot 93, and skip slot 94 we're testing the order of
// events `Mismatched hash -> Not frozen`, which should return
// `EarliestMismatchFound`
// `EarliestMismatchFound(93)`
test_setup
.blockstore
.insert_bank_hash(slot, Hash::new_unique(), false);
@ -952,28 +989,19 @@ pub mod tests {
let repair_status =
match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) {
DuplicateAncestorDecision::EarliestMismatchFound(repair_status)
if insert_even_or_odds == 1 =>
{
repair_status
}
DuplicateAncestorDecision::EarliestAncestorNotFrozen(repair_status)
if insert_even_or_odds == 0 =>
{
repair_status
}
DuplicateAncestorDecision::EarliestMismatchFound(repair_status) => repair_status,
x => panic!("Incorrect decision {x:?}"),
};
// Expect to find everything after 92 in the `correct_ancestors_to_repair`.
let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup
// Expect to find 93 or 94 (see comment above)
let expected_mismatched = test_setup
.correct_ancestors_response
.into_iter()
.filter(|(slot, _)| *slot > 92)
.collect();
.find(|(slot, _)| *slot == if insert_even_or_odds == 0 { 94 } else { 93 })
.unwrap();
assert_eq!(
repair_status.correct_ancestors_to_repair,
expected_mismatched_slots
repair_status.correct_ancestor_to_repair,
expected_mismatched
);
}
@ -993,8 +1021,8 @@ pub mod tests {
match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) {
DuplicateAncestorDecision::ContinueSearch(repair_status) => {
assert_eq!(
repair_status.correct_ancestors_to_repair,
test_setup.correct_ancestors_response
repair_status.correct_ancestor_to_repair,
*test_setup.correct_ancestors_response.last().unwrap()
);
}
x => panic!("Incorrect decision {x:?}"),
@ -1032,15 +1060,16 @@ pub mod tests {
// All the ancestors are mismatched, so we need to continue the search
match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) {
DuplicateAncestorDecision::EarliestMismatchFound(repair_status) => {
// Expect to find everything after 92 in the `correct_ancestors_to_repair`.
let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup
// Expect to find the first slot after 92 in the `correct_ancestor_to_repair`.
let expected_mismatched = test_setup
.correct_ancestors_response
.into_iter()
.filter(|(slot, _)| *slot > 92)
.collect();
.rev()
.find(|(slot, _)| *slot > 92)
.unwrap();
assert_eq!(
repair_status.correct_ancestors_to_repair,
expected_mismatched_slots
repair_status.correct_ancestor_to_repair,
expected_mismatched,
);
}
x => panic!("Incorrect decision {x:?}"),
@ -1059,10 +1088,16 @@ pub mod tests {
.insert_bank_hash(slot, correct_hash, false);
}
// All the ancestors matched
// All the ancestors matched, only the requested slot should be dumped
let DuplicateAncestorDecision::EarliestMismatchFound(repair_status) = run_add_multiple_correct_and_incorrect_responses(
vec![],
&mut test_setup,
) else {
panic!("Incorrect decision")
};
assert_eq!(
run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup),
DuplicateAncestorDecision::AncestorsAllMatch
repair_status.correct_ancestor_to_repair,
*test_setup.correct_ancestors_response.first().unwrap()
);
}
@ -1075,8 +1110,8 @@ pub mod tests {
match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) {
DuplicateAncestorDecision::ContinueSearch(repair_status) => {
assert_eq!(
repair_status.correct_ancestors_to_repair,
test_setup.correct_ancestors_response
repair_status.correct_ancestor_to_repair,
*test_setup.correct_ancestors_response.last().unwrap()
);
}
x => panic!("Incorrect decision {x:?}"),
@ -1097,10 +1132,16 @@ pub mod tests {
.blockstore
.add_tree(tree, true, true, 2, Hash::default());
// All the ancestors matched
// All the ancestors matched, only the requested slot should be dumped
let DuplicateAncestorDecision::EarliestPrunedMismatchFound(repair_status) = run_add_multiple_correct_and_incorrect_responses(
vec![],
&mut test_setup,
) else {
panic!("Incorrect decision")
};
assert_eq!(
run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup),
DuplicateAncestorDecision::AncestorsAllMatch
repair_status.correct_ancestor_to_repair,
*test_setup.correct_ancestors_response.first().unwrap()
);
}
@ -1135,15 +1176,16 @@ pub mod tests {
x => panic!("Incorrect decision {x:?}"),
};
// Expect to find everything after 93 in the `correct_ancestors_to_repair`.
let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup
// Expect to find first slot after 93 in the `correct_ancestor_to_repair`.
let expected_mismatched = test_setup
.correct_ancestors_response
.into_iter()
.filter(|(slot, _)| *slot > 93)
.collect();
.rev()
.find(|(slot, _)| *slot > 93)
.unwrap();
assert_eq!(
repair_status.correct_ancestors_to_repair,
expected_mismatched_slots
repair_status.correct_ancestor_to_repair,
expected_mismatched,
);
}
@ -1183,15 +1225,16 @@ pub mod tests {
x => panic!("Incorrect decision {x:?}"),
};
// Expect to find everything after 92 in the `correct_ancestors_to_repair`.
let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup
// Expect to find first slot after 92 in the `correct_ancestor_to_repair`.
let expected_mismatched = test_setup
.correct_ancestors_response
.into_iter()
.filter(|(slot, _)| *slot >= 93)
.collect();
.rev()
.find(|(slot, _)| *slot >= 93)
.unwrap();
assert_eq!(
repair_status.correct_ancestors_to_repair,
expected_mismatched_slots
repair_status.correct_ancestor_to_repair,
expected_mismatched,
);
}
}

View File

@ -11,7 +11,7 @@ use {
cluster_slots_service::cluster_slots::ClusterSlots,
repair::{
ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService},
duplicate_repair_status::AncestorDuplicateSlotsToRepair,
duplicate_repair_status::AncestorDuplicateSlotToRepair,
outstanding_requests::OutstandingRequests,
repair_weight::RepairWeight,
serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY},
@ -52,8 +52,8 @@ use {
const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200);
const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK;
pub type AncestorDuplicateSlotsSender = CrossbeamSender<AncestorDuplicateSlotsToRepair>;
pub type AncestorDuplicateSlotsReceiver = CrossbeamReceiver<AncestorDuplicateSlotsToRepair>;
pub type AncestorDuplicateSlotsSender = CrossbeamSender<AncestorDuplicateSlotToRepair>;
pub type AncestorDuplicateSlotsReceiver = CrossbeamReceiver<AncestorDuplicateSlotToRepair>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type DumpedSlotsSender = CrossbeamSender<Vec<(Slot, Hash)>>;
@ -822,7 +822,7 @@ impl RepairService {
.repair_request_duplicate_compute_best_peer(slot, cluster_slots, repair_validators)
.ok();
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
correct_ancestors_to_repair: vec![(slot, Hash::default())],
correct_ancestor_to_repair: (slot, Hash::default()),
repair_pubkey_and_addr,
start_ts: timestamp(),
};
@ -1206,7 +1206,7 @@ mod test {
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let duplicate_status = DuplicateSlotRepairStatus {
correct_ancestors_to_repair: vec![(dead_slot, Hash::default())],
correct_ancestor_to_repair: (dead_slot, Hash::default()),
start_ts: std::u64::MAX,
repair_pubkey_and_addr: None,
};
@ -1313,7 +1313,7 @@ mod test {
// Not enough time has passed, should not update the
// address
let mut duplicate_status = DuplicateSlotRepairStatus {
correct_ancestors_to_repair: vec![(dead_slot, Hash::default())],
correct_ancestor_to_repair: (dead_slot, Hash::default()),
start_ts: std::u64::MAX,
repair_pubkey_and_addr: dummy_addr,
};
@ -1328,7 +1328,7 @@ mod test {
// If the repair address is None, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus {
correct_ancestors_to_repair: vec![(dead_slot, Hash::default())],
correct_ancestor_to_repair: (dead_slot, Hash::default()),
start_ts: std::u64::MAX,
repair_pubkey_and_addr: None,
};
@ -1343,7 +1343,7 @@ mod test {
// If sufficient time has passed, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus {
correct_ancestors_to_repair: vec![(dead_slot, Hash::default())],
correct_ancestor_to_repair: (dead_slot, Hash::default()),
start_ts: timestamp() - MAX_DUPLICATE_WAIT_MS as u64,
repair_pubkey_and_addr: dummy_addr,
};

View File

@ -22,7 +22,7 @@ use {
repair::{
ancestor_hashes_service::AncestorHashesReplayUpdateSender,
cluster_slot_state_verifier::*,
duplicate_repair_status::AncestorDuplicateSlotsToRepair,
duplicate_repair_status::AncestorDuplicateSlotToRepair,
repair_service::{
AncestorDuplicateSlotsReceiver, DumpedSlotsSender, PopularPrunedForksReceiver,
},
@ -1416,46 +1416,42 @@ impl ReplayStage {
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) {
let root = bank_forks.read().unwrap().root();
for AncestorDuplicateSlotsToRepair {
slots_to_repair: maybe_repairable_duplicate_slots,
for AncestorDuplicateSlotToRepair {
slot_to_repair: (epoch_slots_frozen_slot, epoch_slots_frozen_hash),
request_type,
} in ancestor_duplicate_slots_receiver.try_iter()
{
warn!(
"{} ReplayStage notified of duplicate slots from ancestor hashes service but we observed as {}: {:?}",
pubkey, if request_type.is_pruned() {"pruned"} else {"dead"}, maybe_repairable_duplicate_slots,
"{} ReplayStage notified of duplicate slot from ancestor hashes service but we observed as {}: {:?}",
pubkey, if request_type.is_pruned() {"pruned"} else {"dead"}, (epoch_slots_frozen_slot, epoch_slots_frozen_hash),
);
let epoch_slots_frozen_state = EpochSlotsFrozenState::new_from_state(
epoch_slots_frozen_slot,
epoch_slots_frozen_hash,
gossip_duplicate_confirmed_slots,
fork_choice,
|| progress.is_dead(epoch_slots_frozen_slot).unwrap_or(false),
|| {
bank_forks
.read()
.unwrap()
.get(epoch_slots_frozen_slot)
.map(|b| b.hash())
},
request_type.is_pruned(),
);
check_slot_agrees_with_cluster(
epoch_slots_frozen_slot,
root,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
);
for (epoch_slots_frozen_slot, epoch_slots_frozen_hash) in
maybe_repairable_duplicate_slots.into_iter()
{
let epoch_slots_frozen_state = EpochSlotsFrozenState::new_from_state(
epoch_slots_frozen_slot,
epoch_slots_frozen_hash,
gossip_duplicate_confirmed_slots,
fork_choice,
|| progress.is_dead(epoch_slots_frozen_slot).unwrap_or(false),
|| {
bank_forks
.read()
.unwrap()
.get(epoch_slots_frozen_slot)
.map(|b| b.hash())
},
request_type.is_pruned(),
);
check_slot_agrees_with_cluster(
epoch_slots_frozen_slot,
root,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state),
);
}
}
}

View File

@ -16,10 +16,14 @@ use {
validator::ValidatorConfig,
},
solana_download_utils::download_snapshot_archive,
solana_entry::entry::create_ticks,
solana_gossip::{contact_info::LegacyContactInfo, gossip_service::discover_cluster},
solana_ledger::{
ancestor_iterator::AncestorIterator, bank_forks_utils, blockstore::Blockstore,
blockstore_processor::ProcessOptions, leader_schedule::FixedSchedule,
ancestor_iterator::AncestorIterator,
bank_forks_utils,
blockstore::{entries_to_test_shreds, Blockstore},
blockstore_processor::ProcessOptions,
leader_schedule::FixedSchedule,
use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
},
solana_local_cluster::{
@ -80,7 +84,6 @@ use {
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
},
trees::tr,
};
mod common;
@ -4542,7 +4545,6 @@ fn test_duplicate_with_pruned_ancestor() {
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
"4mx9yoFBeYasDKBGDWCTWGJdWuJCKbgqmuP8bN9umybCh5Jzngw7KQxe99Rf5uzfyzgba1i65rJW4Wqk7Ab5S8ye",
"3zsEPEDsjfEay7te9XqNjRTCE7vwuT6u4DHzBJC19yp7GS8BuNRMRjnpVrKCBzb3d44kxc4KPGSHkCmk6tEfswCg",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
@ -4568,7 +4570,6 @@ fn test_duplicate_with_pruned_ancestor() {
});
let mut validator_configs = make_identical_validator_configs(&default_config, num_nodes);
validator_configs[3].voting_disabled = true;
// Don't let majority produce anything past the fork by tricking its leader schedule
validator_configs[0].fixed_leader_schedule = Some(FixedSchedule {
leader_schedule: Arc::new(create_custom_leader_schedule(
@ -4704,7 +4705,10 @@ fn test_duplicate_with_pruned_ancestor() {
let last_majority_vote =
wait_for_last_vote_in_tower_to_land_in_ledger(&majority_ledger_path, &majority_pubkey);
info!("Creating duplicate block built off of pruned branch for our node. Last majority vote {last_majority_vote}, Last minority vote {last_minority_vote}");
info!(
"Creating duplicate block built off of pruned branch for our node.
Last majority vote {last_majority_vote}, Last minority vote {last_minority_vote}"
);
{
{
// Copy majority fork
@ -4723,14 +4727,21 @@ fn test_duplicate_with_pruned_ancestor() {
// Change last block parent to chain off of (purged) minority fork
info!("For our node, changing parent of {last_majority_vote} to {last_minority_vote}");
purge_slots_with_count(&our_blockstore, last_majority_vote, 1);
our_blockstore.add_tree(
tr(last_minority_vote) / tr(last_majority_vote),
false,
true,
64,
our_blockstore.clear_unconfirmed_slot(last_majority_vote);
let entries = create_ticks(
64 * (std::cmp::max(1, last_majority_vote - last_minority_vote)),
0,
Hash::default(),
);
let shreds = entries_to_test_shreds(
&entries,
last_majority_vote,
last_minority_vote,
true,
0,
true, // merkle_variant
);
our_blockstore.insert_shreds(shreds, None, false).unwrap();
// Update the root to set minority fork back as pruned
our_blockstore.set_last_root(fork_slot + fork_length);