refactor untar_snapshot_in to push parallelism deeper for further refactoring (#18310)

This commit is contained in:
Jeff Washington (jwash) 2021-06-29 18:26:15 -05:00 committed by GitHub
parent 52fd10ce03
commit ce53b84cdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 74 additions and 43 deletions

View File

@ -599,6 +599,9 @@ pub struct BankFromArchiveTimings {
pub verify_snapshot_bank_us: u64,
}
// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
pub const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;
#[allow(clippy::too_many_arguments)]
pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
account_paths: &[PathBuf],
@ -620,26 +623,17 @@ pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
.tempdir_in(snapshot_path)?;
let mut untar = Measure::start("snapshot untar");
// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
let divisions = std::cmp::min(4, std::cmp::max(1, num_cpus::get() / 4));
// create 'divisions' # of parallel workers, each responsible for 1/divisions of all the files to extract.
let all_unpacked_append_vec_map = (0..divisions)
.into_par_iter()
.map(|index| {
untar_snapshot_in(
&snapshot_tar,
unpack_dir.as_ref(),
account_paths,
archive_format,
Some(ParallelSelector { index, divisions }),
)
})
.collect::<Vec<_>>();
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for h in all_unpacked_append_vec_map {
unpacked_append_vec_map.extend(h?);
}
let divisions = std::cmp::min(
PARALLEL_UNTAR_READERS_DEFAULT,
std::cmp::max(1, num_cpus::get() / 4),
);
let unpacked_append_vec_map = untar_snapshot_in(
&snapshot_tar,
unpack_dir.as_ref(),
account_paths,
archive_format,
divisions,
)?;
untar.stop();
info!("{}", untar);
@ -783,35 +777,72 @@ pub fn purge_old_snapshot_archives<P: AsRef<Path>>(
}
}
fn unpack_snapshot_local<T: 'static + Read + std::marker::Send, F: Fn() -> T>(
reader: F,
ledger_dir: &Path,
account_paths: &[PathBuf],
parallel_archivers: usize,
) -> Result<UnpackedAppendVecMap> {
assert!(parallel_archivers > 0);
let readers = (0..parallel_archivers)
.into_iter()
.map(|_| reader())
.collect::<Vec<_>>();
// create 'parallel_archivers' # of parallel workers, each responsible for 1/parallel_archivers of all the files to extract.
let all_unpacked_append_vec_map = readers
.into_par_iter()
.enumerate()
.map(|(index, reader)| {
let parallel_selector = Some(ParallelSelector {
index,
divisions: parallel_archivers,
});
let mut archive = Archive::new(reader);
unpack_snapshot(&mut archive, ledger_dir, account_paths, parallel_selector)
})
.collect::<Vec<_>>();
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for h in all_unpacked_append_vec_map {
unpacked_append_vec_map.extend(h?);
}
Ok(unpacked_append_vec_map)
}
fn untar_snapshot_in<P: AsRef<Path>>(
snapshot_tar: P,
unpack_dir: &Path,
account_paths: &[PathBuf],
archive_format: ArchiveFormat,
parallel_selector: Option<ParallelSelector>,
parallel_divisions: usize,
) -> Result<UnpackedAppendVecMap> {
let tar_name = File::open(&snapshot_tar)?;
let open_file = || File::open(&snapshot_tar).unwrap();
let account_paths_map = match archive_format {
ArchiveFormat::TarBzip2 => {
let tar = BzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarGzip => {
let tar = GzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarZstd => {
let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?;
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::Tar => {
let tar = BufReader::new(tar_name);
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarBzip2 => unpack_snapshot_local(
|| BzDecoder::new(BufReader::new(open_file())),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::TarGzip => unpack_snapshot_local(
|| GzDecoder::new(BufReader::new(open_file())),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::TarZstd => unpack_snapshot_local(
|| zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::Tar => unpack_snapshot_local(
|| BufReader::new(open_file()),
unpack_dir,
account_paths,
parallel_divisions,
)?,
};
Ok(account_paths_map)
}
@ -929,7 +960,7 @@ pub fn verify_snapshot_archive<P, Q, R>(
unpack_dir,
&[unpack_dir.to_path_buf()],
archive_format,
None,
1,
)
.unwrap();