Add helper fn to create shared buffer - simplify archive format matching (#26674)
This commit is contained in:
parent
84c8cfe901
commit
e7cd6daebe
|
@ -1492,15 +1492,13 @@ pub fn purge_old_snapshot_archives(
|
|||
}
|
||||
}
|
||||
|
||||
fn unpack_snapshot_local<T: 'static + Read + std::marker::Send, F: Fn() -> T>(
|
||||
reader: F,
|
||||
fn unpack_snapshot_local(
|
||||
shared_buffer: SharedBuffer,
|
||||
ledger_dir: &Path,
|
||||
account_paths: &[PathBuf],
|
||||
parallel_divisions: usize,
|
||||
) -> Result<UnpackedAppendVecMap> {
|
||||
assert!(parallel_divisions > 0);
|
||||
// a shared 'reader' that reads the decompressed stream once, keeps some history, and acts as a reader for multiple parallel archive readers
|
||||
let shared_buffer = SharedBuffer::new(reader());
|
||||
|
||||
// allocate all readers before any readers start reading
|
||||
let readers = (0..parallel_divisions)
|
||||
|
@ -1530,47 +1528,22 @@ fn unpack_snapshot_local<T: 'static + Read + std::marker::Send, F: Fn() -> T>(
|
|||
Ok(unpacked_append_vec_map)
|
||||
}
|
||||
|
||||
fn untar_snapshot_file(
|
||||
fn untar_snapshot_create_shared_buffer(
|
||||
snapshot_tar: &Path,
|
||||
unpack_dir: &Path,
|
||||
account_paths: &[PathBuf],
|
||||
archive_format: ArchiveFormat,
|
||||
parallel_divisions: usize,
|
||||
) -> Result<UnpackedAppendVecMap> {
|
||||
) -> Result<SharedBuffer> {
|
||||
let open_file = || File::open(&snapshot_tar).unwrap();
|
||||
let account_paths_map = match archive_format {
|
||||
ArchiveFormat::TarBzip2 => unpack_snapshot_local(
|
||||
|| BzDecoder::new(BufReader::new(open_file())),
|
||||
unpack_dir,
|
||||
account_paths,
|
||||
parallel_divisions,
|
||||
)?,
|
||||
ArchiveFormat::TarGzip => unpack_snapshot_local(
|
||||
|| GzDecoder::new(BufReader::new(open_file())),
|
||||
unpack_dir,
|
||||
account_paths,
|
||||
parallel_divisions,
|
||||
)?,
|
||||
ArchiveFormat::TarZstd => unpack_snapshot_local(
|
||||
|| zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
|
||||
unpack_dir,
|
||||
account_paths,
|
||||
parallel_divisions,
|
||||
)?,
|
||||
ArchiveFormat::TarLz4 => unpack_snapshot_local(
|
||||
|| lz4::Decoder::new(BufReader::new(open_file())).unwrap(),
|
||||
unpack_dir,
|
||||
account_paths,
|
||||
parallel_divisions,
|
||||
)?,
|
||||
ArchiveFormat::Tar => unpack_snapshot_local(
|
||||
|| BufReader::new(open_file()),
|
||||
unpack_dir,
|
||||
account_paths,
|
||||
parallel_divisions,
|
||||
)?,
|
||||
};
|
||||
Ok(account_paths_map)
|
||||
Ok(match archive_format {
|
||||
ArchiveFormat::TarBzip2 => SharedBuffer::new(BzDecoder::new(BufReader::new(open_file()))),
|
||||
ArchiveFormat::TarGzip => SharedBuffer::new(GzDecoder::new(BufReader::new(open_file()))),
|
||||
ArchiveFormat::TarZstd => SharedBuffer::new(
|
||||
zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
|
||||
),
|
||||
ArchiveFormat::TarLz4 => {
|
||||
SharedBuffer::new(lz4::Decoder::new(BufReader::new(open_file())).unwrap())
|
||||
}
|
||||
ArchiveFormat::Tar => SharedBuffer::new(BufReader::new(open_file())),
|
||||
})
|
||||
}
|
||||
|
||||
fn untar_snapshot_in<P: AsRef<Path>>(
|
||||
|
@ -1580,13 +1553,8 @@ fn untar_snapshot_in<P: AsRef<Path>>(
|
|||
archive_format: ArchiveFormat,
|
||||
parallel_divisions: usize,
|
||||
) -> Result<UnpackedAppendVecMap> {
|
||||
untar_snapshot_file(
|
||||
snapshot_tar.as_ref(),
|
||||
unpack_dir,
|
||||
account_paths,
|
||||
archive_format,
|
||||
parallel_divisions,
|
||||
)
|
||||
let shared_buffer = untar_snapshot_create_shared_buffer(snapshot_tar.as_ref(), archive_format)?;
|
||||
unpack_snapshot_local(shared_buffer, unpack_dir, account_paths, parallel_divisions)
|
||||
}
|
||||
|
||||
fn verify_unpacked_snapshots_dir_and_version(
|
||||
|
|
Loading…
Reference in New Issue