From 31ec986ea141bdc9bc0448a5c049935fda1bc0f0 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Thu, 24 Jun 2021 17:29:49 -0500 Subject: [PATCH] untar in parallel (#18184) * untar in parallel * make enum for 'ignore' return value --- runtime/src/hardened_unpack.rs | 55 +++++++++++++++++++++++++++------- runtime/src/snapshot_utils.rs | 48 ++++++++++++++++++----------- 2 files changed, 76 insertions(+), 27 deletions(-) diff --git a/runtime/src/hardened_unpack.rs b/runtime/src/hardened_unpack.rs index 03f213417e..fdaa424f6c 100644 --- a/runtime/src/hardened_unpack.rs +++ b/runtime/src/hardened_unpack.rs @@ -84,6 +84,12 @@ fn check_unpack_result(unpack_result: bool, path: String) -> Result<()> { Ok(()) } +pub enum UnpackPath<'a> { + Valid(&'a Path), + Ignore, + Invalid, +} + fn unpack_archive<'a, A: Read, C>( archive: &mut Archive, apparent_limit_size: u64, @@ -92,7 +98,7 @@ fn unpack_archive<'a, A: Read, C>( mut entry_checker: C, ) -> Result<()> where - C: FnMut(&[&str], tar::EntryType) -> Option<&'a Path>, + C: FnMut(&[&str], tar::EntryType) -> UnpackPath<'a>, { let mut apparent_total_size: u64 = 0; let mut actual_total_size: u64 = 0; @@ -130,14 +136,17 @@ where let parts: Vec<_> = parts.map(|p| p.unwrap()).collect(); let unpack_dir = match entry_checker(parts.as_slice(), kind) { - None => { + UnpackPath::Invalid => { return Err(UnpackError::Archive(format!( "extra entry found: {:?} {:?}", path_str, entry.header().entry_type(), ))); } - Some(unpack_dir) => unpack_dir, + UnpackPath::Ignore => { + continue; + } + UnpackPath::Valid(unpack_dir) => unpack_dir, }; apparent_total_size = checked_total_size_sum( @@ -193,13 +202,27 @@ where /// Map from AppendVec file name to unpacked file system location pub type UnpackedAppendVecMap = HashMap; +// select/choose only 'index' out of each # of 'divisions' of total items. +pub struct ParallelSelector { + pub index: usize, + pub divisions: usize, +} + +impl ParallelSelector { + pub fn select_index(&self, index: usize) -> bool { + index % self.divisions == self.index + } +} + pub fn unpack_snapshot( archive: &mut Archive, ledger_dir: &Path, account_paths: &[PathBuf], + parallel_selector: Option, ) -> Result { assert!(!account_paths.is_empty()); let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); + let mut i = 0; unpack_archive( archive, @@ -208,19 +231,31 @@ pub fn unpack_snapshot( MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT, |parts, kind| { if is_valid_snapshot_archive_entry(parts, kind) { + i += 1; + match ¶llel_selector { + Some(parallel_selector) => { + if !parallel_selector.select_index(i - 1) { + return UnpackPath::Ignore; + } + } + None => {} + }; if let ["accounts", file] = parts { // Randomly distribute the accounts files about the available `account_paths`, let path_index = thread_rng().gen_range(0, account_paths.len()); - account_paths.get(path_index).map(|path_buf| { + match account_paths.get(path_index).map(|path_buf| { unpacked_append_vec_map .insert(file.to_string(), path_buf.join("accounts").join(file)); path_buf.as_path() - }) + }) { + Some(path) => UnpackPath::Valid(path), + None => UnpackPath::Invalid, + } } else { - Some(ledger_dir) + UnpackPath::Valid(ledger_dir) } } else { - None + UnpackPath::Invalid } }, ) @@ -337,9 +372,9 @@ fn unpack_genesis( MAX_GENESIS_ARCHIVE_UNPACKED_COUNT, |p, k| { if is_valid_genesis_archive_entry(p, k) { - Some(unpack_dir) + UnpackPath::Valid(unpack_dir) } else { - None + UnpackPath::Invalid } }, ) @@ -530,7 +565,7 @@ mod tests { fn finalize_and_unpack_snapshot(archive: tar::Builder>) -> Result<()> { with_finalize_and_unpack(archive, |a, b| { - unpack_snapshot(a, b, &[PathBuf::new()]).map(|_| ()) + unpack_snapshot(a, b, &[PathBuf::new()], None).map(|_| ()) }) } diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 62f49c27ed..29896bb561 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -4,7 +4,7 @@ use { accounts_index::AccountSecondaryIndexes, bank::{Bank, BankSlotDelta, Builtins}, bank_forks::ArchiveFormat, - hardened_unpack::{unpack_snapshot, UnpackError, UnpackedAppendVecMap}, + hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap}, serde_snapshot::{ bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages, }, @@ -17,7 +17,7 @@ use { bzip2::bufread::BzDecoder, flate2::read::GzDecoder, log::*, - rayon::ThreadPool, + rayon::{prelude::*, ThreadPool}, regex::Regex, solana_measure::measure::Measure, solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey}, @@ -600,7 +600,7 @@ pub struct BankFromArchiveTimings { } #[allow(clippy::too_many_arguments)] -pub fn bank_from_archive>( +pub fn bank_from_archive + std::marker::Sync>( account_paths: &[PathBuf], frozen_account_pubkeys: &[Pubkey], snapshot_path: &Path, @@ -619,14 +619,29 @@ pub fn bank_from_archive>( .prefix(TMP_SNAPSHOT_PREFIX) .tempdir_in(snapshot_path)?; - let mut untar = Measure::start("untar"); - let unpacked_append_vec_map = untar_snapshot_in( - &snapshot_tar, - unpack_dir.as_ref(), - account_paths, - archive_format, - )?; + let mut untar = Measure::start("snapshot untar"); + // From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later. + let divisions = std::cmp::min(4, std::cmp::max(1, num_cpus::get() / 4)); + // create 'divisions' # of parallel workers, each responsible for 1/divisions of all the files to extract. + let all_unpacked_append_vec_map = (0..divisions) + .into_par_iter() + .map(|index| { + untar_snapshot_in( + &snapshot_tar, + unpack_dir.as_ref(), + account_paths, + archive_format, + Some(ParallelSelector { index, divisions }), + ) + }) + .collect::>(); + let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); + for h in all_unpacked_append_vec_map { + unpacked_append_vec_map.extend(h?); + } + untar.stop(); + info!("{}", untar); let mut measure = Measure::start("bank rebuild from snapshot"); let unpacked_snapshots_dir = unpack_dir.as_ref().join("snapshots"); @@ -773,33 +788,31 @@ fn untar_snapshot_in>( unpack_dir: &Path, account_paths: &[PathBuf], archive_format: ArchiveFormat, + parallel_selector: Option, ) -> Result { - let mut measure = Measure::start("snapshot untar"); let tar_name = File::open(&snapshot_tar)?; let account_paths_map = match archive_format { ArchiveFormat::TarBzip2 => { let tar = BzDecoder::new(BufReader::new(tar_name)); let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir, account_paths)? + unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)? } ArchiveFormat::TarGzip => { let tar = GzDecoder::new(BufReader::new(tar_name)); let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir, account_paths)? + unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)? } ArchiveFormat::TarZstd => { let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?; let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir, account_paths)? + unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)? } ArchiveFormat::Tar => { let tar = BufReader::new(tar_name); let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir, account_paths)? + unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)? } }; - measure.stop(); - info!("{}", measure); Ok(account_paths_map) } @@ -916,6 +929,7 @@ pub fn verify_snapshot_archive( unpack_dir, &[unpack_dir.to_path_buf()], archive_format, + None, ) .unwrap();