Adapt local-cluster/

This commit is contained in:
Michael Vines 2020-02-26 19:04:18 -07:00
parent fcabc6f799
commit 0d4cb252c4
6 changed files with 102 additions and 87 deletions

View File

@ -67,13 +67,19 @@ mod tests {
} }
} }
fn restore_from_snapshot(old_bank_forks: &BankForks, account_paths: Vec<PathBuf>) { fn restore_from_snapshot(
old_bank_forks: &BankForks,
old_last_slot: Slot,
account_paths: Vec<PathBuf>,
) {
let (snapshot_path, snapshot_package_output_path) = old_bank_forks let (snapshot_path, snapshot_package_output_path) = old_bank_forks
.snapshot_config .snapshot_config
.as_ref() .as_ref()
.map(|c| (&c.snapshot_path, &c.snapshot_package_output_path)) .map(|c| (&c.snapshot_path, &c.snapshot_package_output_path))
.unwrap(); .unwrap();
let old_last_bank = old_bank_forks.get(old_last_slot).unwrap();
let deserialized_bank = snapshot_utils::bank_from_archive( let deserialized_bank = snapshot_utils::bank_from_archive(
&account_paths, &account_paths,
&old_bank_forks &old_bank_forks
@ -81,7 +87,10 @@ mod tests {
.as_ref() .as_ref()
.unwrap() .unwrap()
.snapshot_path, .snapshot_path,
snapshot_utils::get_snapshot_archive_path(snapshot_package_output_path), snapshot_utils::get_snapshot_archive_path(
snapshot_package_output_path,
&(old_last_bank.slot(), old_last_bank.get_accounts_hash()),
),
) )
.unwrap(); .unwrap();
@ -139,18 +148,20 @@ mod tests {
slot_snapshot_paths slot_snapshot_paths
.last() .last()
.expect("no snapshots found in path"), .expect("no snapshots found in path"),
snapshot_utils::get_snapshot_archive_path(
&snapshot_config.snapshot_package_output_path,
),
&snapshot_config.snapshot_path, &snapshot_config.snapshot_path,
&last_bank.src.roots(), &last_bank.src.roots(),
&snapshot_config.snapshot_package_output_path,
storages, storages,
) )
.unwrap(); .unwrap();
snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap(); snapshot_utils::archive_snapshot_package(&snapshot_package).unwrap();
restore_from_snapshot(bank_forks, vec![accounts_dir.path().to_path_buf()]); restore_from_snapshot(
bank_forks,
last_slot,
vec![accounts_dir.path().to_path_buf()],
);
} }
#[test] #[test]
@ -249,14 +260,7 @@ mod tests {
}; };
bank_forks bank_forks
.generate_snapshot( .generate_snapshot(slot, &vec![], &package_sender)
slot,
&vec![],
&package_sender,
snapshot_config
.snapshot_package_output_path
.join(slot.to_string()),
)
.unwrap(); .unwrap();
if slot == saved_slot as u64 { if slot == saved_slot as u64 {

View File

@ -915,12 +915,9 @@ fn main() {
snapshot_utils::package_snapshot( snapshot_utils::package_snapshot(
&bank, &bank,
&slot_snapshot_paths, &slot_snapshot_paths,
snapshot_utils::get_snapshot_archive_path(
output_directory,
&(bank.slot(), bank.hash()),
),
&temp_dir, &temp_dir,
&bank.src.roots(), &bank.src.roots(),
output_directory,
storages, storages,
) )
}) })

View File

