Refactors the Full/Incremental SnapshotHash types (#31186)
This commit is contained in:
parent
875df9600d
commit
d43e19bb03
|
@ -2,7 +2,7 @@ use {
|
|||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_runtime::{
|
||||
snapshot_hash::{
|
||||
FullSnapshotHash, FullSnapshotHashes, SnapshotHash, StartingSnapshotHashes,
|
||||
FullSnapshotHash, IncrementalSnapshotHash, SnapshotHash, StartingSnapshotHashes,
|
||||
},
|
||||
snapshot_package::{retain_max_n_elements, SnapshotType},
|
||||
},
|
||||
|
@ -14,8 +14,8 @@ use {
|
|||
pub struct SnapshotGossipManager {
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
latest_snapshot_hashes: Option<LatestSnapshotHashes>,
|
||||
max_full_snapshot_hashes: usize,
|
||||
legacy_full_snapshot_hashes: FullSnapshotHashes,
|
||||
max_legacy_full_snapshot_hashes: usize,
|
||||
legacy_full_snapshot_hashes: Vec<FullSnapshotHash>,
|
||||
}
|
||||
|
||||
impl SnapshotGossipManager {
|
||||
|
@ -24,16 +24,14 @@ impl SnapshotGossipManager {
|
|||
#[must_use]
|
||||
pub fn new(
|
||||
cluster_info: Arc<ClusterInfo>,
|
||||
max_full_snapshot_hashes: usize,
|
||||
max_legacy_full_snapshot_hashes: usize,
|
||||
starting_snapshot_hashes: Option<StartingSnapshotHashes>,
|
||||
) -> Self {
|
||||
let mut this = SnapshotGossipManager {
|
||||
cluster_info,
|
||||
latest_snapshot_hashes: None,
|
||||
max_full_snapshot_hashes,
|
||||
legacy_full_snapshot_hashes: FullSnapshotHashes {
|
||||
hashes: Vec::default(),
|
||||
},
|
||||
max_legacy_full_snapshot_hashes,
|
||||
legacy_full_snapshot_hashes: Vec::default(),
|
||||
};
|
||||
if let Some(starting_snapshot_hashes) = starting_snapshot_hashes {
|
||||
this.push_starting_snapshot_hashes(starting_snapshot_hashes);
|
||||
|
@ -43,11 +41,11 @@ impl SnapshotGossipManager {
|
|||
|
||||
/// Push starting snapshot hashes to the cluster via CRDS
|
||||
fn push_starting_snapshot_hashes(&mut self, starting_snapshot_hashes: StartingSnapshotHashes) {
|
||||
self.update_latest_full_snapshot_hash(starting_snapshot_hashes.full.hash);
|
||||
self.update_latest_full_snapshot_hash(starting_snapshot_hashes.full);
|
||||
if let Some(starting_incremental_snapshot_hash) = starting_snapshot_hashes.incremental {
|
||||
self.update_latest_incremental_snapshot_hash(
|
||||
starting_incremental_snapshot_hash.hash,
|
||||
starting_snapshot_hashes.full.hash.0,
|
||||
starting_incremental_snapshot_hash,
|
||||
starting_snapshot_hashes.full.0 .0,
|
||||
);
|
||||
}
|
||||
self.push_latest_snapshot_hashes_to_cluster();
|
||||
|
@ -65,30 +63,31 @@ impl SnapshotGossipManager {
|
|||
) {
|
||||
match snapshot_type {
|
||||
SnapshotType::FullSnapshot => {
|
||||
self.push_full_snapshot_hash(snapshot_hash);
|
||||
self.push_full_snapshot_hash(FullSnapshotHash(snapshot_hash));
|
||||
}
|
||||
SnapshotType::IncrementalSnapshot(base_slot) => {
|
||||
self.push_incremental_snapshot_hash(snapshot_hash, base_slot);
|
||||
self.push_incremental_snapshot_hash(
|
||||
IncrementalSnapshotHash(snapshot_hash),
|
||||
base_slot,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Push new full snapshot hash to the cluster via CRDS
|
||||
fn push_full_snapshot_hash(&mut self, full_snapshot_hash: (Slot, SnapshotHash)) {
|
||||
fn push_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
|
||||
self.update_latest_full_snapshot_hash(full_snapshot_hash);
|
||||
self.push_latest_snapshot_hashes_to_cluster();
|
||||
|
||||
// Handle legacy snapshot hashes here too
|
||||
// Once LegacySnapshotHashes are removed from CRDS, also remove them here
|
||||
self.push_legacy_full_snapshot_hash(FullSnapshotHash {
|
||||
hash: full_snapshot_hash,
|
||||
});
|
||||
self.push_legacy_full_snapshot_hash(full_snapshot_hash);
|
||||
}
|
||||
|
||||
/// Push new incremental snapshot hash to the cluster via CRDS
|
||||
fn push_incremental_snapshot_hash(
|
||||
&mut self,
|
||||
incremental_snapshot_hash: (Slot, SnapshotHash),
|
||||
incremental_snapshot_hash: IncrementalSnapshotHash,
|
||||
base_slot: Slot,
|
||||
) {
|
||||
self.update_latest_incremental_snapshot_hash(incremental_snapshot_hash, base_slot);
|
||||
|
@ -96,7 +95,7 @@ impl SnapshotGossipManager {
|
|||
}
|
||||
|
||||
/// Update the latest snapshot hashes with a new full snapshot
|
||||
fn update_latest_full_snapshot_hash(&mut self, full_snapshot_hash: (Slot, SnapshotHash)) {
|
||||
fn update_latest_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
|
||||
self.latest_snapshot_hashes = Some(LatestSnapshotHashes {
|
||||
full: full_snapshot_hash,
|
||||
// If we've gotten a new full snapshot, we know there cannot be any
|
||||
|
@ -108,7 +107,7 @@ impl SnapshotGossipManager {
|
|||
/// Update the latest snapshot hashes with a new incremental snapshot
|
||||
fn update_latest_incremental_snapshot_hash(
|
||||
&mut self,
|
||||
incremental_snapshot_hash: (Slot, SnapshotHash),
|
||||
incremental_snapshot_hash: IncrementalSnapshotHash,
|
||||
base_slot: Slot,
|
||||
) {
|
||||
let latest_snapshot_hashes = self
|
||||
|
@ -116,9 +115,9 @@ impl SnapshotGossipManager {
|
|||
.as_mut()
|
||||
.expect("there must already be a full snapshot hash");
|
||||
assert_eq!(
|
||||
base_slot, latest_snapshot_hashes.full.0,
|
||||
base_slot, latest_snapshot_hashes.full.0.0,
|
||||
"the incremental snapshot's base slot ({}) must match the latest full snapshot's slot ({})",
|
||||
base_slot, latest_snapshot_hashes.full.0,
|
||||
base_slot, latest_snapshot_hashes.full.0.0,
|
||||
);
|
||||
latest_snapshot_hashes.incremental = Some(incremental_snapshot_hash);
|
||||
}
|
||||
|
@ -135,11 +134,11 @@ impl SnapshotGossipManager {
|
|||
// `push_snapshot_hashes()` and handle the new error condition here.
|
||||
self.cluster_info
|
||||
.push_snapshot_hashes(
|
||||
clone_hash_for_crds(&latest_snapshot_hashes.full),
|
||||
latest_snapshot_hashes.full.clone_for_crds(),
|
||||
latest_snapshot_hashes
|
||||
.incremental
|
||||
.iter()
|
||||
.map(clone_hash_for_crds)
|
||||
.map(AsSnapshotHash::clone_for_crds)
|
||||
.collect(),
|
||||
)
|
||||
.expect(
|
||||
|
@ -151,34 +150,49 @@ impl SnapshotGossipManager {
|
|||
/// Add `full_snapshot_hash` to the vector of full snapshot hashes, then push that vector to
|
||||
/// the cluster via CRDS.
|
||||
fn push_legacy_full_snapshot_hash(&mut self, full_snapshot_hash: FullSnapshotHash) {
|
||||
self.legacy_full_snapshot_hashes
|
||||
.hashes
|
||||
.push(full_snapshot_hash.hash);
|
||||
self.legacy_full_snapshot_hashes.push(full_snapshot_hash);
|
||||
|
||||
retain_max_n_elements(
|
||||
&mut self.legacy_full_snapshot_hashes.hashes,
|
||||
self.max_full_snapshot_hashes,
|
||||
&mut self.legacy_full_snapshot_hashes,
|
||||
self.max_legacy_full_snapshot_hashes,
|
||||
);
|
||||
|
||||
self.cluster_info
|
||||
.push_legacy_snapshot_hashes(clone_hashes_for_crds(
|
||||
&self.legacy_full_snapshot_hashes.hashes,
|
||||
self.legacy_full_snapshot_hashes.as_slice(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
struct LatestSnapshotHashes {
|
||||
full: (Slot, SnapshotHash),
|
||||
incremental: Option<(Slot, SnapshotHash)>,
|
||||
full: FullSnapshotHash,
|
||||
incremental: Option<IncrementalSnapshotHash>,
|
||||
}
|
||||
|
||||
trait AsSnapshotHash {
|
||||
fn as_snapshot_hash(&self) -> &(Slot, SnapshotHash);
|
||||
|
||||
/// Clones and maps a into what CRDS expects
|
||||
fn clone_for_crds(&self) -> (Slot, Hash) {
|
||||
let (slot, snapshot_hash) = self.as_snapshot_hash();
|
||||
(*slot, snapshot_hash.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsSnapshotHash for FullSnapshotHash {
|
||||
fn as_snapshot_hash(&self) -> &(Slot, SnapshotHash) {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl AsSnapshotHash for IncrementalSnapshotHash {
|
||||
fn as_snapshot_hash(&self) -> &(Slot, SnapshotHash) {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Clones and maps snapshot hashes into what CRDS expects
|
||||
fn clone_hashes_for_crds(hashes: &[(Slot, SnapshotHash)]) -> Vec<(Slot, Hash)> {
|
||||
hashes.iter().map(clone_hash_for_crds).collect()
|
||||
}
|
||||
|
||||
/// Clones and maps a snapshot hash into what CRDS expects
|
||||
fn clone_hash_for_crds(hash: &(Slot, SnapshotHash)) -> (Slot, Hash) {
|
||||
(hash.0, hash.1 .0)
|
||||
fn clone_hashes_for_crds(hashes: &[impl AsSnapshotHash]) -> Vec<(Slot, Hash)> {
|
||||
hashes.iter().map(AsSnapshotHash::clone_for_crds).collect()
|
||||
}
|
||||
|
|
|
@ -731,7 +731,7 @@ impl Validator {
|
|||
let pruned_banks_request_handler = PrunedBanksRequestHandler {
|
||||
pruned_banks_receiver,
|
||||
};
|
||||
let last_full_snapshot_slot = starting_snapshot_hashes.map(|x| x.full.hash.0);
|
||||
let last_full_snapshot_slot = starting_snapshot_hashes.map(|x| x.full.0 .0);
|
||||
let accounts_background_service = AccountsBackgroundService::new(
|
||||
bank_forks.clone(),
|
||||
&exit,
|
||||
|
|
|
@ -3449,7 +3449,7 @@ fn main() {
|
|||
eprintln!("Unable to create incremental snapshot without a base full snapshot");
|
||||
exit(1);
|
||||
}
|
||||
let full_snapshot_slot = starting_snapshot_hashes.unwrap().full.hash.0;
|
||||
let full_snapshot_slot = starting_snapshot_hashes.unwrap().full.0 .0;
|
||||
if bank.slot() <= full_snapshot_slot {
|
||||
eprintln!(
|
||||
"Unable to create incremental snapshot: Slot must be greater than full snapshot slot. slot: {}, full snapshot slot: {}",
|
||||
|
|
|
@ -227,20 +227,16 @@ fn bank_forks_from_snapshot(
|
|||
deserialized_bank.set_shrink_paths(shrink_paths);
|
||||
}
|
||||
|
||||
let full_snapshot_hash = FullSnapshotHash {
|
||||
hash: (
|
||||
full_snapshot_archive_info.slot(),
|
||||
*full_snapshot_archive_info.hash(),
|
||||
),
|
||||
};
|
||||
let full_snapshot_hash = FullSnapshotHash((
|
||||
full_snapshot_archive_info.slot(),
|
||||
*full_snapshot_archive_info.hash(),
|
||||
));
|
||||
let starting_incremental_snapshot_hash =
|
||||
incremental_snapshot_archive_info.map(|incremental_snapshot_archive_info| {
|
||||
IncrementalSnapshotHash {
|
||||
hash: (
|
||||
incremental_snapshot_archive_info.slot(),
|
||||
*incremental_snapshot_archive_info.hash(),
|
||||
),
|
||||
}
|
||||
IncrementalSnapshotHash((
|
||||
incremental_snapshot_archive_info.slot(),
|
||||
*incremental_snapshot_archive_info.hash(),
|
||||
))
|
||||
});
|
||||
let starting_snapshot_hashes = StartingSnapshotHashes {
|
||||
full: full_snapshot_hash,
|
||||
|
|
|
@ -20,32 +20,12 @@ pub struct StartingSnapshotHashes {
|
|||
/// Used by SnapshotPackagerService and SnapshotGossipManager, this struct adds type safety to
|
||||
/// ensure a full snapshot hash is pushed to the right CRDS.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct FullSnapshotHash {
|
||||
pub hash: (Slot, SnapshotHash),
|
||||
}
|
||||
pub struct FullSnapshotHash(pub (Slot, SnapshotHash));
|
||||
|
||||
/// Used by SnapshotPackagerService and SnapshotGossipManager, this struct adds type safety to
|
||||
/// ensure an incremental snapshot hash is pushed to the right CRDS.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct IncrementalSnapshotHash {
|
||||
pub hash: (Slot, SnapshotHash),
|
||||
}
|
||||
|
||||
/// FullSnapshotHashes is used by SnapshotPackagerService to collect the snapshot hashes from full
|
||||
/// snapshots and then push those hashes to CRDS.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct FullSnapshotHashes {
|
||||
pub hashes: Vec<(Slot, SnapshotHash)>,
|
||||
}
|
||||
|
||||
/// IncrementalSnapshotHashes is used by SnapshotPackagerService to collect the snapshot hashes
|
||||
/// from incremental snapshots and then push those hashes to CRDS. `base` is the (full) snapshot
|
||||
/// all the incremental snapshots (`hashes`) are based on.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct IncrementalSnapshotHashes {
|
||||
pub base: (Slot, SnapshotHash),
|
||||
pub hashes: Vec<(Slot, SnapshotHash)>,
|
||||
}
|
||||
pub struct IncrementalSnapshotHash(pub (Slot, SnapshotHash));
|
||||
|
||||
/// The hash used for snapshot archives
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
|
|
Loading…
Reference in New Issue