Bootstrap falls back to getting highest full snapshot (#21124)
This commit is contained in:
parent
b4ff488a0d
commit
7508c36209
|
@ -811,7 +811,7 @@ mod with_incremental_snapshots {
|
|||
|
||||
/// A snapshot hash. In this context (bootstrap *with* incremental snapshots), a snapshot hash
|
||||
/// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
struct SnapshotHash {
|
||||
full: (Slot, Hash),
|
||||
incr: Option<(Slot, Hash)>,
|
||||
|
@ -1111,7 +1111,21 @@ mod with_incremental_snapshots {
|
|||
bootstrap_config: &RpcBootstrapConfig,
|
||||
rpc_peers: &[ContactInfo],
|
||||
) -> Vec<PeerSnapshotHash> {
|
||||
let trusted_snapshot_hashes = get_trusted_snapshot_hashes(cluster_info, validator_config);
|
||||
// Which strategy to use for getting the peer snapshot hashes? The standard way is what's
|
||||
// described in the function's documentation. However, if there are no trusted peers that
|
||||
// have enabled incremental snapshots, it may be possible that there are no snapshot hashes
|
||||
// with a slot that is a multiple of our full snapshot archive interval. If that happens,
|
||||
// we retry with the "fallback" strategy that *does not* filter based on full snapshot
|
||||
// archive interval.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
enum Strategy {
|
||||
Standard,
|
||||
Fallback,
|
||||
}
|
||||
|
||||
for strategy in [Strategy::Standard, Strategy::Fallback] {
|
||||
let trusted_snapshot_hashes =
|
||||
get_trusted_snapshot_hashes(cluster_info, validator_config);
|
||||
|
||||
let mut peer_snapshot_hashes = get_trusted_peer_snapshot_hashes(
|
||||
cluster_info,
|
||||
|
@ -1119,7 +1133,14 @@ mod with_incremental_snapshots {
|
|||
bootstrap_config,
|
||||
rpc_peers,
|
||||
);
|
||||
retain_trusted_peer_snapshot_hashes(&trusted_snapshot_hashes, &mut peer_snapshot_hashes);
|
||||
retain_trusted_peer_snapshot_hashes(
|
||||
&trusted_snapshot_hashes,
|
||||
&mut peer_snapshot_hashes,
|
||||
);
|
||||
|
||||
if strategy == Strategy::Standard {
|
||||
// The standard strategy is to retain only the peer snapshot hashes with a multiple
|
||||
// of our full snapshot archive interval
|
||||
retain_peer_snapshot_hashes_with_a_multiple_of_full_snapshot_archive_interval(
|
||||
validator_config
|
||||
.snapshot_config
|
||||
|
@ -1128,12 +1149,29 @@ mod with_incremental_snapshots {
|
|||
.full_snapshot_archive_interval_slots,
|
||||
&mut peer_snapshot_hashes,
|
||||
);
|
||||
|
||||
// However, if at this point peer_snasphot_hashes is empty, then retry from the
|
||||
// beginning with the "fallback" strategy and *do not* filter based on full
|
||||
// snapshot archive interval.
|
||||
if peer_snapshot_hashes.is_empty() {
|
||||
info!(
|
||||
"No peer snapshot hashes found with a slot that is a multiple of our \
|
||||
full snapshot archive interval. Retrying, but without filtering based \
|
||||
on full snapshot archive interval."
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
|
||||
retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
|
||||
&mut peer_snapshot_hashes,
|
||||
);
|
||||
|
||||
peer_snapshot_hashes
|
||||
return peer_snapshot_hashes;
|
||||
}
|
||||
|
||||
unreachable!("for-loop above is guaranteed to return");
|
||||
}
|
||||
|
||||
/// Get the snapshot hashes from trusted peers.
|
||||
|
@ -1143,9 +1181,9 @@ mod with_incremental_snapshots {
|
|||
/// individual snapshot hashes, their results will be checked against this map to verify
|
||||
/// correctness.
|
||||
///
|
||||
/// NOTE: Only a single full snashot hash is allowed per slot. If somehow two trusted peers
|
||||
/// have a full snapshot hash with the same slot and _different_ hashes, the second will be
|
||||
/// skipped, and its incremental snapshot hashes will not be added to the map.
|
||||
/// NOTE: Only a single snashot hash is allowed per slot. If somehow two trusted peers have a
|
||||
/// snapshot hash with the same slot and _different_ hashes, the second will be skipped. This
|
||||
/// applies to both full and incremental snapshot hashes.
|
||||
fn get_trusted_snapshot_hashes(
|
||||
cluster_info: &ClusterInfo,
|
||||
validator_config: &ValidatorConfig,
|
||||
|
@ -1161,14 +1199,34 @@ mod with_incremental_snapshots {
|
|||
trusted_validators
|
||||
.iter()
|
||||
.for_each(|trusted_validator| {
|
||||
// First get the Crds::SnapshotHashes for each trusted validator and add them as
|
||||
// the keys in the trusted snapshot hashes HashMap.
|
||||
let mut full_snapshot_hashes = Vec::new();
|
||||
cluster_info.get_snapshot_hash_for_node(trusted_validator, |snapshot_hashes| {
|
||||
full_snapshot_hashes = snapshot_hashes.clone();
|
||||
});
|
||||
full_snapshot_hashes.into_iter().for_each(|full_snapshot_hash| {
|
||||
// Do not add this hash if there's already a full snapshot hash with the same
|
||||
// slot but with a _different_ hash.
|
||||
// NOTE: There's no good reason for trusted validators to produce snapshots at
|
||||
// the same slot with different hashes, so this should not happen.
|
||||
if !trusted_snapshot_hashes.keys().any(|trusted_snapshot_hash| {
|
||||
trusted_snapshot_hash.0 == full_snapshot_hash.0 && trusted_snapshot_hash.1 != full_snapshot_hash.1
|
||||
}) {
|
||||
trusted_snapshot_hashes.insert(full_snapshot_hash, HashSet::new());
|
||||
} else {
|
||||
info!("Ignoring full snapshot hash from trusted validator {} with a slot we've already seen (slot: {}), but a different hash.", trusted_validator, full_snapshot_hash.0);
|
||||
}
|
||||
});
|
||||
|
||||
// Then get the Crds::IncrementalSnapshotHashes for each trusted validator and add
|
||||
// them as the values in the trusted snapshot hashes HashMap.
|
||||
if let Some(crds_value::IncrementalSnapshotHashes {base: full_snapshot_hash, hashes: incremental_snapshot_hashes, ..}) = cluster_info.get_incremental_snapshot_hashes_for_node(trusted_validator) {
|
||||
match trusted_snapshot_hashes.get_mut(&full_snapshot_hash) {
|
||||
Some(hashes) => {
|
||||
// Do not add these hashes if there's already an incremental
|
||||
// snapshot hash with this same slot, but with a _different_ hash.
|
||||
// NOTE: There's no good reason for trusted validators to
|
||||
// produce incremental snapshots at the same slot with
|
||||
// different hashes, so this should not happen.
|
||||
if let Some(hashes) = trusted_snapshot_hashes.get_mut(&full_snapshot_hash) {
|
||||
// Do not add this hash if there's already an incremental snapshot hash
|
||||
// with the same slot, but with a _different_ hash.
|
||||
// NOTE: There's no good reason for trusted validators to produce snapshots
|
||||
// at the same slot with different hashes, so this should not happen.
|
||||
for incremental_snapshot_hash in incremental_snapshot_hashes {
|
||||
if !hashes.iter().any(|(slot, hash)| slot == &incremental_snapshot_hash.0 && hash != &incremental_snapshot_hash.1) {
|
||||
hashes.insert(incremental_snapshot_hash);
|
||||
|
@ -1176,27 +1234,15 @@ mod with_incremental_snapshots {
|
|||
info!("Ignoring incremental snapshot hash from trusted validator {} with a slot we've already seen (slot: {}), but a different hash.", trusted_validator, incremental_snapshot_hash.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Do not add these hashes if there's already a full snapshot hash
|
||||
// with the same slot but with a _different_ hash.
|
||||
// NOTE: There's no good reason for trusted validators to
|
||||
// produce full snapshots at the same slot with different
|
||||
// hashes, so this should not happen.
|
||||
if !trusted_snapshot_hashes.keys().any(
|
||||
|(slot, hash)| {
|
||||
slot == &full_snapshot_hash.0
|
||||
&& hash != &full_snapshot_hash.1
|
||||
},
|
||||
) {
|
||||
let mut hashes = HashSet::new();
|
||||
hashes.extend(incremental_snapshot_hashes);
|
||||
trusted_snapshot_hashes
|
||||
.insert(full_snapshot_hash, hashes);
|
||||
} else {
|
||||
info!("Ignoring full snapshot hashes from trusted validator {} with a slot we've already seen (slot: {}), but a different hash.", trusted_validator, full_snapshot_hash.0);
|
||||
}
|
||||
}
|
||||
// Since incremental snapshots *must* have a valid base (i.e. full)
|
||||
// snapshot, if .get() returned None, then that can only happen if there
|
||||
// already is a full snapshot hash in the trusted snapshot hashes with the
|
||||
// same slot but _different_ a hash. Assert that below. If the assert
|
||||
// ever fails, there is a programmer bug.
|
||||
assert!(trusted_snapshot_hashes.keys().any(|(slot, hash)| slot == &full_snapshot_hash.0 && hash != &full_snapshot_hash.1),
|
||||
"There must exist a full snapshot hash already in trusted snapshot hashes with the same slot but a different hash");
|
||||
info!("Ignoring incremental snapshot hashes from trusted validator {} with a base slot we've already seen (base slot: {}), but a different base hash.", trusted_validator, full_snapshot_hash.0);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -1219,47 +1265,30 @@ mod with_incremental_snapshots {
|
|||
if bootstrap_config.no_untrusted_rpc
|
||||
&& !is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators)
|
||||
{
|
||||
// Ignore all untrusted peers
|
||||
// We were told to ignore untrusted peers
|
||||
continue;
|
||||
}
|
||||
|
||||
cluster_info
|
||||
.get_incremental_snapshot_hashes_for_node(&rpc_peer.id)
|
||||
.and_then(
|
||||
|crds_value::IncrementalSnapshotHashes { base, hashes, .. }| {
|
||||
// Newer hashes are pushed to the end of `hashes`, so the last element should
|
||||
// be the newest (i.e. have the highest slot).
|
||||
//
|
||||
// NOTE: It is important that the result of `last().map()` is the return value
|
||||
// from `and_then()`, so that the `or_else()` can run in _both_ scenarios where
|
||||
// either (1) the peer does not have incremental snapshots enabled, or (2) the
|
||||
// peer has not generated any incremental snapshots yet.
|
||||
hashes.last().map(|incremental_snapshot_hash| {
|
||||
peer_snapshot_hashes.push(PeerSnapshotHash {
|
||||
rpc_contact_info: rpc_peer.clone(),
|
||||
snapshot_hash: SnapshotHash {
|
||||
full: base,
|
||||
incr: Some(*incremental_snapshot_hash),
|
||||
},
|
||||
})
|
||||
})
|
||||
},
|
||||
)
|
||||
// If the peer does not have any incremental snapshot hashes, then get its highest full
|
||||
// snapshot hash instead.
|
||||
.or_else(|| {
|
||||
cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |hashes| {
|
||||
if let Some(full_snapshot_hash) = hashes.last() {
|
||||
peer_snapshot_hashes.push(PeerSnapshotHash {
|
||||
rpc_contact_info: rpc_peer.clone(),
|
||||
snapshot_hash: SnapshotHash {
|
||||
full: *full_snapshot_hash,
|
||||
incr: None,
|
||||
},
|
||||
})
|
||||
// Get the highest incremental snapshot hash for this peer.
|
||||
let mut highest_snapshot_hash =
|
||||
get_highest_incremental_snapshot_hash_for_peer(cluster_info, &rpc_peer.id);
|
||||
|
||||
// Get this peer's highest (full) snapshot hash. We need to get these snapshot hashes
|
||||
// (instead of just the IncrementalSnapshotHashes) in case the peer is either (1) not
|
||||
// taking incremental snapshots, or (2) if the last snapshot taken was a full snapshot,
|
||||
// which would get pushed to CRDS here (i.e. `crds_value::SnapshotHashes`) first.
|
||||
let snapshot_hash = get_highest_full_snapshot_hash_for_peer(cluster_info, &rpc_peer.id);
|
||||
if snapshot_hash > highest_snapshot_hash {
|
||||
highest_snapshot_hash = snapshot_hash;
|
||||
}
|
||||
})
|
||||
|
||||
// ...and add the highest snapshot hash to our return vector!
|
||||
if let Some(snapshot_hash) = highest_snapshot_hash {
|
||||
peer_snapshot_hashes.push(PeerSnapshotHash {
|
||||
rpc_contact_info: rpc_peer.clone(),
|
||||
snapshot_hash,
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
trace!("peer snapshot hashes: {:?}", &peer_snapshot_hashes);
|
||||
|
@ -1596,6 +1625,42 @@ mod with_incremental_snapshots {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get the highest full snapshot hash for a peer from CRDS
|
||||
fn get_highest_full_snapshot_hash_for_peer(
|
||||
cluster_info: &ClusterInfo,
|
||||
peer: &Pubkey,
|
||||
) -> Option<SnapshotHash> {
|
||||
let mut full_snapshot_hashes = Vec::new();
|
||||
cluster_info.get_snapshot_hash_for_node(peer, |snapshot_hashes| {
|
||||
full_snapshot_hashes = snapshot_hashes.clone()
|
||||
});
|
||||
full_snapshot_hashes
|
||||
.into_iter()
|
||||
.max()
|
||||
.map(|full_snapshot_hash| SnapshotHash {
|
||||
full: full_snapshot_hash,
|
||||
incr: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the highest incremental snapshot hash for a peer from CRDS
|
||||
fn get_highest_incremental_snapshot_hash_for_peer(
|
||||
cluster_info: &ClusterInfo,
|
||||
peer: &Pubkey,
|
||||
) -> Option<SnapshotHash> {
|
||||
cluster_info
|
||||
.get_incremental_snapshot_hashes_for_node(peer)
|
||||
.map(
|
||||
|crds_value::IncrementalSnapshotHashes { base, hashes, .. }| {
|
||||
let highest_incremental_snapshot_hash = hashes.into_iter().max();
|
||||
SnapshotHash {
|
||||
full: base,
|
||||
incr: highest_incremental_snapshot_hash,
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
Loading…
Reference in New Issue