Uses strong types for snapshot hashes in SnapshotPackagerService (#30603)
This commit is contained in:
parent
82288d4457
commit
70c6c7e1f7
|
@ -10,7 +10,7 @@ use {
|
||||||
snapshot_config::SnapshotConfig,
|
snapshot_config::SnapshotConfig,
|
||||||
snapshot_hash::{
|
snapshot_hash::{
|
||||||
FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash,
|
FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash,
|
||||||
IncrementalSnapshotHashes, StartingSnapshotHashes,
|
IncrementalSnapshotHashes, SnapshotHash, StartingSnapshotHashes,
|
||||||
},
|
},
|
||||||
snapshot_package::{self, retain_max_n_elements, SnapshotPackage, SnapshotType},
|
snapshot_package::{self, retain_max_n_elements, SnapshotPackage, SnapshotType},
|
||||||
snapshot_utils,
|
snapshot_utils,
|
||||||
|
@ -58,17 +58,13 @@ impl SnapshotPackagerService {
|
||||||
.name("solSnapshotPkgr".to_string())
|
.name("solSnapshotPkgr".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
renice_this_thread(snapshot_config.packager_thread_niceness_adj).unwrap();
|
renice_this_thread(snapshot_config.packager_thread_niceness_adj).unwrap();
|
||||||
let mut snapshot_gossip_manager = if enable_gossip_push {
|
let mut snapshot_gossip_manager = enable_gossip_push.then(||
|
||||||
Some(SnapshotGossipManager {
|
SnapshotGossipManager::new(
|
||||||
cluster_info,
|
cluster_info,
|
||||||
max_full_snapshot_hashes,
|
max_full_snapshot_hashes,
|
||||||
max_incremental_snapshot_hashes,
|
max_incremental_snapshot_hashes,
|
||||||
full_snapshot_hashes: FullSnapshotHashes::default(),
|
)
|
||||||
incremental_snapshot_hashes: IncrementalSnapshotHashes::default(),
|
);
|
||||||
})
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
|
if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
|
||||||
snapshot_gossip_manager.push_starting_snapshot_hashes(starting_snapshot_hashes);
|
snapshot_gossip_manager.push_starting_snapshot_hashes(starting_snapshot_hashes);
|
||||||
}
|
}
|
||||||
|
@ -105,7 +101,7 @@ impl SnapshotPackagerService {
|
||||||
if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
|
if let Some(snapshot_gossip_manager) = snapshot_gossip_manager.as_mut() {
|
||||||
snapshot_gossip_manager.push_snapshot_hash(
|
snapshot_gossip_manager.push_snapshot_hash(
|
||||||
snapshot_package.snapshot_type,
|
snapshot_package.snapshot_type,
|
||||||
(snapshot_package.slot(), snapshot_package.hash().0),
|
(snapshot_package.slot(), *snapshot_package.hash()),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -201,6 +197,26 @@ struct SnapshotGossipManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SnapshotGossipManager {
|
impl SnapshotGossipManager {
|
||||||
|
/// Construct a new SnapshotGossipManager with empty snapshot hashes
|
||||||
|
fn new(
|
||||||
|
cluster_info: Arc<ClusterInfo>,
|
||||||
|
max_full_snapshot_hashes: usize,
|
||||||
|
max_incremental_snapshot_hashes: usize,
|
||||||
|
) -> Self {
|
||||||
|
SnapshotGossipManager {
|
||||||
|
cluster_info,
|
||||||
|
max_full_snapshot_hashes,
|
||||||
|
max_incremental_snapshot_hashes,
|
||||||
|
full_snapshot_hashes: FullSnapshotHashes {
|
||||||
|
hashes: Vec::default(),
|
||||||
|
},
|
||||||
|
incremental_snapshot_hashes: IncrementalSnapshotHashes {
|
||||||
|
base: (Slot::default(), SnapshotHash(Hash::default())),
|
||||||
|
hashes: Vec::default(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// If there were starting snapshot hashes, add those to their respective vectors, then push
|
/// If there were starting snapshot hashes, add those to their respective vectors, then push
|
||||||
/// those vectors to the cluster via CRDS.
|
/// those vectors to the cluster via CRDS.
|
||||||
fn push_starting_snapshot_hashes(
|
fn push_starting_snapshot_hashes(
|
||||||
|
@ -219,7 +235,11 @@ impl SnapshotGossipManager {
|
||||||
|
|
||||||
/// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the
|
/// Add `snapshot_hash` to its respective vector of hashes, then push that vector to the
|
||||||
/// cluster via CRDS.
|
/// cluster via CRDS.
|
||||||
fn push_snapshot_hash(&mut self, snapshot_type: SnapshotType, snapshot_hash: (Slot, Hash)) {
|
fn push_snapshot_hash(
|
||||||
|
&mut self,
|
||||||
|
snapshot_type: SnapshotType,
|
||||||
|
snapshot_hash: (Slot, SnapshotHash),
|
||||||
|
) {
|
||||||
match snapshot_type {
|
match snapshot_type {
|
||||||
SnapshotType::FullSnapshot => {
|
SnapshotType::FullSnapshot => {
|
||||||
self.push_full_snapshot_hash(FullSnapshotHash {
|
self.push_full_snapshot_hash(FullSnapshotHash {
|
||||||
|
@ -254,7 +274,9 @@ impl SnapshotGossipManager {
|
||||||
);
|
);
|
||||||
|
|
||||||
self.cluster_info
|
self.cluster_info
|
||||||
.push_snapshot_hashes(self.full_snapshot_hashes.hashes.clone());
|
.push_snapshot_hashes(Self::clone_hashes_for_crds(
|
||||||
|
&self.full_snapshot_hashes.hashes,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push
|
/// Add `incremental_snapshot_hash` to the vector of incremental snapshot hashes, then push
|
||||||
|
@ -287,14 +309,24 @@ impl SnapshotGossipManager {
|
||||||
// error condition here.
|
// error condition here.
|
||||||
self.cluster_info
|
self.cluster_info
|
||||||
.push_incremental_snapshot_hashes(
|
.push_incremental_snapshot_hashes(
|
||||||
self.incremental_snapshot_hashes.base,
|
Self::clone_hash_for_crds(&self.incremental_snapshot_hashes.base),
|
||||||
self.incremental_snapshot_hashes.hashes.clone(),
|
Self::clone_hashes_for_crds(&self.incremental_snapshot_hashes.hashes),
|
||||||
)
|
)
|
||||||
.expect(
|
.expect(
|
||||||
"Bug! The programmer contract has changed for push_incremental_snapshot_hashes() \
|
"Bug! The programmer contract has changed for push_incremental_snapshot_hashes() \
|
||||||
and a new error case has been added, which has not been handled here.",
|
and a new error case has been added, which has not been handled here.",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Clones and maps snapshot hashes into what CRDS expects
|
||||||
|
fn clone_hashes_for_crds(hashes: &[(Slot, SnapshotHash)]) -> Vec<(Slot, Hash)> {
|
||||||
|
hashes.iter().map(Self::clone_hash_for_crds).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clones and maps a snapshot hash into what CRDS expects
|
||||||
|
fn clone_hash_for_crds(hash: &(Slot, SnapshotHash)) -> (Slot, Hash) {
|
||||||
|
(hash.0, hash.1 .0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -230,7 +230,7 @@ fn bank_forks_from_snapshot(
|
||||||
let full_snapshot_hash = FullSnapshotHash {
|
let full_snapshot_hash = FullSnapshotHash {
|
||||||
hash: (
|
hash: (
|
||||||
full_snapshot_archive_info.slot(),
|
full_snapshot_archive_info.slot(),
|
||||||
full_snapshot_archive_info.hash().0,
|
*full_snapshot_archive_info.hash(),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
let starting_incremental_snapshot_hash =
|
let starting_incremental_snapshot_hash =
|
||||||
|
@ -239,7 +239,7 @@ fn bank_forks_from_snapshot(
|
||||||
base: full_snapshot_hash.hash,
|
base: full_snapshot_hash.hash,
|
||||||
hash: (
|
hash: (
|
||||||
incremental_snapshot_archive_info.slot(),
|
incremental_snapshot_archive_info.slot(),
|
||||||
incremental_snapshot_archive_info.hash().0,
|
*incremental_snapshot_archive_info.hash(),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -11,7 +11,7 @@ use {
|
||||||
/// SnapshotPackagerService, which is in charge of pushing the hashes to CRDS. This struct wraps
|
/// SnapshotPackagerService, which is in charge of pushing the hashes to CRDS. This struct wraps
|
||||||
/// up those values make it easier to pass from bank_forks_utils, through validator, to
|
/// up those values make it easier to pass from bank_forks_utils, through validator, to
|
||||||
/// SnapshotPackagerService.
|
/// SnapshotPackagerService.
|
||||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub struct StartingSnapshotHashes {
|
pub struct StartingSnapshotHashes {
|
||||||
pub full: FullSnapshotHash,
|
pub full: FullSnapshotHash,
|
||||||
pub incremental: Option<IncrementalSnapshotHash>,
|
pub incremental: Option<IncrementalSnapshotHash>,
|
||||||
|
@ -19,34 +19,34 @@ pub struct StartingSnapshotHashes {
|
||||||
|
|
||||||
/// Used by SnapshotPackagerService and SnapshotGossipManager, this struct adds type safety to
|
/// Used by SnapshotPackagerService and SnapshotGossipManager, this struct adds type safety to
|
||||||
/// ensure a full snapshot hash is pushed to the right CRDS.
|
/// ensure a full snapshot hash is pushed to the right CRDS.
|
||||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub struct FullSnapshotHash {
|
pub struct FullSnapshotHash {
|
||||||
pub hash: (Slot, Hash),
|
pub hash: (Slot, SnapshotHash),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Used by SnapshotPackagerService and SnapshotGossipManager, this struct adds type safety to
|
/// Used by SnapshotPackagerService and SnapshotGossipManager, this struct adds type safety to
|
||||||
/// ensure an incremental snapshot hash is pushed to the right CRDS. `base` is the (full) snapshot
|
/// ensure an incremental snapshot hash is pushed to the right CRDS. `base` is the (full) snapshot
|
||||||
/// this incremental snapshot (`hash`) is based on.
|
/// this incremental snapshot (`hash`) is based on.
|
||||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub struct IncrementalSnapshotHash {
|
pub struct IncrementalSnapshotHash {
|
||||||
pub base: (Slot, Hash),
|
pub base: (Slot, SnapshotHash),
|
||||||
pub hash: (Slot, Hash),
|
pub hash: (Slot, SnapshotHash),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// FullSnapshotHashes is used by SnapshotPackagerService to collect the snapshot hashes from full
|
/// FullSnapshotHashes is used by SnapshotPackagerService to collect the snapshot hashes from full
|
||||||
/// snapshots and then push those hashes to CRDS.
|
/// snapshots and then push those hashes to CRDS.
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct FullSnapshotHashes {
|
pub struct FullSnapshotHashes {
|
||||||
pub hashes: Vec<(Slot, Hash)>,
|
pub hashes: Vec<(Slot, SnapshotHash)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// IncrementalSnapshotHashes is used by SnapshotPackagerService to collect the snapshot hashes
|
/// IncrementalSnapshotHashes is used by SnapshotPackagerService to collect the snapshot hashes
|
||||||
/// from incremental snapshots and then push those hashes to CRDS. `base` is the (full) snapshot
|
/// from incremental snapshots and then push those hashes to CRDS. `base` is the (full) snapshot
|
||||||
/// all the incremental snapshots (`hashes`) are based on.
|
/// all the incremental snapshots (`hashes`) are based on.
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct IncrementalSnapshotHashes {
|
pub struct IncrementalSnapshotHashes {
|
||||||
pub base: (Slot, Hash),
|
pub base: (Slot, SnapshotHash),
|
||||||
pub hashes: Vec<(Slot, Hash)>,
|
pub hashes: Vec<(Slot, SnapshotHash)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The hash used for snapshot archives
|
/// The hash used for snapshot archives
|
||||||
|
|
Loading…
Reference in New Issue