Make SnapshotPackagerService aware of Incremental Snapshots (#19254)

Add a field to SnapshotPackage that is an enum for SnapshotType, so archive_snapshot_package() will do the right thing.

Fixes #19166
This commit is contained in:
Brooks Prumo 2021-08-17 13:01:59 -05:00 committed by GitHub
parent 7481d9b66b
commit f9986c66b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 74 additions and 26 deletions

View File

@ -4,14 +4,15 @@
// hash on gossip. Monitor gossip for messages from validators in the --trusted-validators // hash on gossip. Monitor gossip for messages from validators in the --trusted-validators
// set and halt the node if a mismatch is detected. // set and halt the node if a mismatch is detected.
use crate::snapshot_packager_service::PendingSnapshotPackage;
use rayon::ThreadPool; use rayon::ThreadPool;
use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_runtime::{ use solana_runtime::{
accounts_db, accounts_db,
snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackage, AccountsPackageReceiver, SnapshotPackage}, snapshot_package::{
AccountsPackage, AccountsPackageReceiver, PendingSnapshotPackage, SnapshotPackage,
},
snapshot_utils, snapshot_utils,
}; };
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
@ -222,7 +223,10 @@ impl AccountsHashVerifier {
mod tests { mod tests {
use super::*; use super::*;
use solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo}; use solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo};
use solana_runtime::snapshot_utils::{ArchiveFormat, SnapshotVersion}; use solana_runtime::{
snapshot_package::SnapshotType,
snapshot_utils::{ArchiveFormat, SnapshotVersion},
};
use solana_sdk::{ use solana_sdk::{
hash::hash, hash::hash,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
@ -314,6 +318,7 @@ mod tests {
hash, hash,
archive_format, archive_format,
snapshot_version, snapshot_version,
SnapshotType::FullSnapshot,
); );
AccountsHashVerifier::process_snapshot_package( AccountsHashVerifier::process_snapshot_package(

View File

@ -1,20 +1,18 @@
use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_runtime::{ use solana_runtime::{
snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotPackage, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::PendingSnapshotPackage,
snapshot_utils, snapshot_utils,
}; };
use solana_sdk::{clock::Slot, hash::Hash}; use solana_sdk::{clock::Slot, hash::Hash};
use std::{ use std::{
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Mutex, Arc,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::Duration,
}; };
pub type PendingSnapshotPackage = Arc<Mutex<Option<SnapshotPackage>>>;
pub struct SnapshotPackagerService { pub struct SnapshotPackagerService {
t_snapshot_packager: JoinHandle<()>, t_snapshot_packager: JoinHandle<()>,
} }
@ -45,18 +43,21 @@ impl SnapshotPackagerService {
let snapshot_package = pending_snapshot_package.lock().unwrap().take(); let snapshot_package = pending_snapshot_package.lock().unwrap().take();
if let Some(snapshot_package) = snapshot_package { if let Some(snapshot_package) = snapshot_package {
if let Err(err) = snapshot_utils::archive_snapshot_package( match snapshot_utils::archive_snapshot_package(
&snapshot_package, &snapshot_package,
maximum_snapshots_to_retain, maximum_snapshots_to_retain,
) { ) {
warn!("Failed to create snapshot archive: {}", err); Ok(_) => {
} else {
hashes.push((snapshot_package.slot(), *snapshot_package.hash())); hashes.push((snapshot_package.slot(), *snapshot_package.hash()));
while hashes.len() > MAX_SNAPSHOT_HASHES { while hashes.len() > MAX_SNAPSHOT_HASHES {
hashes.remove(0); hashes.remove(0);
} }
cluster_info.push_snapshot_hashes(hashes.clone()); cluster_info.push_snapshot_hashes(hashes.clone());
} }
Err(err) => {
warn!("Failed to create snapshot archive: {}", err);
}
};
} else { } else {
std::thread::sleep(Duration::from_millis(100)); std::thread::sleep(Duration::from_millis(100));
} }
@ -81,7 +82,7 @@ mod tests {
use solana_runtime::{ use solana_runtime::{
accounts_db::AccountStorageEntry, accounts_db::AccountStorageEntry,
bank::BankSlotDelta, bank::BankSlotDelta,
snapshot_package::SnapshotPackage, snapshot_package::{SnapshotPackage, SnapshotType},
snapshot_utils::{self, ArchiveFormat, SnapshotVersion, SNAPSHOT_STATUS_CACHE_FILE_NAME}, snapshot_utils::{self, ArchiveFormat, SnapshotVersion, SNAPSHOT_STATUS_CACHE_FILE_NAME},
}; };
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
@ -175,6 +176,7 @@ mod tests {
Hash::default(), Hash::default(),
ArchiveFormat::TarBzip2, ArchiveFormat::TarBzip2,
SnapshotVersion::default(), SnapshotVersion::default(),
SnapshotType::FullSnapshot,
); );
// Make tarball from packageable snapshot // Make tarball from packageable snapshot

View File

@ -21,7 +21,6 @@ use crate::{
shred_fetch_stage::ShredFetchStage, shred_fetch_stage::ShredFetchStage,
sigverify_shreds::ShredSigVerifier, sigverify_shreds::ShredSigVerifier,
sigverify_stage::SigVerifyStage, sigverify_stage::SigVerifyStage,
snapshot_packager_service::PendingSnapshotPackage,
tower_storage::TowerStorage, tower_storage::TowerStorage,
voting_service::VotingService, voting_service::VotingService,
}; };
@ -45,6 +44,7 @@ use solana_runtime::{
bank_forks::BankForks, bank_forks::BankForks,
commitment::BlockCommitmentCache, commitment::BlockCommitmentCache,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_package::PendingSnapshotPackage,
vote_sender_types::ReplayVoteSender, vote_sender_types::ReplayVoteSender,
}; };
use solana_sdk::{pubkey::Pubkey, signature::Keypair}; use solana_sdk::{pubkey::Pubkey, signature::Keypair};

View File

@ -13,7 +13,7 @@ use {
serve_repair::ServeRepair, serve_repair::ServeRepair,
serve_repair_service::ServeRepairService, serve_repair_service::ServeRepairService,
sigverify, sigverify,
snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}, snapshot_packager_service::SnapshotPackagerService,
tower_storage::TowerStorage, tower_storage::TowerStorage,
tpu::{Tpu, DEFAULT_TPU_COALESCE_MS}, tpu::{Tpu, DEFAULT_TPU_COALESCE_MS},
tvu::{Sockets, Tvu, TvuConfig}, tvu::{Sockets, Tvu, TvuConfig},
@ -65,6 +65,7 @@ use {
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_package::PendingSnapshotPackage,
snapshot_utils, snapshot_utils,
}, },
solana_sdk::{ solana_sdk::{

View File

@ -52,7 +52,7 @@ mod tests {
use log::{info, trace}; use log::{info, trace};
use solana_core::{ use solana_core::{
accounts_hash_verifier::AccountsHashVerifier, accounts_hash_verifier::AccountsHashVerifier,
snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}, snapshot_packager_service::SnapshotPackagerService,
}; };
use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}; use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo};
use solana_runtime::{ use solana_runtime::{
@ -66,7 +66,7 @@ mod tests {
genesis_utils::{create_genesis_config, GenesisConfigInfo}, genesis_utils::{create_genesis_config, GenesisConfigInfo},
snapshot_archive_info::FullSnapshotArchiveInfo, snapshot_archive_info::FullSnapshotArchiveInfo,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_package::AccountsPackage, snapshot_package::{AccountsPackage, PendingSnapshotPackage},
snapshot_utils::{ snapshot_utils::{
self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
}, },

View File

@ -16,14 +16,26 @@ use solana_sdk::hash::Hash;
use std::{ use std::{
fs, fs,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::mpsc::{Receiver, SendError, Sender}, sync::{
mpsc::{Receiver, SendError, Sender},
Arc, Mutex,
},
}; };
use tempfile::TempDir; use tempfile::TempDir;
/// The sender side of the AccountsPackage channel, used by AccountsBackgroundService
pub type AccountsPackageSender = Sender<AccountsPackage>; pub type AccountsPackageSender = Sender<AccountsPackage>;
/// The receiver side of the AccountsPackage channel, used by AccountsHashVerifier
pub type AccountsPackageReceiver = Receiver<AccountsPackage>; pub type AccountsPackageReceiver = Receiver<AccountsPackage>;
/// The error type when sending an AccountsPackage over the channel fails
pub type AccountsPackageSendError = SendError<AccountsPackage>; pub type AccountsPackageSendError = SendError<AccountsPackage>;
/// The PendingSnapshotPackage passes a SnapshotPackage from AccountsHashVerifier to
/// SnapshotPackagerService for archiving
pub type PendingSnapshotPackage = Arc<Mutex<Option<SnapshotPackage>>>;
#[derive(Debug)] #[derive(Debug)]
pub struct AccountsPackage { pub struct AccountsPackage {
pub slot: Slot, pub slot: Slot,
@ -176,9 +188,11 @@ pub struct SnapshotPackage {
pub snapshot_links: TempDir, pub snapshot_links: TempDir,
pub storages: SnapshotStorages, pub storages: SnapshotStorages,
pub snapshot_version: SnapshotVersion, pub snapshot_version: SnapshotVersion,
pub snapshot_type: SnapshotType,
} }
impl SnapshotPackage { impl SnapshotPackage {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
slot: Slot, slot: Slot,
block_height: u64, block_height: u64,
@ -189,6 +203,7 @@ impl SnapshotPackage {
hash: Hash, hash: Hash,
archive_format: ArchiveFormat, archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion, snapshot_version: SnapshotVersion,
snapshot_type: SnapshotType,
) -> Self { ) -> Self {
Self { Self {
snapshot_archive_info: SnapshotArchiveInfo { snapshot_archive_info: SnapshotArchiveInfo {
@ -202,6 +217,7 @@ impl SnapshotPackage {
snapshot_links, snapshot_links,
storages, storages,
snapshot_version, snapshot_version,
snapshot_type,
} }
} }
} }
@ -211,3 +227,19 @@ impl SnapshotArchiveInfoGetter for SnapshotPackage {
&self.snapshot_archive_info &self.snapshot_archive_info
} }
} }
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SnapshotType {
FullSnapshot,
IncrementalSnapshot,
}
impl SnapshotType {
/// Get the string prefix of the snapshot type
pub fn to_prefix(&self) -> &'static str {
match self {
SnapshotType::FullSnapshot => TMP_FULL_SNAPSHOT_PREFIX,
SnapshotType::IncrementalSnapshot => TMP_INCREMENTAL_SNAPSHOT_PREFIX,
}
}
}

View File

@ -14,6 +14,7 @@ use {
}, },
snapshot_package::{ snapshot_package::{
AccountsPackage, AccountsPackageSendError, AccountsPackageSender, SnapshotPackage, AccountsPackage, AccountsPackageSendError, AccountsPackageSender, SnapshotPackage,
SnapshotType,
}, },
sorted_storages::SortedStorages, sorted_storages::SortedStorages,
}, },
@ -239,10 +240,10 @@ pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: &Path) {
} }
} }
/// Make a full snapshot archive out of the snapshot package /// Make a snapshot archive out of the snapshot package
pub fn archive_snapshot_package( pub fn archive_snapshot_package(
snapshot_package: &SnapshotPackage, snapshot_package: &SnapshotPackage,
maximum_snapshots_to_retain: usize, maximum_snapshot_archives_to_retain: usize,
) -> Result<()> { ) -> Result<()> {
info!( info!(
"Generating snapshot archive for slot {}", "Generating snapshot archive for slot {}",
@ -268,10 +269,11 @@ pub fn archive_snapshot_package(
.map_err(|e| SnapshotError::IoWithSource(e, "create archive path"))?; .map_err(|e| SnapshotError::IoWithSource(e, "create archive path"))?;
// Create the staging directories // Create the staging directories
let staging_dir_prefix = snapshot_package.snapshot_type.to_prefix();
let staging_dir = tempfile::Builder::new() let staging_dir = tempfile::Builder::new()
.prefix(&format!( .prefix(&format!(
"{}{}-", "{}{}-",
TMP_FULL_SNAPSHOT_PREFIX, staging_dir_prefix,
snapshot_package.slot() snapshot_package.slot()
)) ))
.tempdir_in(tar_dir) .tempdir_in(tar_dir)
@ -323,7 +325,7 @@ pub fn archive_snapshot_package(
// Tar the staging directory into the archive at `archive_path` // Tar the staging directory into the archive at `archive_path`
let archive_path = tar_dir.join(format!( let archive_path = tar_dir.join(format!(
"{}{}.{}", "{}{}.{}",
TMP_FULL_SNAPSHOT_PREFIX, staging_dir_prefix,
snapshot_package.slot(), snapshot_package.slot(),
file_ext file_ext
)); ));
@ -371,7 +373,7 @@ pub fn archive_snapshot_package(
fs::rename(&archive_path, &snapshot_package.path()) fs::rename(&archive_path, &snapshot_package.path())
.map_err(|e| SnapshotError::IoWithSource(e, "archive path rename"))?; .map_err(|e| SnapshotError::IoWithSource(e, "archive path rename"))?;
purge_old_snapshot_archives(tar_dir, maximum_snapshots_to_retain); purge_old_snapshot_archives(tar_dir, maximum_snapshot_archives_to_retain);
timer.stop(); timer.stop();
info!( info!(
@ -1794,6 +1796,11 @@ pub fn process_accounts_package(
), ),
}; };
let snapshot_type = match incremental_snapshot_base_slot {
None => SnapshotType::FullSnapshot,
Some(_) => SnapshotType::IncrementalSnapshot,
};
SnapshotPackage::new( SnapshotPackage::new(
accounts_package.slot, accounts_package.slot,
accounts_package.block_height, accounts_package.block_height,
@ -1804,6 +1811,7 @@ pub fn process_accounts_package(
hash, hash,
accounts_package.archive_format, accounts_package.archive_format,
accounts_package.snapshot_version, accounts_package.snapshot_version,
snapshot_type,
) )
} }