move package_snapshots to AccountsPackagePre ctors (#18997)
This PR solves #18815. Note that I had to make the snapshot prefix constants inside `snapshot_utils.rs` public at the crate level in order to make this work. I'm not sure whether or not introducing this dependency is entirely good, either way the `snapshot_utils.rs` file needs a lot of rework so things will move around, I believe this does the work in the meantime. Any feedback will be greatly appreciated.
This commit is contained in:
parent
f7be47b8b9
commit
06e08c4840
|
@ -55,6 +55,7 @@ mod tests {
|
|||
bank_forks::BankForks,
|
||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_package::AccountsPackagePre,
|
||||
snapshot_utils::{
|
||||
self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
|
||||
},
|
||||
|
@ -254,7 +255,7 @@ mod tests {
|
|||
let snapshot_path = &snapshot_config.snapshot_path;
|
||||
let last_bank_snapshot_info = snapshot_utils::get_highest_bank_snapshot_info(snapshot_path)
|
||||
.expect("no snapshots found in path");
|
||||
let snapshot_package = snapshot_utils::package_full_snapshot(
|
||||
let snapshot_package = AccountsPackagePre::new_full_snapshot_package(
|
||||
last_bank,
|
||||
&last_bank_snapshot_info,
|
||||
snapshot_path,
|
||||
|
|
|
@ -1,10 +1,18 @@
|
|||
use crate::snapshot_utils::{ArchiveFormat, SnapshotVersion};
|
||||
use crate::{accounts_db::SnapshotStorages, bank::BankSlotDelta};
|
||||
use crate::snapshot_utils::{
|
||||
ArchiveFormat, BankSnapshotInfo, Result, SnapshotVersion, TMP_FULL_SNAPSHOT_PREFIX,
|
||||
TMP_INCREMENTAL_SNAPSHOT_PREFIX,
|
||||
};
|
||||
use crate::{
|
||||
accounts_db::SnapshotStorages,
|
||||
bank::{Bank, BankSlotDelta},
|
||||
};
|
||||
use log::*;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::genesis_config::ClusterType;
|
||||
use solana_sdk::hash::Hash;
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
fs,
|
||||
path::{Path, PathBuf},
|
||||
sync::mpsc::{Receiver, SendError, Sender},
|
||||
};
|
||||
use tempfile::TempDir;
|
||||
|
@ -60,6 +68,144 @@ impl AccountsPackagePre {
|
|||
cluster_type,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a snapshot package
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn new_snapshot_package<P>(
|
||||
bank: &Bank,
|
||||
bank_snapshot_info: &BankSnapshotInfo,
|
||||
status_cache_slot_deltas: Vec<BankSlotDelta>,
|
||||
snapshot_package_output_path: P,
|
||||
snapshot_storages: SnapshotStorages,
|
||||
archive_format: ArchiveFormat,
|
||||
snapshot_version: SnapshotVersion,
|
||||
hash_for_testing: Option<Hash>,
|
||||
snapshot_tmpdir: TempDir,
|
||||
) -> Result<Self>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
// Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging.
|
||||
{
|
||||
let snapshot_hardlink_dir = snapshot_tmpdir
|
||||
.as_ref()
|
||||
.join(bank_snapshot_info.slot.to_string());
|
||||
fs::create_dir_all(&snapshot_hardlink_dir)?;
|
||||
fs::hard_link(
|
||||
&bank_snapshot_info.snapshot_path,
|
||||
&snapshot_hardlink_dir.join(bank_snapshot_info.slot.to_string()),
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(Self::new(
|
||||
bank.slot(),
|
||||
bank.block_height(),
|
||||
status_cache_slot_deltas,
|
||||
snapshot_tmpdir,
|
||||
snapshot_storages,
|
||||
bank.get_accounts_hash(),
|
||||
archive_format,
|
||||
snapshot_version,
|
||||
snapshot_package_output_path.as_ref().to_path_buf(),
|
||||
bank.capitalization(),
|
||||
hash_for_testing,
|
||||
bank.cluster_type(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Package up bank snapshot files, snapshot storages, and slot deltas for a full snapshot.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_full_snapshot_package<P, Q>(
|
||||
bank: &Bank,
|
||||
bank_snapshot_info: &BankSnapshotInfo,
|
||||
snapshots_dir: P,
|
||||
status_cache_slot_deltas: Vec<BankSlotDelta>,
|
||||
snapshot_package_output_path: Q,
|
||||
snapshot_storages: SnapshotStorages,
|
||||
archive_format: ArchiveFormat,
|
||||
snapshot_version: SnapshotVersion,
|
||||
hash_for_testing: Option<Hash>,
|
||||
) -> Result<Self>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
{
|
||||
info!(
|
||||
"Package full snapshot for bank: {} has {} account storage entries",
|
||||
bank.slot(),
|
||||
snapshot_storages.len()
|
||||
);
|
||||
|
||||
let snapshot_tmpdir = tempfile::Builder::new()
|
||||
.prefix(&format!("{}{}-", TMP_FULL_SNAPSHOT_PREFIX, bank.slot()))
|
||||
.tempdir_in(snapshots_dir)?;
|
||||
|
||||
Self::new_snapshot_package(
|
||||
bank,
|
||||
bank_snapshot_info,
|
||||
status_cache_slot_deltas,
|
||||
snapshot_package_output_path,
|
||||
snapshot_storages,
|
||||
archive_format,
|
||||
snapshot_version,
|
||||
hash_for_testing,
|
||||
snapshot_tmpdir,
|
||||
)
|
||||
}
|
||||
|
||||
/// Package up bank snapshot files, snapshot storages, and slot deltas for an incremental snapshot.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_incremental_snapshot_package<P, Q>(
|
||||
bank: &Bank,
|
||||
incremental_snapshot_base_slot: Slot,
|
||||
bank_snapshot_info: &BankSnapshotInfo,
|
||||
snapshots_dir: P,
|
||||
status_cache_slot_deltas: Vec<BankSlotDelta>,
|
||||
snapshot_package_output_path: Q,
|
||||
snapshot_storages: SnapshotStorages,
|
||||
archive_format: ArchiveFormat,
|
||||
snapshot_version: SnapshotVersion,
|
||||
hash_for_testing: Option<Hash>,
|
||||
) -> Result<Self>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
{
|
||||
info!(
|
||||
"Package incremental snapshot for bank {} (from base slot {}) has {} account storage entries",
|
||||
bank.slot(),
|
||||
incremental_snapshot_base_slot,
|
||||
snapshot_storages.len()
|
||||
);
|
||||
|
||||
assert!(
|
||||
snapshot_storages.iter().all(|storage| storage
|
||||
.iter()
|
||||
.all(|entry| entry.slot() > incremental_snapshot_base_slot)),
|
||||
"Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!"
|
||||
);
|
||||
|
||||
let snapshot_tmpdir = tempfile::Builder::new()
|
||||
.prefix(&format!(
|
||||
"{}{}-{}-",
|
||||
TMP_INCREMENTAL_SNAPSHOT_PREFIX,
|
||||
incremental_snapshot_base_slot,
|
||||
bank.slot()
|
||||
))
|
||||
.tempdir_in(snapshots_dir)?;
|
||||
|
||||
Self::new_snapshot_package(
|
||||
bank,
|
||||
bank_snapshot_info,
|
||||
status_cache_slot_deltas,
|
||||
snapshot_package_output_path,
|
||||
snapshot_storages,
|
||||
archive_format,
|
||||
snapshot_version,
|
||||
hash_for_testing,
|
||||
snapshot_tmpdir,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AccountsPackage {
|
||||
|
|
|
@ -176,8 +176,8 @@ pub const MAX_BANK_SNAPSHOTS: usize = 8; // Save some snapshots but not too many
|
|||
const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
|
||||
const VERSION_STRING_V1_2_0: &str = "1.2.0";
|
||||
const DEFAULT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0;
|
||||
const TMP_FULL_SNAPSHOT_PREFIX: &str = "tmp-snapshot-";
|
||||
const TMP_INCREMENTAL_SNAPSHOT_PREFIX: &str = "tmp-incremental-snapshot-";
|
||||
pub(crate) const TMP_FULL_SNAPSHOT_PREFIX: &str = "tmp-snapshot-";
|
||||
pub(crate) const TMP_INCREMENTAL_SNAPSHOT_PREFIX: &str = "tmp-incremental-snapshot-";
|
||||
pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 2;
|
||||
pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz)$";
|
||||
pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P<base>[[:digit:]]+)-(?P<slot>[[:digit:]]+)-(?P<hash>[[:alnum:]]+)\.(?P<ext>tar|tar\.bz2|tar\.zst|tar\.gz)$";
|
||||
|
@ -327,144 +327,6 @@ pub enum SnapshotError {
|
|||
}
|
||||
pub type Result<T> = std::result::Result<T, SnapshotError>;
|
||||
|
||||
/// Package up bank snapshot files, snapshot storages, and slot deltas for a full snapshot.
|
||||
pub fn package_full_snapshot<P, Q>(
|
||||
bank: &Bank,
|
||||
bank_snapshot_info: &BankSnapshotInfo,
|
||||
snapshots_dir: P,
|
||||
status_cache_slot_deltas: Vec<BankSlotDelta>,
|
||||
snapshot_package_output_path: Q,
|
||||
snapshot_storages: SnapshotStorages,
|
||||
archive_format: ArchiveFormat,
|
||||
snapshot_version: SnapshotVersion,
|
||||
hash_for_testing: Option<Hash>,
|
||||
) -> Result<AccountsPackagePre>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
{
|
||||
info!(
|
||||
"Package full snapshot for bank: {} has {} account storage entries",
|
||||
bank.slot(),
|
||||
snapshot_storages.len()
|
||||
);
|
||||
|
||||
let snapshot_tmpdir = tempfile::Builder::new()
|
||||
.prefix(&format!("{}{}-", TMP_FULL_SNAPSHOT_PREFIX, bank.slot()))
|
||||
.tempdir_in(snapshots_dir)?;
|
||||
|
||||
do_package_snapshot(
|
||||
bank,
|
||||
bank_snapshot_info,
|
||||
status_cache_slot_deltas,
|
||||
snapshot_package_output_path,
|
||||
snapshot_storages,
|
||||
archive_format,
|
||||
snapshot_version,
|
||||
hash_for_testing,
|
||||
snapshot_tmpdir,
|
||||
)
|
||||
}
|
||||
|
||||
/// Package up bank snapshot files, snapshot storages, and slot deltas for an incremental snapshot.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn package_incremental_snapshot<P, Q>(
|
||||
bank: &Bank,
|
||||
incremental_snapshot_base_slot: Slot,
|
||||
bank_snapshot_info: &BankSnapshotInfo,
|
||||
snapshots_dir: P,
|
||||
status_cache_slot_deltas: Vec<BankSlotDelta>,
|
||||
snapshot_package_output_path: Q,
|
||||
snapshot_storages: SnapshotStorages,
|
||||
archive_format: ArchiveFormat,
|
||||
snapshot_version: SnapshotVersion,
|
||||
hash_for_testing: Option<Hash>,
|
||||
) -> Result<AccountsPackagePre>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
Q: AsRef<Path>,
|
||||
{
|
||||
info!(
|
||||
"Package incremental snapshot for bank {} (from base slot {}) has {} account storage entries",
|
||||
bank.slot(),
|
||||
incremental_snapshot_base_slot,
|
||||
snapshot_storages.len()
|
||||
);
|
||||
|
||||
assert!(
|
||||
snapshot_storages.iter().all(|storage| storage
|
||||
.iter()
|
||||
.all(|entry| entry.slot() > incremental_snapshot_base_slot)),
|
||||
"Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!"
|
||||
);
|
||||
|
||||
let snapshot_tmpdir = tempfile::Builder::new()
|
||||
.prefix(&format!(
|
||||
"{}{}-{}-",
|
||||
TMP_INCREMENTAL_SNAPSHOT_PREFIX,
|
||||
incremental_snapshot_base_slot,
|
||||
bank.slot()
|
||||
))
|
||||
.tempdir_in(snapshots_dir)?;
|
||||
|
||||
do_package_snapshot(
|
||||
bank,
|
||||
bank_snapshot_info,
|
||||
status_cache_slot_deltas,
|
||||
snapshot_package_output_path,
|
||||
snapshot_storages,
|
||||
archive_format,
|
||||
snapshot_version,
|
||||
hash_for_testing,
|
||||
snapshot_tmpdir,
|
||||
)
|
||||
}
|
||||
|
||||
/// Create a snapshot package
|
||||
fn do_package_snapshot<P>(
|
||||
bank: &Bank,
|
||||
bank_snapshot_info: &BankSnapshotInfo,
|
||||
status_cache_slot_deltas: Vec<BankSlotDelta>,
|
||||
snapshot_package_output_path: P,
|
||||
snapshot_storages: SnapshotStorages,
|
||||
archive_format: ArchiveFormat,
|
||||
snapshot_version: SnapshotVersion,
|
||||
hash_for_testing: Option<Hash>,
|
||||
snapshot_tmpdir: TempDir,
|
||||
) -> Result<AccountsPackagePre>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
// Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging.
|
||||
{
|
||||
let snapshot_hardlink_dir = snapshot_tmpdir
|
||||
.as_ref()
|
||||
.join(bank_snapshot_info.slot.to_string());
|
||||
fs::create_dir_all(&snapshot_hardlink_dir)?;
|
||||
fs::hard_link(
|
||||
&bank_snapshot_info.snapshot_path,
|
||||
&snapshot_hardlink_dir.join(bank_snapshot_info.slot.to_string()),
|
||||
)?;
|
||||
}
|
||||
|
||||
let package = AccountsPackagePre::new(
|
||||
bank.slot(),
|
||||
bank.block_height(),
|
||||
status_cache_slot_deltas,
|
||||
snapshot_tmpdir,
|
||||
snapshot_storages,
|
||||
bank.get_accounts_hash(),
|
||||
archive_format,
|
||||
snapshot_version,
|
||||
snapshot_package_output_path.as_ref().to_path_buf(),
|
||||
bank.capitalization(),
|
||||
hash_for_testing,
|
||||
bank.cluster_type(),
|
||||
);
|
||||
|
||||
Ok(package)
|
||||
}
|
||||
|
||||
fn get_archive_ext(archive_format: ArchiveFormat) -> &'static str {
|
||||
match archive_format {
|
||||
ArchiveFormat::TarBzip2 => "tar.bz2",
|
||||
|
@ -1735,7 +1597,7 @@ pub fn snapshot_bank(
|
|||
let highest_bank_snapshot_info = get_highest_bank_snapshot_info(snapshots_dir)
|
||||
.expect("no snapshots found in config snapshots_dir");
|
||||
|
||||
let package = package_full_snapshot(
|
||||
let package = AccountsPackagePre::new_full_snapshot_package(
|
||||
root_bank,
|
||||
&highest_bank_snapshot_info,
|
||||
snapshots_dir,
|
||||
|
@ -1848,7 +1710,7 @@ pub fn package_process_and_archive_full_snapshot(
|
|||
thread_pool: Option<&ThreadPool>,
|
||||
maximum_snapshots_to_retain: usize,
|
||||
) -> Result<PathBuf> {
|
||||
let package = package_full_snapshot(
|
||||
let package = AccountsPackagePre::new_full_snapshot_package(
|
||||
bank,
|
||||
bank_snapshot_info,
|
||||
snapshots_dir,
|
||||
|
@ -1882,7 +1744,7 @@ pub fn package_process_and_archive_incremental_snapshot(
|
|||
thread_pool: Option<&ThreadPool>,
|
||||
maximum_snapshots_to_retain: usize,
|
||||
) -> Result<PathBuf> {
|
||||
let package = package_incremental_snapshot(
|
||||
let package = AccountsPackagePre::new_incremental_snapshot_package(
|
||||
bank,
|
||||
incremental_snapshot_base_slot,
|
||||
bank_snapshot_info,
|
||||
|
|
Loading…
Reference in New Issue