diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 89bd077f4..650027208 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -125,7 +125,7 @@ impl ClusterNodes { .unzip(), }; let index: Vec<_> = { - let shuffle = weighted_shuffle(&weights, shred_seed); + let shuffle = weighted_shuffle(weights.into_iter(), shred_seed); shuffle.into_iter().map(|i| index[i]).collect() }; let self_index = index diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index 0e47cc34f..cdac78255 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -165,7 +165,7 @@ impl ClusterSlots { .collect() } - pub fn compute_weights_exclude_noncomplete( + pub fn compute_weights_exclude_nonfrozen( &self, slot: Slot, repair_peers: &[ContactInfo], @@ -325,7 +325,7 @@ mod tests { // None of these validators have completed slot 9, so should // return nothing assert!(cs - .compute_weights_exclude_noncomplete(slot, &contact_infos) + .compute_weights_exclude_nonfrozen(slot, &contact_infos) .is_empty()); // Give second validator max stake @@ -345,7 +345,7 @@ mod tests { // max stake cs.insert_node_id(slot, contact_infos[0].id); assert_eq!( - cs.compute_weights_exclude_noncomplete(slot, &contact_infos), + cs.compute_weights_exclude_nonfrozen(slot, &contact_infos), vec![(1, 0)] ); } diff --git a/core/src/duplicate_repair_status.rs b/core/src/duplicate_repair_status.rs new file mode 100644 index 000000000..4e89a3634 --- /dev/null +++ b/core/src/duplicate_repair_status.rs @@ -0,0 +1,829 @@ +use solana_ledger::blockstore::Blockstore; +use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, timing::timestamp}; +use std::{collections::HashMap, net::SocketAddr}; + +// Number of validators to sample for the ancestor repair +pub const ANCESTOR_HASH_REPAIR_SAMPLE_SIZE: usize = 21; + +// Even assuming 20% of validators malicious, the chance that >= 11 of the +// ANCESTOR_HASH_REPAIR_SAMPLE_SIZE = 21 validators is malicious is roughly 1/1000. +// Assuming we send a separate sample every 5 seconds, that's once every hour. + +// On the other hand with a 52-48 split of validators with one version of the block vs +// another, the chance of >= 11 of the 21 sampled being from the 52% portion is +// about 57%, so we should be able to find a correct sample in a reasonable amount of time. +const MINIMUM_ANCESTOR_AGREEMENT_SIZE: usize = (ANCESTOR_HASH_REPAIR_SAMPLE_SIZE + 1) / 2; +const RETRY_INTERVAL_SECONDS: usize = 5; + +#[derive(Debug, PartialEq)] +pub enum DuplicateAncestorDecision { + InvalidSample, + AncestorsAllMatch, + SampleNotDuplicateConfirmed, + ContinueSearch(DuplicateSlotRepairStatus), + EarliestAncestorNotFrozen(DuplicateSlotRepairStatus), + EarliestMismatchFound(DuplicateSlotRepairStatus), +} + +impl DuplicateAncestorDecision { + pub fn repair_status(&self) -> Option<&DuplicateSlotRepairStatus> { + match self { + DuplicateAncestorDecision::InvalidSample + | DuplicateAncestorDecision::AncestorsAllMatch + | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None, + DuplicateAncestorDecision::ContinueSearch(status) => Some(status), + DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) => Some(status), + DuplicateAncestorDecision::EarliestMismatchFound(status) => Some(status), + } + } + + fn repair_status_mut(&mut self) -> Option<&mut DuplicateSlotRepairStatus> { + match self { + DuplicateAncestorDecision::InvalidSample + | DuplicateAncestorDecision::AncestorsAllMatch + | DuplicateAncestorDecision::SampleNotDuplicateConfirmed => None, + DuplicateAncestorDecision::ContinueSearch(status) => Some(status), + DuplicateAncestorDecision::EarliestAncestorNotFrozen(status) => Some(status), + DuplicateAncestorDecision::EarliestMismatchFound(status) => Some(status), + } + } +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct DuplicateSlotRepairStatus { + // Any ancestor slots that are either missing or are mismatched. + // A mismatched ancestor slot is one that has been replayed (frozen) + // that has a different hash than the one agreed upon by the sampled peers. + // + // + // These are the slots that need to be dumped in order to repair the correct + // versions. The hash is None if the slot is not frozen, because it's: + // 1) Dead + // 2) Hasn't been replayed + // 3) We don't have the slot in our Ledger + pub correct_ancestors_to_repair: Vec<(Slot, Hash)>, + pub repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>, + pub start_ts: u64, +} + +impl DuplicateSlotRepairStatus { + fn new(correct_ancestors_to_repair: Vec<(Slot, Hash)>) -> Self { + Self { + correct_ancestors_to_repair, + repair_pubkey_and_addr: None, + start_ts: timestamp(), + } + } +} + +#[derive(Default, Clone)] +pub struct DeadSlotAncestorRequestStatus { + // The mismatched slot that was the subject of the AncestorHashes(requested_mismatched_slot) + // repair request. All responses to this request should be for ancestors of this slot. + requested_mismatched_slot: Slot, + // Timestamp at which we sent out the requests + start_ts: u64, + // The addresses of the validators we asked for a response, a response is only acceptable + // from these validators. The boolean represents whether the validator + // has responded. + sampled_validators: HashMap, + // The number of sampled validators that have responded + num_responses: usize, + // Validators who have responded to our ancestor repair requests. An entry + // Vec<(Slot, Hash)> -> usize tells us which validators have + // responded with the same Vec<(Slot, Hash)> set of ancestors. + // + // TODO: Trie may be more efficient + ancestor_request_responses: HashMap, Vec>, +} + +impl DeadSlotAncestorRequestStatus { + pub fn new( + sampled_validators: impl Iterator, + requested_mismatched_slot: Slot, + ) -> Self { + DeadSlotAncestorRequestStatus { + requested_mismatched_slot, + start_ts: timestamp(), + sampled_validators: sampled_validators.map(|p| (p, false)).collect(), + ..DeadSlotAncestorRequestStatus::default() + } + } + + /// Record the response from `from_addr`. Returns Some(DuplicateAncestorDecision) + /// if we have finalized a decision based on the responses. We can finalize a decision when + /// one of the following conditions is met: + /// 1) We have heard from all the validators, OR + /// 2) >= MINIMUM_ANCESTOR_AGREEMENT_SIZE have agreed that we have the correct versions + /// of nth ancestor, for some `n>0`, AND >= MINIMUM_ANCESTOR_AGREEMENT_SIZE have + /// agreed we have the wrong version of the `n-1` ancestor. + pub fn add_response( + &mut self, + from_addr: &SocketAddr, + response_slot_hashes: Vec<(Slot, Hash)>, + blockstore: &Blockstore, + ) -> Option { + if let Some(did_get_response) = self.sampled_validators.get_mut(from_addr) { + if *did_get_response { + // If we've already received a response from this validator, return. + return None; + } + // Mark we got a response from this validator already + *did_get_response = true; + self.num_responses += 1; + } else { + // If this is not a response from one of the sampled validators, return. + return None; + } + + let validators_with_same_response = self + .ancestor_request_responses + .entry(response_slot_hashes.clone()) + .or_default(); + validators_with_same_response.push(*from_addr); + + // If we got enough of the sampled validators to respond, we are confident + // this is the correct set of ancestors + if validators_with_same_response.len() + == MINIMUM_ANCESTOR_AGREEMENT_SIZE.min(self.sampled_validators.len()) + { + // When we reach MINIMUM_ANCESTOR_AGREEMENT_SIZE of the same responses, + // check for mismatches. + return Some( + self.handle_sampled_validators_reached_agreement(blockstore, response_slot_hashes), + ); + } + + // If everyone responded and we still haven't agreed upon a set of + // ancestors, that means there was a lot of disagreement and we sampled + // a bad set of validators. + if self.num_responses == ANCESTOR_HASH_REPAIR_SAMPLE_SIZE.min(self.sampled_validators.len()) + { + info!( + "{} return invalid sample no agreement", + self.requested_mismatched_slot + ); + return Some(DuplicateAncestorDecision::InvalidSample); + } + + None + } + + fn handle_sampled_validators_reached_agreement( + &mut self, + blockstore: &Blockstore, + mut agreed_response: Vec<(Slot, Hash)>, + ) -> DuplicateAncestorDecision { + if agreed_response.is_empty() { + info!( + "{} return invalid sample not duplicate confirmed", + self.requested_mismatched_slot + ); + return DuplicateAncestorDecision::SampleNotDuplicateConfirmed; + } + + if agreed_response.first().unwrap().0 != self.requested_mismatched_slot { + return DuplicateAncestorDecision::InvalidSample; + } + + // Recall: + // 1) *correct* validators only respond to `AncestorHashes(slot)` repair requests IFF they + // saw the ancestors of `slot` get duplicate confirmed, AND + // 2) *correct* validators respond with the ancestors of slot in sequential order + // 3) `slot` should get duplicate confirmed on only one fork in the cluster + // + // From 1) and 3) we can conclude that it is highly likely at least one correct + // validator reported `agreed_response` were the duplicate confirmed ancestors of + // `self.requested_mismatched_slot`. From 2), all the `agreed_response` ancestors + // are ordered such that the ancestor at index `i+1` is the direct descendant of the + // ancestor at `i`. + let mut last_ancestor = 0; + let mut earliest_erroring_ancestor = None; + // Iterate from smallest to largest ancestor, performing integrity checks. + for (i, (ancestor_slot, agreed_upon_hash)) in agreed_response.iter().rev().enumerate() { + if i != 0 && *ancestor_slot <= last_ancestor { + info!( + "{} return invalid sample out of order", + self.requested_mismatched_slot + ); + // Responses were not properly ordered + return DuplicateAncestorDecision::InvalidSample; + } + last_ancestor = *ancestor_slot; + if *ancestor_slot > self.requested_mismatched_slot { + // We should only get ancestors of `self.requested_mismatched_slot` + // in valid responses + info!( + "{} return invalid sample big ancestor", + self.requested_mismatched_slot + ); + return DuplicateAncestorDecision::InvalidSample; + } + let our_frozen_hash = blockstore.get_bank_hash(*ancestor_slot); + if let Some(our_frozen_hash) = our_frozen_hash { + if earliest_erroring_ancestor.is_some() && our_frozen_hash == *agreed_upon_hash { + // It's impossible have a different version of an earlier ancestor, but + // then also have the same version of a later ancestor. + info!("{} mismatches then matches", self.requested_mismatched_slot); + return DuplicateAncestorDecision::InvalidSample; + } else if our_frozen_hash != *agreed_upon_hash + && earliest_erroring_ancestor.is_none() + { + earliest_erroring_ancestor = Some(( + agreed_response.len() - i - 1, + DuplicateAncestorDecision::EarliestMismatchFound( + DuplicateSlotRepairStatus::default(), + ), + )); + } + } else if earliest_erroring_ancestor.is_none() { + // If in our current ledger, `ancestor_slot` is actually on the same fork as + // `self.requested_mismatched_slot`, then the `frozen_hash` should not be None here. + // This is because we had to freeze `ancestor_slot` in order to replay its descendant + // `self.requested_mismatched_slot`. + // + // However, it's possible that we have a version of + // `self.requested_mismatched_slot` that is on the wrong fork with the wrong set of + // ancestors. In this case, we could get responses about ancestors that are not + // ancestors of our version of `self.requested_mismatched_slot` + // + // ``` + // 1 - 2 - 3 - 5' - 6' (our current fork) + // / + // 0 + // \ + // 1 - 2 - 4 - 5 - 6 (cluster agreed fork) + // ``` + // + // In this case, if we make a AncestorsHashes(6) request for our dead slot 6', we may + // get a response with slot `4` in it, which is a slot that doesn't have a frozen + // hash in blockstore yet because either: + // + // 1) We haven't replayed that slot yet (it's on a different fork). + // 2) We don't have that slot yet in our ledger. + // 3) We have the correct/incorrect version of `4`, but we may have replayed + // it on the wrong branch and it's dead. + // + // We ignore such ancestors in this loop. + // + // Note also that besides the missing slot `4`, there are also duplicates between + // both the forks, namely `1, 2, 5` for which we have different versions of these slots + // in our ledger. So how do we handle such cases where there are both missing and mismatched + // ancestors? + // + // There are two cases: + // 1) The first such mismatch `first_mismatch` appears BEFORE the slot `4` that is + // missing from our blockstore. + // 2) The first such mismatch `first_mismatch` appears AFTER the slot `4` that is + // missing from our blockstore. + // + // Because we know any mismatches will also trigger the mismatch casing earlier in + // the function, we will return`EarliestMismatchFound(first_mismatch)`. This will + // cause us to dump and repair `first_mismatch` and all its descendants, which should + // be the right behavior in both above cases. + warn!( + "Blockstore is missing frozen hash for slot {}, + which the cluster claims is an ancestor of dead slot {}. Potentially + our version of the dead slot chains to the wrong fork!", + ancestor_slot, self.requested_mismatched_slot + ); + earliest_erroring_ancestor = Some(( + agreed_response.len() - i - 1, + DuplicateAncestorDecision::EarliestAncestorNotFrozen( + DuplicateSlotRepairStatus::default(), + ), + )); + } + } + + if let Some((earliest_erroring_ancestor_index, mut decision)) = earliest_erroring_ancestor { + // We found the earliest mismatch `earliest_erroring_ancestor_index`. + // We know all slots for indexes > `earliest_erroring_ancestor_index` in + // `agreed_response` match the version we have replayed. + if earliest_erroring_ancestor_index == agreed_response.len() - 1 { + // If the earliest ancestor is missing or a mismatch, then we need to keep searching + // for earlier mismatches + let repair_status = DuplicateSlotRepairStatus::new(agreed_response); + DuplicateAncestorDecision::ContinueSearch(repair_status) + } else { + // We only need to look through the first `earliest_erroring_ancestor_index + 1` + // elements and dump/repair any mismatches. + agreed_response.truncate(earliest_erroring_ancestor_index + 1); + let repair_status = decision.repair_status_mut().unwrap(); + repair_status.correct_ancestors_to_repair = agreed_response; + decision + } + } else { + // If we haven't returned by now, this implies all the ancestors matched our versions + // of those ancestors. Only slot to dump and repair is `self.requested_mismatched_slot` + DuplicateAncestorDecision::AncestorsAllMatch + } + } + + /// Given a timestamp in milliseconds, return if we should retry with another sample batch + /// due to timeout + pub fn is_expired(&self) -> bool { + timestamp() - self.start_ts > RETRY_INTERVAL_SECONDS as u64 * 1000 + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use rand::{self, seq::SliceRandom, thread_rng}; + use solana_ledger::get_tmp_ledger_path_auto_delete; + use std::{collections::BTreeMap, net::IpAddr}; + use tempfile::TempDir; + + struct TestSetup { + sampled_addresses: Vec, + correct_ancestors_response: Vec<(Slot, Hash)>, + _blockstore_temp_dir: TempDir, + blockstore: Blockstore, + status: DeadSlotAncestorRequestStatus, + } + + fn create_rand_socket_addr() -> SocketAddr { + let bytes: [u16; 8] = rand::random(); + let ip = IpAddr::from(bytes); + SocketAddr::new(ip, 8080) + } + + fn setup_add_response_test(request_slot: Slot, num_ancestors_in_response: usize) -> TestSetup { + assert!(request_slot >= num_ancestors_in_response as u64); + let sampled_addresses: Vec = std::iter::repeat_with(create_rand_socket_addr) + .take(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE) + .collect(); + + let status = + DeadSlotAncestorRequestStatus::new(sampled_addresses.iter().cloned(), request_slot); + let blockstore_temp_dir = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(blockstore_temp_dir.path()).unwrap(); + + let correct_ancestors_response: Vec<(Slot, Hash)> = + (request_slot - num_ancestors_in_response as u64..=request_slot) + .map(|ancestor| (ancestor, Hash::new_unique())) + .rev() + .collect(); + + TestSetup { + sampled_addresses, + correct_ancestors_response, + _blockstore_temp_dir: blockstore_temp_dir, + blockstore, + status, + } + } + + #[test] + fn test_add_response_invalid_peer() { + let request_slot = 100; + let TestSetup { + blockstore, + mut status, + .. + } = setup_add_response_test(request_slot, 10); + + // Try adding a response from an invalid peer, should not be registered + let rand_addr = create_rand_socket_addr(); + assert!(status + .add_response(&rand_addr, vec![(99, Hash::new_unique())], &blockstore) + .is_none()); + assert_eq!(status.num_responses, 0); + assert!(status.ancestor_request_responses.is_empty()); + } + + #[test] + fn test_add_multiple_responses_same_peer() { + let request_slot = 100; + let TestSetup { + sampled_addresses, + correct_ancestors_response, + blockstore, + mut status, + .. + } = setup_add_response_test(request_slot, 10); + + // Create an incorrect response + let mut incorrect_ancestors_response = correct_ancestors_response.clone(); + incorrect_ancestors_response.pop().unwrap(); + + // Add a mixture of correct and incorrect responses from the same `responder_addr`. + let num_repeated_responses = ANCESTOR_HASH_REPAIR_SAMPLE_SIZE; + let responder_addr = &sampled_addresses[0]; + for i in 0..num_repeated_responses { + let response = if i % 2 == 0 { + // This is the first response when i == 0, so it should be the only response that + // persists. All later responses, both correct and incorrect should be ignored + correct_ancestors_response.clone() + } else { + incorrect_ancestors_response.clone() + }; + assert!(status + .add_response(responder_addr, response, &blockstore) + .is_none()); + assert_eq!(status.num_responses, 1); + assert_eq!(status.ancestor_request_responses.len(), 1); + let correct_responses = status + .ancestor_request_responses + .get(&correct_ancestors_response) + .unwrap(); + assert!(correct_responses.contains(responder_addr)); + assert_eq!(correct_responses.len(), 1); + } + } + + /// Add `num_correct_responses` correct responses from the sampled valdiators, and + /// then add incorrect responses from the remaining validators. + fn run_add_multiple_correct_and_incorrect_responses( + incorrect_responses: Vec<(Vec<(Slot, Hash)>, usize)>, + test_setup: &mut TestSetup, + ) -> DuplicateAncestorDecision { + let &mut TestSetup { + ref sampled_addresses, + ref correct_ancestors_response, + ref blockstore, + ref mut status, + .. + } = test_setup; + + // Generate an event order of adding correct/incorrect responses + let events: BTreeMap> = incorrect_responses + .into_iter() + .scan( + 0, + |total_count, /*accumulated state*/ + ( + incorrect_response, + num_responses, /*number of validators returning this response*/ + )| { + assert!(num_responses > 0); + *total_count += num_responses; + Some((*total_count, incorrect_response)) + }, + ) + .collect(); + + let total_incorrect_responses = events.iter().last().map(|(count, _)| *count).unwrap_or(0); + assert!(total_incorrect_responses <= ANCESTOR_HASH_REPAIR_SAMPLE_SIZE); + + let mut event_order: Vec = (0..sampled_addresses.len()).collect(); + event_order.shuffle(&mut thread_rng()); + + for (event, responder_addr) in event_order.iter().zip(sampled_addresses.iter()) { + let response = events + .range((event + 1)..) + .next() + .map(|(_count, response)| response) + .unwrap_or_else(|| correct_ancestors_response) + .clone(); + + if let Some(decision) = status.add_response(responder_addr, response, blockstore) { + // Note we may get a decision before we've heard back from all the + // sampled validators + return decision; + } + } + + // Should never get here + panic!("Decision must be made after hearing back from all the sampled validators"); + } + + #[test] + fn test_add_multiple_responses_invalid_sample_no_agreement() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Create an incorrect response + let mut incorrect_ancestors_response_0 = test_setup.correct_ancestors_response.clone(); + incorrect_ancestors_response_0.pop().unwrap(); + + // Create another incorrect response + let mut incorrect_ancestors_response_1 = incorrect_ancestors_response_0.clone(); + incorrect_ancestors_response_1.pop().unwrap(); + let desired_incorrect_responses = vec![ + ( + incorrect_ancestors_response_0, + MINIMUM_ANCESTOR_AGREEMENT_SIZE - 1, + ), + (incorrect_ancestors_response_1, 2), + ]; + + // Ensure that no response gets >= MINIMUM_ANCESTOR_AGREEMENT_SIZE responses + let total_invalid_responses: usize = desired_incorrect_responses + .iter() + .map(|(_, count)| count) + .sum(); + assert!( + ANCESTOR_HASH_REPAIR_SAMPLE_SIZE - total_invalid_responses + < MINIMUM_ANCESTOR_AGREEMENT_SIZE + ); + + assert_eq!( + run_add_multiple_correct_and_incorrect_responses( + desired_incorrect_responses, + &mut test_setup + ), + DuplicateAncestorDecision::InvalidSample + ); + } + + #[test] + fn test_add_multiple_responses_not_duplicate_confirmed() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Create an incorrect response that is empty + let incorrect_ancestors_response = vec![]; + let desired_incorrect_responses = vec![( + incorrect_ancestors_response, + MINIMUM_ANCESTOR_AGREEMENT_SIZE, + )]; + + assert_eq!( + run_add_multiple_correct_and_incorrect_responses( + desired_incorrect_responses, + &mut test_setup + ), + DuplicateAncestorDecision::SampleNotDuplicateConfirmed + ); + } + + #[test] + fn test_add_multiple_responses_invalid_sample_missing_requested_slot() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Create an incorrect response that is missing `request_slot` + let incorrect_ancestors_response = vec![(request_slot - 1, Hash::new_unique())]; + let desired_incorrect_responses = vec![( + incorrect_ancestors_response, + MINIMUM_ANCESTOR_AGREEMENT_SIZE, + )]; + + assert_eq!( + run_add_multiple_correct_and_incorrect_responses( + desired_incorrect_responses, + &mut test_setup + ), + DuplicateAncestorDecision::InvalidSample + ); + } + + #[test] + fn test_add_multiple_responses_invalid_sample_responses_not_ancestors() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Create an incorrect response. If the agreed upon response contains + // slots >= request_slot, we still mark the responses as invalid + let mut incorrect_ancestors_response = test_setup.correct_ancestors_response.clone(); + incorrect_ancestors_response.push((request_slot + 1, Hash::new_unique())); + let desired_incorrect_responses = vec![( + incorrect_ancestors_response, + MINIMUM_ANCESTOR_AGREEMENT_SIZE, + )]; + + assert_eq!( + run_add_multiple_correct_and_incorrect_responses( + desired_incorrect_responses, + &mut test_setup + ), + DuplicateAncestorDecision::InvalidSample + ); + } + + #[test] + fn test_add_multiple_responses_invalid_sample_responses_out_of_order() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Create an incorrect response that is out of order + let mut incorrect_ancestors_response = test_setup.correct_ancestors_response.clone(); + incorrect_ancestors_response.swap_remove(0); + let desired_incorrect_responses = vec![( + incorrect_ancestors_response, + MINIMUM_ANCESTOR_AGREEMENT_SIZE, + )]; + + assert_eq!( + run_add_multiple_correct_and_incorrect_responses( + desired_incorrect_responses, + &mut test_setup + ), + DuplicateAncestorDecision::InvalidSample + ); + } + + #[test] + fn test_add_multiple_responses_invalid_sample_matches_then_mismatches() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Insert all the correct frozen ancestors + for &(slot, correct_hash) in &test_setup.correct_ancestors_response { + test_setup + .blockstore + .insert_bank_hash(slot, correct_hash, false); + } + + // Create an incorrect response where there is a mismatched ancestor `X`, then + // a matching ancestor `Y > X` + let mut incorrect_ancestors_response = test_setup.correct_ancestors_response.clone(); + incorrect_ancestors_response[5].1 = Hash::new_unique(); + let desired_incorrect_responses = vec![( + incorrect_ancestors_response, + MINIMUM_ANCESTOR_AGREEMENT_SIZE, + )]; + + assert_eq!( + run_add_multiple_correct_and_incorrect_responses( + desired_incorrect_responses, + &mut test_setup + ), + DuplicateAncestorDecision::InvalidSample + ); + } + + #[test] + fn test_add_multiple_responses_ancestors_all_not_frozen() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Create an incorrect response, but the agreed upon response will be the correct + // one. + let mut incorrect_ancestors_response = test_setup.correct_ancestors_response.clone(); + incorrect_ancestors_response.push((request_slot, Hash::new_unique())); + let desired_incorrect_responses = vec![( + incorrect_ancestors_response, + MINIMUM_ANCESTOR_AGREEMENT_SIZE - 1, + )]; + + // We have no entries in the blockstore, so all the ancestors will be missing + match run_add_multiple_correct_and_incorrect_responses( + desired_incorrect_responses, + &mut test_setup, + ) { + DuplicateAncestorDecision::ContinueSearch(repair_status) => { + assert_eq!( + repair_status.correct_ancestors_to_repair, + test_setup.correct_ancestors_response + ); + } + x => panic!("Incorrect decision {:?}", x), + }; + } + + #[test] + fn test_add_multiple_responses_ancestors_some_not_frozen() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Set up a situation where some of our ancestors are correct, + // but then we fork off and are missing some ancestors like so: + // ``` + // 93 - 95 - 97 - 99 - 100 (our current fork, missing some slots like 98) + // / + // 90 - 91 - 92 (all correct) + // \ + // 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100 (correct fork) + // ``` + let rand_num: u64 = rand::random(); + let insert_even_or_odds: u64 = rand_num % 2; + for &(slot, correct_hash) in &test_setup.correct_ancestors_response { + if slot <= 92 { + test_setup + .blockstore + .insert_bank_hash(slot, correct_hash, false); + } else if slot % 2 == insert_even_or_odds { + // Here we either skip slot 93 or 94. + // + // 1) If we skip slot 93, and insert mismatched slot 94 we're testing the order of + // events `Not frozen -> Mismatched hash` + // + // 2) If we insert mismatched slot 93, and skip slot 94 we're testing the order of + // events `Mismatched hash -> Not frozen` + // + // Both cases should return `EarliestMismatchFound` + test_setup + .blockstore + .insert_bank_hash(slot, Hash::new_unique(), false); + } + } + + let repair_status = + match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { + DuplicateAncestorDecision::EarliestMismatchFound(repair_status) + if insert_even_or_odds == 1 => + { + repair_status + } + DuplicateAncestorDecision::EarliestAncestorNotFrozen(repair_status) + if insert_even_or_odds == 0 => + { + repair_status + } + x => panic!("Incorrect decision {:?}", x), + }; + + // Expect to find everything after 92 in the `correct_ancestors_to_repair`. + let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup + .correct_ancestors_response + .into_iter() + .filter(|(slot, _)| *slot > 92) + .collect(); + assert_eq!( + repair_status.correct_ancestors_to_repair, + expected_mismatched_slots + ); + } + + #[test] + fn test_add_multiple_responses_ancestors_all_mismatched() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Insert all the wrong hashes for the slots + for (slot, _) in &test_setup.correct_ancestors_response { + test_setup + .blockstore + .insert_bank_hash(*slot, Hash::new_unique(), false); + } + + // All the ancestors are mismatched, so we need to continue the search + match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { + DuplicateAncestorDecision::ContinueSearch(repair_status) => { + assert_eq!( + repair_status.correct_ancestors_to_repair, + test_setup.correct_ancestors_response + ); + } + x => panic!("Incorrect decision {:?}", x), + }; + } + + #[test] + fn test_add_multiple_responses_ancestors_some_mismatched() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Set up a situation where some of our ancestors are correct, + // but then we fork off with different versions of the correct slots. + // ``` + // 93' - 94' - 95' - 96' - 97' - 98' - 99' - 100' (our current fork, missing some slots like 98) + // / + // 90 - 91 - 92 (all correct) + // \ + // 93 - 94 - 95 - 96 - 97 - 98 - 99 - 100 (correct fork) + // ``` + + // Insert all the wrong hashes for the slots + for &(slot, correct_hash) in &test_setup.correct_ancestors_response { + if slot <= 92 { + test_setup + .blockstore + .insert_bank_hash(slot, correct_hash, false); + } else { + test_setup + .blockstore + .insert_bank_hash(slot, Hash::new_unique(), false); + } + } + + // All the ancestors are mismatched, so we need to continue the search + match run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup) { + DuplicateAncestorDecision::EarliestMismatchFound(repair_status) => { + // Expect to find everything after 92 in the `correct_ancestors_to_repair`. + let expected_mismatched_slots: Vec<(Slot, Hash)> = test_setup + .correct_ancestors_response + .into_iter() + .filter(|(slot, _)| *slot > 92) + .collect(); + assert_eq!( + repair_status.correct_ancestors_to_repair, + expected_mismatched_slots + ); + } + x => panic!("Incorrect decision {:?}", x), + }; + } + + #[test] + fn test_add_multiple_responses_ancestors_all_match() { + let request_slot = 100; + let mut test_setup = setup_add_response_test(request_slot, 10); + + // Insert all the correct frozen ancestors + for &(slot, correct_hash) in &test_setup.correct_ancestors_response { + test_setup + .blockstore + .insert_bank_hash(slot, correct_hash, false); + } + + // All the ancestors matched + assert_eq!( + run_add_multiple_correct_and_incorrect_responses(vec![], &mut test_setup), + DuplicateAncestorDecision::AncestorsAllMatch + ); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 3b16ba2dc..ce7e156d9 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -22,6 +22,7 @@ pub mod consensus; pub mod cost_model; pub mod cost_tracker; pub mod cost_update_service; +pub mod duplicate_repair_status; pub mod execute_cost_table; pub mod fetch_stage; pub mod fork_choice; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 8b5e2f1f9..a2f8c4de1 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -3,9 +3,9 @@ use crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, + duplicate_repair_status::DuplicateSlotRepairStatus, outstanding_requests::OutstandingRequests, repair_weight::RepairWeight, - replay_stage::DUPLICATE_THRESHOLD, result::Result, serve_repair::{ServeRepair, ShredRepairType, REPAIR_PEERS_CACHE_CAPACITY}, }; @@ -17,12 +17,9 @@ use solana_ledger::{ shred::Nonce, }; use solana_measure::measure::Measure; -use solana_runtime::{bank::Bank, bank_forks::BankForks, contains::Contains}; +use solana_runtime::{bank_forks::BankForks, contains::Contains}; use solana_sdk::{ - clock::{BankId, Slot}, - epoch_schedule::EpochSchedule, - pubkey::Pubkey, - timing::timestamp, + clock::Slot, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey, timing::timestamp, }; use std::{ collections::{HashMap, HashSet}, @@ -40,7 +37,7 @@ pub type DuplicateSlotsResetSender = CrossbeamSender; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; pub type ConfirmedSlotsSender = CrossbeamSender>; pub type ConfirmedSlotsReceiver = CrossbeamReceiver>; -pub type OutstandingRepairs = OutstandingRequests; +pub type OutstandingShredRepairs = OutstandingRequests; #[derive(Default, Debug)] pub struct SlotRepairs { @@ -135,12 +132,6 @@ impl Default for RepairSlotRange { } } -#[derive(Default, Clone)] -pub struct DuplicateSlotRepairStatus { - start: u64, - repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>, -} - pub struct RepairService { t_repair: JoinHandle<()>, } @@ -154,7 +145,7 @@ impl RepairService { repair_info: RepairInfo, cluster_slots: Arc, verified_vote_receiver: VerifiedVoteReceiver, - outstanding_requests: Arc>, + outstanding_requests: Arc>, ) -> Self { let t_repair = Builder::new() .name("solana-repair-service".to_string()) @@ -183,7 +174,7 @@ impl RepairService { repair_info: RepairInfo, cluster_slots: &ClusterSlots, verified_vote_receiver: VerifiedVoteReceiver, - outstanding_requests: &RwLock, + outstanding_requests: &RwLock, ) { let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root()); let serve_repair = ServeRepair::new(cluster_info.clone()); @@ -236,31 +227,6 @@ impl RepairService { root_bank.epoch_schedule(), ); add_votes_elapsed.stop(); - /*let new_duplicate_slots = Self::find_new_duplicate_slots( - &duplicate_slot_repair_statuses, - blockstore, - cluster_slots, - &root_bank, - ); - Self::process_new_duplicate_slots( - &new_duplicate_slots, - &mut duplicate_slot_repair_statuses, - cluster_slots, - &root_bank, - blockstore, - &serve_repair, - &repair_info.duplicate_slots_reset_sender, - &repair_info.repair_validators, - ); - Self::generate_and_send_duplicate_repairs( - &mut duplicate_slot_repair_statuses, - cluster_slots, - blockstore, - &serve_repair, - &mut repair_stats, - &repair_socket, - &repair_info.repair_validators, - );*/ repair_weight.get_best_weighted_repairs( blockstore, @@ -423,12 +389,12 @@ impl RepairService { repairs: &mut Vec, max_repairs: usize, slot: Slot, - duplicate_slot_repair_statuses: &impl Contains<'a, Slot>, + ancestor_hashes_request_statuses: &impl Contains<'a, Slot>, ) { let mut pending_slots = vec![slot]; while repairs.len() < max_repairs && !pending_slots.is_empty() { let slot = pending_slots.pop().unwrap(); - if duplicate_slot_repair_statuses.contains(&slot) { + if ancestor_hashes_request_statuses.contains(&slot) { // These are repaired through a different path continue; } @@ -482,7 +448,7 @@ impl RepairService { repair_stats: &mut RepairStats, repair_socket: &UdpSocket, repair_validators: &Option>, - outstanding_requests: &RwLock, + outstanding_requests: &RwLock, ) { duplicate_slot_repair_statuses.retain(|slot, status| { Self::update_duplicate_slot_repair_addr( @@ -550,7 +516,7 @@ impl RepairService { ) { let now = timestamp(); if status.repair_pubkey_and_addr.is_none() - || now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64 + || now.saturating_sub(status.start_ts) >= MAX_DUPLICATE_WAIT_MS as u64 { let repair_pubkey_and_addr = serve_repair.repair_request_duplicate_compute_best_peer( slot, @@ -558,112 +524,33 @@ impl RepairService { repair_validators, ); status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok(); - status.start = timestamp(); + status.start_ts = timestamp(); } } #[allow(dead_code)] - fn process_new_duplicate_slots( - new_duplicate_slots: &[(Slot, BankId)], + fn initiate_repair_for_duplicate_slot( + slot: Slot, duplicate_slot_repair_statuses: &mut HashMap, cluster_slots: &ClusterSlots, - root_bank: &Bank, - blockstore: &Blockstore, serve_repair: &ServeRepair, - duplicate_slots_reset_sender: &DuplicateSlotsResetSender, repair_validators: &Option>, ) { - for (slot, bank_id) in new_duplicate_slots { - warn!( - "Cluster confirmed slot: {}, dumping our current version and repairing", - slot - ); - // Clear the slot signatures from status cache for this slot - root_bank.clear_slot_signatures(*slot); - - // Clear the accounts for this slot - root_bank.remove_unrooted_slots(&[(*slot, *bank_id)]); - - // Clear the slot-related data in blockstore. This will: - // 1) Clear old shreds allowing new ones to be inserted - // 2) Clear the "dead" flag allowing ReplayStage to start replaying - // this slot - blockstore.clear_unconfirmed_slot(*slot); - - // Signal ReplayStage to clear its progress map so that a different - // version of this slot can be replayed - let _ = duplicate_slots_reset_sender.send(*slot); - - // Mark this slot as special repair, try to download from single - // validator to avoid corruption - let repair_pubkey_and_addr = serve_repair - .repair_request_duplicate_compute_best_peer(*slot, cluster_slots, repair_validators) - .ok(); - let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { - start: timestamp(), - repair_pubkey_and_addr, - }; - duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status); + // If we're already in the middle of repairing this, ignore the signal. + if duplicate_slot_repair_statuses.contains_key(&slot) { + return; } - } - - #[allow(dead_code)] - fn find_new_duplicate_slots( - duplicate_slot_repair_statuses: &HashMap, - blockstore: &Blockstore, - cluster_slots: &ClusterSlots, - root_bank: &Bank, - ) -> Vec { - let dead_slots_iter = blockstore - .dead_slots_iterator(root_bank.slot() + 1) - .expect("Couldn't get dead slots iterator from blockstore"); - dead_slots_iter - .filter_map(|dead_slot| { - if let Some(status) = duplicate_slot_repair_statuses.get(&dead_slot) { - // Newly repaired version of this slot has been marked dead again, - // time to purge again - warn!( - "Repaired version of slot {} most recently (but maybe not entirely) - from {:?} has failed again", - dead_slot, status.repair_pubkey_and_addr - ); - } - cluster_slots - .lookup(dead_slot) - .and_then(|completed_dead_slot_pubkeys| { - let epoch = root_bank.get_epoch_and_slot_index(dead_slot).0; - if let Some(epoch_stakes) = root_bank.epoch_stakes(epoch) { - let total_stake = epoch_stakes.total_stake(); - let node_id_to_vote_accounts = epoch_stakes.node_id_to_vote_accounts(); - let total_completed_slot_stake: u64 = completed_dead_slot_pubkeys - .read() - .unwrap() - .iter() - .map(|(node_key, _)| { - node_id_to_vote_accounts - .get(node_key) - .map(|v| v.total_stake) - .unwrap_or(0) - }) - .sum(); - if total_completed_slot_stake as f64 / total_stake as f64 - > DUPLICATE_THRESHOLD - { - Some(dead_slot) - } else { - None - } - } else { - error!( - "Dead slot {} is too far ahead of root bank {}", - dead_slot, - root_bank.slot() - ); - None - } - }) - }) - .collect() + // Mark this slot as special repair, try to download from single + // validator to avoid corruption + let repair_pubkey_and_addr = serve_repair + .repair_request_duplicate_compute_best_peer(slot, cluster_slots, repair_validators) + .ok(); + let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { + correct_ancestors_to_repair: vec![(slot, Hash::default())], + repair_pubkey_and_addr, + start_ts: timestamp(), + }; + duplicate_slot_repair_statuses.insert(slot, new_duplicate_slot_repair_status); } pub fn join(self) -> thread::Result<()> { @@ -674,16 +561,12 @@ impl RepairService { #[cfg(test)] mod test { use super::*; - use crossbeam_channel::unbounded; use solana_gossip::cluster_info::Node; use solana_ledger::blockstore::{ make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, }; use solana_ledger::shred::max_ticks_per_n_shreds; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; - use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}; - use solana_sdk::signature::Signer; - use solana_vote_program::vote_transaction; use std::collections::HashSet; #[test] @@ -981,11 +864,12 @@ mod test { let blockstore = Blockstore::open(&blockstore_path).unwrap(); let cluster_slots = ClusterSlots::default(); let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info); - let mut duplicate_slot_repair_statuses = HashMap::new(); + let mut ancestor_hashes_request_statuses = HashMap::new(); let dead_slot = 9; let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); let duplicate_status = DuplicateSlotRepairStatus { - start: std::u64::MAX, + correct_ancestors_to_repair: vec![(dead_slot, Hash::default())], + start_ts: std::u64::MAX, repair_pubkey_and_addr: None, }; @@ -996,12 +880,12 @@ mod test { .insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false) .unwrap(); - duplicate_slot_repair_statuses.insert(dead_slot, duplicate_status); + ancestor_hashes_request_statuses.insert(dead_slot, duplicate_status); // There is no repair_addr, so should not get filtered because the timeout // `std::u64::MAX` has not expired RepairService::generate_and_send_duplicate_repairs( - &mut duplicate_slot_repair_statuses, + &mut ancestor_hashes_request_statuses, &cluster_slots, &blockstore, &serve_repair, @@ -1010,23 +894,23 @@ mod test { &None, &RwLock::new(OutstandingRequests::default()), ); - assert!(duplicate_slot_repair_statuses + assert!(ancestor_hashes_request_statuses .get(&dead_slot) .unwrap() .repair_pubkey_and_addr .is_none()); - assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); + assert!(ancestor_hashes_request_statuses.get(&dead_slot).is_some()); // Give the slot a repair address - duplicate_slot_repair_statuses + ancestor_hashes_request_statuses .get_mut(&dead_slot) .unwrap() .repair_pubkey_and_addr = Some((Pubkey::default(), receive_socket.local_addr().unwrap())); - // Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses` + // Slot is not yet full, should not get filtered from `ancestor_hashes_request_statuses` RepairService::generate_and_send_duplicate_repairs( - &mut duplicate_slot_repair_statuses, + &mut ancestor_hashes_request_statuses, &cluster_slots, &blockstore, &serve_repair, @@ -1035,16 +919,16 @@ mod test { &None, &RwLock::new(OutstandingRequests::default()), ); - assert_eq!(duplicate_slot_repair_statuses.len(), 1); - assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); + assert_eq!(ancestor_hashes_request_statuses.len(), 1); + assert!(ancestor_hashes_request_statuses.get(&dead_slot).is_some()); // Insert rest of shreds. Slot is full, should get filtered from - // `duplicate_slot_repair_statuses` + // `ancestor_hashes_request_statuses` blockstore .insert_shreds(vec![shreds.pop().unwrap()], None, false) .unwrap(); RepairService::generate_and_send_duplicate_repairs( - &mut duplicate_slot_repair_statuses, + &mut ancestor_hashes_request_statuses, &cluster_slots, &blockstore, &serve_repair, @@ -1053,7 +937,7 @@ mod test { &None, &RwLock::new(OutstandingRequests::default()), ); - assert!(duplicate_slot_repair_statuses.is_empty()); + assert!(ancestor_hashes_request_statuses.is_empty()); } #[test] @@ -1078,7 +962,8 @@ mod test { // Not enough time has passed, should not update the // address let mut duplicate_status = DuplicateSlotRepairStatus { - start: std::u64::MAX, + correct_ancestors_to_repair: vec![(dead_slot, Hash::default())], + start_ts: std::u64::MAX, repair_pubkey_and_addr: dummy_addr, }; RepairService::update_duplicate_slot_repair_addr( @@ -1092,7 +977,8 @@ mod test { // If the repair address is None, should try to update let mut duplicate_status = DuplicateSlotRepairStatus { - start: std::u64::MAX, + correct_ancestors_to_repair: vec![(dead_slot, Hash::default())], + start_ts: std::u64::MAX, repair_pubkey_and_addr: None, }; RepairService::update_duplicate_slot_repair_addr( @@ -1106,7 +992,8 @@ mod test { // If sufficient time has passed, should try to update let mut duplicate_status = DuplicateSlotRepairStatus { - start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64, + correct_ancestors_to_repair: vec![(dead_slot, Hash::default())], + start_ts: timestamp() - MAX_DUPLICATE_WAIT_MS as u64, repair_pubkey_and_addr: dummy_addr, }; RepairService::update_duplicate_slot_repair_addr( @@ -1118,132 +1005,4 @@ mod test { ); assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr); } - - #[test] - pub fn test_process_new_duplicate_slots() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let cluster_slots = ClusterSlots::default(); - let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info); - let mut duplicate_slot_repair_statuses = HashMap::new(); - let duplicate_slot = 9; - - // Fill blockstore for dead slot - blockstore.set_dead_slot(duplicate_slot).unwrap(); - assert!(blockstore.is_dead(duplicate_slot)); - let (shreds, _) = make_slot_entries(duplicate_slot, 0, 1); - blockstore.insert_shreds(shreds, None, false).unwrap(); - - let keypairs = ValidatorVoteKeypairs::new_rand(); - let (reset_sender, reset_receiver) = unbounded(); - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = genesis_utils::create_genesis_config_with_vote_accounts( - 1_000_000_000, - &[&keypairs], - vec![10000], - ); - let bank0 = Arc::new(Bank::new(&genesis_config)); - let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot); - let duplicate_bank_id = bank9.bank_id(); - let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey()); - bank9 - .transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey()) - .unwrap(); - let vote_tx = vote_transaction::new_vote_transaction( - vec![0], - bank0.hash(), - bank0.last_blockhash(), - &keypairs.node_keypair, - &keypairs.vote_keypair, - &keypairs.vote_keypair, - None, - ); - bank9.process_transaction(&vote_tx).unwrap(); - assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some()); - - RepairService::process_new_duplicate_slots( - &[(duplicate_slot, duplicate_bank_id)], - &mut duplicate_slot_repair_statuses, - &cluster_slots, - &bank9, - &blockstore, - &serve_repair, - &reset_sender, - &None, - ); - - // Blockstore should have been cleared - assert!(!blockstore.is_dead(duplicate_slot)); - - // Should not be able to find signature for slot 9 for the tx - assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_none()); - - // Getting balance should return the old balance (accounts were cleared) - assert_eq!( - bank9.get_balance(&keypairs.node_keypair.pubkey()), - old_balance - ); - - // Should add the duplicate slot to the tracker - assert!(duplicate_slot_repair_statuses - .get(&duplicate_slot) - .is_some()); - - // A signal should be sent to clear ReplayStage - assert!(reset_receiver.try_recv().is_ok()); - } - - #[test] - pub fn test_find_new_duplicate_slots() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let cluster_slots = ClusterSlots::default(); - let duplicate_slot_repair_statuses = HashMap::new(); - let keypairs = ValidatorVoteKeypairs::new_rand(); - let only_node_id = keypairs.node_keypair.pubkey(); - let GenesisConfigInfo { genesis_config, .. } = - genesis_utils::create_genesis_config_with_vote_accounts( - 1_000_000_000, - &[keypairs], - vec![100], - ); - let bank0 = Bank::new(&genesis_config); - - // Empty blockstore should have no duplicates - assert!(RepairService::find_new_duplicate_slots( - &duplicate_slot_repair_statuses, - &blockstore, - &cluster_slots, - &bank0, - ) - .is_empty()); - - // Insert a dead slot, but is not confirmed by network so should not - // be marked as duplicate - let dead_slot = 9; - blockstore.set_dead_slot(dead_slot).unwrap(); - assert!(RepairService::find_new_duplicate_slots( - &duplicate_slot_repair_statuses, - &blockstore, - &cluster_slots, - &bank0, - ) - .is_empty()); - - // If supermajority confirms the slot, then dead slot should be - // marked as a duplicate that needs to be repaired - cluster_slots.insert_node_id(dead_slot, only_node_id); - assert_eq!( - RepairService::find_new_duplicate_slots( - &duplicate_slot_repair_statuses, - &blockstore, - &cluster_slots, - &bank0, - ), - vec![dead_slot] - ); - } } diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 5de8e5efa..10ec77e28 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -1,7 +1,8 @@ use crate::{ cluster_slots::ClusterSlots, + duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE, repair_response, - repair_service::{OutstandingRepairs, RepairStats}, + repair_service::{OutstandingShredRepairs, RepairStats}, request_response::RequestResponse, result::{Error, Result}, }; @@ -14,7 +15,7 @@ use rand::{ use solana_gossip::{ cluster_info::{ClusterInfo, ClusterInfoError}, contact_info::ContactInfo, - weighted_shuffle::weighted_best, + weighted_shuffle::{weighted_best, weighted_shuffle}, }; use solana_ledger::{ ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, @@ -487,7 +488,7 @@ impl ServeRepair { peers_cache: &mut LruCache, repair_stats: &mut RepairStats, repair_validators: &Option>, - outstanding_requests: &mut OutstandingRepairs, + outstanding_requests: &mut OutstandingShredRepairs, ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location @@ -510,6 +511,28 @@ impl ServeRepair { Ok((addr, out)) } + pub fn repair_request_ancestor_hashes_sample_peers( + &self, + slot: Slot, + cluster_slots: &ClusterSlots, + repair_validators: &Option>, + ) -> Result> { + let repair_peers: Vec<_> = self.repair_peers(repair_validators, slot); + if repair_peers.is_empty() { + return Err(ClusterInfoError::NoPeers.into()); + } + let weights = cluster_slots.compute_weights_exclude_nonfrozen(slot, &repair_peers); + let mut sampled_validators = weighted_shuffle( + weights.into_iter().map(|(stake, _i)| stake), + solana_sdk::pubkey::new_rand().to_bytes(), + ); + sampled_validators.truncate(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE); + Ok(sampled_validators + .into_iter() + .map(|i| (repair_peers[i].id, repair_peers[i].serve_repair)) + .collect()) + } + pub fn repair_request_duplicate_compute_best_peer( &self, slot: Slot, @@ -520,7 +543,7 @@ impl ServeRepair { if repair_peers.is_empty() { return Err(ClusterInfoError::NoPeers.into()); } - let weights = cluster_slots.compute_weights_exclude_noncomplete(slot, &repair_peers); + let weights = cluster_slots.compute_weights_exclude_nonfrozen(slot, &repair_peers); let n = weighted_best(&weights, solana_sdk::pubkey::new_rand().to_bytes()); Ok((repair_peers[n].id, repair_peers[n].serve_repair)) } @@ -882,7 +905,7 @@ mod tests { let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp()); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(me)); let serve_repair = ServeRepair::new(cluster_info.clone()); - let mut outstanding_requests = OutstandingRepairs::default(); + let mut outstanding_requests = OutstandingShredRepairs::default(); let rv = serve_repair.repair_request( &cluster_slots, ShredRepairType::Shred(0, 0), @@ -1215,7 +1238,7 @@ mod tests { &mut LruCache::new(100), &mut RepairStats::default(), &trusted_validators, - &mut OutstandingRepairs::default(), + &mut OutstandingShredRepairs::default(), ) .is_err()); } @@ -1232,7 +1255,7 @@ mod tests { &mut LruCache::new(100), &mut RepairStats::default(), &trusted_validators, - &mut OutstandingRepairs::default(), + &mut OutstandingShredRepairs::default(), ) .is_ok()); @@ -1253,7 +1276,7 @@ mod tests { &mut LruCache::new(100), &mut RepairStats::default(), &None, - &mut OutstandingRepairs::default(), + &mut OutstandingShredRepairs::default(), ) .is_ok()); } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 0a5621f2b..b4c003959 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -7,7 +7,7 @@ use crate::{ completed_data_sets_service::CompletedDataSetsSender, outstanding_requests::OutstandingRequests, repair_response, - repair_service::{OutstandingRepairs, RepairInfo, RepairService}, + repair_service::{OutstandingShredRepairs, RepairInfo, RepairService}, result::{Error, Result}, }; use crossbeam_channel::{ @@ -125,7 +125,7 @@ fn run_check_duplicate( } fn verify_repair( - outstanding_requests: &mut OutstandingRepairs, + outstanding_requests: &mut OutstandingShredRepairs, shred: &Shred, repair_meta: &Option, ) -> bool { @@ -144,7 +144,7 @@ fn verify_repair( fn prune_shreds_invalid_repair( shreds: &mut Vec, repair_infos: &mut Vec>, - outstanding_requests: &Arc>, + outstanding_requests: &Arc>, ) { assert_eq!(shreds.len(), repair_infos.len()); let mut i = 0; @@ -175,7 +175,7 @@ fn run_insert( handle_duplicate: F, metrics: &mut BlockstoreInsertionMetrics, completed_data_sets_sender: &CompletedDataSetsSender, - outstanding_requests: &Arc>, + outstanding_requests: &Arc>, ) -> Result<()> where F: Fn(Shred), @@ -372,7 +372,7 @@ impl WindowService { + std::marker::Send + std::marker::Sync, { - let outstanding_requests: Arc> = + let outstanding_requests: Arc> = Arc::new(RwLock::new(OutstandingRequests::default())); let bank_forks = repair_info.bank_forks.clone(); @@ -468,7 +468,7 @@ impl WindowService { insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, check_duplicate_sender: CrossbeamSender, completed_data_sets_sender: CompletedDataSetsSender, - outstanding_requests: Arc>, + outstanding_requests: Arc>, ) -> JoinHandle<()> { let exit = exit.clone(); let blockstore = blockstore.clone(); @@ -817,7 +817,7 @@ mod test { _from_addr, nonce: 0, }; - let outstanding_requests = Arc::new(RwLock::new(OutstandingRepairs::default())); + let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default())); let repair_type = ShredRepairType::Orphan(9); let nonce = outstanding_requests .write() diff --git a/gossip/benches/weighted_shuffle.rs b/gossip/benches/weighted_shuffle.rs index 37097f3e6..d2b15f97d 100644 --- a/gossip/benches/weighted_shuffle.rs +++ b/gossip/benches/weighted_shuffle.rs @@ -21,7 +21,7 @@ fn bench_weighted_shuffle_old(bencher: &mut Bencher) { let weights = make_weights(&mut rng); bencher.iter(|| { rng.fill(&mut seed[..]); - weighted_shuffle(&weights, seed); + weighted_shuffle::>(weights.iter(), seed); }); } diff --git a/gossip/src/deprecated.rs b/gossip/src/deprecated.rs index 120c69ffc..37cb349de 100644 --- a/gossip/src/deprecated.rs +++ b/gossip/src/deprecated.rs @@ -91,9 +91,9 @@ pub fn shuffle_peers_and_index( } fn stake_weighted_shuffle(stakes_and_index: &[(u64, usize)], seed: [u8; 32]) -> Vec<(u64, usize)> { - let stake_weights: Vec<_> = stakes_and_index.iter().map(|(w, _)| *w).collect(); + let stake_weights = stakes_and_index.iter().map(|(w, _)| *w); - let shuffle = weighted_shuffle(&stake_weights, seed); + let shuffle = weighted_shuffle(stake_weights, seed); shuffle.iter().map(|x| stakes_and_index[*x]).collect() } diff --git a/gossip/src/weighted_shuffle.rs b/gossip/src/weighted_shuffle.rs index 63ba440f3..25e0658be 100644 --- a/gossip/src/weighted_shuffle.rs +++ b/gossip/src/weighted_shuffle.rs @@ -9,6 +9,7 @@ use { }, rand_chacha::ChaChaRng, std::{ + borrow::Borrow, iter, ops::{AddAssign, Div, Sub, SubAssign}, }, @@ -136,18 +137,20 @@ where /// Returns a list of indexes shuffled based on the input weights /// Note - The sum of all weights must not exceed `u64::MAX` -pub fn weighted_shuffle(weights: &[T], seed: [u8; 32]) -> Vec +pub fn weighted_shuffle(weights: F, seed: [u8; 32]) -> Vec where T: Copy + PartialOrd + iter::Sum + Div + FromPrimitive + ToPrimitive, + B: Borrow, + F: Iterator + Clone, { - let total_weight: T = weights.iter().copied().sum(); + let total_weight: T = weights.clone().map(|x| *x.borrow()).sum(); let mut rng = ChaChaRng::from_seed(seed); weights - .iter() .enumerate() - .map(|(i, v)| { + .map(|(i, weight)| { + let weight = weight.borrow(); // This generates an "inverse" weight but it avoids floating point math - let x = (total_weight / *v) + let x = (total_weight / *weight) .to_u64() .expect("values > u64::max are not supported"); ( @@ -223,7 +226,7 @@ mod tests { fn test_weighted_shuffle_iterator() { let mut test_set = [0; 6]; let mut count = 0; - let shuffle = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]); + let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1].into_iter(), [0x5a; 32]); shuffle.into_iter().for_each(|x| { assert_eq!(test_set[x], 0); test_set[x] = 1; @@ -238,7 +241,7 @@ mod tests { let mut test_weights = vec![0; 100]; (0..100).for_each(|i| test_weights[i] = (i + 1) as u64); let mut count = 0; - let shuffle = weighted_shuffle(&test_weights, [0xa5; 32]); + let shuffle = weighted_shuffle(test_weights.into_iter(), [0xa5; 32]); shuffle.into_iter().for_each(|x| { assert_eq!(test_set[x], 0); test_set[x] = 1; @@ -249,9 +252,9 @@ mod tests { #[test] fn test_weighted_shuffle_compare() { - let shuffle = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]); + let shuffle = weighted_shuffle(vec![50, 10, 2, 1, 1, 1].into_iter(), [0x5a; 32]); - let shuffle1 = weighted_shuffle(&[50, 10, 2, 1, 1, 1], [0x5a; 32]); + let shuffle1 = weighted_shuffle(vec![50, 10, 2, 1, 1, 1].into_iter(), [0x5a; 32]); shuffle1 .into_iter() .zip(shuffle.into_iter()) @@ -264,7 +267,7 @@ mod tests { fn test_weighted_shuffle_imbalanced() { let mut weights = vec![std::u32::MAX as u64; 3]; weights.push(1); - let shuffle = weighted_shuffle(&weights, [0x5a; 32]); + let shuffle = weighted_shuffle(weights.iter().cloned(), [0x5a; 32]); shuffle.into_iter().for_each(|x| { if x == weights.len() - 1 { assert_eq!(weights[x], 1); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c24d7aa85..c0863bfde 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -58,6 +58,8 @@ use std::{ }, time::Instant, }; + +use tempfile::TempDir; use thiserror::Error; use trees::{Tree, TreeWalk}; @@ -3795,6 +3797,19 @@ macro_rules! get_tmp_ledger_path { }; } +#[macro_export] +macro_rules! get_tmp_ledger_path_auto_delete { + () => { + $crate::blockstore::get_ledger_path_from_name_auto_delete($crate::tmp_ledger_name!()) + }; +} + +pub fn get_ledger_path_from_name_auto_delete(name: &str) -> TempDir { + let path = get_ledger_path_from_name(name); + fs::create_dir_all(&path).unwrap(); + TempDir::new_in(path).unwrap() +} + pub fn get_ledger_path_from_name(name: &str) -> PathBuf { use std::env; let out_dir = env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());