From f9986c66b862faedabfebf4ecdf00dabe81d752b Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Tue, 17 Aug 2021 13:01:59 -0500 Subject: [PATCH] Make SnapshotPackagerService aware of Incremental Snapshots (#19254) Add a field to SnapshotPackage that is an enum for SnapshotType, so archive_snapshot_package() will do the right thing. Fixes #19166 --- core/src/accounts_hash_verifier.rs | 11 ++++++--- core/src/snapshot_packager_service.rs | 28 ++++++++++++---------- core/src/tvu.rs | 2 +- core/src/validator.rs | 3 ++- core/tests/snapshots.rs | 4 ++-- runtime/src/snapshot_package.rs | 34 ++++++++++++++++++++++++++- runtime/src/snapshot_utils.rs | 18 ++++++++++---- 7 files changed, 74 insertions(+), 26 deletions(-) diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index 61026a500a..832152b9bf 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -4,14 +4,15 @@ // hash on gossip. Monitor gossip for messages from validators in the --trusted-validators // set and halt the node if a mismatch is detected. -use crate::snapshot_packager_service::PendingSnapshotPackage; use rayon::ThreadPool; use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; use solana_runtime::{ accounts_db, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, - snapshot_package::{AccountsPackage, AccountsPackageReceiver, SnapshotPackage}, + snapshot_package::{ + AccountsPackage, AccountsPackageReceiver, PendingSnapshotPackage, SnapshotPackage, + }, snapshot_utils, }; use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; @@ -222,7 +223,10 @@ impl AccountsHashVerifier { mod tests { use super::*; use solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo}; - use solana_runtime::snapshot_utils::{ArchiveFormat, SnapshotVersion}; + use solana_runtime::{ + snapshot_package::SnapshotType, + snapshot_utils::{ArchiveFormat, SnapshotVersion}, + }; use solana_sdk::{ hash::hash, signature::{Keypair, Signer}, @@ -314,6 +318,7 @@ mod tests { hash, archive_format, snapshot_version, + SnapshotType::FullSnapshot, ); AccountsHashVerifier::process_snapshot_package( diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index b5e379219f..29a39e0363 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,20 +1,18 @@ use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; use solana_runtime::{ - snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::SnapshotPackage, + snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_package::PendingSnapshotPackage, snapshot_utils, }; use solana_sdk::{clock::Slot, hash::Hash}; use std::{ sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, + Arc, }, thread::{self, Builder, JoinHandle}, time::Duration, }; -pub type PendingSnapshotPackage = Arc>>; - pub struct SnapshotPackagerService { t_snapshot_packager: JoinHandle<()>, } @@ -45,18 +43,21 @@ impl SnapshotPackagerService { let snapshot_package = pending_snapshot_package.lock().unwrap().take(); if let Some(snapshot_package) = snapshot_package { - if let Err(err) = snapshot_utils::archive_snapshot_package( + match snapshot_utils::archive_snapshot_package( &snapshot_package, maximum_snapshots_to_retain, ) { - warn!("Failed to create snapshot archive: {}", err); - } else { - hashes.push((snapshot_package.slot(), *snapshot_package.hash())); - while hashes.len() > MAX_SNAPSHOT_HASHES { - hashes.remove(0); + Ok(_) => { + hashes.push((snapshot_package.slot(), *snapshot_package.hash())); + while hashes.len() > MAX_SNAPSHOT_HASHES { + hashes.remove(0); + } + cluster_info.push_snapshot_hashes(hashes.clone()); } - cluster_info.push_snapshot_hashes(hashes.clone()); - } + Err(err) => { + warn!("Failed to create snapshot archive: {}", err); + } + }; } else { std::thread::sleep(Duration::from_millis(100)); } @@ -81,7 +82,7 @@ mod tests { use solana_runtime::{ accounts_db::AccountStorageEntry, bank::BankSlotDelta, - snapshot_package::SnapshotPackage, + snapshot_package::{SnapshotPackage, SnapshotType}, snapshot_utils::{self, ArchiveFormat, SnapshotVersion, SNAPSHOT_STATUS_CACHE_FILE_NAME}, }; use solana_sdk::hash::Hash; @@ -175,6 +176,7 @@ mod tests { Hash::default(), ArchiveFormat::TarBzip2, SnapshotVersion::default(), + SnapshotType::FullSnapshot, ); // Make tarball from packageable snapshot diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 9e1737d3ff..379063ac26 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -21,7 +21,6 @@ use crate::{ shred_fetch_stage::ShredFetchStage, sigverify_shreds::ShredSigVerifier, sigverify_stage::SigVerifyStage, - snapshot_packager_service::PendingSnapshotPackage, tower_storage::TowerStorage, voting_service::VotingService, }; @@ -45,6 +44,7 @@ use solana_runtime::{ bank_forks::BankForks, commitment::BlockCommitmentCache, snapshot_config::SnapshotConfig, + snapshot_package::PendingSnapshotPackage, vote_sender_types::ReplayVoteSender, }; use solana_sdk::{pubkey::Pubkey, signature::Keypair}; diff --git a/core/src/validator.rs b/core/src/validator.rs index c328b1cc75..c9c42809e6 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -13,7 +13,7 @@ use { serve_repair::ServeRepair, serve_repair_service::ServeRepairService, sigverify, - snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}, + snapshot_packager_service::SnapshotPackagerService, tower_storage::TowerStorage, tpu::{Tpu, DEFAULT_TPU_COALESCE_MS}, tvu::{Sockets, Tvu, TvuConfig}, @@ -65,6 +65,7 @@ use { hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, + snapshot_package::PendingSnapshotPackage, snapshot_utils, }, solana_sdk::{ diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 9e291469fd..967effff73 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -52,7 +52,7 @@ mod tests { use log::{info, trace}; use solana_core::{ accounts_hash_verifier::AccountsHashVerifier, - snapshot_packager_service::{PendingSnapshotPackage, SnapshotPackagerService}, + snapshot_packager_service::SnapshotPackagerService, }; use solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}; use solana_runtime::{ @@ -66,7 +66,7 @@ mod tests { genesis_utils::{create_genesis_config, GenesisConfigInfo}, snapshot_archive_info::FullSnapshotArchiveInfo, snapshot_config::SnapshotConfig, - snapshot_package::AccountsPackage, + snapshot_package::{AccountsPackage, PendingSnapshotPackage}, snapshot_utils::{ self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }, diff --git a/runtime/src/snapshot_package.rs b/runtime/src/snapshot_package.rs index 21b4c5adc7..2038e65d78 100644 --- a/runtime/src/snapshot_package.rs +++ b/runtime/src/snapshot_package.rs @@ -16,14 +16,26 @@ use solana_sdk::hash::Hash; use std::{ fs, path::{Path, PathBuf}, - sync::mpsc::{Receiver, SendError, Sender}, + sync::{ + mpsc::{Receiver, SendError, Sender}, + Arc, Mutex, + }, }; use tempfile::TempDir; +/// The sender side of the AccountsPackage channel, used by AccountsBackgroundService pub type AccountsPackageSender = Sender; + +/// The receiver side of the AccountsPackage channel, used by AccountsHashVerifier pub type AccountsPackageReceiver = Receiver; + +/// The error type when sending an AccountsPackage over the channel fails pub type AccountsPackageSendError = SendError; +/// The PendingSnapshotPackage passes a SnapshotPackage from AccountsHashVerifier to +/// SnapshotPackagerService for archiving +pub type PendingSnapshotPackage = Arc>>; + #[derive(Debug)] pub struct AccountsPackage { pub slot: Slot, @@ -176,9 +188,11 @@ pub struct SnapshotPackage { pub snapshot_links: TempDir, pub storages: SnapshotStorages, pub snapshot_version: SnapshotVersion, + pub snapshot_type: SnapshotType, } impl SnapshotPackage { + #[allow(clippy::too_many_arguments)] pub fn new( slot: Slot, block_height: u64, @@ -189,6 +203,7 @@ impl SnapshotPackage { hash: Hash, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, + snapshot_type: SnapshotType, ) -> Self { Self { snapshot_archive_info: SnapshotArchiveInfo { @@ -202,6 +217,7 @@ impl SnapshotPackage { snapshot_links, storages, snapshot_version, + snapshot_type, } } } @@ -211,3 +227,19 @@ impl SnapshotArchiveInfoGetter for SnapshotPackage { &self.snapshot_archive_info } } + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SnapshotType { + FullSnapshot, + IncrementalSnapshot, +} + +impl SnapshotType { + /// Get the string prefix of the snapshot type + pub fn to_prefix(&self) -> &'static str { + match self { + SnapshotType::FullSnapshot => TMP_FULL_SNAPSHOT_PREFIX, + SnapshotType::IncrementalSnapshot => TMP_INCREMENTAL_SNAPSHOT_PREFIX, + } + } +} diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 6100009d52..140820fa03 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -14,6 +14,7 @@ use { }, snapshot_package::{ AccountsPackage, AccountsPackageSendError, AccountsPackageSender, SnapshotPackage, + SnapshotType, }, sorted_storages::SortedStorages, }, @@ -239,10 +240,10 @@ pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: &Path) { } } -/// Make a full snapshot archive out of the snapshot package +/// Make a snapshot archive out of the snapshot package pub fn archive_snapshot_package( snapshot_package: &SnapshotPackage, - maximum_snapshots_to_retain: usize, + maximum_snapshot_archives_to_retain: usize, ) -> Result<()> { info!( "Generating snapshot archive for slot {}", @@ -268,10 +269,11 @@ pub fn archive_snapshot_package( .map_err(|e| SnapshotError::IoWithSource(e, "create archive path"))?; // Create the staging directories + let staging_dir_prefix = snapshot_package.snapshot_type.to_prefix(); let staging_dir = tempfile::Builder::new() .prefix(&format!( "{}{}-", - TMP_FULL_SNAPSHOT_PREFIX, + staging_dir_prefix, snapshot_package.slot() )) .tempdir_in(tar_dir) @@ -323,7 +325,7 @@ pub fn archive_snapshot_package( // Tar the staging directory into the archive at `archive_path` let archive_path = tar_dir.join(format!( "{}{}.{}", - TMP_FULL_SNAPSHOT_PREFIX, + staging_dir_prefix, snapshot_package.slot(), file_ext )); @@ -371,7 +373,7 @@ pub fn archive_snapshot_package( fs::rename(&archive_path, &snapshot_package.path()) .map_err(|e| SnapshotError::IoWithSource(e, "archive path rename"))?; - purge_old_snapshot_archives(tar_dir, maximum_snapshots_to_retain); + purge_old_snapshot_archives(tar_dir, maximum_snapshot_archives_to_retain); timer.stop(); info!( @@ -1794,6 +1796,11 @@ pub fn process_accounts_package( ), }; + let snapshot_type = match incremental_snapshot_base_slot { + None => SnapshotType::FullSnapshot, + Some(_) => SnapshotType::IncrementalSnapshot, + }; + SnapshotPackage::new( accounts_package.slot, accounts_package.block_height, @@ -1804,6 +1811,7 @@ pub fn process_accounts_package( hash, accounts_package.archive_format, accounts_package.snapshot_version, + snapshot_type, ) }