@ -10,7 +10,7 @@ use solana_sdk::{clock::Slot, timing};
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
ops::Index, ops::Index,
path::{Path, PathBuf}, path::PathBuf,
sync::Arc, sync::Arc,
time::Instant, time::Instant,
}; };
@ -192,10 +192,6 @@ impl BankForks {
root, root,
&root_bank.src.roots(), &root_bank.src.roots(),
snapshot_package_sender.as_ref().unwrap(), snapshot_package_sender.as_ref().unwrap(),
snapshot_utils::get_snapshot_archive_path(
&config.snapshot_package_output_path,
&(root_bank.slot(), root_bank.hash()),
),
); );
if r.is_err() { if r.is_err() {
warn!("Error generating snapshot for bank: {}, err: {:?}", root, r); warn!("Error generating snapshot for bank: {}, err: {:?}", root, r);
@ -239,12 +235,11 @@ impl BankForks {
} }
} }
pub fn generate_snapshot<P: AsRef<Path>>( pub fn generate_snapshot(
&self, &self,
root: Slot, root: Slot,
slots_to_snapshot: &[Slot], slots_to_snapshot: &[Slot],
snapshot_package_sender: &SnapshotPackageSender, snapshot_package_sender: &SnapshotPackageSender,
tar_output_file: P,
) -> Result<()> { ) -> Result<()> {
let config = self.snapshot_config.as_ref().unwrap(); let config = self.snapshot_config.as_ref().unwrap();
@ -270,9 +265,9 @@ impl BankForks {
let package = snapshot_utils::package_snapshot( let package = snapshot_utils::package_snapshot(
&bank, &bank,
latest_slot_snapshot_paths, latest_slot_snapshot_paths,
tar_output_file,
&config.snapshot_path, &config.snapshot_path,
slots_to_snapshot, slots_to_snapshot,
&config.snapshot_package_output_path,
storages, storages,
)?; )?;

View File

@ -10,7 +10,7 @@ use crate::{
}; };
use log::*; use log::*;
use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash}; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash};
use std::{fs, path::PathBuf, result, sync::Arc}; use std::{fs, path::PathBuf, process, result, sync::Arc};
pub type LoadResult = result::Result< pub type LoadResult = result::Result<
( (
@ -55,39 +55,45 @@ pub fn load(
match snapshot_utils::get_highest_snapshot_archive_path( match snapshot_utils::get_highest_snapshot_archive_path(
&snapshot_config.snapshot_package_output_path, &snapshot_config.snapshot_package_output_path,
) { ) {
Some(tar) => { Some((archive_filename, archive_snapshot_hash)) => {
if tar.exists() { info!("Loading snapshot package: {:?}", archive_filename);
info!("Loading snapshot package: {:?}", tar); // Fail hard here if snapshot fails to load, don't silently continue
// Fail hard here if snapshot fails to load, don't silently continue
if account_paths.is_empty() { if account_paths.is_empty() {
panic!("Account paths not present when booting from snapshot") error!("Account paths not present when booting from snapshot");
} process::exit(1);
let deserialized_bank = snapshot_utils::bank_from_archive(
&account_paths,
&snapshot_config.snapshot_path,
&tar,
)
.expect("Load from snapshot failed");
let snapshot_hash = (
deserialized_bank.slot(),
deserialized_bank.get_accounts_hash(),
);
return to_loadresult(
blockstore_processor::process_blockstore_from_root(
genesis_config,
blockstore,
Arc::new(deserialized_bank),
&process_options,
&VerifyRecyclers::default(),
),
Some(snapshot_hash),
);
} else {
info!("Snapshot package does not exist: {:?}", tar);
} }
let deserialized_bank = snapshot_utils::bank_from_archive(
&account_paths,
&snapshot_config.snapshot_path,
&archive_filename,
)
.expect("Load from snapshot failed");
let deserialized_snapshot_hash = (
deserialized_bank.slot(),
deserialized_bank.get_accounts_hash(),
);
if deserialized_snapshot_hash != archive_snapshot_hash {
error!(
"Snapshot has mismatch:\narchive: {:?}\ndeserialized: {:?}",
archive_snapshot_hash, deserialized_snapshot_hash
);
process::exit(1);
}
return to_loadresult(
blockstore_processor::process_blockstore_from_root(
genesis_config,
blockstore,
Arc::new(deserialized_bank),
&process_options,
&VerifyRecyclers::default(),
),
Some(deserialized_snapshot_hash),
);
} }
None => info!("No snapshot package available"), None => info!("No snapshot package available"),
} }

View File

@ -82,9 +82,9 @@ impl SlotSnapshotPaths {
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>( pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
bank: &Bank, bank: &Bank,
snapshot_files: &SlotSnapshotPaths, snapshot_files: &SlotSnapshotPaths,
snapshot_package_output_file: P,
snapshot_path: Q, snapshot_path: Q,
slots_to_snapshot: &[Slot], slots_to_snapshot: &[Slot],
snapshot_package_output_path: P,
snapshot_storages: SnapshotStorages, snapshot_storages: SnapshotStorages,
) -> Result<SnapshotPackage> { ) -> Result<SnapshotPackage> {
// Hard link all the snapshots we need for this package // Hard link all the snapshots we need for this package
@ -101,12 +101,17 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
// any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir) // any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir)
snapshot_files.copy_snapshot_directory(snapshot_hard_links_dir.path())?; snapshot_files.copy_snapshot_directory(snapshot_hard_links_dir.path())?;
let snapshot_package_output_file = get_snapshot_archive_path(
&snapshot_package_output_path,
&(bank.slot(), bank.get_accounts_hash()),
);
let package = SnapshotPackage::new( let package = SnapshotPackage::new(
bank.slot(), bank.slot(),
bank.src.slot_deltas(slots_to_snapshot), bank.src.slot_deltas(slots_to_snapshot),
snapshot_hard_links_dir, snapshot_hard_links_dir,
snapshot_storages, snapshot_storages,
snapshot_package_output_file.as_ref().to_path_buf(), snapshot_package_output_file,
bank.get_accounts_hash(), bank.get_accounts_hash(),
); );
@ -555,9 +560,9 @@ fn get_snapshot_archives<P: AsRef<Path>>(snapshot_output_dir: P) -> Vec<(PathBuf
pub fn get_highest_snapshot_archive_path<P: AsRef<Path>>( pub fn get_highest_snapshot_archive_path<P: AsRef<Path>>(
snapshot_output_dir: P, snapshot_output_dir: P,
) -> Option<PathBuf> { ) -> Option<(PathBuf, (Slot, Hash))> {
let archives = get_snapshot_archives(snapshot_output_dir); let archives = get_snapshot_archives(snapshot_output_dir);
archives.into_iter().next().map(|archive| archive.0) archives.into_iter().next()
} }
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>( pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(

View File

@ -18,10 +18,11 @@ use solana_local_cluster::{
}; };
use solana_sdk::{ use solana_sdk::{
client::SyncClient, client::SyncClient,
clock, clock::{self, Slot},
commitment_config::CommitmentConfig, commitment_config::CommitmentConfig,
epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}, epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH},
genesis_config::OperatingMode, genesis_config::OperatingMode,
hash::Hash,
poh_config::PohConfig, poh_config::PohConfig,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
}; };
@ -648,14 +649,16 @@ fn test_snapshot_restart_tower() {
.as_ref() .as_ref()
.unwrap() .unwrap()
.snapshot_package_output_path; .snapshot_package_output_path;
let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path);
wait_for_next_snapshot(&cluster, &tar);
// Copy tar to validator's snapshot output directory let (archive_filename, archive_snapshot_hash) =
let validator_tar_path = snapshot_utils::get_snapshot_archive_path( wait_for_next_snapshot(&cluster, &snapshot_package_output_path);
// Copy archive to validator's snapshot output directory
let validator_archive_path = snapshot_utils::get_snapshot_archive_path(
&validator_snapshot_test_config.snapshot_output_path, &validator_snapshot_test_config.snapshot_output_path,
&archive_snapshot_hash,
); );
fs::hard_link(tar, &validator_tar_path).unwrap(); fs::hard_link(archive_filename, &validator_archive_path).unwrap();
// Restart validator from snapshot, the validator's tower state in this snapshot // Restart validator from snapshot, the validator's tower state in this snapshot
// will contain slots < the root bank of the snapshot. Validator should not panic. // will contain slots < the root bank of the snapshot. Validator should not panic.
@ -704,21 +707,23 @@ fn test_snapshots_blockstore_floor() {
trace!("Waiting for snapshot tar to be generated with slot",); trace!("Waiting for snapshot tar to be generated with slot",);
let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path); let (archive_filename, (archive_slot, archive_hash)) = loop {
loop { let archive =
if tar.exists() { snapshot_utils::get_highest_snapshot_archive_path(&snapshot_package_output_path);
trace!("snapshot tar exists"); if archive.is_some() {
break; trace!("snapshot exists");
break archive.unwrap();
} }
sleep(Duration::from_millis(5000)); sleep(Duration::from_millis(5000));
} };
// Copy tar to validator's snapshot output directory // Copy archive to validator's snapshot output directory
let validator_tar_path = snapshot_utils::get_snapshot_archive_path( let validator_archive_path = snapshot_utils::get_snapshot_archive_path(
&validator_snapshot_test_config.snapshot_output_path, &validator_snapshot_test_config.snapshot_output_path,
&(archive_slot, archive_hash),
); );
fs::hard_link(tar, &validator_tar_path).unwrap(); fs::hard_link(archive_filename, &validator_archive_path).unwrap();
let slot_floor = snapshot_utils::bank_slot_from_archive(&validator_tar_path).unwrap(); let slot_floor = archive_slot;
// Start up a new node from a snapshot // Start up a new node from a snapshot
let validator_stake = 5; let validator_stake = 5;
@ -814,8 +819,7 @@ fn test_snapshots_restart_validity() {
expected_balances.extend(new_balances); expected_balances.extend(new_balances);
let tar = snapshot_utils::get_snapshot_archive_path(&snapshot_package_output_path); wait_for_next_snapshot(&cluster, &snapshot_package_output_path);
wait_for_next_snapshot(&cluster, &tar);
// Create new account paths since validator exit is not guaranteed to cleanup RPC threads, // Create new account paths since validator exit is not guaranteed to cleanup RPC threads,
// which may delete the old accounts on exit at any point // which may delete the old accounts on exit at any point
@ -952,7 +956,10 @@ fn test_no_voting() {
} }
} }
fn wait_for_next_snapshot<P: AsRef<Path>>(cluster: &LocalCluster, tar: P) { fn wait_for_next_snapshot(
cluster: &LocalCluster,
snapshot_package_output_path: &Path,
) -> (PathBuf, (Slot, Hash)) {
// Get slot after which this was generated // Get slot after which this was generated
let client = cluster let client = cluster
.get_validator_client(&cluster.entry_point_info.id) .get_validator_client(&cluster.entry_point_info.id)
@ -964,17 +971,18 @@ fn wait_for_next_snapshot<P: AsRef<Path>>(cluster: &LocalCluster, tar: P) {
// Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot // Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot
// must include the transactions just pushed // must include the transactions just pushed
trace!( trace!(
"Waiting for snapshot tar to be generated with slot > {}", "Waiting for snapshot archive to be generated with slot > {}",
last_slot last_slot
); );
loop { loop {
if tar.as_ref().exists() { if let Some((filename, (slot, hash))) =
trace!("snapshot tar exists"); snapshot_utils::get_highest_snapshot_archive_path(snapshot_package_output_path)
let slot = snapshot_utils::bank_slot_from_archive(&tar).unwrap(); {
trace!("snapshot for slot {} exists", slot);
if slot >= last_slot { if slot >= last_slot {
break; return (filename, (slot, hash));
} }
trace!("snapshot tar slot {} < last_slot {}", slot, last_slot); trace!("snapshot slot {} < last_slot {}", slot, last_slot);
} }
sleep(Duration::from_millis(5000)); sleep(Duration::from_millis(5000));
} }