From ce53b84cdc42ec1ac7cfecb7b32fd6d96af751b2 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Tue, 29 Jun 2021 18:26:15 -0500 Subject: [PATCH] refactor untar_snapshot_in to push parallelism deeper for further refactoring (#18310) --- runtime/src/snapshot_utils.rs | 117 +++++++++++++++++++++------------- 1 file changed, 74 insertions(+), 43 deletions(-) diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 29896bb561..eb0551ff82 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -599,6 +599,9 @@ pub struct BankFromArchiveTimings { pub verify_snapshot_bank_us: u64, } +// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later. +pub const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4; + #[allow(clippy::too_many_arguments)] pub fn bank_from_archive + std::marker::Sync>( account_paths: &[PathBuf], @@ -620,26 +623,17 @@ pub fn bank_from_archive + std::marker::Sync>( .tempdir_in(snapshot_path)?; let mut untar = Measure::start("snapshot untar"); - // From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later. - let divisions = std::cmp::min(4, std::cmp::max(1, num_cpus::get() / 4)); - // create 'divisions' # of parallel workers, each responsible for 1/divisions of all the files to extract. - let all_unpacked_append_vec_map = (0..divisions) - .into_par_iter() - .map(|index| { - untar_snapshot_in( - &snapshot_tar, - unpack_dir.as_ref(), - account_paths, - archive_format, - Some(ParallelSelector { index, divisions }), - ) - }) - .collect::>(); - let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); - for h in all_unpacked_append_vec_map { - unpacked_append_vec_map.extend(h?); - } - + let divisions = std::cmp::min( + PARALLEL_UNTAR_READERS_DEFAULT, + std::cmp::max(1, num_cpus::get() / 4), + ); + let unpacked_append_vec_map = untar_snapshot_in( + &snapshot_tar, + unpack_dir.as_ref(), + account_paths, + archive_format, + divisions, + )?; untar.stop(); info!("{}", untar); @@ -783,35 +777,72 @@ pub fn purge_old_snapshot_archives>( } } +fn unpack_snapshot_local T>( + reader: F, + ledger_dir: &Path, + account_paths: &[PathBuf], + parallel_archivers: usize, +) -> Result { + assert!(parallel_archivers > 0); + let readers = (0..parallel_archivers) + .into_iter() + .map(|_| reader()) + .collect::>(); + + // create 'parallel_archivers' # of parallel workers, each responsible for 1/parallel_archivers of all the files to extract. + let all_unpacked_append_vec_map = readers + .into_par_iter() + .enumerate() + .map(|(index, reader)| { + let parallel_selector = Some(ParallelSelector { + index, + divisions: parallel_archivers, + }); + let mut archive = Archive::new(reader); + unpack_snapshot(&mut archive, ledger_dir, account_paths, parallel_selector) + }) + .collect::>(); + let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); + for h in all_unpacked_append_vec_map { + unpacked_append_vec_map.extend(h?); + } + + Ok(unpacked_append_vec_map) +} + fn untar_snapshot_in>( snapshot_tar: P, unpack_dir: &Path, account_paths: &[PathBuf], archive_format: ArchiveFormat, - parallel_selector: Option, + parallel_divisions: usize, ) -> Result { - let tar_name = File::open(&snapshot_tar)?; + let open_file = || File::open(&snapshot_tar).unwrap(); let account_paths_map = match archive_format { - ArchiveFormat::TarBzip2 => { - let tar = BzDecoder::new(BufReader::new(tar_name)); - let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)? - } - ArchiveFormat::TarGzip => { - let tar = GzDecoder::new(BufReader::new(tar_name)); - let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)? - } - ArchiveFormat::TarZstd => { - let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?; - let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)? - } - ArchiveFormat::Tar => { - let tar = BufReader::new(tar_name); - let mut archive = Archive::new(tar); - unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)? - } + ArchiveFormat::TarBzip2 => unpack_snapshot_local( + || BzDecoder::new(BufReader::new(open_file())), + unpack_dir, + account_paths, + parallel_divisions, + )?, + ArchiveFormat::TarGzip => unpack_snapshot_local( + || GzDecoder::new(BufReader::new(open_file())), + unpack_dir, + account_paths, + parallel_divisions, + )?, + ArchiveFormat::TarZstd => unpack_snapshot_local( + || zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(), + unpack_dir, + account_paths, + parallel_divisions, + )?, + ArchiveFormat::Tar => unpack_snapshot_local( + || BufReader::new(open_file()), + unpack_dir, + account_paths, + parallel_divisions, + )?, }; Ok(account_paths_map) } @@ -929,7 +960,7 @@ pub fn verify_snapshot_archive( unpack_dir, &[unpack_dir.to_path_buf()], archive_format, - None, + 1, ) .unwrap();