use crate::snapshot_package::SnapshotPackage; use bincode::{deserialize_from, serialize_into}; use bzip2::bufread::BzDecoder; use fs_extra::dir::CopyOptions; use log::*; use solana_measure::measure::Measure; use solana_runtime::{bank::Bank, status_cache::SlotDelta}; use solana_sdk::{clock::Slot, transaction}; use std::{ cmp::Ordering, fs, fs::File, io::{BufReader, BufWriter, Error as IOError, ErrorKind}, path::{Path, PathBuf}, }; use tar::Archive; use thiserror::Error; pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; pub const TAR_SNAPSHOTS_DIR: &str = "snapshots"; pub const TAR_ACCOUNTS_DIR: &str = "accounts"; #[derive(PartialEq, Ord, Eq, Debug)] pub struct SlotSnapshotPaths { pub slot: Slot, pub snapshot_file_path: PathBuf, } #[derive(Error, Debug)] pub enum SnapshotError { #[error("I/O error")] IO(#[from] std::io::Error), #[error("serialization error")] Serialize(#[from] Box), #[error("file system error")] FsExtra(#[from] fs_extra::error::Error), } pub type Result = std::result::Result; impl PartialOrd for SlotSnapshotPaths { fn partial_cmp(&self, other: &Self) -> Option { Some(self.slot.cmp(&other.slot)) } } impl SlotSnapshotPaths { fn copy_snapshot_directory>(&self, snapshot_hardlink_dir: P) -> Result<()> { // Create a new directory in snapshot_hardlink_dir let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(self.slot.to_string()); let _ = fs::remove_dir_all(&new_slot_hardlink_dir); fs::create_dir_all(&new_slot_hardlink_dir)?; // Copy the snapshot fs::copy( &self.snapshot_file_path, &new_slot_hardlink_dir.join(self.slot.to_string()), )?; Ok(()) } } pub fn package_snapshot, Q: AsRef>( bank: &Bank, snapshot_files: &SlotSnapshotPaths, snapshot_package_output_file: P, snapshot_path: Q, slots_to_snapshot: &[Slot], ) -> Result { // Hard link all the snapshots we need for this package let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?; // Get a reference to all the relevant AccountStorageEntries let account_storage_entries: Vec<_> = bank .rc .get_rooted_storage_entries() .into_iter() .filter(|x| x.slot_id() <= bank.slot()) .collect(); // Create a snapshot package info!( "Snapshot for bank: {} has {} account storage entries", bank.slot(), account_storage_entries.len() ); // Any errors from this point on will cause the above SnapshotPackage to drop, clearing // any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir) snapshot_files.copy_snapshot_directory(snapshot_hard_links_dir.path())?; let package = SnapshotPackage::new( bank.slot(), bank.src.slot_deltas(slots_to_snapshot), snapshot_hard_links_dir, account_storage_entries, snapshot_package_output_file.as_ref().to_path_buf(), ); Ok(package) } pub fn get_snapshot_paths>(snapshot_path: P) -> Vec where P: std::fmt::Debug, { match fs::read_dir(&snapshot_path) { Ok(paths) => { let mut names = paths .filter_map(|entry| { entry.ok().and_then(|e| { e.path() .file_name() .and_then(|n| n.to_str().map(|s| s.parse::().ok())) .unwrap_or(None) }) }) .map(|slot| { let snapshot_path = snapshot_path.as_ref().join(slot.to_string()); SlotSnapshotPaths { slot, snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)), } }) .collect::>(); names.sort(); names } Err(err) => { info!( "Unable to read snapshot directory {:?}: {}", snapshot_path, err ); vec![] } } } pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result<()> { bank.purge_zero_lamport_accounts(); let slot = bank.slot(); // snapshot_path/slot let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot); fs::create_dir_all(slot_snapshot_dir.clone())?; // the snapshot is stored as snapshot_path/slot/slot let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); info!( "creating snapshot {}, path: {:?}", bank.slot(), snapshot_file_path, ); let snapshot_file = File::create(&snapshot_file_path)?; // snapshot writer let mut snapshot_stream = BufWriter::new(snapshot_file); // Create the snapshot serialize_into(&mut snapshot_stream, &*bank)?; let mut bank_rc_serialize = Measure::start("create snapshot"); serialize_into(&mut snapshot_stream, &bank.rc)?; bank_rc_serialize.stop(); inc_new_counter_info!("bank-rc-serialize-ms", bank_rc_serialize.as_ms() as usize); info!( "{} for slot {} at {:?}", bank_rc_serialize, bank.slot(), snapshot_file_path, ); Ok(()) } pub fn remove_snapshot>(slot: Slot, snapshot_path: P) -> Result<()> { let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot); // Remove the snapshot directory for this slot fs::remove_dir_all(slot_snapshot_dir)?; Ok(()) } pub fn bank_slot_from_archive>(snapshot_tar: P) -> Result { let tempdir = tempfile::TempDir::new()?; untar_snapshot_in(&snapshot_tar, &tempdir)?; let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR); let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); let last_root_paths = snapshot_paths .last() .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; let file = File::open(&last_root_paths.snapshot_file_path)?; let mut stream = BufReader::new(file); let bank: Bank = deserialize_from(&mut stream)?; Ok(bank.slot()) } pub fn bank_from_archive>( account_paths: &[PathBuf], snapshot_path: &PathBuf, snapshot_tar: P, ) -> Result { // Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()` let unpack_dir = tempfile::tempdir_in(snapshot_path)?; untar_snapshot_in(&snapshot_tar, &unpack_dir)?; let mut measure = Measure::start("bank rebuild from snapshot"); let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR); let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR); let bank = rebuild_bank_from_snapshots( account_paths, &unpacked_snapshots_dir, unpacked_accounts_dir, )?; if !bank.verify_snapshot_bank() { panic!("Snapshot bank failed to verify"); } measure.stop(); info!("{}", measure); // Move the unpacked snapshots into `snapshot_path` let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| { panic!( "Invalid snapshot path {:?}: {}", unpacked_snapshots_dir, err ) }); let paths: Vec = dir_files .filter_map(|entry| entry.ok().map(|e| e.path())) .collect(); let mut copy_options = CopyOptions::new(); copy_options.overwrite = true; fs_extra::move_items(&paths, &snapshot_path, ©_options)?; Ok(bank) } pub fn get_snapshot_tar_path>(snapshot_output_dir: P) -> PathBuf { snapshot_output_dir.as_ref().join("snapshot.tar.bz2") } pub fn untar_snapshot_in, Q: AsRef>( snapshot_tar: P, unpack_dir: Q, ) -> Result<()> { let mut measure = Measure::start("snapshot untar"); let tar_bz2 = File::open(snapshot_tar)?; let tar = BzDecoder::new(BufReader::new(tar_bz2)); let mut archive = Archive::new(tar); archive.unpack(&unpack_dir)?; measure.stop(); info!("{}", measure); Ok(()) } fn rebuild_bank_from_snapshots

