Only push latest snapshot hashes in SnapshotGossipManager (#31154)
This commit is contained in:
parent
f2e3a0d821
commit
944b9d574a
|
@ -2,8 +2,7 @@ use {
|
|||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_runtime::{
|
||||
snapshot_hash::{
|
||||
FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash,
|
||||
IncrementalSnapshotHashes, SnapshotHash, StartingSnapshotHashes,
|
||||
FullSnapshotHash, FullSnapshotHashes, SnapshotHash, StartingSnapshotHashes,
|
||||
},
|
||||
snapshot_package::{retain_max_n_elements, SnapshotType},
|
||||
},
|
||||
|
@ -14,10 +13,9 @@ use {
|
|||
/// Manage pushing snapshot hash information to gossip
|
||||
pub struct SnapshotGossipManager {
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
latest_snapshot_hashes: Option<LatestSnapshotHashes>,
|
||||
max_full_snapshot_hashes: usize,
|
||||
max_incremental_snapshot_hashes: usize,
|
||||
full_snapshot_hashes: FullSnapshotHashes,
|
||||
incremental_snapshot_hashes: IncrementalSnapshotHashes,
|
||||
legacy_full_snapshot_hashes: FullSnapshotHashes,
|
||||
}
|
||||
|
||||
impl SnapshotGossipManager {
|
||||
|
@ -26,40 +24,42 @@ impl SnapshotGossipManager {
|
|||
pub fn new(
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
max_full_snapshot_hashes: usize,
|
||||
max_incremental_snapshot_hashes: usize,
|
||||
_max_incremental_snapshot_hashes: usize,
|
||||
) -> Self {
|
||||
SnapshotGossipManager {
|
||||
cluster_info,
|
||||
latest_snapshot_hashes: None,
|
||||
max_full_snapshot_hashes,
|
||||
max_incremental_snapshot_hashes,
|
||||
full_snapshot_hashes: FullSnapshotHashes {
|
||||
hashes: Vec::default(),
|
||||
},
|
||||
incremental_snapshot_hashes: IncrementalSnapshotHashes {
|
||||
base: (Slot::default(), SnapshotHash(Hash::default())),
|
||||
legacy_full_snapshot_hashes: FullSnapshotHashes {
|
||||
hashes: Vec::default(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// If there were starting snapshot hashes, add those to their respective vectors, then push
|
||||
/// those vectors to the cluster via CRDS.
|
||||
/// Push any starting snapshot hashes to the cluster via CRDS
|
||||
pub fn push_starting_snapshot_hashes(
|
||||
&mut self,
|
||||
starting_snapshot_hashes: Option<StartingSnapshotHashes>,
|
||||
) {
|
||||
if let Some(starting_snapshot_hashes) = starting_snapshot_hashes {
|
||||
let starting_full_snapshot_hash = starting_snapshot_hashes.full;
|
||||
self.push_full_snapshot_hash(starting_full_snapshot_hash);
|
||||
let Some(starting_snapshot_hashes) = starting_snapshot_hashes else {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental {
|
||||
self.push_incremental_snapshot_hash(starting_incremental_snapshot_hash);
|
||||
};
|
||||
self.update_latest_full_snapshot_hash(starting_snapshot_hashes.full.hash);
|
||||
if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental {
|
||||
self.update_latest_incremental_snapshot_hash(
|
||||
starting_incremental_snapshot_hash.hash,
|
||||
starting_incremental_snapshot_hash.base.0,
|
||||
);
|
||||
}
|
||||
self.push_latest_snapshot_hashes_to_cluster();
|
||||
|
||||
// Handle legacy snapshot hashes here too
|
||||
// Once LegacySnapshotHashes are removed from CRDS, also remove them here
|
||||
self.push_legacy_full_snapshot_hash(starting_snapshot_hashes.full);
|
||||
}
|
||||
|
||||
/// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the
|
||||
/// cluster via CRDS.
|
||||
/// Push new snapshot hash to the cluster via CRDS
|
||||
pub fn push_snapshot_hash(
|
||||
&mut self,
|
||||
snapshot_type: SnapshotType,
|
||||
|
@ -67,79 +67,110 @@ impl SnapshotGossipManager {
|
|||
) {
|
||||
match snapshot_type {
|
||||
SnapshotType::FullSnapshot => {
|
||||
self.push_full_snapshot_hash(FullSnapshotHash {
|
||||
hash: snapshot_hash,
|
||||
});
|
||||
self.push_full_snapshot_hash(snapshot_hash);
|
||||
}
|
||||
SnapshotType::IncrementalSnapshot(base_slot) => {
|
||||
let latest_full_snapshot_hash = *self.full_snapshot_hashes.hashes.last().unwrap();
|
||||
assert_eq!(
|
||||
base_slot, latest_full_snapshot_hash.0,
|
||||
"the incremental snapshot's base slot ({}) must match the latest full snapshot hash's slot ({})",
|
||||
base_slot, latest_full_snapshot_hash.0,
|
||||
);
|
||||
self.push_incremental_snapshot_hash(IncrementalSnapshotHash {
|
||||
base: latest_full_snapshot_hash,
|
||||
hash: snapshot_hash,
|
||||
});
|
||||
self.push_incremental_snapshot_hash(snapshot_hash, base_slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Push new full snapshot hash to the cluster via CRDS
|
||||
fn push_full_snapshot_hash(&mut self, full_snapshot_hash: (Slot, SnapshotHash)) {
|
||||
self.update_latest_full_snapshot_hash(full_snapshot_hash);
|
||||
self.push_latest_snapshot_hashes_to_cluster();
|
||||
|
||||
// Handle legacy snapshot hashes here too
|
||||
// Once LegacySnapshotHashes are removed from CRDS, also remove them here
|
||||
self.push_legacy_full_snapshot_hash(FullSnapshotHash {
|
||||
hash: full_snapshot_hash,
|
||||
});
|
||||
}
|
||||
|
||||
/// Push new incremental snapshot hash to the cluster via CRDS
|
||||
fn push_incremental_snapshot_hash(
|
||||
&mut self,
|
||||
incremental_snapshot_hash: (Slot, SnapshotHash),
|
||||
base_slot: Slot,
|
||||
) {
|
||||
self.update_latest_incremental_snapshot_hash(incremental_snapshot_hash, base_slot);
|
||||
self.push_latest_snapshot_hashes_to_cluster();
|
||||
}
|
||||
|
||||
/// Update the latest snapshot hashes with a new full snapshot
|
||||
fn update_latest_full_snapshot_hash(&mut self, full_snapshot_hash: (Slot, SnapshotHash)) {
|
||||
self.latest_snapshot_hashes = Some(LatestSnapshotHashes {
|
||||
full: full_snapshot_hash,
|
||||
incremental: None,
|
||||
});
|
||||
}
|
||||
|
||||
/// Update the latest snapshot hashes with a new incremental snapshot
|
||||
fn update_latest_incremental_snapshot_hash(
|
||||
&mut self,
|
||||
incremental_snapshot_hash: (Slot, SnapshotHash),
|
||||
base_slot: Slot,
|
||||
) {
|
||||
let latest_snapshot_hashes = self
|
||||
.latest_snapshot_hashes
|
||||
.as_mut()
|
||||
.expect("there must already be a full snapshot hash");
|
||||
assert_eq!(
|
||||
base_slot, latest_snapshot_hashes.full.0,
|
||||
"the incremental snapshot's base slot ({}) must match the latest full snapshot's slot ({})",
|
||||
base_slot, latest_snapshot_hashes.full.0,
|
||||
);
|
||||
latest_snapshot_hashes.incremental = Some(incremental_snapshot_hash);
|
||||
}
|
||||
|
||||
/// Push the latest snapshot hashes to the cluster via CRDS
|
||||
fn push_latest_snapshot_hashes_to_cluster(&self) {
|
||||
let Some(latest_snapshot_hashes) = self.latest_snapshot_hashes.as_ref() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Pushing snapshot hashes to the cluster should never fail. The only error case is when
|
||||
// the length of the incremental hashes is too big, (and we send a maximum of one here).
|
||||
// If this call ever does error, it's a programmer bug! Check to see what changed in
|
||||
// `push_snapshot_hashes()` and handle the new error condition here.
|
||||
self.cluster_info
|
||||
.push_snapshot_hashes(
|
||||
clone_hash_for_crds(&latest_snapshot_hashes.full),
|
||||
latest_snapshot_hashes
|
||||
.incremental
|
||||
.iter()
|
||||
.map(clone_hash_for_crds)
|
||||
.collect(),
|
||||
)
|
||||
.expect(
|
||||
"Bug! The programmer contract has changed for push_snapshot_hashes() \
|
||||
and a new error case has been added that has not been handled here.",
|
||||
);
|
||||
}
|
||||
|
||||
/// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to
|
||||
/// the cluster via CRDS.
|
||||
fn push_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
|
||||
self.full_snapshot_hashes
|
||||
fn push_legacy_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
|
||||
self.legacy_full_snapshot_hashes
|
||||
.hashes
|
||||
.push(full_snapshot_hash.hash);
|
||||
|
||||
retain_max_n_elements(
|
||||
&mut self.full_snapshot_hashes.hashes,
|
||||
&mut self.legacy_full_snapshot_hashes.hashes,
|
||||
self.max_full_snapshot_hashes,
|
||||
);
|
||||
|
||||
self.cluster_info
|
||||
.push_legacy_snapshot_hashes(clone_hashes_for_crds(&self.full_snapshot_hashes.hashes));
|
||||
.push_legacy_snapshot_hashes(clone_hashes_for_crds(
|
||||
&self.legacy_full_snapshot_hashes.hashes,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push
|
||||
/// that vector to the cluster via CRDS.
|
||||
fn push_incremental_snapshot_hash(
|
||||
&mut self,
|
||||
incremental_snapshot_hash: IncrementalSnapshotHash,
|
||||
) {
|
||||
// If the base snapshot hash is different from the one in IncrementalSnapshotHashes, then
|
||||
// that means the old incremental snapshot hashes are no longer valid, so clear them all
|
||||
// out.
|
||||
if incremental_snapshot_hash.base != self.incremental_snapshot_hashes.base {
|
||||
self.incremental_snapshot_hashes.hashes.clear();
|
||||
self.incremental_snapshot_hashes.base = incremental_snapshot_hash.base;
|
||||
}
|
||||
|
||||
self.incremental_snapshot_hashes
|
||||
.hashes
|
||||
.push(incremental_snapshot_hash.hash);
|
||||
|
||||
retain_max_n_elements(
|
||||
&mut self.incremental_snapshot_hashes.hashes,
|
||||
self.max_incremental_snapshot_hashes,
|
||||
);
|
||||
|
||||
// Pushing incremental snapshot hashes to the cluster should never fail. The only error
|
||||
// case is when the length of the hashes is too big, but we account for that with
|
||||
// `max_incremental_snapshot_hashes`. If this call ever does error, it's a programmer bug!
|
||||
// Check to see what changed in `push_snapshot_hashes()` and handle the new
|
||||
// error condition here.
|
||||
self.cluster_info
|
||||
.push_snapshot_hashes(
|
||||
clone_hash_for_crds(&self.incremental_snapshot_hashes.base),
|
||||
clone_hashes_for_crds(&self.incremental_snapshot_hashes.hashes),
|
||||
)
|
||||
.expect(
|
||||
"Bug! The programmer contract has changed for push_snapshot_hashes() \
|
||||
and a new error case has been added, which has not been handled here.",
|
||||
);
|
||||
}
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
struct LatestSnapshotHashes {
|
||||
full: (Slot, SnapshotHash),
|
||||
incremental: Option<(Slot, SnapshotHash)>,
|
||||
}
|
||||
|
||||
/// Clones and maps snapshot hashes into what CRDS expects
|
||||
|
|
Loading…
Reference in New Issue