diff --git a/Cargo.lock b/Cargo.lock index b97fe070b7..45e657919b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6082,6 +6082,7 @@ dependencies = [ "solana-tpu-client", "solana-vote-program", "tempfile", + "trees", ] [[package]] diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 5e50b58681..7e421facc2 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -1,10 +1,12 @@ use { crate::{ cluster_slots::ClusterSlots, - duplicate_repair_status::{AncestorRequestStatus, DuplicateAncestorDecision}, + duplicate_repair_status::{ + AncestorRequestDecision, AncestorRequestStatus, AncestorRequestType, + }, outstanding_requests::OutstandingRequests, packet_threshold::DynamicPacketToProcessThreshold, - repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup}, + repair_service::{AncestorDuplicateSlotsSender, RepairInfo, RepairStatsGroup}, replay_stage::DUPLICATE_THRESHOLD, result::{Error, Result}, serve_repair::{ @@ -46,6 +48,14 @@ use { pub enum AncestorHashesReplayUpdate { Dead(Slot), DeadDuplicateConfirmed(Slot), + // `Slot` belongs to a fork we have pruned. We have observed that this fork is "popular" aka + // reached 52+% stake through votes in turbine/gossip including votes for descendants. These + // votes are hash agnostic since we have not replayed `Slot` so we can never say for certainty + // that this fork has reached duplicate confirmation, but it is suspected to have. This + // indicates that there is most likely a block with invalid ancestry present and thus we + // collect an ancestor sample to resolve this issue. `Slot` is the deepest slot in this fork + // that is popular, so any duplicate problems will be for `Slot` or one of it's ancestors. + PopularPrunedFork(Slot), } impl AncestorHashesReplayUpdate { @@ -53,6 +63,7 @@ impl AncestorHashesReplayUpdate { match self { AncestorHashesReplayUpdate::Dead(slot) => *slot, AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot) => *slot, + AncestorHashesReplayUpdate::PopularPrunedFork(slot) => *slot, } } } @@ -62,8 +73,8 @@ pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2; pub type AncestorHashesReplayUpdateSender = Sender; pub type AncestorHashesReplayUpdateReceiver = Receiver; -type RetryableSlotsSender = Sender; -type RetryableSlotsReceiver = Receiver; +type RetryableSlotsSender = Sender<(Slot, AncestorRequestType)>; +type RetryableSlotsReceiver = Receiver<(Slot, AncestorRequestType)>; type OutstandingAncestorHashesRepairs = OutstandingRequests; #[derive(Default)] @@ -175,7 +186,7 @@ impl AncestorHashesService { blockstore, outstanding_requests.clone(), exit.clone(), - repair_info.duplicate_slots_reset_sender.clone(), + repair_info.ancestor_duplicate_slots_sender.clone(), retryable_slots_sender, repair_info.cluster_info.clone(), ancestor_hashes_request_socket.clone(), @@ -209,7 +220,7 @@ impl AncestorHashesService { blockstore: Arc, outstanding_requests: Arc>, exit: Arc, - duplicate_slots_reset_sender: DuplicateSlotsResetSender, + ancestor_duplicate_slots_sender: AncestorDuplicateSlotsSender, retryable_slots_sender: RetryableSlotsSender, cluster_info: Arc, ancestor_socket: Arc, @@ -229,7 +240,7 @@ impl AncestorHashesService { &outstanding_requests, &mut stats, &mut packet_threshold, - &duplicate_slots_reset_sender, + &ancestor_duplicate_slots_sender, &retryable_slots_sender, &keypair, &ancestor_socket, @@ -259,7 +270,7 @@ impl AncestorHashesService { outstanding_requests: &RwLock, stats: &mut AncestorHashesResponsesStats, packet_threshold: &mut DynamicPacketToProcessThreshold, - duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + ancestor_duplicate_slots_sender: &AncestorDuplicateSlotsSender, retryable_slots_sender: &RetryableSlotsSender, keypair: &Keypair, ancestor_socket: &UdpSocket, @@ -289,7 +300,7 @@ impl AncestorHashesService { stats, outstanding_requests, blockstore, - duplicate_slots_reset_sender, + ancestor_duplicate_slots_sender, retryable_slots_sender, keypair, ancestor_socket, @@ -305,13 +316,13 @@ impl AncestorHashesService { stats: &mut AncestorHashesResponsesStats, outstanding_requests: &RwLock, blockstore: &Blockstore, - duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + ancestor_duplicate_slots_sender: &AncestorDuplicateSlotsSender, retryable_slots_sender: &RetryableSlotsSender, keypair: &Keypair, ancestor_socket: &UdpSocket, ) { packet_batch.iter().for_each(|packet| { - let decision = Self::verify_and_process_ancestor_response( + let ancestor_request_decision = Self::verify_and_process_ancestor_response( packet, ancestor_hashes_request_statuses, stats, @@ -320,11 +331,10 @@ impl AncestorHashesService { keypair, ancestor_socket, ); - if let Some((slot, decision)) = decision { + if let Some(ancestor_request_decision) = ancestor_request_decision { Self::handle_ancestor_request_decision( - slot, - decision, - duplicate_slots_reset_sender, + ancestor_request_decision, + ancestor_duplicate_slots_sender, retryable_slots_sender, ); } @@ -342,7 +352,7 @@ impl AncestorHashesService { blockstore: &Blockstore, keypair: &Keypair, ancestor_socket: &UdpSocket, - ) -> Option<(Slot, DuplicateAncestorDecision)> { + ) -> Option { let from_addr = packet.meta().socket_addr(); let packet_data = match packet.data(..) { Some(data) => data, @@ -403,6 +413,7 @@ impl AncestorHashesService { hashes.clone(), blockstore, ); + let request_type = ancestor_hashes_status_ref.get().request_type(); if decision.is_some() { // Once a request is completed, remove it from the map so that new // requests for the same slot can be made again if necessary. It's @@ -414,7 +425,11 @@ impl AncestorHashesService { // In which case we wouldn't want to delete the newly inserted entry here. ancestor_hashes_status_ref.remove(); } - decision.map(|decision| (request_slot, decision)) + decision.map(|decision| AncestorRequestDecision { + slot: request_slot, + decision, + request_type, + }) } else { None } @@ -442,43 +457,43 @@ impl AncestorHashesService { } fn handle_ancestor_request_decision( - slot: Slot, - decision: DuplicateAncestorDecision, - duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + ancestor_request_decision: AncestorRequestDecision, + ancestor_duplicate_slots_sender: &AncestorDuplicateSlotsSender, retryable_slots_sender: &RetryableSlotsSender, ) { - if decision.is_retryable() { - let _ = retryable_slots_sender.send(slot); + if ancestor_request_decision.is_retryable() { + let _ = retryable_slots_sender.send(( + ancestor_request_decision.slot, + ancestor_request_decision.request_type, + )); } - let potential_slots_to_dump = { - // TODO: In the case of DuplicateAncestorDecision::ContinueSearch - // This means all the ancestors were mismatched, which - // means the earliest mismatched ancestor has yet to be found. - // - // In the best case scenario, this means after ReplayStage dumps - // the earliest known ancestor `A` here, and then repairs `A`, - // because we may still have the incorrect version of some ancestor - // of `A`, we will mark `A` as dead and then continue the search - // protocol through another round of ancestor repairs. - // - // However this process is a bit slow, so in an ideal world, the - // protocol could be extended to keep searching by making - // another ancestor repair request from the earliest returned - // ancestor from this search. - decision - .repair_status() - .map(|status| status.correct_ancestors_to_repair.clone()) - }; + + // TODO: In the case of DuplicateAncestorDecision::ContinueSearch + // This means all the ancestors were mismatched, which + // means the earliest mismatched ancestor has yet to be found. + // + // In the best case scenario, this means after ReplayStage dumps + // the earliest known ancestor `A` here, and then repairs `A`, + // because we may still have the incorrect version of some ancestor + // of `A`, we will mark `A` as dead and then continue the search + // protocol through another round of ancestor repairs. + // + // However this process is a bit slow, so in an ideal world, the + // protocol could be extended to keep searching by making + // another ancestor repair request from the earliest returned + // ancestor from this search. + + let potential_slots_to_repair = ancestor_request_decision.slots_to_repair(); // Now signal ReplayStage about the new updated slots. It's important to do this // AFTER we've removed the ancestor_hashes_status_ref in case replay // then sends us another dead slot signal based on the updates we are // about to send. - if let Some(potential_slots_to_dump) = potential_slots_to_dump { - // Signal ReplayStage to dump the fork that is descended from - // `earliest_mismatched_slot_to_dump`. - if !potential_slots_to_dump.is_empty() { - let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump); + if let Some(slots_to_repair) = potential_slots_to_repair { + if !slots_to_repair.is_empty() { + // Signal ReplayStage to dump the fork that is descended from + // `earliest_mismatched_slot_to_dump`. + let _ = ancestor_duplicate_slots_sender.send(slots_to_repair); } } } @@ -488,6 +503,7 @@ impl AncestorHashesService { ancestor_hashes_request_statuses: &DashMap, dead_slot_pool: &mut HashSet, repairable_dead_slot_pool: &mut HashSet, + popular_pruned_slot_pool: &mut HashSet, root_slot: Slot, ) { for update in ancestor_hashes_replay_update_receiver.try_iter() { @@ -499,14 +515,57 @@ impl AncestorHashesService { AncestorHashesReplayUpdate::Dead(dead_slot) => { if repairable_dead_slot_pool.contains(&dead_slot) { return; + } else if popular_pruned_slot_pool.contains(&dead_slot) { + // If `dead_slot` is also part of a popular pruned fork, this implies that the slot has + // become `EpochSlotsFrozen` as 52% had to have frozen some version of this slot in order + // to vote on it / it's descendants as observed in `repair_weight`. + // This fits the alternate criteria we use in `find_epoch_slots_frozen_dead_slots` + // so we can upgrade it to `repairable_dead_slot_pool`. + popular_pruned_slot_pool.remove(&dead_slot); + repairable_dead_slot_pool.insert(dead_slot); } else { dead_slot_pool.insert(dead_slot); } } AncestorHashesReplayUpdate::DeadDuplicateConfirmed(dead_slot) => { + // If this slot was previously queued as a popular pruned slot, prefer to + // instead process it as dead duplicate confirmed. + // In general we prefer to use the dead duplicate confirmed pathway + // whenever possible as it allows us to compare frozen hashes of ancestors with + // the cluster rather than just comparing ancestry links. + popular_pruned_slot_pool.remove(&dead_slot); + dead_slot_pool.remove(&dead_slot); repairable_dead_slot_pool.insert(dead_slot); } + AncestorHashesReplayUpdate::PopularPrunedFork(pruned_slot) => { + // The `dead_slot_pool` or `repairable_dead_slot_pool` can already contain this slot already + // if the below order of events happens: + // + // 1. Slot is marked dead/duplicate confirmed + // 2. Slot is pruned + if dead_slot_pool.contains(&pruned_slot) { + // Similar to the above case where `pruned_slot` was first pruned and then marked + // dead, since `pruned_slot` is part of a popular pruned fork it has become + // `EpochSlotsFrozen` as 52% must have frozen a version of this slot in + // order to vote. + // This fits the alternate criteria we use in `find_epoch_slots_frozen_dead_slots` + // so we can upgrade it to `repairable_dead_slot_pool`. + info!("{pruned_slot} is part of a popular pruned fork however we previously marked it as dead. + Upgrading as dead duplicate confirmed"); + dead_slot_pool.remove(&pruned_slot); + repairable_dead_slot_pool.insert(pruned_slot); + } else if repairable_dead_slot_pool.contains(&pruned_slot) { + // If we already observed `pruned_slot` as dead duplicate confirmed, we + // ignore the additional information that `pruned_slot` is popular pruned. + // This is similar to the above case where `pruned_slot` was first pruned + // and then marked dead duplicate confirmed. + info!("Received pruned duplicate confirmed status for {pruned_slot} that was previously marked + dead duplicate confirmed. Ignoring and processing it as dead duplicate confirmed."); + } else { + popular_pruned_slot_pool.insert(pruned_slot); + } + } } } } @@ -529,6 +588,17 @@ impl AncestorHashesService { let mut dead_slot_pool = HashSet::new(); let mut repairable_dead_slot_pool = HashSet::new(); + // We keep a separate pool for slots that are part of popular pruned forks, (reached 52+% + // in repair weight). Since these slots are pruned, we most likely do not have frozen + // hashes for the ancestors. Because of this we process responses differently, using only + // slot number to find the missing/invalid ancestor to dump & repair. + // + // However if slots are pruned and also dead/dead duplicate confirmed we give extra priority + // to the dead pathway, preferring to add the slot to `repairable_dead_slot_pool` instead. This is + // because if the slot is dead we must have been able to replay the ancestor. If the ancestors + // have frozen hashes we should compare hashes instead of raw ancestry as hashes give more information + // in finding missing/invalid ancestors. + let mut popular_pruned_slot_pool = HashSet::new(); // Sliding window that limits the number of slots repaired via AncestorRepair // to MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND/second @@ -551,6 +621,7 @@ impl AncestorHashesService { &mut repair_stats, &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); @@ -571,12 +642,17 @@ impl AncestorHashesService { repair_stats: &mut AncestorRepairRequestsStats, dead_slot_pool: &mut HashSet, repairable_dead_slot_pool: &mut HashSet, + popular_pruned_slot_pool: &mut HashSet, request_throttle: &mut Vec, ) { let root_bank = repair_info.bank_forks.read().unwrap().root_bank(); - for slot in retryable_slots_receiver.try_iter() { + for (slot, request_type) in retryable_slots_receiver.try_iter() { datapoint_info!("ancestor-repair-retry", ("slot", slot, i64)); - repairable_dead_slot_pool.insert(slot); + if request_type.is_pruned() { + popular_pruned_slot_pool.insert(slot); + } else { + repairable_dead_slot_pool.insert(slot); + } } Self::process_replay_updates( @@ -584,6 +660,7 @@ impl AncestorHashesService { ancestor_hashes_request_statuses, dead_slot_pool, repairable_dead_slot_pool, + popular_pruned_slot_pool, root_bank.slot(), ); @@ -596,13 +673,18 @@ impl AncestorHashesService { dead_slot_pool.retain(|slot| *slot > root_bank.slot()); repairable_dead_slot_pool.retain(|slot| *slot > root_bank.slot()); + popular_pruned_slot_pool.retain(|slot| *slot > root_bank.slot()); ancestor_hashes_request_statuses.retain(|slot, status| { if *slot <= root_bank.slot() { false } else if status.is_expired() { - // Add the slot back to the repairable pool to retry - repairable_dead_slot_pool.insert(*slot); + // Add the slot back to the correct pool to retry + if status.request_type().is_pruned() { + popular_pruned_slot_pool.insert(*slot); + } else { + repairable_dead_slot_pool.insert(*slot); + } false } else { true @@ -617,36 +699,55 @@ impl AncestorHashesService { let number_of_allowed_requests = MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND.saturating_sub(request_throttle.len()); - // Find dead slots for which it's worthwhile to ask the network for their - // ancestors - for _ in 0..number_of_allowed_requests { - let slot = repairable_dead_slot_pool.iter().next().cloned(); - if let Some(slot) = slot { - warn!( - "Cluster froze slot: {}, but we marked it as dead. - Initiating protocol to sample cluster for dead slot ancestors.", - slot - ); + // Find dead and pruned slots for which it's worthwhile to ask the network for their + // ancestors, prioritizing dead slots first. + let potential_slot_requests = repairable_dead_slot_pool + .iter() + .copied() + .zip(std::iter::repeat( + AncestorRequestType::DeadDuplicateConfirmed, + )) + .chain( + popular_pruned_slot_pool + .iter() + .copied() + .zip(std::iter::repeat(AncestorRequestType::PopularPruned)), + ) + .collect::>() + .into_iter(); - if Self::initiate_ancestor_hashes_requests_for_duplicate_slot( - ancestor_hashes_request_statuses, - ancestor_hashes_request_socket, - &repair_info.cluster_slots, - serve_repair, - &repair_info.repair_validators, - slot, - repair_stats, - outstanding_requests, - identity_keypair, - ) { - request_throttle.push(timestamp()); + for (slot, request_type) in potential_slot_requests.take(number_of_allowed_requests) { + warn!( + "Cluster froze slot: {}, but we marked it as {}. + Initiating protocol to sample cluster for dead slot ancestors.", + slot, + if request_type.is_pruned() { + "pruned" + } else { + "dead" + }, + ); + + if Self::initiate_ancestor_hashes_requests_for_duplicate_slot( + ancestor_hashes_request_statuses, + ancestor_hashes_request_socket, + &repair_info.cluster_slots, + serve_repair, + &repair_info.repair_validators, + slot, + repair_stats, + outstanding_requests, + identity_keypair, + request_type, + ) { + request_throttle.push(timestamp()); + if request_type.is_pruned() { + popular_pruned_slot_pool.take(&slot).unwrap(); + } else { repairable_dead_slot_pool.take(&slot).unwrap(); } - } else { - break; } } - repair_stats.report(); } @@ -713,6 +814,7 @@ impl AncestorHashesService { repair_stats: &mut AncestorRepairRequestsStats, outstanding_requests: &RwLock, identity_keypair: &Keypair, + request_type: AncestorRequestType, ) -> bool { let sampled_validators = serve_repair.repair_request_ancestor_hashes_sample_peers( duplicate_slot, @@ -745,6 +847,7 @@ impl AncestorHashesService { .into_iter() .map(|(_pk, socket_addr)| socket_addr), duplicate_slot, + request_type, ); assert!(!ancestor_hashes_request_statuses.contains_key(&duplicate_slot)); ancestor_hashes_request_statuses.insert(duplicate_slot, ancestor_request_status); @@ -761,6 +864,7 @@ mod test { super::*, crate::{ cluster_slot_state_verifier::{DuplicateSlotsToRepair, PurgeRepairSlotCounter}, + duplicate_repair_status::DuplicateAncestorDecision, replay_stage::{ tests::{replay_blockstore_components, ReplayBlockstoreComponents}, ReplayStage, @@ -790,6 +894,7 @@ mod test { let ancestor_hashes_request_statuses = DashMap::new(); let mut dead_slot_pool = HashSet::new(); let mut repairable_dead_slot_pool = HashSet::new(); + let mut popular_pruned_slot_pool = HashSet::new(); let slot = 10; let mut root_slot = 0; @@ -802,10 +907,12 @@ mod test { &ancestor_hashes_request_statuses, &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, root_slot, ); assert!(dead_slot_pool.contains(&slot)); assert!(repairable_dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.is_empty()); // 2) Getting a duplicate confirmed dead slot should move the slot // from the dead pool to the repairable pool @@ -817,10 +924,12 @@ mod test { &ancestor_hashes_request_statuses, &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, root_slot, ); assert!(dead_slot_pool.is_empty()); assert!(repairable_dead_slot_pool.contains(&slot)); + assert!(popular_pruned_slot_pool.is_empty()); // 3) Getting another dead signal should not add it back to the dead pool ancestor_hashes_replay_update_sender @@ -831,31 +940,68 @@ mod test { &ancestor_hashes_request_statuses, &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, root_slot, ); assert!(dead_slot_pool.is_empty()); assert!(repairable_dead_slot_pool.contains(&slot)); + assert!(popular_pruned_slot_pool.is_empty()); - // 4) If an outstanding request for a slot already exists, should + // 4) If an outstanding request (pruned or regular) for a slot already exists, should // ignore any signals from replay stage ancestor_hashes_request_statuses.insert(slot, AncestorRequestStatus::default()); dead_slot_pool.clear(); repairable_dead_slot_pool.clear(); + popular_pruned_slot_pool.clear(); ancestor_hashes_replay_update_sender .send(AncestorHashesReplayUpdate::Dead(slot)) .unwrap(); ancestor_hashes_replay_update_sender .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot)) .unwrap(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::PopularPrunedFork(slot)) + .unwrap(); AncestorHashesService::process_replay_updates( &ancestor_hashes_replay_update_receiver, &ancestor_hashes_request_statuses, &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, root_slot, ); assert!(dead_slot_pool.is_empty()); assert!(repairable_dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.is_empty()); + + ancestor_hashes_request_statuses.insert( + slot, + AncestorRequestStatus::new( + std::iter::empty(), + slot, + AncestorRequestType::PopularPruned, + ), + ); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::Dead(slot)) + .unwrap(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot)) + .unwrap(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::PopularPrunedFork(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.is_empty()); // 5) If we get any signals for slots <= root_slot, they should be ignored root_slot = 15; @@ -875,12 +1021,111 @@ mod test { &ancestor_hashes_request_statuses, &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, root_slot, ); assert!(dead_slot_pool.is_empty()); assert!(repairable_dead_slot_pool.is_empty()); } + #[test] + pub fn test_ancestor_hashes_service_process_pruned_replay_updates() { + let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = + unbounded(); + let ancestor_hashes_request_statuses = DashMap::new(); + let mut dead_slot_pool = HashSet::new(); + let mut repairable_dead_slot_pool = HashSet::new(); + let mut popular_pruned_slot_pool = HashSet::new(); + let slot = 10; + let root_slot = 0; + + // 1) Getting a popular pruned signal should add it to the popular pruned pool + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::PopularPrunedFork(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.contains(&slot)); + + // 2) Receiving a dead signal afterwards should upgrade it to dead duplicate confirmed + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::Dead(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.contains(&slot)); + assert!(popular_pruned_slot_pool.is_empty()); + + // 3) Instead if we receive a dead duplicate confirmed afterwards it should also be + // upgraded to dead duplicate confirmed + repairable_dead_slot_pool.clear(); + popular_pruned_slot_pool.insert(slot); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::DeadDuplicateConfirmed(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.contains(&slot)); + assert!(popular_pruned_slot_pool.is_empty()); + + // 4) Receiving a popular pruned after it has been dead duplicate confirmed should do nothing + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::PopularPrunedFork(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.contains(&slot)); + assert!(popular_pruned_slot_pool.is_empty()); + + // 5) Instead, receiving a popular pruned after it has only been marked dead should upgrade it to dead + // duplicate confirmed + repairable_dead_slot_pool.clear(); + dead_slot_pool.insert(slot); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::PopularPrunedFork(slot)) + .unwrap(); + AncestorHashesService::process_replay_updates( + &ancestor_hashes_replay_update_receiver, + &ancestor_hashes_request_statuses, + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, + root_slot, + ); + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.contains(&slot)); + assert!(popular_pruned_slot_pool.is_empty()); + } + #[test] fn test_ancestor_hashes_service_find_epoch_slots_frozen_dead_slots() { let vote_simulator = VoteSimulator::new(3); @@ -1036,6 +1281,7 @@ mod test { outstanding_requests: Arc>, dead_slot_pool: HashSet, repairable_dead_slot_pool: HashSet, + popular_pruned_slot_pool: HashSet, request_throttle: Vec, repair_stats: AncestorRepairRequestsStats, retryable_slots_sender: RetryableSlotsSender, @@ -1061,13 +1307,13 @@ mod test { bank_forks.clone(), repair_whitelist.clone(), ); - let (duplicate_slots_reset_sender, _duplicate_slots_reset_receiver) = unbounded(); + let (ancestor_duplicate_slots_sender, _ancestor_duplicate_slots_receiver) = unbounded(); let repair_info = RepairInfo { bank_forks, cluster_info: requester_cluster_info, cluster_slots: Arc::new(ClusterSlots::default()), epoch_schedule, - duplicate_slots_reset_sender, + ancestor_duplicate_slots_sender, repair_validators: None, repair_whitelist, }; @@ -1085,6 +1331,7 @@ mod test { )), dead_slot_pool: HashSet::new(), repairable_dead_slot_pool: HashSet::new(), + popular_pruned_slot_pool: HashSet::new(), request_throttle: vec![], repair_stats: AncestorRepairRequestsStats::default(), ancestor_hashes_replay_update_sender, @@ -1202,6 +1449,7 @@ mod test { &mut repair_stats, &outstanding_requests, &requester_cluster_info.keypair(), + AncestorRequestType::DeadDuplicateConfirmed, ); assert!(ancestor_hashes_request_statuses.is_empty()); @@ -1249,6 +1497,7 @@ mod test { &mut repair_stats, &outstanding_requests, &requester_cluster_info.keypair(), + AncestorRequestType::DeadDuplicateConfirmed, ); assert_eq!(ancestor_hashes_request_statuses.len(), 1); @@ -1262,7 +1511,11 @@ mod test { packet .meta_mut() .set_socket_addr(&responder_info.serve_repair().unwrap()); - let decision = AncestorHashesService::verify_and_process_ancestor_response( + let AncestorRequestDecision { + slot, + request_type, + decision, + } = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, &mut AncestorHashesResponsesStats::default(), @@ -1273,21 +1526,74 @@ mod test { ) .unwrap(); - assert_matches!( - decision, - ( - _dead_slot, - DuplicateAncestorDecision::EarliestAncestorNotFrozen(_) - ) - ); + assert_eq!(slot, dead_slot); assert_eq!( decision - .1 .repair_status() .unwrap() .correct_ancestors_to_repair, vec![(dead_slot, *correct_bank_hashes.get(&dead_slot).unwrap())] ); + assert_matches!( + (decision, request_type), + ( + DuplicateAncestorDecision::EarliestAncestorNotFrozen(_), + AncestorRequestType::DeadDuplicateConfirmed, + ) + ); + + // Should have removed the ancestor status on successful + // completion + assert!(ancestor_hashes_request_statuses.is_empty()); + + // Now make a pruned request for the same slot + AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &cluster_slots, + &requester_serve_repair, + &repair_validators, + dead_slot, + &mut repair_stats, + &outstanding_requests, + &requester_cluster_info.keypair(), + AncestorRequestType::PopularPruned, + ); + + assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(ancestor_hashes_request_statuses.contains_key(&dead_slot)); + + // Should have received valid response since pruned doesn't check hashes + let mut response_packet = response_receiver + .recv_timeout(Duration::from_millis(10_000)) + .unwrap(); + let packet = &mut response_packet[0]; + packet + .meta_mut() + .set_socket_addr(&responder_info.serve_repair().unwrap()); + let AncestorRequestDecision { + slot, + request_type, + decision, + } = AncestorHashesService::verify_and_process_ancestor_response( + packet, + &ancestor_hashes_request_statuses, + &mut AncestorHashesResponsesStats::default(), + &outstanding_requests, + &requester_blockstore, + &requester_cluster_info.keypair(), + &ancestor_hashes_request_socket, + ) + .unwrap(); + + assert_eq!(slot, dead_slot); + assert_matches!( + (decision, request_type), + ( + DuplicateAncestorDecision::AncestorsAllMatch, + AncestorRequestType::PopularPruned, + ) + ); // Should have removed the ancestor status on successful // completion @@ -1306,6 +1612,7 @@ mod test { outstanding_requests, mut dead_slot_pool, mut repairable_dead_slot_pool, + mut popular_pruned_slot_pool, mut request_throttle, ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver, @@ -1332,6 +1639,7 @@ mod test { &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); @@ -1340,9 +1648,10 @@ mod test { assert!(ancestor_hashes_request_statuses.is_empty()); // 2) Simulate signals from ReplayStage, should make a request - // for `dead_duplicate_confirmed_slot` + // for `dead_duplicate_confirmed_slot` and `popular_pruned_slot` let dead_slot = 10; let dead_duplicate_confirmed_slot = 14; + let popular_pruned_slot = 16; ancestor_hashes_replay_update_sender .send(AncestorHashesReplayUpdate::Dead(dead_slot)) .unwrap(); @@ -1356,6 +1665,11 @@ mod test { dead_duplicate_confirmed_slot, )) .unwrap(); + ancestor_hashes_replay_update_sender + .send(AncestorHashesReplayUpdate::PopularPrunedFork( + popular_pruned_slot, + )) + .unwrap(); AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, @@ -1367,14 +1681,17 @@ mod test { &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); assert_eq!(dead_slot_pool.len(), 1); assert!(dead_slot_pool.contains(&dead_slot)); assert!(repairable_dead_slot_pool.is_empty()); - assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(popular_pruned_slot_pool.is_empty()); + assert_eq!(ancestor_hashes_request_statuses.len(), 2); assert!(ancestor_hashes_request_statuses.contains_key(&dead_duplicate_confirmed_slot)); + assert!(ancestor_hashes_request_statuses.contains_key(&popular_pruned_slot)); // 3) Simulate an outstanding request timing out ancestor_hashes_request_statuses @@ -1382,10 +1699,15 @@ mod test { .unwrap() .value_mut() .make_expired(); + ancestor_hashes_request_statuses + .get_mut(&popular_pruned_slot) + .unwrap() + .value_mut() + .make_expired(); // If the request timed out, we should remove the slot from `ancestor_hashes_request_statuses`, - // and add it to `repairable_dead_slot_pool`. Because the request_throttle is at its limit, - // we should not immediately retry the timed request. + // and add it to `repairable_dead_slot_pool` or `popular_pruned_slot_pool`. + // Because the request_throttle is at its limit, we should not immediately retry the timed request. request_throttle.resize(MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND, std::u64::MAX); AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, @@ -1398,6 +1720,7 @@ mod test { &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); @@ -1405,6 +1728,8 @@ mod test { assert!(dead_slot_pool.contains(&dead_slot)); assert_eq!(repairable_dead_slot_pool.len(), 1); assert!(repairable_dead_slot_pool.contains(&dead_duplicate_confirmed_slot)); + assert_eq!(popular_pruned_slot_pool.len(), 1); + assert!(popular_pruned_slot_pool.contains(&popular_pruned_slot)); assert!(ancestor_hashes_request_statuses.is_empty()); // 4) If the throttle only has expired timestamps from more than a second ago, @@ -1426,13 +1751,16 @@ mod test { &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); assert_eq!(dead_slot_pool.len(), 1); assert!(dead_slot_pool.contains(&dead_slot)); assert!(repairable_dead_slot_pool.is_empty()); - assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(popular_pruned_slot_pool.is_empty()); + assert_eq!(ancestor_hashes_request_statuses.len(), 2); assert!(ancestor_hashes_request_statuses.contains_key(&dead_duplicate_confirmed_slot)); + assert!(ancestor_hashes_request_statuses.contains_key(&popular_pruned_slot)); // Request throttle includes one item for the request we just made assert_eq!( request_throttle.len(), @@ -1460,6 +1788,7 @@ mod test { &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); @@ -1467,7 +1796,8 @@ mod test { assert!(dead_slot_pool.contains(&dead_slot)); assert_eq!(repairable_dead_slot_pool.len(), 1); assert!(repairable_dead_slot_pool.contains(&dead_duplicate_confirmed_slot_2)); - assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(popular_pruned_slot_pool.is_empty()); + assert_eq!(ancestor_hashes_request_statuses.len(), 2); assert!(ancestor_hashes_request_statuses.contains_key(&dead_duplicate_confirmed_slot)); // 6) If root moves past slot, should remove it from all state @@ -1481,8 +1811,10 @@ mod test { w_bank_forks.insert(new_root_bank); w_bank_forks.set_root(new_root_slot, &AbsRequestSender::default(), None); } + popular_pruned_slot_pool.insert(dead_duplicate_confirmed_slot); assert!(!dead_slot_pool.is_empty()); assert!(!repairable_dead_slot_pool.is_empty()); + assert!(!popular_pruned_slot_pool.is_empty()); assert!(!ancestor_hashes_request_statuses.is_empty()); request_throttle.clear(); AncestorHashesService::manage_ancestor_requests( @@ -1496,10 +1828,12 @@ mod test { &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); assert!(dead_slot_pool.is_empty()); assert!(repairable_dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.is_empty()); assert!(ancestor_hashes_request_statuses.is_empty()); } @@ -1569,6 +1903,7 @@ mod test { outstanding_requests, mut dead_slot_pool, mut repairable_dead_slot_pool, + mut popular_pruned_slot_pool, mut request_throttle, ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver, @@ -1653,6 +1988,7 @@ mod test { &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); @@ -1667,7 +2003,11 @@ mod test { packet .meta_mut() .set_socket_addr(&responder_info.serve_repair().unwrap()); - let decision = AncestorHashesService::verify_and_process_ancestor_response( + let AncestorRequestDecision { + slot, + request_type, + decision, + } = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, &mut AncestorHashesResponsesStats::default(), @@ -1678,21 +2018,21 @@ mod test { ) .unwrap(); - assert_matches!( - decision, - ( - _dead_slot, - DuplicateAncestorDecision::EarliestAncestorNotFrozen(_) - ) - ); + assert_eq!(slot, dead_slot); assert_eq!( decision - .1 .repair_status() .unwrap() .correct_ancestors_to_repair, vec![(dead_slot, *correct_bank_hashes.get(&dead_slot).unwrap())] ); + assert_matches!( + (decision, request_type), + ( + DuplicateAncestorDecision::EarliestAncestorNotFrozen(_), + AncestorRequestType::DeadDuplicateConfirmed + ) + ); // Should have removed the ancestor status on successful // completion @@ -1711,6 +2051,7 @@ mod test { outstanding_requests, mut dead_slot_pool, mut repairable_dead_slot_pool, + mut popular_pruned_slot_pool, mut request_throttle, ancestor_hashes_replay_update_receiver, retryable_slots_receiver, @@ -1724,16 +2065,21 @@ mod test { // Simulate network response processing thread reaching a retryable // decision let request_slot = 10; + let ancestor_request_decision = AncestorRequestDecision { + slot: request_slot, + request_type: AncestorRequestType::DeadDuplicateConfirmed, + decision: decision.clone(), + }; AncestorHashesService::handle_ancestor_request_decision( - request_slot, - decision, - &repair_info.duplicate_slots_reset_sender, + ancestor_request_decision, + &repair_info.ancestor_duplicate_slots_sender, &retryable_slots_sender, ); // Simulate ancestor request thread getting the retry signal assert!(dead_slot_pool.is_empty()); assert!(repairable_dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.is_empty()); AncestorHashesService::manage_ancestor_requests( &ancestor_hashes_request_statuses, &ancestor_hashes_request_socket, @@ -1745,10 +2091,50 @@ mod test { &mut AncestorRepairRequestsStats::default(), &mut dead_slot_pool, &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, &mut request_throttle, ); assert!(dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.is_empty()); assert!(repairable_dead_slot_pool.contains(&request_slot)); + + // Simulate network response processing thread reaching a retryable decision for a pruned + // slot + let request_slot = 10; + let ancestor_request_decision = AncestorRequestDecision { + slot: request_slot, + request_type: AncestorRequestType::PopularPruned, + decision, + }; + repairable_dead_slot_pool.clear(); + AncestorHashesService::handle_ancestor_request_decision( + ancestor_request_decision, + &repair_info.ancestor_duplicate_slots_sender, + &retryable_slots_sender, + ); + + // Simulate ancestor request thread getting the retry signal + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.is_empty()); + AncestorHashesService::manage_ancestor_requests( + &ancestor_hashes_request_statuses, + &ancestor_hashes_request_socket, + &repair_info, + &outstanding_requests, + &ancestor_hashes_replay_update_receiver, + &retryable_slots_receiver, + &requester_serve_repair, + &mut AncestorRepairRequestsStats::default(), + &mut dead_slot_pool, + &mut repairable_dead_slot_pool, + &mut popular_pruned_slot_pool, + &mut request_throttle, + ); + + assert!(dead_slot_pool.is_empty()); + assert!(repairable_dead_slot_pool.is_empty()); + assert!(popular_pruned_slot_pool.contains(&request_slot)); } } diff --git a/core/src/cluster_slot_state_verifier.rs b/core/src/cluster_slot_state_verifier.rs index e2348faa1c..f00b74fa56 100644 --- a/core/src/cluster_slot_state_verifier.rs +++ b/core/src/cluster_slot_state_verifier.rs @@ -219,6 +219,7 @@ pub struct EpochSlotsFrozenState { epoch_slots_frozen_hash: Hash, duplicate_confirmed_hash: Option, bank_status: BankStatus, + is_popular_pruned: bool, } impl EpochSlotsFrozenState { pub fn new_from_state( @@ -228,6 +229,7 @@ impl EpochSlotsFrozenState { fork_choice: &mut HeaviestSubtreeForkChoice, is_dead: impl Fn() -> bool, get_hash: impl Fn() -> Option, + is_popular_pruned: bool, ) -> Self { let bank_status = BankStatus::new(is_dead, get_hash); let duplicate_confirmed_hash = get_duplicate_confirmed_hash_from_state( @@ -240,6 +242,7 @@ impl EpochSlotsFrozenState { epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, + is_popular_pruned, ) } @@ -247,13 +250,19 @@ impl EpochSlotsFrozenState { epoch_slots_frozen_hash: Hash, duplicate_confirmed_hash: Option, bank_status: BankStatus, + is_popular_pruned: bool, ) -> Self { Self { epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, + is_popular_pruned, } } + + fn is_popular_pruned(&self) -> bool { + self.is_popular_pruned + } } #[derive(PartialEq, Eq, Debug)] @@ -263,14 +272,18 @@ pub enum SlotStateUpdate { Dead(DeadState), Duplicate(DuplicateState), EpochSlotsFrozen(EpochSlotsFrozenState), + // The fork is pruned but has reached `DUPLICATE_THRESHOLD` from votes aggregated across + // descendants and all versions of the slots on this fork. + PopularPrunedFork, } impl SlotStateUpdate { fn into_state_changes(self, slot: Slot) -> Vec { let bank_frozen_hash = self.bank_hash(); - if bank_frozen_hash.is_none() { + if bank_frozen_hash.is_none() && !self.is_popular_pruned() { // If the bank hasn't been frozen yet, then there's nothing to do // since replay of the slot hasn't finished yet. + // However if the bank is pruned, then replay will never finish so we still process now return vec![]; } @@ -286,6 +299,7 @@ impl SlotStateUpdate { SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state) => { on_epoch_slots_frozen(slot, epoch_slots_frozen_state) } + SlotStateUpdate::PopularPrunedFork => on_popular_pruned_fork(slot), } } @@ -300,6 +314,17 @@ impl SlotStateUpdate { SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state) => { epoch_slots_frozen_state.bank_status.bank_hash() } + SlotStateUpdate::PopularPrunedFork => None, + } + } + + fn is_popular_pruned(&self) -> bool { + match self { + SlotStateUpdate::PopularPrunedFork => true, + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state) => { + epoch_slots_frozen_state.is_popular_pruned() + } + _ => false, } } } @@ -344,6 +369,7 @@ fn check_duplicate_confirmed_hash_against_frozen_hash( slot, duplicate_confirmed_hash, bank_frozen_hash ); } + state_changes.push(ResultingStateChange::MarkSlotDuplicate(bank_frozen_hash)); state_changes.push(ResultingStateChange::RepairDuplicateConfirmedVersion( duplicate_confirmed_hash, @@ -364,6 +390,7 @@ fn check_epoch_slots_hash_against_frozen_hash( epoch_slots_frozen_hash: Hash, bank_frozen_hash: Hash, is_dead: bool, + is_popular_pruned: bool, ) { if epoch_slots_frozen_hash != bank_frozen_hash { if is_dead { @@ -373,6 +400,12 @@ fn check_epoch_slots_hash_against_frozen_hash( "EpochSlots sample returned slot {} with hash {}, but we marked slot dead", slot, epoch_slots_frozen_hash ); + } else if is_popular_pruned { + // The cluster sample found the troublesome slot which caused this fork to be pruned + warn!( + "EpochSlots sample returned slot {slot} with hash {epoch_slots_frozen_hash}, but we + have pruned it due to incorrect ancestry" + ); } else { // The duplicate confirmed slot hash does not match our frozen hash. // Modify fork choice rule to exclude our version from being voted @@ -383,7 +416,11 @@ fn check_epoch_slots_hash_against_frozen_hash( slot, epoch_slots_frozen_hash, bank_frozen_hash ); } - state_changes.push(ResultingStateChange::MarkSlotDuplicate(bank_frozen_hash)); + if !is_popular_pruned { + // If the slot is already pruned, it will already be pruned from fork choice so no + // reason to mark as duplicate + state_changes.push(ResultingStateChange::MarkSlotDuplicate(bank_frozen_hash)); + } state_changes.push(ResultingStateChange::RepairDuplicateConfirmedVersion( epoch_slots_frozen_hash, )); @@ -420,12 +457,14 @@ fn on_dead_slot(slot: Slot, dead_state: DeadState) -> Vec // match arm above. let bank_hash = Hash::default(); let is_dead = true; + let is_popular_pruned = false; check_epoch_slots_hash_against_frozen_hash( &mut state_changes, slot, epoch_slots_frozen_hash, bank_hash, is_dead, + is_popular_pruned, ); } } @@ -466,12 +505,14 @@ fn on_frozen_slot(slot: Slot, bank_frozen_state: BankFrozenState) -> Vec (), - // No action to be taken yet - BankStatus::Unprocessed => { + slot, epoch_slots_frozen_hash, duplicate_confirmed_hash + ); + } return vec![]; } } - let frozen_hash = bank_status.bank_hash().expect("bank hash must exist"); + match bank_status { + BankStatus::Dead | BankStatus::Frozen(_) => (), + // No action to be taken yet unless `slot` is pruned in which case it will never be played + BankStatus::Unprocessed => { + if !is_popular_pruned { + return vec![]; + } + } + } + + let frozen_hash = bank_status.bank_hash().unwrap_or_default(); let is_dead = bank_status.is_dead(); let mut state_changes = vec![]; check_epoch_slots_hash_against_frozen_hash( @@ -589,11 +651,21 @@ fn on_epoch_slots_frozen( epoch_slots_frozen_hash, frozen_hash, is_dead, + is_popular_pruned, ); state_changes } +fn on_popular_pruned_fork(slot: Slot) -> Vec { + warn!("{slot} is part of a pruned fork which has reached the DUPLICATE_THRESHOLD aggregating across descendants + and slot versions. It is suspected to be duplicate or have an ancestor that is duplicate. + Notifying ancestor_hashes_service"); + vec![ResultingStateChange::SendAncestorHashesReplayUpdate( + AncestorHashesReplayUpdate::PopularPrunedFork(slot), + )] +} + fn get_cluster_confirmed_hash_from_state( slot: Slot, gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, @@ -1252,7 +1324,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = None; let bank_status = BankStatus::Unprocessed; - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() @@ -1262,7 +1334,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = Some(Hash::new_unique()); let bank_status = BankStatus::Unprocessed; - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() @@ -1272,7 +1344,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = Some(epoch_slots_frozen_hash); let bank_status = BankStatus::Unprocessed; - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() @@ -1282,7 +1354,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = None; let bank_status = BankStatus::Dead; - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), vec![ @@ -1294,7 +1366,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = Some(Hash::new_unique()); let bank_status = BankStatus::Dead; - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() @@ -1304,7 +1376,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = Some(epoch_slots_frozen_hash); let bank_status = BankStatus::Dead; - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() @@ -1315,7 +1387,7 @@ mod test { let duplicate_confirmed_hash = None; let frozen_hash = Hash::new_unique(); let bank_status = BankStatus::Frozen(frozen_hash); - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), vec![ @@ -1327,7 +1399,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = None; let bank_status = BankStatus::Frozen(epoch_slots_frozen_hash); - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() @@ -1337,7 +1409,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = Some(Hash::new_unique()); let bank_status = BankStatus::Frozen(Hash::new_unique()); - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() @@ -1347,7 +1419,7 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = Some(Hash::new_unique()); let bank_status = BankStatus::Frozen(epoch_slots_frozen_hash); - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() @@ -1357,12 +1429,121 @@ mod test { let epoch_slots_frozen_hash = Hash::new_unique(); let duplicate_confirmed_hash = Some(Hash::new_unique()); let bank_status = BankStatus::Frozen(duplicate_confirmed_hash.unwrap()); - let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, false); ( SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), Vec::::new() ) }, + epoch_slots_frozen_state_update_11: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = None; + let bank_status = BankStatus::Unprocessed; + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)], + ) + }, + epoch_slots_frozen_state_update_12: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = Some(Hash::new_unique()); + let bank_status = BankStatus::Unprocessed; + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)], + ) + }, + epoch_slots_frozen_state_update_13: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = Some(epoch_slots_frozen_hash); + let bank_status = BankStatus::Unprocessed; + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)], + ) + }, + epoch_slots_frozen_state_update_14: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = None; + let bank_status = BankStatus::Dead; + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)], + ) + }, + epoch_slots_frozen_state_update_15: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = Some(Hash::new_unique()); + let bank_status = BankStatus::Dead; + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)], + ) + }, + epoch_slots_frozen_state_update_16: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = Some(epoch_slots_frozen_hash); + let bank_status = BankStatus::Dead; + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)], + ) + }, + epoch_slots_frozen_state_update_17: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = None; + let frozen_hash = Hash::new_unique(); + let bank_status = BankStatus::Frozen(frozen_hash); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)], + ) + }, + epoch_slots_frozen_state_update_18: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = None; + let bank_status = BankStatus::Frozen(epoch_slots_frozen_hash); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + Vec::::new() + ) + }, + epoch_slots_frozen_state_update_19: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = Some(Hash::new_unique()); + let bank_status = BankStatus::Frozen(Hash::new_unique()); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + vec![ResultingStateChange::RepairDuplicateConfirmedVersion(epoch_slots_frozen_hash)], + ) + }, + epoch_slots_frozen_state_update_20: { + let epoch_slots_frozen_hash = Hash::new_unique(); + let duplicate_confirmed_hash = Some(Hash::new_unique()); + let bank_status = BankStatus::Frozen(epoch_slots_frozen_hash); + let epoch_slots_frozen_state = EpochSlotsFrozenState::new(epoch_slots_frozen_hash, duplicate_confirmed_hash, bank_status, true); + ( + SlotStateUpdate::EpochSlotsFrozen(epoch_slots_frozen_state), + Vec::::new() + ) + }, + popular_pruned_fork: { + ( + SlotStateUpdate::PopularPrunedFork, + vec![ResultingStateChange::SendAncestorHashesReplayUpdate( + AncestorHashesReplayUpdate::PopularPrunedFork(10), + )] + ) + }, } struct InitialState { @@ -2072,6 +2253,7 @@ mod test { &mut heaviest_subtree_fork_choice, || progress.is_dead(3).unwrap_or(false), || Some(slot3_hash), + false, ); let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = unbounded(); @@ -2165,6 +2347,7 @@ mod test { &mut heaviest_subtree_fork_choice, || progress.is_dead(3).unwrap_or(false), || Some(slot3_hash), + false, ); let (ancestor_hashes_replay_update_sender, _ancestor_hashes_replay_update_receiver) = unbounded(); diff --git a/core/src/duplicate_repair_status.rs b/core/src/duplicate_repair_status.rs index 9690b6178c..e9ffc2bbae 100644 --- a/core/src/duplicate_repair_status.rs +++ b/core/src/duplicate_repair_status.rs @@ -1,11 +1,24 @@ use { solana_ledger::blockstore::Blockstore, solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, timing::timestamp}, - std::{collections::HashMap, net::SocketAddr}, + std::{ + collections::HashMap, + net::SocketAddr, + sync::atomic::{AtomicUsize, Ordering}, + }, }; // Number of validators to sample for the ancestor repair -pub const ANCESTOR_HASH_REPAIR_SAMPLE_SIZE: usize = 21; +// We use static to enable tests from having to spin up 21 validators +static ANCESTOR_HASH_REPAIR_SAMPLE_SIZE: AtomicUsize = AtomicUsize::new(21); + +pub fn get_ancestor_hash_repair_sample_size() -> usize { + ANCESTOR_HASH_REPAIR_SAMPLE_SIZE.load(Ordering::Relaxed) +} + +pub fn set_ancestor_hash_repair_sample_size_for_tests_only(sample_size: usize) { + ANCESTOR_HASH_REPAIR_SAMPLE_SIZE.store(sample_size, Ordering::Relaxed); +} // Even assuming 20% of validators malicious, the chance that >= 11 of the // ANCESTOR_HASH_REPAIR_SAMPLE_SIZE = 21 validators is malicious is roughly 1/1000. @@ -14,10 +27,12 @@ pub const ANCESTOR_HASH_REPAIR_SAMPLE_SIZE: usize = 21; // On the other hand with a 52-48 split of validators with one version of the block vs // another, the chance of >= 11 of the 21 sampled being from the 52% portion is // about 57%, so we should be able to find a correct sample in a reasonable amount of time. -const MINIMUM_ANCESTOR_AGREEMENT_SIZE: usize = (ANCESTOR_HASH_REPAIR_SAMPLE_SIZE + 1) / 2; +pub fn get_minimum_ancestor_agreement_size() -> usize { + (get_ancestor_hash_repair_sample_size() + 1) / 2 +} const RETRY_INTERVAL_SECONDS: usize = 5; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum DuplicateAncestorDecision { InvalidSample, AncestorsAllMatch, @@ -25,6 +40,7 @@ pub enum DuplicateAncestorDecision { ContinueSearch(DuplicateSlotRepairStatus), EarliestAncestorNotFrozen(DuplicateSlotRepairStatus), EarliestMismatchFound(DuplicateSlotRepairStatus), + EarliestPrunedMismatchFound(DuplicateSlotRepairStatus), } impl DuplicateAncestorDecision { @@ -34,13 +50,14 @@ impl DuplicateAncestorDecision { DuplicateAncestorDecision::InvalidSample // It may be possible the validators have not yet detected duplicate confirmation // so retry - | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => true, + | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => true, DuplicateAncestorDecision::AncestorsAllMatch => false, DuplicateAncestorDecision::ContinueSearch(_status) | DuplicateAncestorDecision::EarliestAncestorNotFrozen(_status) - | DuplicateAncestorDecision::EarliestMismatchFound(_status) => false, + | DuplicateAncestorDecision::EarliestMismatchFound(_status) + | DuplicateAncestorDecision::EarliestPrunedMismatchFound(_status) => false, } } @@ -49,20 +66,24 @@ impl DuplicateAncestorDecision { DuplicateAncestorDecision::InvalidSample | DuplicateAncestorDecision::AncestorsAllMatch | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None, - DuplicateAncestorDecision::ContinueSearch(status) => Some(status), - DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) => Some(status), - DuplicateAncestorDecision::EarliestMismatchFound(status) => Some(status), + + DuplicateAncestorDecision::ContinueSearch(status) + | DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) + | DuplicateAncestorDecision::EarliestMismatchFound(status) + | DuplicateAncestorDecision::EarliestPrunedMismatchFound(status) => Some(status), } } - fn repair_status_mut(&mut self) -> Option<&mut DuplicateSlotRepairStatus> { + pub fn repair_status_mut(&mut self) -> Option<&mut DuplicateSlotRepairStatus> { match self { DuplicateAncestorDecision::InvalidSample | DuplicateAncestorDecision::AncestorsAllMatch | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None, - DuplicateAncestorDecision::ContinueSearch(status) => Some(status), - DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) => Some(status), - DuplicateAncestorDecision::EarliestMismatchFound(status) => Some(status), + + DuplicateAncestorDecision::ContinueSearch(status) + | DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) + | DuplicateAncestorDecision::EarliestMismatchFound(status) + | DuplicateAncestorDecision::EarliestPrunedMismatchFound(status) => Some(status), } } } @@ -94,11 +115,69 @@ impl DuplicateSlotRepairStatus { } } +#[derive(Default, Clone, Copy, PartialEq, Eq, Debug)] +pub enum AncestorRequestType { + #[default] + DeadDuplicateConfirmed, + PopularPruned, +} + +impl AncestorRequestType { + pub fn is_pruned(&self) -> bool { + matches!(self, Self::PopularPruned) + } +} + +pub struct AncestorDuplicateSlotsToRepair { + // Slots that `ancestor_hashes_service` found that need to be repaired + pub slots_to_repair: Vec<(Slot, Hash)>, + // Condition that initiated this request + pub request_type: AncestorRequestType, +} + +impl AncestorDuplicateSlotsToRepair { + pub fn is_empty(&self) -> bool { + self.slots_to_repair.is_empty() + } +} + +#[derive(Debug, PartialEq, Eq)] +pub struct AncestorRequestDecision { + // The slot that initiated this request + pub slot: Slot, + // Condition which initiated this request + pub request_type: AncestorRequestType, + // Decision + pub decision: DuplicateAncestorDecision, +} + +impl AncestorRequestDecision { + pub fn slots_to_repair(self) -> Option { + let Self { + request_type, + mut decision, + .. + } = self; + decision + .repair_status_mut() + .map(|status| AncestorDuplicateSlotsToRepair { + slots_to_repair: std::mem::take(&mut status.correct_ancestors_to_repair), + request_type, + }) + } + + pub fn is_retryable(&self) -> bool { + self.decision.is_retryable() + } +} + #[derive(Default, Clone)] pub struct AncestorRequestStatus { // The mismatched slot that was the subject of the AncestorHashes(requested_mismatched_slot) // repair request. All responses to this request should be for ancestors of this slot. requested_mismatched_slot: Slot, + // Condition which initiated this request + request_type: AncestorRequestType, // Timestamp at which we sent out the requests start_ts: u64, // The addresses of the validators we asked for a response, a response is only acceptable @@ -119,9 +198,11 @@ impl AncestorRequestStatus { pub fn new( sampled_validators: impl Iterator, requested_mismatched_slot: Slot, + request_type: AncestorRequestType, ) -> Self { AncestorRequestStatus { requested_mismatched_slot, + request_type, start_ts: timestamp(), sampled_validators: sampled_validators.map(|p| (p, false)).collect(), ..AncestorRequestStatus::default() @@ -163,7 +244,7 @@ impl AncestorRequestStatus { // If we got enough of the sampled validators to respond, we are confident // this is the correct set of ancestors if validators_with_same_response.len() - == MINIMUM_ANCESTOR_AGREEMENT_SIZE.min(self.sampled_validators.len()) + == get_minimum_ancestor_agreement_size().min(self.sampled_validators.len()) { // When we reach MINIMUM_ANCESTOR_AGREEMENT_SIZE of the same responses, // check for mismatches. @@ -175,7 +256,8 @@ impl AncestorRequestStatus { // If everyone responded and we still haven't agreed upon a set of // ancestors, that means there was a lot of disagreement and we sampled // a bad set of validators. - if self.num_responses == ANCESTOR_HASH_REPAIR_SAMPLE_SIZE.min(self.sampled_validators.len()) + if self.num_responses + == get_ancestor_hash_repair_sample_size().min(self.sampled_validators.len()) { info!( "{} return invalid sample no agreement", @@ -187,6 +269,10 @@ impl AncestorRequestStatus { None } + pub fn request_type(&self) -> AncestorRequestType { + self.request_type + } + fn handle_sampled_validators_reached_agreement( &mut self, blockstore: &Blockstore, @@ -227,7 +313,6 @@ impl AncestorRequestStatus { // Responses were not properly ordered return DuplicateAncestorDecision::InvalidSample; } - last_ancestor = *ancestor_slot; if *ancestor_slot > self.requested_mismatched_slot { // We should only get ancestors of `self.requested_mismatched_slot` // in valid responses @@ -254,6 +339,31 @@ impl AncestorRequestStatus { ), )); } + } else if earliest_erroring_ancestor.is_none() && self.request_type.is_pruned() { + // If the slot we are requesting for is pruned, then the slot and many of its + // ancestors may not have a frozen hash (unlike dead slots where all the ancestors + // will have a frozen hash). Thus the best we can do is to compare the slot numbers + // to find the first ancestor that has the wrong parent, or the first missing + // ancestor. + // + // We return the earliest such mismatch. + if let Ok(Some(meta)) = blockstore.meta(*ancestor_slot) { + if i != 0 && meta.parent_slot != Some(last_ancestor) { + earliest_erroring_ancestor = Some(( + agreed_response.len() - i - 1, + DuplicateAncestorDecision::EarliestPrunedMismatchFound( + DuplicateSlotRepairStatus::default(), + ), + )); + } + } else { + earliest_erroring_ancestor = Some(( + agreed_response.len() - i - 1, + DuplicateAncestorDecision::EarliestPrunedMismatchFound( + DuplicateSlotRepairStatus::default(), + ), + )); + } } else if earliest_erroring_ancestor.is_none() { // If in our current ledger, `ancestor_slot` is actually on the same fork as // `self.requested_mismatched_slot`, then the `frozen_hash` should not be None here. @@ -290,9 +400,9 @@ impl AncestorRequestStatus { // ancestors? // // There are two cases: - // 1) The first such mismatch `first_mismatch` appears BEFORE the slot `4` that is + // 1) The first such mismatch `first_mismatch` appears somewhere BEFORE the slot `4` that is // missing from our blockstore. - // 2) The first such mismatch `first_mismatch` appears AFTER the slot `4` that is + // 2) The first such mismatch `first_mismatch` appears immediately AFTER the slot `4` that is // missing from our blockstore. // // Because we know any mismatches will also trigger the mismatch casing earlier in @@ -312,6 +422,7 @@ impl AncestorRequestStatus { ), )); } + last_ancestor = *ancestor_slot; } if let Some((earliest_erroring_ancestor_index, mut decision)) = earliest_erroring_ancestor { @@ -358,6 +469,7 @@ pub mod tests { solana_ledger::get_tmp_ledger_path_auto_delete, std::{collections::BTreeMap, net::IpAddr}, tempfile::TempDir, + trees::tr, }; struct TestSetup { @@ -374,13 +486,21 @@ pub mod tests { SocketAddr::new(ip, 8080) } - fn setup_add_response_test(request_slot: Slot, num_ancestors_in_response: usize) -> TestSetup { + fn setup_add_response_test_with_type( + request_slot: Slot, + num_ancestors_in_response: usize, + request_type: AncestorRequestType, + ) -> TestSetup { assert!(request_slot >= num_ancestors_in_response as u64); let sampled_addresses: Vec = std::iter::repeat_with(create_rand_socket_addr) - .take(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE) + .take(get_ancestor_hash_repair_sample_size()) .collect(); - let status = AncestorRequestStatus::new(sampled_addresses.iter().cloned(), request_slot); + let status = AncestorRequestStatus::new( + sampled_addresses.iter().cloned(), + request_slot, + request_type, + ); let blockstore_temp_dir = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(blockstore_temp_dir.path()).unwrap(); @@ -399,6 +519,25 @@ pub mod tests { } } + fn setup_add_response_test(request_slot: Slot, num_ancestors_in_response: usize) -> TestSetup { + setup_add_response_test_with_type( + request_slot, + num_ancestors_in_response, + AncestorRequestType::DeadDuplicateConfirmed, + ) + } + + fn setup_add_response_test_pruned( + request_slot: Slot, + num_ancestors_in_response: usize, + ) -> TestSetup { + setup_add_response_test_with_type( + request_slot, + num_ancestors_in_response, + AncestorRequestType::PopularPruned, + ) + } + #[test] fn test_add_response_invalid_peer() { let request_slot = 100; @@ -433,7 +572,7 @@ pub mod tests { incorrect_ancestors_response.pop().unwrap(); // Add a mixture of correct and incorrect responses from the same `responder_addr`. - let num_repeated_responses = ANCESTOR_HASH_REPAIR_SAMPLE_SIZE; + let num_repeated_responses = get_ancestor_hash_repair_sample_size(); let responder_addr = &sampled_addresses[0]; for i in 0..num_repeated_responses { let response = if i % 2 == 0 { @@ -489,7 +628,7 @@ pub mod tests { .collect(); let total_incorrect_responses = events.iter().last().map(|(count, _)| *count).unwrap_or(0); - assert!(total_incorrect_responses <= ANCESTOR_HASH_REPAIR_SAMPLE_SIZE); + assert!(total_incorrect_responses <= get_ancestor_hash_repair_sample_size()); let mut event_order: Vec = (0..sampled_addresses.len()).collect(); event_order.shuffle(&mut thread_rng()); @@ -528,7 +667,7 @@ pub mod tests { let desired_incorrect_responses = vec![ ( incorrect_ancestors_response_0, - MINIMUM_ANCESTOR_AGREEMENT_SIZE - 1, + get_minimum_ancestor_agreement_size() - 1, ), (incorrect_ancestors_response_1, 2), ]; @@ -539,8 +678,8 @@ pub mod tests { .map(|(_, count)| count) .sum(); assert!( - ANCESTOR_HASH_REPAIR_SAMPLE_SIZE - total_invalid_responses - < MINIMUM_ANCESTOR_AGREEMENT_SIZE + get_ancestor_hash_repair_sample_size() - total_invalid_responses + < get_minimum_ancestor_agreement_size() ); assert_eq!( @@ -561,7 +700,7 @@ pub mod tests { let incorrect_ancestors_response = vec![]; let desired_incorrect_responses = vec![( incorrect_ancestors_response, - MINIMUM_ANCESTOR_AGREEMENT_SIZE, + get_minimum_ancestor_agreement_size(), )]; assert_eq!( @@ -582,7 +721,7 @@ pub mod tests { let incorrect_ancestors_response = vec![(request_slot - 1, Hash::new_unique())]; let desired_incorrect_responses = vec![( incorrect_ancestors_response, - MINIMUM_ANCESTOR_AGREEMENT_SIZE, + get_minimum_ancestor_agreement_size(), )]; assert_eq!( @@ -605,7 +744,7 @@ pub mod tests { incorrect_ancestors_response.push((request_slot + 1, Hash::new_unique())); let desired_incorrect_responses = vec![( incorrect_ancestors_response, - MINIMUM_ANCESTOR_AGREEMENT_SIZE, + get_minimum_ancestor_agreement_size(), )]; assert_eq!( @@ -627,7 +766,7 @@ pub mod tests { incorrect_ancestors_response.swap_remove(0); let desired_incorrect_responses = vec![( incorrect_ancestors_response, - MINIMUM_ANCESTOR_AGREEMENT_SIZE, + get_minimum_ancestor_agreement_size(), )]; assert_eq!( @@ -657,7 +796,7 @@ pub mod tests { incorrect_ancestors_response[5].1 = Hash::new_unique(); let desired_incorrect_responses = vec![( incorrect_ancestors_response, - MINIMUM_ANCESTOR_AGREEMENT_SIZE, + get_minimum_ancestor_agreement_size(), )]; assert_eq!( @@ -680,7 +819,7 @@ pub mod tests { incorrect_ancestors_response.push((request_slot, Hash::new_unique())); let desired_incorrect_responses = vec![( incorrect_ancestors_response, - MINIMUM_ANCESTOR_AGREEMENT_SIZE - 1, + get_minimum_ancestor_agreement_size() - 1, )]; // We have no entries in the blockstore, so all the ancestors will be missing @@ -723,12 +862,12 @@ pub mod tests { // Here we either skip slot 93 or 94. // // 1) If we skip slot 93, and insert mismatched slot 94 we're testing the order of - // events `Not frozen -> Mismatched hash` + // events `Not frozen -> Mismatched hash` which should return + // `EarliestAncestorNotFrozen` // // 2) If we insert mismatched slot 93, and skip slot 94 we're testing the order of - // events `Mismatched hash -> Not frozen` - // - // Both cases should return `EarliestMismatchFound` + // events `Mismatched hash -> Not frozen`, which should return + // `EarliestMismatchFound` test_setup .blockstore .insert_bank_hash(slot, Hash::new_unique(), false); @@ -794,7 +933,7 @@ pub mod tests { // Set up a situation where some of our ancestors are correct, // but then we fork off with different versions of the correct slots. // ``` - // 93' - 94' - 95' - 96' - 97' - 98' - 99' - 100' (our current fork, missing some slots like 98) + // 93' - 94' - 95' - 96' - 97' - 98' - 99' - 100' (our current fork) // / // 90 - 91 - 92 (all correct) // \ @@ -850,4 +989,133 @@ pub mod tests { DuplicateAncestorDecision::AncestorsAllMatch ); } + + #[test] + fn test_add_multiple_responses_pruned_all_mismatch() { + let request_slot = 100; + let mut test_setup = setup_add_response_test_pruned(request_slot, 10); + + // We have no entries in the blockstore, so all the ancestors will be missing + match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { + DuplicateAncestorDecision::ContinueSearch(repair_status) => { + assert_eq!( + repair_status.correct_ancestors_to_repair, + test_setup.correct_ancestors_response + ); + } + x => panic!("Incorrect decision {x:?}"), + }; + } + + #[test] + fn test_add_multiple_responses_pruned_all_match() { + let request_slot = 100; + let mut test_setup = setup_add_response_test_pruned(request_slot, 10); + + // Insert all the correct ancestory + let tree = test_setup + .correct_ancestors_response + .iter() + .fold(tr(request_slot + 1), |tree, (slot, _)| (tr(*slot) / tree)); + test_setup + .blockstore + .add_tree(tree, true, true, 2, Hash::default()); + + // All the ancestors matched + assert_eq!( + run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup), + DuplicateAncestorDecision::AncestorsAllMatch + ); + } + + #[test] + fn test_add_multiple_responses_pruned_some_ancestors_missing() { + let request_slot = 100; + let mut test_setup = setup_add_response_test_pruned(request_slot, 10); + + // Set up a situation where some of our ancestors are correct, + // but then we fork off and are missing some ancestors like so: + // ``` + // 93 - 95 - 97 - 99 - 100 (our current fork, missing some slots like 98) + // / + // 90 - 91 - 92 (all correct) + // \ + // 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100 (correct fork) + // ``` + let tree = test_setup + .correct_ancestors_response + .iter() + .filter(|(slot, _)| *slot <= 92 || *slot % 2 == 1) + .fold(tr(request_slot), |tree, (slot, _)| (tr(*slot) / tree)); + test_setup + .blockstore + .add_tree(tree, true, true, 2, Hash::default()); + + let repair_status = + match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { + DuplicateAncestorDecision::EarliestPrunedMismatchFound(repair_status) => { + repair_status + } + x => panic!("Incorrect decision {x:?}"), + }; + + // Expect to find everything after 93 in the `correct_ancestors_to_repair`. + let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup + .correct_ancestors_response + .into_iter() + .filter(|(slot, _)| *slot > 93) + .collect(); + assert_eq!( + repair_status.correct_ancestors_to_repair, + expected_mismatched_slots + ); + } + + #[test] + fn test_add_multiple_responses_pruned_ancestor_is_bad() { + let request_slot = 100; + let mut test_setup = setup_add_response_test_pruned(request_slot, 10); + + // Set up the situation we expect to see, exactly 1 duplicate has caused this branch to + // descend from pruned. + // ``` + // Our fork view: + // 90 - 91 - 92 + // 10 - 11 - 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100 + // + // Correct fork: + // 90 - 91 - 92 - 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100 + // ``` + let root_fork = tr(90) / (tr(91) / tr(92)); + let pruned_fork = [10, 11, 93, 94, 95, 96, 97, 98, 99] + .iter() + .rev() + .fold(tr(100), |tree, slot| (tr(*slot) / tree)); + + test_setup + .blockstore + .add_tree(root_fork, true, true, 2, Hash::default()); + test_setup + .blockstore + .add_tree(pruned_fork, true, true, 2, Hash::default()); + + let repair_status = + match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { + DuplicateAncestorDecision::EarliestPrunedMismatchFound(repair_status) => { + repair_status + } + x => panic!("Incorrect decision {x:?}"), + }; + + // Expect to find everything after 92 in the `correct_ancestors_to_repair`. + let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup + .correct_ancestors_response + .into_iter() + .filter(|(slot, _)| *slot >= 93) + .collect(); + assert_eq!( + repair_status.correct_ancestors_to_repair, + expected_mismatched_slots + ); + } } diff --git a/core/src/heaviest_subtree_fork_choice.rs b/core/src/heaviest_subtree_fork_choice.rs index 7fd12e2167..f624c4849b 100644 --- a/core/src/heaviest_subtree_fork_choice.rs +++ b/core/src/heaviest_subtree_fork_choice.rs @@ -498,6 +498,10 @@ impl HeaviestSubtreeForkChoice { .map(|(slot_hash, fork_info)| (slot_hash, fork_info.stake_voted_subtree)) } + pub fn slots_iter(&self) -> impl Iterator + '_ { + self.fork_infos.iter().map(|((slot, _), _)| slot).copied() + } + /// Split off the node at `slot_hash_key` and propagate the stake subtraction up to the root of the /// tree. /// diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index d168a7ff96..b2feaf6306 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -10,6 +10,7 @@ use { ancestor_hashes_service::{AncestorHashesReplayUpdateReceiver, AncestorHashesService}, cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, + duplicate_repair_status::AncestorDuplicateSlotsToRepair, outstanding_requests::OutstandingRequests, repair_weight::RepairWeight, serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY}, @@ -49,13 +50,15 @@ use { const DEFER_REPAIR_THRESHOLD: Duration = Duration::from_millis(200); const DEFER_REPAIR_THRESHOLD_TICKS: u64 = DEFER_REPAIR_THRESHOLD.as_millis() as u64 / MS_PER_TICK; -pub type DuplicateSlotsResetSender = CrossbeamSender>; -pub type DuplicateSlotsResetReceiver = CrossbeamReceiver>; +pub type AncestorDuplicateSlotsSender = CrossbeamSender; +pub type AncestorDuplicateSlotsReceiver = CrossbeamReceiver; pub type ConfirmedSlotsSender = CrossbeamSender>; pub type ConfirmedSlotsReceiver = CrossbeamReceiver>; pub type DumpedSlotsSender = CrossbeamSender>; pub type DumpedSlotsReceiver = CrossbeamReceiver>; pub type OutstandingShredRepairs = OutstandingRequests; +pub type PopularPrunedForksSender = CrossbeamSender>; +pub type PopularPrunedForksReceiver = CrossbeamReceiver>; #[derive(Default, Debug)] pub struct SlotRepairs { @@ -197,7 +200,7 @@ pub struct RepairInfo { pub cluster_info: Arc, pub cluster_slots: Arc, pub epoch_schedule: EpochSchedule, - pub duplicate_slots_reset_sender: DuplicateSlotsResetSender, + pub ancestor_duplicate_slots_sender: AncestorDuplicateSlotsSender, // Validators from which repairs are requested pub repair_validators: Option>, // Validators which should be given priority when serving @@ -224,6 +227,7 @@ pub struct RepairService { } impl RepairService { + #[allow(clippy::too_many_arguments)] pub fn new( blockstore: Arc, exit: Arc, @@ -234,6 +238,7 @@ impl RepairService { outstanding_requests: Arc>, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, dumped_slots_receiver: DumpedSlotsReceiver, + popular_pruned_forks_sender: PopularPrunedForksSender, ) -> Self { let t_repair = { let blockstore = blockstore.clone(); @@ -250,6 +255,7 @@ impl RepairService { verified_vote_receiver, &outstanding_requests, dumped_slots_receiver, + popular_pruned_forks_sender, ) }) .unwrap() @@ -277,6 +283,7 @@ impl RepairService { verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: &RwLock, dumped_slots_receiver: DumpedSlotsReceiver, + popular_pruned_forks_sender: PopularPrunedForksSender, ) { let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root()); let serve_repair = ServeRepair::new( @@ -290,6 +297,7 @@ impl RepairService { let mut best_repairs_stats = BestRepairsStats::default(); let mut last_stats = Instant::now(); let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY); + let mut popular_pruned_forks_requests = HashSet::new(); loop { if exit.load(Ordering::Relaxed) { @@ -329,7 +337,12 @@ impl RepairService { // question would have already been purged in `repair_weight.set_root` // and there is no chance of it being part of the rooted path. if slot >= repair_weight.root() { - repair_weight.split_off(slot); + let dumped_slots = repair_weight.split_off(slot); + // Remove from outstanding ancestor hashes requests. Also clean any + // requests that might have been since fixed + popular_pruned_forks_requests.retain(|slot| { + !dumped_slots.contains(slot) && repair_weight.is_pruned(*slot) + }); } } }); @@ -371,6 +384,32 @@ impl RepairService { &mut best_repairs_stats, ); + let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks( + root_bank.epoch_stakes_map(), + root_bank.epoch_schedule(), + ); + // Check if we've already sent a request along this pruned fork + popular_pruned_forks.retain(|slot| { + if popular_pruned_forks_requests + .iter() + .any(|prev_req_slot| repair_weight.same_tree(*slot, *prev_req_slot)) + { + false + } else { + popular_pruned_forks_requests.insert(*slot); + true + } + }); + if !popular_pruned_forks.is_empty() { + warn!( + "Notifying repair of popular pruned forks {:?}", + popular_pruned_forks + ); + popular_pruned_forks_sender + .send(popular_pruned_forks) + .unwrap_or_else(|err| error!("failed to send popular pruned forks {err}")); + } + repairs }; diff --git a/core/src/repair_weight.rs b/core/src/repair_weight.rs index edaab3d830..dbd8cccc6b 100644 --- a/core/src/repair_weight.rs +++ b/core/src/repair_weight.rs @@ -4,7 +4,9 @@ use { repair_generic_traversal::{get_closest_completion, get_unknown_last_index}, repair_service::{BestRepairsStats, RepairTiming}, repair_weighted_traversal, + replay_stage::DUPLICATE_THRESHOLD, serve_repair::ShredRepairType, + tree_diff::TreeDiff, }, solana_ledger::{ ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_meta::SlotMeta, @@ -33,6 +35,14 @@ impl TreeRoot { pub fn is_pruned(&self) -> bool { matches!(self, Self::PrunedRoot(_)) } + + #[cfg(test)] + pub fn slot(&self) -> Slot { + match self { + Self::Root(slot) => *slot, + Self::PrunedRoot(slot) => *slot, + } + } } impl From for Slot { @@ -43,10 +53,16 @@ impl From for Slot { } } } + +#[derive(Clone)] pub struct RepairWeight { // Map from root -> a subtree rooted at that `root` trees: HashMap, // Map from root -> pruned subtree + // In the case of duplicate blocks linking back to a slot which is pruned, it is important to + // hold onto pruned trees so that we can repair / ancestor hashes repair when necessary. Since + // the parent slot is pruned these blocks will never be replayed / marked dead, so the existing + // dead duplicate confirmed pathway will not catch this special case. // We manage the size by removing slots < root pruned_trees: HashMap, @@ -172,7 +188,7 @@ impl RepairWeight { for (tree_root, updates) in all_subtree_updates { let tree = self - .get_tree(&tree_root) + .get_tree_mut(tree_root) .expect("Tree for `tree_root` must exist here"); let updates: Vec<_> = updates.into_iter().collect(); tree.add_votes( @@ -297,17 +313,18 @@ impl RepairWeight { /// This function removes and destroys the original `ST`. /// /// Assumes that `slot` is greater than `self.root`. - pub fn split_off(&mut self, slot: Slot) { + /// Returns slots that were orphaned + pub fn split_off(&mut self, slot: Slot) -> HashSet { assert!(slot >= self.root); if slot == self.root { error!("Trying to orphan root of repair tree {}", slot); - return; + return HashSet::new(); } match self.slot_to_tree.get(&slot).copied() { Some(TreeRoot::Root(subtree_root)) => { if subtree_root == slot { info!("{} is already orphan, skipping", slot); - return; + return HashSet::new(); } let subtree = self .trees @@ -316,6 +333,7 @@ impl RepairWeight { let orphaned_tree = subtree.split_off(&(slot, Hash::default())); self.rename_tree_root(&orphaned_tree, TreeRoot::Root(slot)); self.trees.insert(slot, orphaned_tree); + self.trees.get(&slot).unwrap().slots_iter().collect() } Some(TreeRoot::PrunedRoot(subtree_root)) => { // Even if these orphaned slots were previously pruned, they should be added back to @@ -341,11 +359,17 @@ impl RepairWeight { // back into the main set of trees, self.trees self.rename_tree_root(&subtree, TreeRoot::Root(subtree_root)); self.trees.insert(subtree_root, subtree); + self.trees + .get(&subtree_root) + .unwrap() + .slots_iter() + .collect() } else { let orphaned_tree = subtree.split_off(&(slot, Hash::default())); self.pruned_trees.insert(subtree_root, subtree); self.rename_tree_root(&orphaned_tree, TreeRoot::Root(slot)); self.trees.insert(slot, orphaned_tree); + self.trees.get(&slot).unwrap().slots_iter().collect() } } None => { @@ -353,6 +377,7 @@ impl RepairWeight { "Trying to split off slot {} which doesn't currently exist in repair", slot ); + HashSet::new() } } } @@ -710,17 +735,111 @@ impl RepairWeight { Some(orphan_tree_root) } - fn get_tree(&mut self, tree_root: &TreeRoot) -> Option<&mut HeaviestSubtreeForkChoice> { + /// If any pruned trees reach the `DUPLICATE_THRESHOLD`, there is a high chance that they are + /// duplicate confirmed (can't say for sure because we don't differentiate by hash in + /// `repair_weight`). + /// For each such pruned tree, find the deepest child which has reached `DUPLICATE_THRESHOLD` + /// for handling in ancestor hashes repair + /// We refer to such trees as "popular" pruned forks, and the deepest child as the "popular" pruned + /// slot of the fork. + /// + /// `DUPLICATE_THRESHOLD` is expected to be > 50%. + /// It is expected that no two children of a parent could both reach `DUPLICATE_THRESHOLD`. + pub fn get_popular_pruned_forks( + &self, + epoch_stakes: &HashMap, + epoch_schedule: &EpochSchedule, + ) -> Vec { + #[cfg(test)] + static_assertions::const_assert!(DUPLICATE_THRESHOLD > 0.5); + let mut repairs = vec![]; + for (pruned_root, pruned_tree) in self.pruned_trees.iter() { + let mut slot_to_start_repair = (*pruned_root, Hash::default()); + + // This pruned tree *could* span an epoch boundary. To be safe we use the + // minimum DUPLICATE_THRESHOLD across slots in case of stake modification. This + // *could* lead to a false positive. + // + // Additionally, we could have a case where a slot that reached `DUPLICATE_THRESHOLD` + // no longer reaches threshold post epoch boundary due to stake modifications. + // + // Post boundary, we have 2 cases: + // 1) The previously popular slot stays as the majority fork. In this + // case it will eventually reach the new duplicate threshold and + // validators missing the correct version will be able to trigger this pruned + // repair pathway. + // 2) With the stake modifications, this previously popular slot no + // longer holds the majority stake. The remaining stake is now expected to + // reach consensus on a new fork post epoch boundary. Once this consensus is + // reached, validators on the popular pruned fork will be able to switch + // to the new majority fork. + // + // In either case, `HeaviestSubtreeForkChoice` updates the stake only when observing new + // votes leading to a potential mixed bag of stakes being observed. It is safest to use + // the minimum threshold from either side of the boundary. + let min_total_stake = pruned_tree + .slots_iter() + .map(|slot| { + epoch_stakes + .get(&epoch_schedule.get_epoch(slot)) + .expect("Pruned tree cannot contain slots more than an epoch behind") + .total_stake() + }) + .min() + .expect("Pruned tree cannot be empty"); + let duplicate_confirmed_threshold = + ((min_total_stake as f64) * DUPLICATE_THRESHOLD) as u64; + + // TODO: `HeaviestSubtreeForkChoice` subtracts and migrates stake as validators switch + // forks within the rooted subtree, however `repair_weight` does not migrate stake + // across subtrees. This could lead to an additional false positive if validators + // switch post prune as stake added to a pruned tree it is never removed. + // A further optimization could be to store an additional `latest_votes` + // in `repair_weight` to manage switching across subtrees. + if pruned_tree + .stake_voted_subtree(&slot_to_start_repair) + .expect("Root of tree must exist") + >= duplicate_confirmed_threshold + { + // Search to find the deepest node that still has >= duplicate_confirmed_threshold (could + // just use best slot but this is a slight optimization that will save us some iterations + // in ancestor repair) + while let Some(child) = pruned_tree + .children(&slot_to_start_repair) + .expect("Found earlier, this slot should exist") + .find(|c| { + pruned_tree + .stake_voted_subtree(c) + .expect("Found in children must exist") + >= duplicate_confirmed_threshold + }) + { + slot_to_start_repair = *child; + } + repairs.push(slot_to_start_repair.0); + } + } + repairs + } + + fn get_tree(&self, tree_root: TreeRoot) -> Option<&HeaviestSubtreeForkChoice> { match tree_root { - TreeRoot::Root(r) => self.trees.get_mut(r), - TreeRoot::PrunedRoot(r) => self.pruned_trees.get_mut(r), + TreeRoot::Root(r) => self.trees.get(&r), + TreeRoot::PrunedRoot(r) => self.pruned_trees.get(&r), } } - fn remove_tree(&mut self, tree_root: &TreeRoot) -> Option { + fn get_tree_mut(&mut self, tree_root: TreeRoot) -> Option<&mut HeaviestSubtreeForkChoice> { match tree_root { - TreeRoot::Root(r) => self.trees.remove(r), - TreeRoot::PrunedRoot(r) => self.pruned_trees.remove(r), + TreeRoot::Root(r) => self.trees.get_mut(&r), + TreeRoot::PrunedRoot(r) => self.pruned_trees.get_mut(&r), + } + } + + fn remove_tree(&mut self, tree_root: TreeRoot) -> Option { + match tree_root { + TreeRoot::Root(r) => self.trees.remove(&r), + TreeRoot::PrunedRoot(r) => self.pruned_trees.remove(&r), } } @@ -728,6 +847,22 @@ impl RepairWeight { self.slot_to_tree.get(&slot).copied() } + /// Returns true iff `slot` is currently tracked and in a pruned tree + pub fn is_pruned(&self, slot: Slot) -> bool { + self.get_tree_root(slot) + .as_ref() + .map(TreeRoot::is_pruned) + .unwrap_or(false) + } + + /// Returns true iff `slot1` and `slot2` are both tracked and belong to the same tree + pub fn same_tree(&self, slot1: Slot, slot2: Slot) -> bool { + self.get_tree_root(slot1) + .and_then(|tree_root| self.get_tree(tree_root)) + .map(|tree| tree.contains_block(&(slot2, Hash::default()))) + .unwrap_or(false) + } + /// Assumes that `new_tree_root` does not already exist in `self.trees` fn insert_new_tree(&mut self, new_tree_root: Slot) { assert!(!self.trees.contains_key(&new_tree_root)); @@ -798,12 +933,12 @@ impl RepairWeight { epoch_schedule: &EpochSchedule, ) { // Update self.slot_to_tree to reflect the merge - let tree1 = self.remove_tree(&root1).expect("tree to merge must exist"); + let tree1 = self.remove_tree(root1).expect("tree to merge must exist"); self.rename_tree_root(&tree1, root2); // Merge trees let tree2 = self - .get_tree(&root2) + .get_tree_mut(root2) .expect("tree to be merged into must exist"); tree2.merge( @@ -1158,7 +1293,7 @@ mod test { // Add a vote to a slot chaining to pruned blockstore.add_tree(tr(6) / tr(20), true, true, 2, Hash::default()); - let votes = vec![(23, vote_pubkeys.clone())]; + let votes = vec![(23, vote_pubkeys.iter().take(1).copied().collect_vec())]; repair_weight.add_votes( &blockstore, votes.into_iter(), @@ -1184,6 +1319,55 @@ mod test { TreeRoot::PrunedRoot(3) ); + // Pruned tree should now have 1 vote + assert_eq!( + repair_weight + .pruned_trees + .get(&3) + .unwrap() + .stake_voted_subtree(&(3, Hash::default())) + .unwrap(), + stake + ); + assert_eq!( + repair_weight + .trees + .get(&2) + .unwrap() + .stake_voted_subtree(&(2, Hash::default())) + .unwrap(), + 3 * stake + ); + + // Add the rest of the stake + let votes = vec![(23, vote_pubkeys.iter().skip(1).copied().collect_vec())]; + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + + // Pruned tree should have all the stake as well + assert_eq!( + repair_weight + .pruned_trees + .get(&3) + .unwrap() + .stake_voted_subtree(&(3, Hash::default())) + .unwrap(), + 3 * stake + ); + assert_eq!( + repair_weight + .trees + .get(&2) + .unwrap() + .stake_voted_subtree(&(2, Hash::default())) + .unwrap(), + 3 * stake + ); + // Update root and trim pruned tree repair_weight.set_root(10); // Add a vote to an orphan, where earliest ancestor is unrooted, should still add as pruned @@ -1610,11 +1794,7 @@ mod test { #[test] fn test_set_root_pruned_tree_trim_and_cleanup() { - // Connect orphans to main fork - let blockstore = setup_orphans(); - blockstore.add_tree(tr(2) / tr(8), true, true, 2, Hash::default()); - blockstore.add_tree(tr(3) / (tr(9) / tr(20)), true, true, 2, Hash::default()); - + let blockstore = setup_big_forks(); let stake = 100; let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(3, stake); let votes = vec![ @@ -1717,11 +1897,7 @@ mod test { #[test] fn test_set_root_pruned_tree_split() { - // Connect orphans to main fork - let blockstore = setup_orphans(); - blockstore.add_tree(tr(2) / tr(8), true, true, 2, Hash::default()); - blockstore.add_tree(tr(3) / (tr(9) / tr(20)), true, true, 2, Hash::default()); - + let blockstore = setup_big_forks(); let stake = 100; let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(3, stake); let votes = vec![ @@ -2045,7 +2221,7 @@ mod test { // Add 22 to `pruned_trees`, ancestor search should now // chain it back to 20 - repair_weight.remove_tree(&TreeRoot::Root(20)).unwrap(); + repair_weight.remove_tree(TreeRoot::Root(20)).unwrap(); repair_weight.insert_new_pruned_tree(20); assert_eq!( repair_weight.find_ancestor_subtree_of_slot(&blockstore, 23), @@ -2222,6 +2398,245 @@ mod test { assert_eq!(orphans, vec![0, 3]); } + #[test] + fn test_get_popular_pruned_forks() { + let blockstore = setup_big_forks(); + let stake = 100; + let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(10, stake); + let epoch_stakes = bank.epoch_stakes_map(); + let epoch_schedule = bank.epoch_schedule(); + + // Add a little stake for each fork + let votes = vec![ + (4, vec![vote_pubkeys[0]]), + (11, vec![vote_pubkeys[1]]), + (6, vec![vote_pubkeys[2]]), + (23, vec![vote_pubkeys[3]]), + ]; + let mut repair_weight = RepairWeight::new(0); + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + + // Set root to 4, there should now be 3 pruned trees with `stake` + repair_weight.set_root(4); + assert_eq!(repair_weight.trees.len(), 1); + assert_eq!(repair_weight.pruned_trees.len(), 3); + assert!(repair_weight + .pruned_trees + .iter() + .all( + |(root, pruned_tree)| pruned_tree.stake_voted_subtree(&(*root, Hash::default())) + == Some(stake) + )); + + // No fork has DUPLICATE_THRESHOLD, should not be any popular forks + assert!(repair_weight + .get_popular_pruned_forks(epoch_stakes, epoch_schedule) + .is_empty()); + + // 500 stake, still less than DUPLICATE_THRESHOLD, should not be any popular forks + let five_votes = vote_pubkeys.iter().copied().take(5).collect_vec(); + let votes = vec![(11, five_votes.clone()), (6, five_votes)]; + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + assert!(repair_weight + .get_popular_pruned_forks(epoch_stakes, epoch_schedule) + .is_empty()); + + // 600 stake, since we voted for leaf, leaf should be returned + let votes = vec![(11, vec![vote_pubkeys[5]]), (6, vec![vote_pubkeys[6]])]; + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + assert_eq!( + vec![6, 11], + repair_weight + .get_popular_pruned_forks(epoch_stakes, epoch_schedule) + .into_iter() + .sorted() + .collect_vec() + ); + + // For the last pruned tree we leave 100 stake on 23 and 22 and put 600 stake on 20. We + // should return 20 and not traverse the tree deeper + let six_votes = vote_pubkeys.iter().copied().take(6).collect_vec(); + let votes = vec![(20, six_votes)]; + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + assert_eq!( + vec![6, 11, 20], + repair_weight + .get_popular_pruned_forks(epoch_stakes, epoch_schedule) + .into_iter() + .sorted() + .collect_vec() + ); + } + + #[test] + fn test_get_popular_pruned_forks_forks() { + let blockstore = setup_big_forks(); + let stake = 100; + let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(10, stake); + let epoch_stakes = bank.epoch_stakes_map(); + let epoch_schedule = bank.epoch_schedule(); + + // Add a little stake for each fork + let votes = vec![ + (4, vec![vote_pubkeys[0]]), + (11, vec![vote_pubkeys[1]]), + (6, vec![vote_pubkeys[2]]), + (23, vec![vote_pubkeys[3]]), + ]; + let mut repair_weight = RepairWeight::new(0); + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + + // Prune the entire tree + std::mem::swap(&mut repair_weight.trees, &mut repair_weight.pruned_trees); + repair_weight + .slot_to_tree + .iter_mut() + .for_each(|(_, s)| *s = TreeRoot::PrunedRoot(s.slot())); + + // Traverse to 20 + let mut repair_weight_20 = repair_weight.clone(); + repair_weight_20.add_votes( + &blockstore, + vec![(20, vote_pubkeys.clone())].into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + assert_eq!( + vec![20], + repair_weight_20.get_popular_pruned_forks(epoch_stakes, epoch_schedule) + ); + + // 4 and 8 individually do not have enough stake, but 2 is popular + let votes = vec![(10, vote_pubkeys.iter().copied().skip(6).collect_vec())]; + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + assert_eq!( + vec![2], + repair_weight.get_popular_pruned_forks(epoch_stakes, epoch_schedule) + ); + } + + #[test] + fn test_get_popular_pruned_forks_stake_change_across_epoch_boundary() { + let blockstore = setup_big_forks(); + let stake = 100; + let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(10, stake); + let mut epoch_stakes = bank.epoch_stakes_map().clone(); + let mut epoch_schedule = *bank.epoch_schedule(); + + // Simulate epoch boundary at slot 10, where half of the stake deactivates + // Additional epoch boundary at slot 20, where 30% of the stake reactivates + let initial_stakes = epoch_stakes + .get(&epoch_schedule.get_epoch(0)) + .unwrap() + .clone(); + let mut dec_stakes = epoch_stakes + .get(&epoch_schedule.get_epoch(0)) + .unwrap() + .clone(); + let mut inc_stakes = epoch_stakes + .get(&epoch_schedule.get_epoch(0)) + .unwrap() + .clone(); + epoch_schedule.first_normal_slot = 0; + epoch_schedule.slots_per_epoch = 10; + assert_eq!( + epoch_schedule.get_epoch(10), + epoch_schedule.get_epoch(9) + 1 + ); + assert_eq!( + epoch_schedule.get_epoch(20), + epoch_schedule.get_epoch(19) + 1 + ); + dec_stakes.set_total_stake(dec_stakes.total_stake() - 5 * stake); + inc_stakes.set_total_stake(dec_stakes.total_stake() + 3 * stake); + epoch_stakes.insert(epoch_schedule.get_epoch(0), initial_stakes); + epoch_stakes.insert(epoch_schedule.get_epoch(10), dec_stakes); + epoch_stakes.insert(epoch_schedule.get_epoch(20), inc_stakes); + + // Add a little stake for each fork + let votes = vec![ + (4, vec![vote_pubkeys[0]]), + (11, vec![vote_pubkeys[1]]), + (6, vec![vote_pubkeys[2]]), + (23, vec![vote_pubkeys[3]]), + ]; + let mut repair_weight = RepairWeight::new(0); + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + + // Set root to 4, there should now be 3 pruned trees with `stake` + repair_weight.set_root(4); + assert_eq!(repair_weight.trees.len(), 1); + assert_eq!(repair_weight.pruned_trees.len(), 3); + assert!(repair_weight + .pruned_trees + .iter() + .all( + |(root, pruned_tree)| pruned_tree.stake_voted_subtree(&(*root, Hash::default())) + == Some(stake) + )); + + // No fork hash `DUPLICATE_THRESHOLD`, should not be any popular forks + assert!(repair_weight + .get_popular_pruned_forks(&epoch_stakes, &epoch_schedule) + .is_empty()); + + // 400 stake, For the 6 tree it will be less than `DUPLICATE_THRESHOLD`, however 11 + // has epoch modifications where at some point 400 stake is enough. For 22, although it + // does cross the second epoch where the stake requirement was less, because it doesn't + // have any blocks in that epoch the minimum total stake is still 800 which fails. + let four_votes = vote_pubkeys.iter().copied().take(4).collect_vec(); + let votes = vec![ + (11, four_votes.clone()), + (6, four_votes.clone()), + (22, four_votes), + ]; + repair_weight.add_votes( + &blockstore, + votes.into_iter(), + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + assert_eq!( + vec![11], + repair_weight.get_popular_pruned_forks(&epoch_stakes, &epoch_schedule) + ); + } + fn setup_orphan_repair_weight() -> (Blockstore, Bank, RepairWeight) { let blockstore = setup_orphans(); let stake = 100; @@ -2316,6 +2731,32 @@ mod test { blockstore } + fn setup_big_forks() -> Blockstore { + /* + Build fork structure: + slot 0 + | + slot 1 + / \ + /----| |----| + slot 2 | + / \ slot 3 + slot 4 slot 8 / \ + | slot 5 slot 9 + slot 10 | | + | slot 6 slot 20 + slot 11 | + slot 22 + | + slot 23 + */ + let blockstore = setup_orphans(); + // Connect orphans to main fork + blockstore.add_tree(tr(2) / tr(8), true, true, 2, Hash::default()); + blockstore.add_tree(tr(3) / (tr(9) / tr(20)), true, true, 2, Hash::default()); + blockstore + } + fn setup_forks() -> Blockstore { /* Build fork structure: diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f202cfa26d..f183a0e064 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -18,11 +18,14 @@ use { SWITCH_FORK_THRESHOLD, }, cost_update_service::CostUpdate, + duplicate_repair_status::AncestorDuplicateSlotsToRepair, fork_choice::{ForkChoice, SelectVoteAndResetForkResult}, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats}, - repair_service::{DumpedSlotsSender, DuplicateSlotsResetReceiver}, + repair_service::{ + AncestorDuplicateSlotsReceiver, DumpedSlotsSender, PopularPrunedForksReceiver, + }, rewards_recorder_service::{RewardsMessage, RewardsRecorderSender}, tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, @@ -268,9 +271,11 @@ pub struct ReplayTiming { wait_receive_elapsed: u64, heaviest_fork_failures_elapsed: u64, bank_count: u64, + process_ancestor_hashes_duplicate_slots_elapsed: u64, process_gossip_duplicate_confirmed_slots_elapsed: u64, process_duplicate_slots_elapsed: u64, process_unfrozen_gossip_verified_vote_hashes_elapsed: u64, + process_popular_pruned_forks_elapsed: u64, repair_correct_slots_elapsed: u64, retransmit_not_propagated_elapsed: u64, generate_new_bank_forks_read_lock_us: u64, @@ -296,8 +301,10 @@ impl ReplayTiming { wait_receive_elapsed: u64, heaviest_fork_failures_elapsed: u64, bank_count: u64, + process_ancestor_hashes_duplicate_slots_elapsed: u64, process_gossip_duplicate_confirmed_slots_elapsed: u64, process_unfrozen_gossip_verified_vote_hashes_elapsed: u64, + process_popular_pruned_forks_elapsed: u64, process_duplicate_slots_elapsed: u64, repair_correct_slots_elapsed: u64, retransmit_not_propagated_elapsed: u64, @@ -315,10 +322,13 @@ impl ReplayTiming { self.wait_receive_elapsed += wait_receive_elapsed; self.heaviest_fork_failures_elapsed += heaviest_fork_failures_elapsed; self.bank_count += bank_count; + self.process_ancestor_hashes_duplicate_slots_elapsed += + process_ancestor_hashes_duplicate_slots_elapsed; self.process_gossip_duplicate_confirmed_slots_elapsed += process_gossip_duplicate_confirmed_slots_elapsed; self.process_unfrozen_gossip_verified_vote_hashes_elapsed += process_unfrozen_gossip_verified_vote_hashes_elapsed; + self.process_popular_pruned_forks_elapsed += process_popular_pruned_forks_elapsed; self.process_duplicate_slots_elapsed += process_duplicate_slots_elapsed; self.repair_correct_slots_elapsed += repair_correct_slots_elapsed; self.retransmit_not_propagated_elapsed += retransmit_not_propagated_elapsed; @@ -381,6 +391,11 @@ impl ReplayTiming { self.replay_active_banks_elapsed as i64, i64 ), + ( + "process_ancestor_hashes_duplicate_slots_elapsed", + self.process_ancestor_hashes_duplicate_slots_elapsed as i64, + i64 + ), ( "process_gossip_duplicate_confirmed_slots_elapsed", self.process_gossip_duplicate_confirmed_slots_elapsed as i64, @@ -391,6 +406,11 @@ impl ReplayTiming { self.process_unfrozen_gossip_verified_vote_hashes_elapsed as i64, i64 ), + ( + "process_popular_pruned_forks_elapsed", + self.process_popular_pruned_forks_elapsed as i64, + i64 + ), ( "wait_receive_elapsed", self.wait_receive_elapsed as i64, @@ -468,7 +488,7 @@ impl ReplayStage { vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, - epoch_slots_frozen_receiver: DuplicateSlotsResetReceiver, + ancestor_duplicate_slots_receiver: AncestorDuplicateSlotsReceiver, replay_vote_sender: ReplayVoteSender, gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver, gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, @@ -481,6 +501,7 @@ impl ReplayStage { prioritization_fee_cache: Arc, dumped_slots_sender: DumpedSlotsSender, banking_tracer: Arc, + popular_pruned_forks_receiver: PopularPrunedForksReceiver, ) -> Result { let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { let tower = process_blockstore.process_to_create_tower()?; @@ -626,13 +647,15 @@ impl ReplayStage { let forks_root = bank_forks.read().unwrap().root(); - // Reset any dead slots that have been frozen by a sufficient portion of - // the network. Signalled by repair_service. - let mut purge_dead_slots_time = Measure::start("purge_dead_slots"); - Self::process_epoch_slots_frozen_dead_slots( + // Process cluster-agreed versions of duplicate slots for which we potentially + // have the wrong version. Our version was dead or pruned. + // Signalled by ancestor_hashes_service. + let mut process_ancestor_hashes_duplicate_slots_time = + Measure::start("process_ancestor_hashes_duplicate_slots"); + Self::process_ancestor_hashes_duplicate_slots( &my_pubkey, &blockstore, - &epoch_slots_frozen_receiver, + &ancestor_duplicate_slots_receiver, &mut duplicate_slots_tracker, &gossip_duplicate_confirmed_slots, &mut epoch_slots_frozen_slots, @@ -643,7 +666,7 @@ impl ReplayStage { &ancestor_hashes_replay_update_sender, &mut purge_repair_slot_counter, ); - purge_dead_slots_time.stop(); + process_ancestor_hashes_duplicate_slots_time.stop(); // Check for any newly confirmed slots detected from gossip. let mut process_gossip_duplicate_confirmed_slots_time = @@ -678,6 +701,24 @@ impl ReplayStage { for _ in gossip_verified_vote_hash_receiver.try_iter() {} process_unfrozen_gossip_verified_vote_hashes_time.stop(); + let mut process_popular_pruned_forks_time = + Measure::start("process_popular_pruned_forks_time"); + // Check for "popular" (52+% stake aggregated across versions/descendants) forks + // that are pruned, which would not be detected by normal means. + // Signalled by `repair_service`. + Self::process_popular_pruned_forks( + &popular_pruned_forks_receiver, + &blockstore, + &mut duplicate_slots_tracker, + &mut epoch_slots_frozen_slots, + &bank_forks, + &mut heaviest_subtree_fork_choice, + &mut duplicate_slots_to_repair, + &ancestor_hashes_replay_update_sender, + &mut purge_repair_slot_counter, + ); + process_popular_pruned_forks_time.stop(); + // Check to remove any duplicated slots from fork choice let mut process_duplicate_slots_time = Measure::start("process_duplicate_slots"); if !tpu_has_bank { @@ -1053,8 +1094,10 @@ impl ReplayStage { wait_receive_time.as_us(), heaviest_fork_failures_time.as_us(), u64::from(did_complete_bank), + process_ancestor_hashes_duplicate_slots_time.as_us(), process_gossip_duplicate_confirmed_slots_time.as_us(), process_unfrozen_gossip_verified_vote_hashes_time.as_us(), + process_popular_pruned_forks_time.as_us(), process_duplicate_slots_time.as_us(), dump_then_repair_correct_slots_time.as_us(), retransmit_not_propagated_time.as_us(), @@ -1292,10 +1335,9 @@ impl ReplayStage { } } else { warn!( - "Trying to dump slot {} which does not exist in bank forks", + "Dumping slot {} which does not exist in bank forks (possibly pruned)", *duplicate_slot ); - return false; } @@ -1357,10 +1399,10 @@ impl ReplayStage { } #[allow(clippy::too_many_arguments)] - fn process_epoch_slots_frozen_dead_slots( + fn process_ancestor_hashes_duplicate_slots( pubkey: &Pubkey, blockstore: &Blockstore, - epoch_slots_frozen_receiver: &DuplicateSlotsResetReceiver, + ancestor_duplicate_slots_receiver: &AncestorDuplicateSlotsReceiver, duplicate_slots_tracker: &mut DuplicateSlotsTracker, gossip_duplicate_confirmed_slots: &GossipDuplicateConfirmedSlots, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, @@ -1372,13 +1414,17 @@ impl ReplayStage { purge_repair_slot_counter: &mut PurgeRepairSlotCounter, ) { let root = bank_forks.read().unwrap().root(); - for maybe_purgeable_duplicate_slots in epoch_slots_frozen_receiver.try_iter() { + for AncestorDuplicateSlotsToRepair { + slots_to_repair: maybe_repairable_duplicate_slots, + request_type, + } in ancestor_duplicate_slots_receiver.try_iter() + { warn!( - "{} ReplayStage notified of epoch slots duplicate frozen dead slots: {:?}", - pubkey, maybe_purgeable_duplicate_slots + "{} ReplayStage notified of duplicate slots from ancestor hashes service but we observed as {}: {:?}", + pubkey, if request_type.is_pruned() {"pruned"} else {"dead"}, maybe_repairable_duplicate_slots, ); for (epoch_slots_frozen_slot, epoch_slots_frozen_hash) in - maybe_purgeable_duplicate_slots.into_iter() + maybe_repairable_duplicate_slots.into_iter() { let epoch_slots_frozen_state = EpochSlotsFrozenState::new_from_state( epoch_slots_frozen_slot, @@ -1393,6 +1439,7 @@ impl ReplayStage { .get(epoch_slots_frozen_slot) .map(|b| b.hash()) }, + request_type.is_pruned(), ); check_slot_agrees_with_cluster( epoch_slots_frozen_slot, @@ -1531,6 +1578,40 @@ impl ReplayStage { .expect("must exist based on earlier check"); } + #[allow(clippy::too_many_arguments)] + fn process_popular_pruned_forks( + popular_pruned_forks_receiver: &PopularPrunedForksReceiver, + blockstore: &Blockstore, + duplicate_slots_tracker: &mut DuplicateSlotsTracker, + epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, + bank_forks: &RwLock, + fork_choice: &mut HeaviestSubtreeForkChoice, + duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, + ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, + purge_repair_slot_counter: &mut PurgeRepairSlotCounter, + ) { + let root = bank_forks.read().unwrap().root(); + for new_popular_pruned_slots in popular_pruned_forks_receiver.try_iter() { + for new_popular_pruned_slot in new_popular_pruned_slots { + if new_popular_pruned_slot <= root { + continue; + } + check_slot_agrees_with_cluster( + new_popular_pruned_slot, + root, + blockstore, + duplicate_slots_tracker, + epoch_slots_frozen_slots, + fork_choice, + duplicate_slots_to_repair, + ancestor_hashes_replay_update_sender, + purge_repair_slot_counter, + SlotStateUpdate::PopularPrunedFork, + ); + } + } + } + // Check for any newly confirmed slots by the cluster. This is only detects // optimistic and in the future, duplicate slot confirmations on the exact // single slots and does not account for votes on their descendants. Used solely diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index eca67061bc..9c52596f80 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -1,7 +1,7 @@ use { crate::{ cluster_slots::ClusterSlots, - duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE, + duplicate_repair_status::get_ancestor_hash_repair_sample_size, repair_response, repair_service::{OutstandingShredRepairs, RepairStats, REPAIR_MS}, request_response::RequestResponse, @@ -65,11 +65,15 @@ pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 11; pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128; // Limit cache entries ttl in order to avoid re-using outdated data. const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10); + +#[cfg(test)] +static_assertions::const_assert_eq!(MAX_ANCESTOR_BYTES_IN_PACKET, 1220); pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize = PACKET_DATA_SIZE - SIZE_OF_NONCE - 4 /*(response version enum discriminator)*/ - 4 /*slot_hash length*/; + pub const MAX_ANCESTOR_RESPONSES: usize = MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::<(Slot, Hash)>(); /// Number of bytes in the randomly generated token sent with ping messages. @@ -1109,7 +1113,7 @@ impl ServeRepair { let addr = repair_peers[i].serve_repair().ok()?; Some((*repair_peers[i].pubkey(), addr)) }) - .take(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE) + .take(get_ancestor_hash_repair_sample_size()) .collect(); Ok(peers) } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 4ba16d39d2..624acc59c3 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -188,17 +188,18 @@ impl Tvu { ); let cluster_slots = Arc::new(ClusterSlots::default()); - let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded(); + let (ancestor_duplicate_slots_sender, ancestor_duplicate_slots_receiver) = unbounded(); let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = unbounded(); let (dumped_slots_sender, dumped_slots_receiver) = unbounded(); + let (popular_pruned_forks_sender, popular_pruned_forks_receiver) = unbounded(); let window_service = { let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule(); let repair_info = RepairInfo { bank_forks: bank_forks.clone(), epoch_schedule, - duplicate_slots_reset_sender, + ancestor_duplicate_slots_sender, repair_validators: tvu_config.repair_validators, repair_whitelist: tvu_config.repair_whitelist, cluster_info: cluster_info.clone(), @@ -218,6 +219,7 @@ impl Tvu { duplicate_slots_sender, ancestor_hashes_replay_update_receiver, dumped_slots_receiver, + popular_pruned_forks_sender, ) }; @@ -290,7 +292,7 @@ impl Tvu { vote_tracker, cluster_slots, retransmit_slots_sender, - duplicate_slots_reset_receiver, + ancestor_duplicate_slots_receiver, replay_vote_sender, gossip_confirmed_slots_receiver, gossip_verified_vote_hash_receiver, @@ -303,6 +305,7 @@ impl Tvu { prioritization_fee_cache.clone(), dumped_slots_sender, banking_tracer, + popular_pruned_forks_receiver, )?; let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index b41e7a700f..e4d162602e 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -7,7 +7,10 @@ use { cluster_info_vote_listener::VerifiedVoteReceiver, completed_data_sets_service::CompletedDataSetsSender, repair_response, - repair_service::{DumpedSlotsReceiver, OutstandingShredRepairs, RepairInfo, RepairService}, + repair_service::{ + DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo, + RepairService, + }, result::{Error, Result}, }, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, @@ -316,6 +319,7 @@ impl WindowService { duplicate_slots_sender: DuplicateSlotSender, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, dumped_slots_receiver: DumpedSlotsReceiver, + popular_pruned_forks_sender: PopularPrunedForksSender, ) -> WindowService { let outstanding_requests = Arc::>::default(); @@ -331,6 +335,7 @@ impl WindowService { outstanding_requests.clone(), ancestor_hashes_replay_update_receiver, dumped_slots_receiver, + popular_pruned_forks_sender, ); let (duplicate_sender, duplicate_receiver) = unbounded(); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index bdeeba57a2..608eb1a489 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3153,6 +3153,11 @@ impl Blockstore { Ok(()) } + /// For tests + pub fn set_last_root(&mut self, root: Slot) { + *self.last_root.write().unwrap() = root; + } + pub fn mark_slots_as_if_rooted_normally_at_startup( &self, slots: Vec<(Slot, Option)>, diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index c9222a581c..c3bf01e024 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -32,6 +32,7 @@ solana-thin-client = { workspace = true } solana-tpu-client = { workspace = true } solana-vote-program = { workspace = true } tempfile = { workspace = true } +trees = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index 1570d62610..a3e96cb76a 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -58,6 +58,10 @@ pub fn last_vote_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option<(Sl restore_tower(tower_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap()) } +pub fn last_root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { + restore_tower(tower_path, node_pubkey).map(|tower| tower.root()) +} + pub fn restore_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); diff --git a/local-cluster/tests/local_cluster_slow_2.rs b/local-cluster/tests/local_cluster_slow_2.rs index 0db7379b09..5e30f250a4 100644 --- a/local-cluster/tests/local_cluster_slow_2.rs +++ b/local-cluster/tests/local_cluster_slow_2.rs @@ -8,7 +8,9 @@ use { serial_test::serial, solana_core::validator::ValidatorConfig, solana_gossip::gossip_service::discover_cluster, - solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore}, + solana_ledger::{ + ancestor_iterator::AncestorIterator, blockstore::Blockstore, leader_schedule::FixedSchedule, + }, solana_local_cluster::{ cluster::Cluster, cluster_tests, @@ -23,7 +25,13 @@ use { signature::{Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, - std::{collections::HashSet, sync::Arc, thread::sleep, time::Duration}, + std::{ + collections::HashSet, + sync::Arc, + thread::sleep, + time::{Duration, Instant}, + }, + trees::tr, }; mod common; @@ -453,3 +461,281 @@ fn test_slot_hash_expiry() { "test_slot_hashes_expiry", ); } + +// This test simulates a case where a leader sends a duplicate block with different ancestory. One +// version builds off of the rooted path, however the other version builds off a pruned branch. The +// validators that receive the pruned version will need to repair in order to continue, which +// requires an ancestor hashes repair. +// +// We setup 3 validators: +// - majority, will produce the rooted path +// - minority, will produce the pruned path +// - our_node, will be fed the pruned version of the duplicate block and need to repair +// +// Additionally we setup 3 observer nodes to propagate votes and participate in the ancestor hashes +// sample. +// +// Fork structure: +// +// 0 - 1 - ... - 10 (fork slot) - 30 - ... - 61 (rooted path) - ... +// | +// |- 11 - ... - 29 (pruned path) - 81' +// +// +// Steps: +// 1) Different leader schedule, minority thinks it produces 0-29 and majority rest, majority +// thinks minority produces all blocks. This is to avoid majority accidentally producing blocks +// before it shuts down. +// 2) Start cluster, kill our_node. +// 3) Kill majority cluster after it votes for any slot > fork slot (guarantees that the fork slot is +// reached as minority cannot pass threshold otherwise). +// 4) Let minority produce forks on pruned forks until out of leader slots then kill. +// 5) Truncate majority ledger past fork slot so it starts building off of fork slot. +// 6) Restart majority and wait untill it starts producing blocks on main fork and roots something +// past the fork slot. +// 7) Construct our ledger by copying majority ledger and copying blocks from minority for the pruned path. +// 8) In our node's ledger, change the parent of the latest slot in majority fork to be the latest +// slot in the minority fork (simulates duplicate built off of pruned block) +// 9) Start our node which will pruned the minority fork on ledger replay and verify that we can make roots. +// +#[test] +#[serial] +fn test_duplicate_with_pruned_ancestor() { + solana_logger::setup_with("info,solana_metrics=off"); + solana_core::duplicate_repair_status::set_ancestor_hash_repair_sample_size_for_tests_only(3); + + let majority_leader_stake = 10_000_000 * DEFAULT_NODE_STAKE; + let minority_leader_stake = 2_000_000 * DEFAULT_NODE_STAKE; + let our_node = DEFAULT_NODE_STAKE; + let observer_stake = DEFAULT_NODE_STAKE; + + let slots_per_epoch = 2048; + let fork_slot: u64 = 10; + let fork_length: u64 = 20; + let majority_fork_buffer = 5; + + let mut node_stakes = vec![majority_leader_stake, minority_leader_stake, our_node]; + // We need enough observers to reach `ANCESTOR_HASH_REPAIR_SAMPLE_SIZE` + node_stakes.append(&mut vec![observer_stake; 3]); + + let num_nodes = node_stakes.len(); + + let validator_keys = vec![ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", + "4mx9yoFBeYasDKBGDWCTWGJdWuJCKbgqmuP8bN9umybCh5Jzngw7KQxe99Rf5uzfyzgba1i65rJW4Wqk7Ab5S8ye", + "3zsEPEDsjfEay7te9XqNjRTCE7vwuT6u4DHzBJC19yp7GS8BuNRMRjnpVrKCBzb3d44kxc4KPGSHkCmk6tEfswCg", + ] + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .chain(std::iter::repeat_with(|| (Arc::new(Keypair::new()), true))) + .take(node_stakes.len()) + .collect::>(); + let validators = validator_keys + .iter() + .map(|(kp, _)| kp.pubkey()) + .collect::>(); + let (majority_pubkey, minority_pubkey, our_node_pubkey) = + (validators[0], validators[1], validators[2]); + + let mut default_config = ValidatorConfig::default_for_test(); + // Minority fork is leader long enough to create pruned fork + let validator_to_slots = vec![ + (minority_pubkey, (fork_slot + fork_length) as usize), + (majority_pubkey, slots_per_epoch as usize), + ]; + let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter()); + default_config.fixed_leader_schedule = Some(FixedSchedule { + leader_schedule: Arc::new(leader_schedule), + }); + + let mut validator_configs = make_identical_validator_configs(&default_config, num_nodes); + validator_configs[3].voting_disabled = true; + // Don't let majority produce anything past the fork by tricking its leader schedule + validator_configs[0].fixed_leader_schedule = Some(FixedSchedule { + leader_schedule: Arc::new(create_custom_leader_schedule( + [(minority_pubkey, slots_per_epoch as usize)].into_iter(), + )), + }); + + let mut config = ClusterConfig { + cluster_lamports: DEFAULT_CLUSTER_LAMPORTS + node_stakes.iter().sum::(), + node_stakes, + validator_configs, + validator_keys: Some(validator_keys), + slots_per_epoch, + stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + + let majority_ledger_path = cluster.ledger_path(&majority_pubkey); + let minority_ledger_path = cluster.ledger_path(&minority_pubkey); + let our_node_ledger_path = cluster.ledger_path(&our_node_pubkey); + + info!( + "majority {} ledger path {:?}", + majority_pubkey, majority_ledger_path + ); + info!( + "minority {} ledger path {:?}", + minority_pubkey, minority_ledger_path + ); + info!( + "our_node {} ledger path {:?}", + our_node_pubkey, our_node_ledger_path + ); + + info!("Killing our node"); + let our_node_info = cluster.exit_node(&our_node_pubkey); + + info!("Waiting on majority validator to vote on at least {fork_slot}"); + let now = Instant::now(); + let mut last_majority_vote = 0; + loop { + let elapsed = now.elapsed(); + assert!( + elapsed <= Duration::from_secs(30), + "Majority validator failed to vote on a slot >= {} in {} secs, + majority validator last vote: {}", + fork_slot, + elapsed.as_secs(), + last_majority_vote, + ); + sleep(Duration::from_millis(100)); + + if let Some((last_vote, _)) = last_vote_in_tower(&majority_ledger_path, &majority_pubkey) { + last_majority_vote = last_vote; + if last_vote >= fork_slot { + break; + } + } + } + + info!("Killing majority validator, waiting for minority fork to reach a depth of at least 15",); + let mut majority_validator_info = cluster.exit_node(&majority_pubkey); + + let now = Instant::now(); + let mut last_minority_vote = 0; + while last_minority_vote < fork_slot + 15 { + let elapsed = now.elapsed(); + assert!( + elapsed <= Duration::from_secs(30), + "Minority validator failed to create a fork of depth >= {} in {} secs, + last_minority_vote: {}", + 15, + elapsed.as_secs(), + last_minority_vote, + ); + + if let Some((last_vote, _)) = last_vote_in_tower(&minority_ledger_path, &minority_pubkey) { + last_minority_vote = last_vote; + } + } + + info!( + "Killing minority validator, fork created successfully: {:?}", + last_minority_vote + ); + let last_minority_vote = + wait_for_last_vote_in_tower_to_land_in_ledger(&minority_ledger_path, &minority_pubkey); + let minority_validator_info = cluster.exit_node(&minority_pubkey); + + info!("Truncating majority validator ledger to {fork_slot}"); + { + remove_tower(&majority_ledger_path, &majority_pubkey); + let blockstore = open_blockstore(&majority_ledger_path); + purge_slots_with_count(&blockstore, fork_slot + 1, 100); + } + + info!("Restarting majority validator"); + // Make sure we don't send duplicate votes + majority_validator_info.config.wait_to_vote_slot = Some(fork_slot + fork_length); + // Fix the leader schedule so we can produce blocks + majority_validator_info.config.fixed_leader_schedule = + minority_validator_info.config.fixed_leader_schedule.clone(); + cluster.restart_node( + &majority_pubkey, + majority_validator_info, + SocketAddrSpace::Unspecified, + ); + + let mut last_majority_root = 0; + let now = Instant::now(); + info!( + "Waiting for majority validator to root something past {}", + fork_slot + fork_length + majority_fork_buffer + ); + while last_majority_root <= fork_slot + fork_length + majority_fork_buffer { + let elapsed = now.elapsed(); + assert!( + elapsed <= Duration::from_secs(60), + "Majority validator failed to root something > {} in {} secs, + last majority validator vote: {},", + fork_slot + fork_length + majority_fork_buffer, + elapsed.as_secs(), + last_majority_vote, + ); + sleep(Duration::from_millis(100)); + + if let Some(last_root) = last_root_in_tower(&majority_ledger_path, &majority_pubkey) { + last_majority_root = last_root; + } + } + + let last_majority_vote = + wait_for_last_vote_in_tower_to_land_in_ledger(&majority_ledger_path, &majority_pubkey); + info!("Creating duplicate block built off of pruned branch for our node. Last majority vote {last_majority_vote}, Last minority vote {last_minority_vote}"); + { + { + // Copy majority fork + std::fs::remove_dir_all(&our_node_info.info.ledger_path).unwrap(); + let mut opt = fs_extra::dir::CopyOptions::new(); + opt.copy_inside = true; + fs_extra::dir::copy(&majority_ledger_path, &our_node_ledger_path, &opt).unwrap(); + remove_tower(&our_node_ledger_path, &majority_pubkey); + } + + // Copy minority fork. Rewind our root so that we can copy over the purged bank + let minority_blockstore = open_blockstore(&minority_validator_info.info.ledger_path); + let mut our_blockstore = open_blockstore(&our_node_info.info.ledger_path); + our_blockstore.set_last_root(fork_slot - 1); + copy_blocks(last_minority_vote, &minority_blockstore, &our_blockstore); + + // Change last block parent to chain off of (purged) minority fork + info!("For our node, changing parent of {last_majority_vote} to {last_minority_vote}"); + purge_slots_with_count(&our_blockstore, last_majority_vote, 1); + our_blockstore.add_tree( + tr(last_minority_vote) / tr(last_majority_vote), + false, + true, + 64, + Hash::default(), + ); + + // Update the root to set minority fork back as pruned + our_blockstore.set_last_root(fork_slot + fork_length); + } + + // Actual test, `our_node` will replay the minority fork, then the majority fork which will + // prune the minority fork. Then finally the problematic block will be skipped (not replayed) + // because its parent has been pruned from bank forks. Meanwhile the majority validator has + // continued making blocks and voting, duplicate confirming everything. This will cause the + // pruned fork to become popular triggering an ancestor hashes repair, eventually allowing our + // node to dump & repair & continue making roots. + info!("Restarting our node, verifying that our node is making roots past the duplicate block"); + + cluster.restart_node( + &our_node_pubkey, + our_node_info, + SocketAddrSpace::Unspecified, + ); + + cluster_tests::check_for_new_roots( + 16, + &[cluster.get_contact_info(&our_node_pubkey).unwrap().clone()], + &cluster.connection_cache, + "test_duplicate_with_pruned_ancestor", + ); +} diff --git a/runtime/src/epoch_stakes.rs b/runtime/src/epoch_stakes.rs index 3f1cda0a28..89707e1a5e 100644 --- a/runtime/src/epoch_stakes.rs +++ b/runtime/src/epoch_stakes.rs @@ -44,6 +44,11 @@ impl EpochStakes { self.total_stake } + /// For tests + pub fn set_total_stake(&mut self, total_stake: u64) { + self.total_stake = total_stake; + } + pub fn node_id_to_vote_accounts(&self) -> &Arc { &self.node_id_to_vote_accounts }