diff --git a/Cargo.lock b/Cargo.lock index 22e86737ba..310faf2d20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4005,6 +4005,7 @@ dependencies = [ "solana-sdk 0.23.0", "solana-stake-program 0.23.0", "solana-vote-program 0.23.0", + "symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "sys-info 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)", "tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 8546a815a2..f36150f313 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -1,15 +1,7 @@ use solana_ledger::{ - snapshot_package::{SnapshotPackage, SnapshotPackageReceiver}, - snapshot_utils::{ - serialize_status_cache, SnapshotError, TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR, - TAR_VERSION_FILE, - }, + snapshot_package::SnapshotPackageReceiver, snapshot_utils::archive_snapshot_package, }; -use solana_measure::measure::Measure; -use solana_metrics::datapoint_info; use std::{ - fs, - process::ExitStatus, sync::{ atomic::{AtomicBool, Ordering}, mpsc::RecvTimeoutError, @@ -18,32 +10,6 @@ use std::{ thread::{self, Builder, JoinHandle}, time::Duration, }; -use symlink; -use tempfile::TempDir; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum SnapshotServiceError { - #[error("I/O error")] - IO(#[from] std::io::Error), - - #[error("serialization error")] - Serialize(#[from] Box), - - #[error("receive timeout error")] - RecvTimeoutError(#[from] RecvTimeoutError), - - #[error("snapshot error")] - SnapshotError(#[from] SnapshotError), - - #[error("archive generation failure {0}")] - ArchiveGenerationFailure(ExitStatus), - - #[error("storage path symlink is invalid")] - StoragePathSymlinkInvalid, -} - -type Result = std::result::Result; pub struct SnapshotPackagerService { t_snapshot_packager: JoinHandle<()>, @@ -58,14 +24,19 @@ impl SnapshotPackagerService { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = Self::run(&snapshot_package_receiver) { - match e { - SnapshotServiceError::RecvTimeoutError(RecvTimeoutError::Disconnected) => { - break + + match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) { + Ok(mut snapshot_package) => { + // Only package the latest + while let Ok(new_snapshot_package) = snapshot_package_receiver.try_recv() { + snapshot_package = new_snapshot_package; + } + if let Err(err) = archive_snapshot_package(&snapshot_package) { + warn!("Failed to create snapshot archive: {}", err); } - SnapshotServiceError::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => info!("Error from package_snapshots: {:?}", e), } + Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Timeout) => (), } }) .unwrap(); @@ -74,123 +45,6 @@ impl SnapshotPackagerService { } } - pub fn package_snapshots(snapshot_package: &SnapshotPackage) -> Result<()> { - info!( - "Generating snapshot tarball for root {}", - snapshot_package.root - ); - - serialize_status_cache( - snapshot_package.root, - &snapshot_package.slot_deltas, - &snapshot_package.snapshot_links, - )?; - - let mut timer = Measure::start("snapshot_package-package_snapshots"); - let tar_dir = snapshot_package - .tar_output_file - .parent() - .expect("Tar output path is invalid"); - - fs::create_dir_all(tar_dir)?; - - // Create the staging directories - let staging_dir = TempDir::new()?; - let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR); - let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR); - let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE); - fs::create_dir_all(&staging_accounts_dir)?; - - // Add the snapshots to the staging directory - symlink::symlink_dir( - snapshot_package.snapshot_links.path(), - &staging_snapshots_dir, - )?; - - // Add the AppendVecs into the compressible list - for storage in &snapshot_package.storage_entries { - storage.flush()?; - let storage_path = storage.get_path(); - let output_path = staging_accounts_dir.join( - storage_path - .file_name() - .expect("Invalid AppendVec file path"), - ); - - // `storage_path` - The file path where the AppendVec itself is located - // `output_path` - The directory where the AppendVec will be placed in the staging directory. - let storage_path = - fs::canonicalize(storage_path).expect("Could not get absolute path for accounts"); - symlink::symlink_dir(storage_path, &output_path)?; - if !output_path.is_file() { - return Err(SnapshotServiceError::StoragePathSymlinkInvalid); - } - } - - // Write version file - { - use std::io::Write; - let snapshot_version = format!("{}\n", env!("CARGO_PKG_VERSION")); - let mut f = std::fs::File::create(staging_version_file)?; - //f.write_all(&snapshot_version.to_string().into_bytes())?; - f.write_all(&snapshot_version.into_bytes())?; - } - - // Tar the staging directory into the archive at `archive_path` - let archive_path = tar_dir.join("new_state.tar.bz2"); - let args = vec![ - "jcfhS", - archive_path.to_str().unwrap(), - "-C", - staging_dir.path().to_str().unwrap(), - TAR_ACCOUNTS_DIR, - TAR_SNAPSHOTS_DIR, - TAR_VERSION_FILE, - ]; - - let output = std::process::Command::new("tar").args(&args).output()?; - if !output.status.success() { - warn!("tar command failed with exit code: {}", output.status); - use std::str::from_utf8; - info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); - info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); - - return Err(SnapshotServiceError::ArchiveGenerationFailure( - output.status, - )); - } - - // Once everything is successful, overwrite the previous tarball so that other validators - // can fetch this newly packaged snapshot - let metadata = fs::metadata(&archive_path)?; - fs::rename(&archive_path, &snapshot_package.tar_output_file)?; - - timer.stop(); - info!( - "Successfully created tarball. slot: {}, elapsed ms: {}, size={}", - snapshot_package.root, - timer.as_ms(), - metadata.len() - ); - datapoint_info!( - "snapshot-package", - ("slot", snapshot_package.root, i64), - ("duration_ms", timer.as_ms(), i64), - ("size", metadata.len(), i64) - ); - Ok(()) - } - - fn run(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> { - let mut snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?; - // Only package the latest - while let Ok(new_snapshot_package) = snapshot_receiver.try_recv() { - snapshot_package = new_snapshot_package; - } - Self::package_snapshots(&snapshot_package)?; - Ok(()) - } - pub fn join(self) -> thread::Result<()> { self.t_snapshot_packager.join() } @@ -200,14 +54,17 @@ impl SnapshotPackagerService { mod tests { use super::*; use bincode::serialize_into; - use solana_ledger::snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME}; + use solana_ledger::{ + snapshot_package::SnapshotPackage, + snapshot_utils::{self, SNAPSHOT_STATUS_CACHE_FILE_NAME}, + }; use solana_runtime::{ accounts_db::AccountStorageEntry, bank::MAX_SNAPSHOT_DATA_FILE_SIZE, status_cache::SlotDelta, }; - use solana_sdk::transaction::Result as TransactionResult; + use solana_sdk::transaction; use std::{ - fs::{remove_dir_all, OpenOptions}, + fs::{self, remove_dir_all, OpenOptions}, io::Write, path::{Path, PathBuf}, }; @@ -286,7 +143,7 @@ mod tests { ); // Make tarball from packageable snapshot - SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); + snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap(); // before we compare, stick an empty status_cache in this dir so that the package comparision works // This is needed since the status_cache is added by the packager and is not collected from @@ -302,7 +159,7 @@ mod tests { ) .unwrap(); - // Check tarball is correct - snapshot_utils::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir); + // Check archive is correct + snapshot_utils::verify_snapshot_archive(output_tar_path, snapshots_dir, accounts_dir); } } diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index da301a6fa5..c3a3653c89 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -140,7 +140,8 @@ mod tests { &last_bank.src.roots(), ) .unwrap(); - SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); + + snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap(); restore_from_snapshot(bank_forks, vec![accounts_dir.path().to_path_buf()]); } @@ -322,7 +323,7 @@ mod tests { ) .unwrap(); - snapshot_utils::verify_snapshot_tar( + snapshot_utils::verify_snapshot_archive( saved_tar, saved_snapshots_dir.path(), saved_accounts_dir diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 80e23462f6..c13f452797 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -43,6 +43,7 @@ solana-sdk = { path = "../sdk", version = "0.23.0" } solana-stake-program = { path = "../programs/stake", version = "0.23.0" } solana-vote-program = { path = "../programs/vote", version = "0.23.0" } sys-info = "0.5.8" +symlink = "0.1.0" tar = "0.4.26" thiserror = "1.0" tempfile = "3.1.0" diff --git a/ledger/src/snapshot_utils.rs b/ledger/src/snapshot_utils.rs index 9bd099255f..b414c955a6 100644 --- a/ledger/src/snapshot_utils.rs +++ b/ledger/src/snapshot_utils.rs @@ -12,10 +12,10 @@ use solana_sdk::transaction::Result as TransactionResult; use solana_sdk::{clock::Slot, transaction}; use std::{ cmp::Ordering, - fs, - fs::File, + fs::{self, File}, io::{BufReader, BufWriter, Error as IOError, ErrorKind, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, + process::ExitStatus, }; use tar::Archive; use tempfile::TempDir; @@ -42,6 +42,12 @@ pub enum SnapshotError { #[error("file system error")] FsExtra(#[from] fs_extra::error::Error), + + #[error("archive generation failure {0}")] + ArchiveGenerationFailure(ExitStatus), + + #[error("storage path symlink is invalid")] + StoragePathSymlinkInvalid, } pub type Result = std::result::Result; @@ -107,6 +113,110 @@ pub fn package_snapshot, Q: AsRef>( Ok(package) } +pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<()> { + info!( + "Generating snapshot archive for slot {}", + snapshot_package.root + ); + + serialize_status_cache( + snapshot_package.root, + &snapshot_package.slot_deltas, + &snapshot_package.snapshot_links, + )?; + + let mut timer = Measure::start("snapshot_package-package_snapshots"); + let tar_dir = snapshot_package + .tar_output_file + .parent() + .expect("Tar output path is invalid"); + + fs::create_dir_all(tar_dir)?; + + // Create the staging directories + let staging_dir = TempDir::new()?; + let staging_accounts_dir = staging_dir.path().join(TAR_ACCOUNTS_DIR); + let staging_snapshots_dir = staging_dir.path().join(TAR_SNAPSHOTS_DIR); + let staging_version_file = staging_dir.path().join(TAR_VERSION_FILE); + fs::create_dir_all(&staging_accounts_dir)?; + + // Add the snapshots to the staging directory + symlink::symlink_dir( + snapshot_package.snapshot_links.path(), + &staging_snapshots_dir, + )?; + + // Add the AppendVecs into the compressible list + for storage in &snapshot_package.storage_entries { + storage.flush()?; + let storage_path = storage.get_path(); + let output_path = staging_accounts_dir.join( + storage_path + .file_name() + .expect("Invalid AppendVec file path"), + ); + + // `storage_path` - The file path where the AppendVec itself is located + // `output_path` - The directory where the AppendVec will be placed in the staging directory. + let storage_path = + fs::canonicalize(storage_path).expect("Could not get absolute path for accounts"); + symlink::symlink_dir(storage_path, &output_path)?; + if !output_path.is_file() { + return Err(SnapshotError::StoragePathSymlinkInvalid); + } + } + + // Write version file + { + let snapshot_version = format!("{}\n", env!("CARGO_PKG_VERSION")); + let mut f = std::fs::File::create(staging_version_file)?; + //f.write_all(&snapshot_version.to_string().into_bytes())?; + f.write_all(&snapshot_version.into_bytes())?; + } + + // Tar the staging directory into the archive at `archive_path` + let archive_path = tar_dir.join("new_state.tar.bz2"); + let args = vec![ + "jcfhS", + archive_path.to_str().unwrap(), + "-C", + staging_dir.path().to_str().unwrap(), + TAR_ACCOUNTS_DIR, + TAR_SNAPSHOTS_DIR, + TAR_VERSION_FILE, + ]; + + let output = std::process::Command::new("tar").args(&args).output()?; + if !output.status.success() { + warn!("tar command failed with exit code: {}", output.status); + use std::str::from_utf8; + info!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?")); + info!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?")); + + return Err(SnapshotError::ArchiveGenerationFailure(output.status)); + } + + // Once everything is successful, overwrite the previous tarball so that other validators + // can fetch this newly packaged snapshot + let metadata = fs::metadata(&archive_path)?; + fs::rename(&archive_path, &snapshot_package.tar_output_file)?; + + timer.stop(); + info!( + "Successfully created tarball. slot: {}, elapsed ms: {}, size={}", + snapshot_package.root, + timer.as_ms(), + metadata.len() + ); + datapoint_info!( + "snapshot-package", + ("slot", snapshot_package.root, i64), + ("duration_ms", timer.as_ms(), i64), + ("size", metadata.len(), i64) + ); + Ok(()) +} + pub fn get_snapshot_paths>(snapshot_path: P) -> Vec where P: std::fmt::Debug, @@ -469,8 +579,11 @@ fn get_io_error(error: &str) -> SnapshotError { SnapshotError::IO(IOError::new(ErrorKind::Other, error)) } -pub fn verify_snapshot_tar(snapshot_tar: P, snapshots_to_verify: Q, storages_to_verify: R) -where +pub fn verify_snapshot_archive( + snapshot_tar: P, + snapshots_to_verify: Q, + storages_to_verify: R, +) where P: AsRef, Q: AsRef, R: AsRef,