Fix hardlinking across filesystem boundaries (#5449)
* Fix hardlinking across filesystem boundaries * create output dir for snapshot tar
This commit is contained in:
parent
2a17e90b7b
commit
8aa7a851ca
|
@ -293,6 +293,7 @@ impl BankForks {
|
|||
&bank,
|
||||
&slot_snapshot_paths[start..],
|
||||
tar_output_file,
|
||||
&config.snapshot_path,
|
||||
)?;
|
||||
|
||||
// Send the package to the packaging thread
|
||||
|
@ -606,6 +607,7 @@ mod tests {
|
|||
last_bank,
|
||||
&slot_snapshot_paths,
|
||||
snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path),
|
||||
&snapshot_config.snapshot_path,
|
||||
)
|
||||
.unwrap();
|
||||
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
|
||||
|
@ -738,7 +740,7 @@ mod tests {
|
|||
|
||||
let package_sender = {
|
||||
if slot == saved_slot as u64 {
|
||||
// Only send one package on the real sende so that the packaging service
|
||||
// Only send one package on the real sender so that the packaging service
|
||||
// doesn't take forever to run the packaging logic on all MAX_CACHE_ENTRIES
|
||||
// later
|
||||
&sender
|
||||
|
@ -761,7 +763,22 @@ mod tests {
|
|||
if slot == saved_slot as u64 {
|
||||
let options = CopyOptions::new();
|
||||
fs_extra::dir::copy(&accounts_dir, &saved_accounts_dir, &options).unwrap();
|
||||
fs_extra::dir::copy(&snapshots_dir, &saved_snapshots_dir, &options).unwrap();
|
||||
let snapshot_paths: Vec<_> = fs::read_dir(&snapshot_config.snapshot_path)
|
||||
.unwrap()
|
||||
.filter_map(|entry| {
|
||||
let e = entry.unwrap();
|
||||
let file_path = e.path();
|
||||
let file_name = file_path.file_name().unwrap();
|
||||
file_name
|
||||
.to_str()
|
||||
.map(|s| s.parse::<u64>().ok().map(|_| file_path.clone()))
|
||||
.unwrap_or(None)
|
||||
})
|
||||
.collect();
|
||||
|
||||
for snapshot_path in snapshot_paths {
|
||||
fs_extra::dir::copy(&snapshot_path, &saved_snapshots_dir, &options).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -796,9 +813,7 @@ mod tests {
|
|||
// Check the tar we cached the state for earlier was generated correctly
|
||||
snapshot_utils::tests::verify_snapshot_tar(
|
||||
saved_tar,
|
||||
saved_snapshots_dir
|
||||
.path()
|
||||
.join(snapshots_dir.path().file_name().unwrap()),
|
||||
saved_snapshots_dir.path(),
|
||||
saved_accounts_dir
|
||||
.path()
|
||||
.join(accounts_dir.path().file_name().unwrap()),
|
||||
|
|
|
@ -11,6 +11,7 @@ use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
|||
use std::sync::Arc;
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
|
||||
pub type SnapshotPackageSender = Sender<SnapshotPackage>;
|
||||
pub type SnapshotPackageReceiver = Receiver<SnapshotPackage>;
|
||||
|
@ -19,31 +20,25 @@ pub const TAR_SNAPSHOTS_DIR: &str = "snapshots";
|
|||
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
|
||||
|
||||
pub struct SnapshotPackage {
|
||||
snapshot_path: PathBuf,
|
||||
snapshot_links: TempDir,
|
||||
storage_entries: Vec<Arc<AccountStorageEntry>>,
|
||||
tar_output_file: PathBuf,
|
||||
}
|
||||
|
||||
impl SnapshotPackage {
|
||||
pub fn new(
|
||||
snapshot_path: PathBuf,
|
||||
snapshot_links: TempDir,
|
||||
storage_entries: Vec<Arc<AccountStorageEntry>>,
|
||||
tar_output_file: PathBuf,
|
||||
) -> Self {
|
||||
Self {
|
||||
snapshot_path,
|
||||
snapshot_links,
|
||||
storage_entries,
|
||||
tar_output_file,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SnapshotPackage {
|
||||
fn drop(&mut self) {
|
||||
let _ = fs::remove_dir_all(&self.snapshot_path);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SnapshotPackagerService {
|
||||
t_snapshot_packager: JoinHandle<()>,
|
||||
}
|
||||
|
@ -72,11 +67,19 @@ impl SnapshotPackagerService {
|
|||
}
|
||||
|
||||
pub fn package_snapshots(snapshot_package: &SnapshotPackage) -> Result<()> {
|
||||
let tar_dir = snapshot_package
|
||||
.tar_output_file
|
||||
.parent()
|
||||
.expect("Tar output path is invalid");
|
||||
|
||||
fs::create_dir_all(tar_dir)?;
|
||||
|
||||
// Create the tar builder
|
||||
let tar_gz = tempfile::Builder::new()
|
||||
.prefix("new_state")
|
||||
.suffix(".tgz")
|
||||
.tempfile()?;
|
||||
.tempfile_in(tar_dir)?;
|
||||
|
||||
let temp_tar_path = tar_gz.path();
|
||||
let enc = GzEncoder::new(&tar_gz, Compression::default());
|
||||
let mut tar = tar::Builder::new(enc);
|
||||
|
@ -88,9 +91,8 @@ impl SnapshotPackagerService {
|
|||
// that was created to persist those snapshots while this package was being created
|
||||
let res = tar.append_dir_all(
|
||||
tar_output_snapshots_dir,
|
||||
snapshot_package.snapshot_path.as_path(),
|
||||
snapshot_package.snapshot_links.path(),
|
||||
);
|
||||
let _ = fs::remove_dir_all(snapshot_package.snapshot_path.as_path());
|
||||
res?;
|
||||
|
||||
// Add the AppendVecs into the compressible list
|
||||
|
@ -171,11 +173,10 @@ mod tests {
|
|||
.collect();
|
||||
|
||||
// Create directory of hard links for snapshots
|
||||
let link_snapshots_dir = temp_dir.path().join("link_snapshots");
|
||||
fs::create_dir_all(&link_snapshots_dir).unwrap();
|
||||
let link_snapshots_dir = tempfile::tempdir_in(temp_dir.path()).unwrap();
|
||||
for snapshots_path in snapshots_paths {
|
||||
let snapshot_file_name = snapshots_path.file_name().unwrap();
|
||||
let link_path = link_snapshots_dir.join(snapshot_file_name);
|
||||
let link_path = link_snapshots_dir.path().join(snapshot_file_name);
|
||||
fs::hard_link(&snapshots_path, &link_path).unwrap();
|
||||
}
|
||||
|
||||
|
|
|
@ -48,24 +48,16 @@ impl SlotSnapshotPaths {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn package_snapshot<Q: AsRef<Path>>(
|
||||
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
|
||||
bank: &Bank,
|
||||
snapshot_files: &[SlotSnapshotPaths],
|
||||
snapshot_package_output_file: Q,
|
||||
snapshot_package_output_file: P,
|
||||
snapshot_path: Q,
|
||||
) -> Result<SnapshotPackage> {
|
||||
let slot = bank.slot();
|
||||
|
||||
// Hard link all the snapshots we need for this package
|
||||
let snapshot_hard_links_dir = get_snapshots_hardlink_dir_for_package(
|
||||
snapshot_package_output_file
|
||||
.as_ref()
|
||||
.parent()
|
||||
.expect("Invalid output path for tar"),
|
||||
slot,
|
||||
);
|
||||
|
||||
let _ = fs::remove_dir_all(&snapshot_hard_links_dir);
|
||||
fs::create_dir_all(&snapshot_hard_links_dir)?;
|
||||
let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?;
|
||||
|
||||
// Get a reference to all the relevant AccountStorageEntries
|
||||
let account_storage_entries = bank.rc.get_storage_entries();
|
||||
|
@ -76,18 +68,19 @@ pub fn package_snapshot<Q: AsRef<Path>>(
|
|||
slot,
|
||||
account_storage_entries.len()
|
||||
);
|
||||
let package = SnapshotPackage::new(
|
||||
snapshot_hard_links_dir.clone(),
|
||||
account_storage_entries,
|
||||
snapshot_package_output_file.as_ref().to_path_buf(),
|
||||
);
|
||||
|
||||
// Any errors from this point on will cause the above SnapshotPackage to drop, clearing
|
||||
// any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir)
|
||||
for files in snapshot_files {
|
||||
files.hardlink_snapshot_directory(&snapshot_hard_links_dir)?;
|
||||
files.hardlink_snapshot_directory(snapshot_hard_links_dir.path())?;
|
||||
}
|
||||
|
||||
let package = SnapshotPackage::new(
|
||||
snapshot_hard_links_dir,
|
||||
account_storage_entries,
|
||||
snapshot_package_output_file.as_ref().to_path_buf(),
|
||||
);
|
||||
|
||||
Ok(package)
|
||||
}
|
||||
|
||||
|
@ -228,11 +221,6 @@ fn get_bank_snapshot_dir<P: AsRef<Path>>(path: P, slot: u64) -> PathBuf {
|
|||
path.as_ref().join(slot.to_string())
|
||||
}
|
||||
|
||||
fn get_snapshots_hardlink_dir_for_package<P: AsRef<Path>>(parent_dir: P, slot: u64) -> PathBuf {
|
||||
let file_name = format!("snapshot_{}_hard_links", slot);
|
||||
parent_dir.as_ref().join(file_name)
|
||||
}
|
||||
|
||||
fn get_io_error(error: &str) -> Error {
|
||||
warn!("BankForks error: {:?}", error);
|
||||
Error::IO(IOError::new(ErrorKind::Other, error))
|
||||
|
|
Loading…
Reference in New Issue