( local_account_paths: &[PathBuf], unpacked_snapshots_dir: &PathBuf, append_vecs_path: P, ) -> Result where P: AsRef, { let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); if snapshot_paths.len() > 1 { return Err(get_io_error("invalid snapshot format")); } let root_paths = snapshot_paths .pop() .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; // Rebuild the root bank info!("Loading from {:?}", &root_paths.snapshot_file_path); let file = File::open(&root_paths.snapshot_file_path)?; let mut stream = BufReader::new(file); let bank: Bank = deserialize_from(&mut stream)?; // Rebuild accounts bank.rc .accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?; // Rebuild status cache let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME); let status_cache = File::open(status_cache_path)?; let mut stream = BufReader::new(status_cache); let slot_deltas: Vec>> = deserialize_from(&mut stream).unwrap_or_default(); bank.src.append(&slot_deltas); Ok(bank) } fn get_snapshot_file_name(slot: Slot) -> String { slot.to_string() } fn get_bank_snapshot_dir>(path: P, slot: Slot) -> PathBuf { path.as_ref().join(slot.to_string()) } fn get_io_error(error: &str) -> SnapshotError { warn!("Snapshot Error: {:?}", error); SnapshotError::IO(IOError::new(ErrorKind::Other, error)) } pub fn verify_snapshot_tar(snapshot_tar: P, snapshots_to_verify: Q, storages_to_verify: R) where P: AsRef, Q: AsRef, R: AsRef, { let temp_dir = tempfile::TempDir::new().unwrap(); let unpack_dir = temp_dir.path(); untar_snapshot_in(snapshot_tar, &unpack_dir).unwrap(); // Check snapshots are the same let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR); assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap()); // Check the account entries are the same let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR); assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap()); }