Fix getting the golden snapshot hashes during bootstrap with Incremental Snapshots (#23518)

This commit is contained in:
Brooks Prumo 2022-03-08 15:16:53 -06:00 committed by GitHub
parent c2ec294401
commit 9b80452c7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 317 additions and 76 deletions

View File

@ -1108,7 +1108,8 @@ mod with_incremental_snapshots {
bootstrap_config: &RpcBootstrapConfig,
rpc_peers: &[ContactInfo],
) -> Vec<PeerSnapshotHash> {
let known_snapshot_hashes = get_known_snapshot_hashes(cluster_info, validator_config);
let known_snapshot_hashes =
get_snapshot_hashes_from_known_validators(cluster_info, validator_config);
let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(
cluster_info,
@ -1116,7 +1117,10 @@ mod with_incremental_snapshots {
bootstrap_config,
rpc_peers,
);
retain_known_peer_snapshot_hashes(&known_snapshot_hashes, &mut peer_snapshot_hashes);
retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
&known_snapshot_hashes,
&mut peer_snapshot_hashes,
);
retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
&mut peer_snapshot_hashes,
@ -1125,78 +1129,169 @@ mod with_incremental_snapshots {
peer_snapshot_hashes
}
/// Get the snapshot hashes from known peers.
/// Map full snapshot hashes to a set of incremental snapshot hashes. Each full snapshot hash
/// is treated as the base for its set of incremental snapshot hashes.
type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
/// Get the snapshot hashes from known validators.
///
/// The hashes are put into a map from full snapshot hash to a set of incremental snapshot
/// hashes. This map will be used as the golden hashes; when peers are queried for their
/// individual snapshot hashes, their results will be checked against this map to verify
/// correctness.
/// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
/// snapshot hashes. This map will be used as the "known snapshot hashes"; when peers are
/// queried for their individual snapshot hashes, their results will be checked against this
/// map to verify correctness.
///
/// NOTE: Only a single snashot hash is allowed per slot. If somehow two known peers have a
/// snapshot hash with the same slot and _different_ hashes, the second will be skipped. This
/// applies to both full and incremental snapshot hashes.
fn get_known_snapshot_hashes(
/// NOTE: Only a single snashot hash is allowed per slot. If somehow two known validators have
/// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
/// This applies to both full and incremental snapshot hashes.
fn get_snapshot_hashes_from_known_validators(
cluster_info: &ClusterInfo,
validator_config: &ValidatorConfig,
) -> HashMap<(Slot, Hash), HashSet<(Slot, Hash)>> {
if validator_config.known_validators.is_none() {
trace!("no known validators, so no known snapshot hashes");
return HashMap::new();
}
let known_validators = validator_config.known_validators.as_ref().unwrap();
let mut known_snapshot_hashes: HashMap<(Slot, Hash), HashSet<(Slot, Hash)>> =
HashMap::new();
known_validators
.iter()
.for_each(|known_validator| {
// First get the Crds::SnapshotHashes for each known validator and add them as
// the keys in the known snapshot hashes HashMap.
let mut full_snapshot_hashes = Vec::new();
cluster_info.get_snapshot_hash_for_node(known_validator, |snapshot_hashes| {
full_snapshot_hashes = snapshot_hashes.clone();
});
full_snapshot_hashes.into_iter().for_each(|full_snapshot_hash| {
// Do not add this hash if there's already a full snapshot hash with the same
// slot but with a _different_ hash.
// NOTE: There's no good reason for known validators to produce snapshots at
// the same slot with different hashes, so this should not happen.
if !known_snapshot_hashes.keys().any(|known_snapshot_hash| {
known_snapshot_hash.0 == full_snapshot_hash.0 && known_snapshot_hash.1 != full_snapshot_hash.1
}) {
known_snapshot_hashes.insert(full_snapshot_hash, HashSet::new());
} else {
info!("Ignoring full snapshot hash from known validator {} with a slot we've already seen (slot: {}), but a different hash.", known_validator, full_snapshot_hash.0);
}
});
// Then get the Crds::IncrementalSnapshotHashes for each known validator and add
// them as the values in the known snapshot hashes HashMap.
if let Some(crds_value::IncrementalSnapshotHashes {base: full_snapshot_hash, hashes: incremental_snapshot_hashes, ..}) = cluster_info.get_incremental_snapshot_hashes_for_node(known_validator) {
if let Some(hashes) = known_snapshot_hashes.get_mut(&full_snapshot_hash) {
// Do not add this hash if there's already an incremental snapshot hash
// with the same slot, but with a _different_ hash.
// NOTE: There's no good reason for known validators to produce snapshots
// at the same slot with different hashes, so this should not happen.
for incremental_snapshot_hash in incremental_snapshot_hashes {
if !hashes.iter().any(|(slot, hash)| slot == &incremental_snapshot_hash.0 && hash != &incremental_snapshot_hash.1) {
hashes.insert(incremental_snapshot_hash);
} else {
info!("Ignoring incremental snapshot hash from known validator {} with a slot we've already seen (slot: {}), but a different hash.", known_validator, incremental_snapshot_hash.0);
}
}
} else {
// Since incremental snapshots *must* have a valid base (i.e. full)
// snapshot, if .get() returned None, then that can only happen if there
// already is a full snapshot hash in the known snapshot hashes with the
// same slot but _different_ a hash. Assert that below. If the assert
// ever fails, there is a programmer bug.
assert!(known_snapshot_hashes.keys().any(|(slot, hash)| slot == &full_snapshot_hash.0 && hash != &full_snapshot_hash.1),
"There must exist a full snapshot hash already in known snapshot hashes with the same slot but a different hash");
info!("Ignoring incremental snapshot hashes from known validator {} with a base slot we've already seen (base slot: {}), but a different base hash.", known_validator, full_snapshot_hash.0);
}
}
) -> KnownSnapshotHashes {
// Get the full snapshot hashes for a node from CRDS
let get_full_snapshot_hashes_for_node = |node| {
let mut full_snapshot_hashes = Vec::new();
cluster_info.get_snapshot_hash_for_node(node, |snapshot_hashes| {
full_snapshot_hashes = snapshot_hashes.clone();
});
full_snapshot_hashes
};
// Get the incremental snapshot hashes for a node from CRDS
let get_incremental_snapshot_hashes_for_node = |node| {
cluster_info
.get_incremental_snapshot_hashes_for_node(node)
.map(|hashes| (hashes.base, hashes.hashes))
};
validator_config
.known_validators
.as_ref()
.map(|known_validators| {
build_known_snapshot_hashes(
known_validators,
get_full_snapshot_hashes_for_node,
get_incremental_snapshot_hashes_for_node,
)
})
.unwrap_or_else(|| {
trace!("No known validators, so no known snapshot hashes");
KnownSnapshotHashes::new()
})
}
/// Build the known snapshot hashes from a set of nodes.
///
/// The `get_full_snapshot_hashes_for_node` and `get_incremental_snapshot_hashes_for_node`
/// parameters are Fns that map a pubkey to its respective full and incremental snapshot
/// hashes. These parameters exist to provide a way to test the inner algorithm without
/// needing runtime information such as the ClusterInfo or ValidatorConfig.
fn build_known_snapshot_hashes<'a, F1, F2>(
nodes: impl IntoIterator<Item = &'a Pubkey>,
get_full_snapshot_hashes_for_node: F1,
get_incremental_snapshot_hashes_for_node: F2,
) -> KnownSnapshotHashes
where
F1: Fn(&'a Pubkey) -> Vec<(Slot, Hash)>,
F2: Fn(&'a Pubkey) -> Option<((Slot, Hash), Vec<(Slot, Hash)>)>,
{
let mut known_snapshot_hashes = KnownSnapshotHashes::new();
/// Check to see if there exists another snapshot hash in the haystack with the *same* slot
/// but *different* hash as the needle.
fn is_any_same_slot_and_different_hash<'a>(
needle: &(Slot, Hash),
haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
) -> bool {
haystack
.into_iter()
.any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
}
'outer: for node in nodes {
// First get the full snapshot hashes for each node and add them as the keys in the
// known snapshot hashes map.
let full_snapshot_hashes = get_full_snapshot_hashes_for_node(node);
for full_snapshot_hash in &full_snapshot_hashes {
// Do not add this snapshot hash if there's already a full snapshot hash with the
// same slot but with a _different_ hash.
// NOTE: Nodes should not produce snapshots at the same slot with _different_
// hashes. So if it happens, keep the first and ignore the rest.
if !is_any_same_slot_and_different_hash(
full_snapshot_hash,
known_snapshot_hashes.keys(),
) {
// Insert a new full snapshot hash into the known snapshot hashes IFF an entry
// doesn't already exist. This is to ensure we don't overwrite existing
// incremental snapshot hashes that may be present for this full snapshot hash.
known_snapshot_hashes
.entry(*full_snapshot_hash)
.or_default();
} else {
warn!(
"Ignoring all snapshot hashes from node {} since we've seen a different full snapshot hash with this slot. full snapshot hash: {:?}",
node,
full_snapshot_hash,
);
continue 'outer;
}
}
// Then get the incremental snapshot hashes for each node and add them as the values in the
// known snapshot hashes map.
if let Some((base_snapshot_hash, incremental_snapshot_hashes)) =
get_incremental_snapshot_hashes_for_node(node)
{
// Incremental snapshots must be based off a valid full snapshot. Ensure the node
// has a full snapshot hash that matches its base snapshot hash.
if !full_snapshot_hashes.contains(&base_snapshot_hash) {
warn!(
"Ignoring all incremental snapshot hashes from node {} since its base snapshot hash does not match any of its full snapshot hashes. base snapshot hash: {:?}, full snapshot hashes: {:?}",
node,
base_snapshot_hash,
full_snapshot_hashes
);
continue 'outer;
}
if let Some(known_incremental_snapshot_hashes) =
known_snapshot_hashes.get_mut(&base_snapshot_hash)
{
// Do not add this snapshot hash if there's already an incremental snapshot
// hash with the same slot, but with a _different_ hash.
// NOTE: Nodes should not produce snapshots at the same slot with _different_
// hashes. So if it happens, keep the first and ignore the rest.
for incremental_snapshot_hash in &incremental_snapshot_hashes {
if !is_any_same_slot_and_different_hash(
incremental_snapshot_hash,
known_incremental_snapshot_hashes.iter(),
) {
known_incremental_snapshot_hashes.insert(*incremental_snapshot_hash);
} else {
warn!(
"Ignoring incremental snapshot hash from node {} since we've seen a different incremental snapshot hash with this slot. incremental snapshot hash: {:?}",
node,
incremental_snapshot_hash,
);
}
}
} else {
// Since incremental snapshots *must* have a valid base (i.e. full)
// snapshot, if .get() returned None, then that can only happen if there
// already is a full snapshot hash in the known snapshot hashes with the
// same slot but _different_ a hash. Assert that below. If the assert
// ever fails, there is a programmer bug.
assert!(
is_any_same_slot_and_different_hash(&base_snapshot_hash, known_snapshot_hashes.keys()),
"There must exist a full snapshot hash already in the known snapshot hashes with the same slot but a different hash!",
);
debug!(
"Ignoring incremental snapshot hashes from node {} since we've seen a different base snapshot hash with this slot. base snapshot hash: {:?}",
node,
base_snapshot_hash,
);
}
}
}
trace!("known snapshot hashes: {:?}", &known_snapshot_hashes);
known_snapshot_hashes
@ -1244,9 +1339,9 @@ mod with_incremental_snapshots {
peer_snapshot_hashes
}
/// Retain the peer snapshot hashes that match a hash from the known snapshot hashes
fn retain_known_peer_snapshot_hashes(
known_snapshot_hashes: &HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>,
/// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
known_snapshot_hashes: &KnownSnapshotHashes,
peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
) {
peer_snapshot_hashes.retain(|peer_snapshot_hash| {
@ -1266,7 +1361,7 @@ mod with_incremental_snapshots {
});
trace!(
"retain known peer snapshot hashes: {:?}",
"retain peer snapshot hashes that match known snapshot hashes: {:?}",
&peer_snapshot_hashes
);
}
@ -1634,8 +1729,151 @@ mod with_incremental_snapshots {
}
#[test]
fn test_retain_known_peer_snapshot_hashes() {
let known_snapshot_hashes: HashMap<(Slot, Hash), HashSet<(Slot, Hash)>> = [
fn test_build_known_snapshot_hashes() {
let full_snapshot_hashes1 = vec![
(100_000, Hash::new_unique()),
(200_000, Hash::new_unique()),
(300_000, Hash::new_unique()),
(400_000, Hash::new_unique()),
];
let full_snapshot_hashes2 = vec![
(100_000, Hash::new_unique()),
(200_000, Hash::new_unique()),
(300_000, Hash::new_unique()),
(400_000, Hash::new_unique()),
];
let base_snapshot_hash1 = full_snapshot_hashes1.last().unwrap();
let base_snapshot_hash2 = full_snapshot_hashes2.last().unwrap();
let incremental_snapshot_hashes1 = vec![
(400_500, Hash::new_unique()),
(400_600, Hash::new_unique()),
(400_700, Hash::new_unique()),
(400_800, Hash::new_unique()),
];
let incremental_snapshot_hashes2 = vec![
(400_500, Hash::new_unique()),
(400_600, Hash::new_unique()),
(400_700, Hash::new_unique()),
(400_800, Hash::new_unique()),
];
#[allow(clippy::type_complexity)]
let mut oracle: HashMap<
Pubkey,
(Vec<(Slot, Hash)>, Option<((Slot, Hash), Vec<(Slot, Hash)>)>),
> = HashMap::new();
// no snapshots at all
oracle.insert(Pubkey::new_unique(), (vec![], None));
// just full snapshots
oracle.insert(Pubkey::new_unique(), (full_snapshot_hashes1.clone(), None));
// just full snapshots, with different hashes
oracle.insert(Pubkey::new_unique(), (full_snapshot_hashes2.clone(), None));
// full and incremental snapshots
oracle.insert(
Pubkey::new_unique(),
(
full_snapshot_hashes1.clone(),
Some((*base_snapshot_hash1, incremental_snapshot_hashes1.clone())),
),
);
// full and incremental snapshots, but base hash is wrong
oracle.insert(
Pubkey::new_unique(),
(
full_snapshot_hashes1.clone(),
Some((*base_snapshot_hash2, incremental_snapshot_hashes1.clone())),
),
);
// full and incremental snapshots, with different hashes
oracle.insert(
Pubkey::new_unique(),
(
full_snapshot_hashes2.clone(),
Some((*base_snapshot_hash2, incremental_snapshot_hashes2.clone())),
),
);
// full and incremental snapshots, but base hash is wrong
oracle.insert(
Pubkey::new_unique(),
(
full_snapshot_hashes2.clone(),
Some((*base_snapshot_hash1, incremental_snapshot_hashes2.clone())),
),
);
// handle duplicates as well
oracle.insert(
Pubkey::new_unique(),
(
full_snapshot_hashes1.clone(),
Some((*base_snapshot_hash1, incremental_snapshot_hashes1.clone())),
),
);
// handle duplicates, with different hashes
oracle.insert(
Pubkey::new_unique(),
(
full_snapshot_hashes2.clone(),
Some((*base_snapshot_hash2, incremental_snapshot_hashes2.clone())),
),
);
let node_to_full_snapshot_hashes = |node| oracle.get(node).unwrap().clone().0;
let node_to_incremental_snapshot_hashes = |node| oracle.get(node).unwrap().clone().1;
let known_snapshot_hashes = build_known_snapshot_hashes(
oracle.keys(),
node_to_full_snapshot_hashes,
node_to_incremental_snapshot_hashes,
);
let mut known_full_snapshot_hashes: Vec<_> =
known_snapshot_hashes.keys().copied().collect();
known_full_snapshot_hashes.sort_unstable();
let known_base_snapshot_hash = known_full_snapshot_hashes.last().unwrap();
let mut known_incremental_snapshot_hashes: Vec<_> = known_snapshot_hashes
.get(known_base_snapshot_hash)
.unwrap()
.iter()
.copied()
.collect();
known_incremental_snapshot_hashes.sort_unstable();
assert!(
known_full_snapshot_hashes == full_snapshot_hashes1
|| known_full_snapshot_hashes == full_snapshot_hashes2
);
if known_full_snapshot_hashes == full_snapshot_hashes1 {
assert_eq!(known_base_snapshot_hash, base_snapshot_hash1);
assert_eq!(
known_incremental_snapshot_hashes,
incremental_snapshot_hashes1
);
} else {
assert_eq!(known_base_snapshot_hash, base_snapshot_hash2);
assert_eq!(
known_incremental_snapshot_hashes,
incremental_snapshot_hashes2
);
}
}
#[test]
fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
let known_snapshot_hashes: KnownSnapshotHashes = [
(
(200_000, Hash::new_unique()),
[
@ -1709,7 +1947,10 @@ mod with_incremental_snapshots {
),
];
let mut actual = peer_snapshot_hashes;
retain_known_peer_snapshot_hashes(&known_snapshot_hashes, &mut actual);
retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
&known_snapshot_hashes,
&mut actual,
);
assert_eq!(expected, actual);
}