From 6cb2040a1b97b71415eb602a9279b806c21cea69 Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 31 Jul 2019 17:58:10 -0700 Subject: [PATCH] Snapshot Packaging Service (#5262) * Snapshot serialization and packaging --- Cargo.lock | 38 ++ core/Cargo.toml | 6 + core/src/bank_forks.rs | 549 +++++++++++++++++------------ core/src/lib.rs | 7 + core/src/replay_stage.rs | 10 +- core/src/result.rs | 6 + core/src/snapshot_package.rs | 195 ++++++++++ core/src/snapshot_utils.rs | 255 ++++++++++++++ core/src/tvu.rs | 18 + core/src/validator.rs | 78 ++-- multinode-demo/bootstrap-leader.sh | 1 + multinode-demo/validator.sh | 1 + run.sh | 1 + runtime/src/accounts_db.rs | 32 +- runtime/src/append_vec.rs | 4 + runtime/src/bank.rs | 14 + runtime/src/status_cache.rs | 22 +- validator/src/main.rs | 27 +- 18 files changed, 990 insertions(+), 274 deletions(-) create mode 100644 core/src/snapshot_package.rs create mode 100644 core/src/snapshot_utils.rs diff --git a/Cargo.lock b/Cargo.lock index 4c5a862c3..8d7bf49f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -828,6 +828,14 @@ dependencies = [ "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "dir-diff" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "walkdir 2.2.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "dirs" version = "1.0.5" @@ -1059,6 +1067,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.60 (registry+https://github.com/rust-lang/crates.io-index)", + "miniz-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "miniz_oxide_c_api 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1080,6 +1089,11 @@ name = "foreign-types-shared" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "fs_extra" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -1789,6 +1803,15 @@ dependencies = [ "unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "miniz-sys" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cc 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.60 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "miniz_oxide" version = "0.3.0" @@ -3030,6 +3053,9 @@ dependencies = [ "core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)", "crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "dir-diff 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "flate2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3075,7 +3101,10 @@ dependencies = [ "solana-storage-program 0.18.0-pre0", "solana-vote-api 0.18.0-pre0", "solana-vote-signer 0.18.0-pre0", + "symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "sys-info 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-fs 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4349,6 +4378,11 @@ name = "subtle" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "symlink" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "syn" version = "0.11.11" @@ -5189,6 +5223,7 @@ dependencies = [ "checksum difference 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" "checksum digest 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "03b072242a8cbaf9c145665af9d250c59af3b958f83ed6824e13533cf76d5b90" "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" +"checksum dir-diff 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1cce6e50ca36311e494793f7629014dc78cd963ba85cd05968ae06a63b867f0b" "checksum dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901" "checksum dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3" "checksum dirs-sys 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b" @@ -5217,6 +5252,7 @@ dependencies = [ "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" "checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +"checksum fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5f2a4a2034423744d2cc7ca2068453168dcdb82c438419e639a26bd87839c674" "checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" @@ -5297,6 +5333,7 @@ dependencies = [ "checksum mime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0" "checksum mime 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)" = "3e27ca21f40a310bd06d9031785f4801710d566c184a6e15bad4f1d9b65f9425" "checksum mime_guess 2.0.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "30de2e4613efcba1ec63d8133f344076952090c122992a903359be5a4f99c3ed" +"checksum miniz-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "1e9e3ae51cea1576ceba0dde3d484d30e6e5b86dee0b2d412fe3a16a15c98202" "checksum miniz_oxide 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c061edee74a88eb35d876ce88b94d77a0448a201de111c244b70d047f5820516" "checksum miniz_oxide_c_api 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6c675792957b0d19933816c4e1d56663c341dd9bfa31cb2140ff2267c1d8ecf4" "checksum mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)" = "83f51996a3ed004ef184e16818edc51fadffe8e7ca68be67f9dee67d84d0ff23" @@ -5468,6 +5505,7 @@ dependencies = [ "checksum structopt-derive 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "53010261a84b37689f9ed7d395165029f9cc7abb9f56bbfe86bee2597ed25107" "checksum subtle 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee" "checksum subtle 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "01dca13cf6c3b179864ab3292bd794e757618d35a7766b7c46050c614ba00829" +"checksum symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" "checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)" = "eadc09306ca51a40555dd6fc2b415538e9e18bc9f870e47b1a524a79fe2dcf5e" "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" diff --git a/core/Cargo.toml b/core/Cargo.toml index de693a7be..be05f6c37 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -25,6 +25,9 @@ chrono = { version = "0.4.7", features = ["serde"] } core_affinity = "0.5.9" crc = { version = "1.8.1", optional = true } crossbeam-channel = "0.3" +dir-diff = "0.3.1" +flate2 = "1.0.9" +fs_extra = "1.1.0" hashbrown = "0.2.0" indexmap = "1.0" itertools = "0.8.0" @@ -65,7 +68,10 @@ solana-storage-api = { path = "../programs/storage_api", version = "0.18.0-pre0" solana-storage-program = { path = "../programs/storage_program", version = "0.18.0-pre0" } solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre0" } solana-vote-signer = { path = "../vote-signer", version = "0.18.0-pre0" } +symlink = "0.1.0" sys-info = "0.5.7" +tar = "0.4.26" +tempfile = "3.1.0" tokio = "0.1" tokio-codec = "0.1" tokio-fs = "0.1" diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index 520cdb99d..bed44de32 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -1,25 +1,61 @@ //! The `bank_forks` module implments BankForks a DAG of checkpointed Banks -use bincode::{deserialize_from, serialize_into}; +use crate::result::{Error, Result}; +use crate::snapshot_package::SnapshotPackageSender; +use crate::snapshot_utils; +use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_info; use solana_runtime::bank::{Bank, BankRc, StatusCacheRc}; +use solana_runtime::status_cache::MAX_CACHE_ENTRIES; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::timing; use std::collections::{HashMap, HashSet}; use std::fs; -use std::fs::File; -use std::io::{BufReader, BufWriter, Error, ErrorKind}; +use std::io::{Error as IOError, ErrorKind}; use std::ops::Index; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SnapshotConfig { + snapshot_path: PathBuf, + snapshot_package_output_path: PathBuf, + snapshot_interval_slots: usize, +} + +impl SnapshotConfig { + pub fn new( + snapshot_path: PathBuf, + snapshot_package_output_path: PathBuf, + snapshot_interval_slots: usize, + ) -> Self { + Self { + snapshot_path, + snapshot_package_output_path, + snapshot_interval_slots, + } + } + + pub fn snapshot_path(&self) -> &Path { + self.snapshot_path.as_path() + } + + pub fn snapshot_package_output_path(&self) -> &Path { + &self.snapshot_package_output_path.as_path() + } + + pub fn snapshot_interval_slots(&self) -> usize { + self.snapshot_interval_slots + } +} + pub struct BankForks { banks: HashMap>, working_bank: Arc, root: u64, - slots: HashSet, - snapshot_path: Option, + snapshot_config: Option, + last_snapshot: u64, confidence: HashMap, } @@ -71,8 +107,8 @@ impl BankForks { banks, working_bank, root: 0, - slots: HashSet::new(), - snapshot_path: None, + snapshot_config: None, + last_snapshot: 0, confidence: HashMap::new(), } } @@ -136,8 +172,8 @@ impl BankForks { root, banks, working_bank, - slots: HashSet::new(), - snapshot_path: None, + snapshot_config: None, + last_snapshot: 0, confidence: HashMap::new(), } } @@ -156,7 +192,7 @@ impl BankForks { self.working_bank.clone() } - pub fn set_root(&mut self, root: u64) { + pub fn set_root(&mut self, root: u64, snapshot_package_sender: &Option) { self.root = root; let set_root_start = Instant::now(); let root_bank = self @@ -172,6 +208,33 @@ impl BankForks { let new_tx_count = root_bank.transaction_count(); self.prune_non_root(root); + // Generate a snapshot if snapshots are configured and it's been an appropriate number + // of banks since the last snapshot + if self.snapshot_config.is_some() { + let config = self + .snapshot_config + .as_ref() + .expect("Called package_snapshot without a snapshot configuration"); + if root - self.last_snapshot >= config.snapshot_interval_slots as u64 { + let mut snapshot_time = Measure::start("total-snapshot-ms"); + let r = self.generate_snapshot( + root, + snapshot_package_sender.as_ref().unwrap(), + snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path), + ); + if r.is_err() { + warn!("Error generating snapshot for bank: {}, err: {:?}", root, r); + } else { + self.last_snapshot = root; + } + + // Cleanup outdated snapshots + self.purge_old_snapshots(); + snapshot_time.stop(); + inc_new_counter_info!("total-snapshot-setup-ms", snapshot_time.as_ms() as usize); + } + } + inc_new_counter_info!( "bank-forks_set_root_ms", timing::duration_as_ms(&set_root_start.elapsed()) as usize @@ -186,30 +249,60 @@ impl BankForks { self.root } - fn prune_non_root(&mut self, root: u64) { - let slots: HashSet = self - .banks - .iter() - .filter(|(_, b)| b.is_frozen()) - .map(|(k, _)| *k) - .collect(); - let descendants = self.descendants(); - self.banks - .retain(|slot, _| descendants[&root].contains(slot)); - self.confidence - .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); - if let Some(snapshot_path) = &self.snapshot_path { - let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect(); - trace!("prune non root {} - {:?}", root, diff); - for slot in diff.iter() { - if **slot > root { - let _ = self.add_snapshot(**slot, root); - } else { - BankForks::remove_snapshot(**slot, &snapshot_path); - } + fn purge_old_snapshots(&self) { + // Remove outdated snapshots + let config = self.snapshot_config.as_ref().unwrap(); + let names = snapshot_utils::get_snapshot_names(&config.snapshot_path); + let num_to_remove = names.len().saturating_sub(MAX_CACHE_ENTRIES); + for old_root in &names[..num_to_remove] { + let r = snapshot_utils::remove_snapshot(*old_root, &config.snapshot_path); + if r.is_err() { + warn!("Couldn't remove snapshot at: {:?}", config.snapshot_path); } } - self.slots = slots.clone(); + } + + fn generate_snapshot>( + &self, + root: u64, + snapshot_package_sender: &SnapshotPackageSender, + tar_output_file: P, + ) -> Result<()> { + let config = self.snapshot_config.as_ref().unwrap(); + + // Add a snapshot for the new root + let bank = self + .get(root) + .cloned() + .expect("root must exist in BankForks"); + snapshot_utils::add_snapshot(&config.snapshot_path, &bank, root)?; + + // Package the relevant snapshots + let names = snapshot_utils::get_snapshot_names(&config.snapshot_path); + + // We only care about the last MAX_CACHE_ENTRIES snapshots of roots because + // the status cache of anything older is thrown away by the bank in + // status_cache.prune_roots() + let start = names.len().saturating_sub(MAX_CACHE_ENTRIES); + let package = snapshot_utils::package_snapshot( + &bank, + &names[start..], + &config.snapshot_path, + tar_output_file, + )?; + + // Send the package to the packaging thread + snapshot_package_sender.send(package)?; + + Ok(()) + } + + fn prune_non_root(&mut self, root: u64) { + let descendants = self.descendants(); + self.banks + .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); + self.confidence + .retain(|slot, _| slot == &root || descendants[&root].contains(slot)); } pub fn cache_fork_confidence( @@ -247,109 +340,20 @@ impl BankForks { self.confidence.get(&fork) } - fn get_io_error(error: &str) -> Error { - warn!("BankForks error: {:?}", error); - Error::new(ErrorKind::Other, error) + pub fn set_snapshot_config(&mut self, snapshot_config: SnapshotConfig) { + self.snapshot_config = Some(snapshot_config); } - pub fn add_snapshot(&self, slot: u64, root: u64) -> Result<(), Error> { - let path = self.snapshot_path.as_ref().expect("no snapshot_path"); - fs::create_dir_all(path.clone())?; - let bank_file = format!("{}", slot); - let bank_file_path = path.join(bank_file); - trace!("path: {:?}", bank_file_path); - let file = File::create(bank_file_path)?; - let mut stream = BufWriter::new(file); - let bank_slot = self.get(slot); - if bank_slot.is_none() { - return Err(BankForks::get_io_error("bank_forks get error")); - } - let bank = bank_slot.unwrap().clone(); - serialize_into(&mut stream, &*bank) - .map_err(|_| BankForks::get_io_error("serialize bank error"))?; - let mut parent_slot: u64 = 0; - if let Some(parent_bank) = bank.parent() { - parent_slot = parent_bank.slot(); - } - serialize_into(&mut stream, &parent_slot) - .map_err(|_| BankForks::get_io_error("serialize bank parent error"))?; - serialize_into(&mut stream, &root) - .map_err(|_| BankForks::get_io_error("serialize root error"))?; - serialize_into(&mut stream, &bank.src) - .map_err(|_| BankForks::get_io_error("serialize bank status cache error"))?; - serialize_into(&mut stream, &bank.rc) - .map_err(|_| BankForks::get_io_error("serialize bank accounts error"))?; - Ok(()) - } - - pub fn remove_snapshot(slot: u64, path: &Path) { - let bank_file = format!("{}", slot); - let bank_file_path = path.join(bank_file); - let _ = fs::remove_file(bank_file_path); - } - - pub fn set_snapshot_path(&mut self, snapshot_path: &Path) { - self.snapshot_path = Some(snapshot_path.to_path_buf()); - } - - fn load_snapshots( - names: &[u64], - bank0: &mut Bank, - bank_maps: &mut Vec<(u64, u64, Bank)>, - status_cache_rc: &StatusCacheRc, - snapshot_path: &Path, - ) -> Option { - let mut bank_root: Option = None; - - for bank_slot in names.iter().rev() { - let bank_path = format!("{}", bank_slot); - let bank_file_path = snapshot_path.join(bank_path.clone()); - info!("Load from {:?}", bank_file_path); - let file = File::open(bank_file_path); - if file.is_err() { - warn!("Snapshot file open failed for {}", bank_slot); - continue; - } - let file = file.unwrap(); - let mut stream = BufReader::new(file); - let bank: Result = deserialize_from(&mut stream) - .map_err(|_| BankForks::get_io_error("deserialize bank error")); - let slot: Result = deserialize_from(&mut stream) - .map_err(|_| BankForks::get_io_error("deserialize bank parent error")); - let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 }; - let root: Result = deserialize_from(&mut stream) - .map_err(|_| BankForks::get_io_error("deserialize root error")); - let status_cache: Result = deserialize_from(&mut stream) - .map_err(|_| BankForks::get_io_error("deserialize bank status cache error")); - if bank_root.is_none() && bank0.rc.update_from_stream(&mut stream).is_ok() { - bank_root = Some(root.unwrap()); - } - if bank_root.is_some() { - match bank { - Ok(v) => { - if status_cache.is_ok() { - status_cache_rc.append(&status_cache.unwrap()); - } - bank_maps.push((*bank_slot, parent_slot, v)); - } - Err(_) => warn!("Load snapshot failed for {}", bank_slot), - } - } else { - BankForks::remove_snapshot(*bank_slot, snapshot_path); - warn!("Load snapshot rc failed for {}", bank_slot); - } - } - - bank_root + pub fn snapshot_config(&self) -> &Option { + &self.snapshot_config } fn setup_banks( bank_maps: &mut Vec<(u64, u64, Bank)>, bank_rc: &BankRc, status_cache_rc: &StatusCacheRc, - ) -> (HashMap>, HashSet, u64) { + ) -> (HashMap>, u64) { let mut banks = HashMap::new(); - let mut slots = HashSet::new(); let (last_slot, last_parent_slot, mut last_bank) = bank_maps.remove(0); last_bank.set_bank_rc(&bank_rc, &status_cache_rc); @@ -362,7 +366,6 @@ impl BankForks { } if slot > 0 { banks.insert(slot, Arc::new(bank)); - slots.insert(slot); } } if last_parent_slot != 0 { @@ -371,56 +374,54 @@ impl BankForks { } } banks.insert(last_slot, Arc::new(last_bank)); - slots.insert(last_slot); - (banks, slots, last_slot) + (banks, last_slot) } pub fn load_from_snapshot( genesis_block: &GenesisBlock, account_paths: Option, - snapshot_path: &Path, - ) -> Result { - let paths = fs::read_dir(snapshot_path)?; - let mut names = paths - .filter_map(|entry| { - entry.ok().and_then(|e| { - e.path() - .file_name() - .and_then(|n| n.to_str().map(|s| s.parse::().unwrap())) - }) - }) - .collect::>(); - - names.sort(); + snapshot_config: &SnapshotConfig, + ) -> Result { + fs::create_dir_all(&snapshot_config.snapshot_path)?; + let names = snapshot_utils::get_snapshot_names(&snapshot_config.snapshot_path); + if names.is_empty() { + return Err(Error::IO(IOError::new( + ErrorKind::Other, + "no snapshots found", + ))); + } let mut bank_maps = vec![]; let status_cache_rc = StatusCacheRc::default(); let id = (names[names.len() - 1] + 1) as usize; let mut bank0 = Bank::create_with_genesis(&genesis_block, account_paths.clone(), &status_cache_rc, id); bank0.freeze(); - let bank_root = BankForks::load_snapshots( + let bank_root = snapshot_utils::load_snapshots( &names, &mut bank0, &mut bank_maps, &status_cache_rc, - snapshot_path, + &snapshot_config.snapshot_path, ); if bank_maps.is_empty() || bank_root.is_none() { - BankForks::remove_snapshot(0, snapshot_path); - return Err(Error::new(ErrorKind::Other, "no snapshots loaded")); + return Err(Error::IO(IOError::new( + ErrorKind::Other, + "no snapshots loaded", + ))); } let root = bank_root.unwrap(); - let (banks, slots, last_slot) = + let (banks, last_slot) = BankForks::setup_banks(&mut bank_maps, &bank0.rc, &status_cache_rc); let working_bank = banks[&last_slot].clone(); + Ok(BankForks { banks, working_bank, root, - slots, - snapshot_path: Some(snapshot_path.to_path_buf()), + snapshot_config: None, + last_snapshot: *names.last().unwrap(), confidence: HashMap::new(), }) } @@ -430,12 +431,17 @@ impl BankForks { mod tests { use super::*; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; + use crate::service::Service; + use crate::snapshot_package::SnapshotPackagerService; + use fs_extra::dir::CopyOptions; + use itertools::Itertools; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; - use std::env; - use std::fs::remove_dir_all; + use std::sync::atomic::AtomicBool; + use std::sync::mpsc::channel; + use tempfile::TempDir; #[test] fn test_bank_forks() { @@ -545,73 +551,25 @@ mod tests { ); } - struct TempPaths { - pub paths: String, - } - - impl TempPaths { - fn remove_all(&self) { - let paths: Vec = self.paths.split(',').map(|s| s.to_string()).collect(); - paths.iter().for_each(|p| { - let _ignored = remove_dir_all(p); - }); - } - } - - #[macro_export] - macro_rules! tmp_bank_accounts_name { - () => { - &format!("{}-{}", file!(), line!()) - }; - } - - #[macro_export] - macro_rules! get_tmp_bank_accounts_path { - () => { - get_tmp_bank_accounts_path(tmp_bank_accounts_name!()) - }; - } - - impl Drop for TempPaths { - fn drop(&mut self) { - self.remove_all() - } - } - - fn get_paths_vec(paths: &str) -> Vec { - paths.split(',').map(|s| s.to_string()).collect() - } - - fn get_tmp_snapshots_path() -> TempPaths { - let out_dir = env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string()); - let path = format!("{}/snapshots", out_dir); - TempPaths { - paths: path.to_string(), - } - } - - fn get_tmp_bank_accounts_path(paths: &str) -> TempPaths { - let vpaths = get_paths_vec(paths); - let out_dir = env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string()); - let vpaths: Vec<_> = vpaths - .iter() - .map(|path| format!("{}/{}", out_dir, path)) - .collect(); - TempPaths { - paths: vpaths.join(","), - } - } - fn restore_from_snapshot( genesis_block: &GenesisBlock, bank_forks: BankForks, account_paths: Option, last_slot: u64, ) { - let snapshot_path = bank_forks.snapshot_path.as_ref().unwrap(); + let snapshot_path = bank_forks + .snapshot_config + .as_ref() + .map(|c| &c.snapshot_path) + .unwrap(); + + let new = BankForks::load_from_snapshot( + &genesis_block, + account_paths, + bank_forks.snapshot_config.as_ref().unwrap(), + ) + .unwrap(); - let new = - BankForks::load_from_snapshot(&genesis_block, account_paths, snapshot_path).unwrap(); for (slot, _) in new.banks.iter() { if *slot > 0 { let bank = bank_forks.banks.get(slot).unwrap().clone(); @@ -619,31 +577,39 @@ mod tests { bank.compare_bank(&new_bank); } } + assert_eq!(new.working_bank().slot(), last_slot); for (slot, _) in new.banks.iter() { - BankForks::remove_snapshot(*slot, snapshot_path); + snapshot_utils::remove_snapshot(*slot, snapshot_path).unwrap(); } } #[test] fn test_bank_forks_snapshot_n() { solana_logger::setup(); - let path = get_tmp_bank_accounts_path!(); - let spath = get_tmp_snapshots_path(); + let accounts_dir = TempDir::new().unwrap(); + let snapshot_dir = TempDir::new().unwrap(); + let snapshot_output_path = TempDir::new().unwrap(); let GenesisBlockInfo { genesis_block, mint_keypair, .. } = create_genesis_block(10_000); - path.remove_all(); - spath.remove_all(); for index in 0..10 { - let bank0 = Bank::new_with_paths(&genesis_block, Some(path.paths.clone())); + let bank0 = Bank::new_with_paths( + &genesis_block, + Some(accounts_dir.path().to_str().unwrap().to_string()), + ); bank0.freeze(); - let slot = bank0.slot(); let mut bank_forks = BankForks::new(0, bank0); - bank_forks.set_snapshot_path(&PathBuf::from(&spath.paths)); - bank_forks.add_snapshot(slot, 0).unwrap(); + let snapshot_config = SnapshotConfig::new( + PathBuf::from(snapshot_dir.path()), + PathBuf::from(snapshot_output_path.path()), + 100, + ); + bank_forks.set_snapshot_config(snapshot_config.clone()); + let bank0 = bank_forks.get(0).unwrap(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, 0).unwrap(); for forks in 0..index { let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1); let key1 = Keypair::new().pubkey(); @@ -655,11 +621,146 @@ mod tests { ); assert_eq!(bank.process_transaction(&tx), Ok(())); bank.freeze(); - let slot = bank.slot(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, &bank, 0).unwrap(); bank_forks.insert(bank); - bank_forks.add_snapshot(slot, 0).unwrap(); } - restore_from_snapshot(&genesis_block, bank_forks, Some(path.paths.clone()), index); + restore_from_snapshot( + &genesis_block, + bank_forks, + Some(accounts_dir.path().to_str().unwrap().to_string()), + index, + ); } } + + #[test] + fn test_concurrent_snapshot_packaging() { + solana_logger::setup(); + let accounts_dir = TempDir::new().unwrap(); + let snapshots_dir = TempDir::new().unwrap(); + let snapshot_output_path = TempDir::new().unwrap(); + let GenesisBlockInfo { + genesis_block, + mint_keypair, + .. + } = create_genesis_block(10_000); + let (sender, receiver) = channel(); + let (fake_sender, _fake_receiver) = channel(); + let bank0 = Bank::new_with_paths( + &genesis_block, + Some(accounts_dir.path().to_str().unwrap().to_string()), + ); + bank0.freeze(); + + // Set up bank forks + let mut bank_forks = BankForks::new(0, bank0); + let snapshot_config = SnapshotConfig::new( + PathBuf::from(snapshots_dir.path()), + PathBuf::from(snapshot_output_path.path()), + 1, + ); + bank_forks.set_snapshot_config(snapshot_config.clone()); + + // Take snapshot of zeroth bank + let bank0 = bank_forks.get(0).unwrap(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, 0).unwrap(); + + // Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted + // and the snapshot purging logic will run on every snapshot taken. This means the three + // (including snapshot for bank0 created above) earliest snapshots will get purged by the + // time this loop is done. + + // Also, make a saved copy of the state of the snapshot for a bank with + // bank.slot == saved_slot, so we can use it for a correctness check later. + let saved_snapshots_dir = TempDir::new().unwrap(); + let saved_accounts_dir = TempDir::new().unwrap(); + let saved_slot = 4; + let saved_tar = snapshot_config + .snapshot_package_output_path + .join(saved_slot.to_string()); + for forks in 0..MAX_CACHE_ENTRIES + 2 { + let bank = Bank::new_from_parent( + &bank_forks[forks as u64], + &Pubkey::default(), + (forks + 1) as u64, + ); + let slot = bank.slot(); + let key1 = Keypair::new().pubkey(); + let tx = system_transaction::create_user_account( + &mint_keypair, + &key1, + 1, + genesis_block.hash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + bank.freeze(); + bank_forks.insert(bank); + + let package_sender = { + if slot == saved_slot as u64 { + // Only send one package on the real sende so that the packaging service + // doesn't take forever to run the packaging logic on all MAX_CACHE_ENTRIES + // later + &sender + } else { + &fake_sender + } + }; + + bank_forks + .generate_snapshot( + slot, + &package_sender, + snapshot_config + .snapshot_package_output_path + .join(slot.to_string()), + ) + .unwrap(); + + if slot == saved_slot as u64 { + let options = CopyOptions::new(); + fs_extra::dir::copy(&accounts_dir, &saved_accounts_dir, &options).unwrap(); + fs_extra::dir::copy(&snapshots_dir, &saved_snapshots_dir, &options).unwrap(); + } + } + + // Purge all the outdated snapshots, including the ones needed to generate the package + // currently sitting in the channel + bank_forks.purge_old_snapshots(); + let mut snapshot_names = snapshot_utils::get_snapshot_names(&snapshots_dir); + snapshot_names.sort(); + assert_eq!( + snapshot_names, + (3..=MAX_CACHE_ENTRIES as u64 + 2).collect_vec() + ); + + // Create a SnapshotPackagerService to create tarballs from all the pending + // SnapshotPackage's on the channel. By the time this service starts, we have already + // purged the first two snapshots, which are needed by every snapshot other than + // the last two snapshots. However, the packaging service should still be able to + // correctly construct the earlier snapshots because the SnapshotPackage's on the + // channel hold hard links to these deleted snapshots. We verify this is the case below. + let exit = Arc::new(AtomicBool::new(false)); + let snapshot_packager_service = SnapshotPackagerService::new(receiver, &exit); + + // Close the channel so that the package service will exit after reading all the + // packages off the channel + drop(sender); + + // Wait for service to finish + snapshot_packager_service + .join() + .expect("SnapshotPackagerService exited with error"); + + // Check the tar we cached the state for earlier was generated correctly + snapshot_utils::tests::verify_snapshot_tar( + saved_tar, + saved_snapshots_dir + .path() + .join(snapshots_dir.path().file_name().unwrap()), + saved_accounts_dir + .path() + .join(accounts_dir.path().file_name().unwrap()), + ); + } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 9b83c84d5..3bf0b6c19 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -63,6 +63,8 @@ pub mod rpc_subscriptions; pub mod service; pub mod sigverify; pub mod sigverify_stage; +pub mod snapshot_package; +pub mod snapshot_utils; pub mod staking_utils; pub mod storage_stage; pub mod streamer; @@ -101,3 +103,8 @@ extern crate solana_metrics; extern crate matches; extern crate crossbeam_channel; +extern crate dir_diff; +extern crate flate2; +extern crate fs_extra; +extern crate tar; +extern crate tempfile; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index b4e5b5806..1c820c19a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -12,6 +12,7 @@ use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; +use crate::snapshot_package::SnapshotPackageSender; use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; @@ -41,6 +42,7 @@ impl Finalizer { Finalizer { exit_sender } } } + // Implement a destructor for Finalizer. impl Drop for Finalizer { fn drop(&mut self) { @@ -90,6 +92,7 @@ impl ReplayStage { poh_recorder: &Arc>, leader_schedule_cache: &Arc, slot_full_senders: Vec>, + snapshot_package_sender: Option, ) -> (Self, Receiver>>) where T: 'static + KeypairUtil + Send + Sync, @@ -179,6 +182,7 @@ impl ReplayStage { &root_bank_sender, lockouts, &lockouts_sender, + &snapshot_package_sender, )?; Self::reset_poh_recorder( @@ -396,6 +400,7 @@ impl ReplayStage { root_bank_sender: &Sender>>, lockouts: HashMap, lockouts_sender: &Sender, + snapshot_package_sender: &Option, ) -> Result<()> where T: 'static + KeypairUtil + Send + Sync, @@ -419,7 +424,10 @@ impl ReplayStage { // is consumed by repair_service to update gossip, so we don't want to get blobs for // repair on gossip before we update leader schedule, otherwise they may get dropped. leader_schedule_cache.set_root(rooted_banks.last().unwrap()); - bank_forks.write().unwrap().set_root(new_root); + bank_forks + .write() + .unwrap() + .set_root(new_root, snapshot_package_sender); Self::handle_new_root(&bank_forks, progress); trace!("new root {}", new_root); if let Err(e) = root_bank_sender.send(rooted_banks) { diff --git a/core/src/result.rs b/core/src/result.rs index 0f9ce2a8b..aab1e890e 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -30,6 +30,7 @@ pub enum Error { SendError, PohRecorderError(poh_recorder::PohRecorderError), BlocktreeError(blocktree::BlocktreeError), + FsExtra(fs_extra::error::Error), } pub type Result = std::result::Result; @@ -102,6 +103,11 @@ impl std::convert::From for Error { Error::IO(e) } } +impl std::convert::From for Error { + fn from(e: fs_extra::error::Error) -> Error { + Error::FsExtra(e) + } +} impl std::convert::From for Error { fn from(e: serde_json::Error) -> Error { Error::JSON(e) diff --git a/core/src/snapshot_package.rs b/core/src/snapshot_package.rs new file mode 100644 index 000000000..9c2eac57c --- /dev/null +++ b/core/src/snapshot_package.rs @@ -0,0 +1,195 @@ +use crate::result::{Error, Result}; +use crate::service::Service; +use flate2::write::GzEncoder; +use flate2::Compression; +use solana_runtime::accounts_db::AccountStorageEntry; +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; +use std::sync::Arc; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; + +pub type SnapshotPackageSender = Sender; +pub type SnapshotPackageReceiver = Receiver; + +pub const TAR_SNAPSHOT_DIR: &str = "snapshots"; +pub const TAR_ACCOUNTS_DIR: &str = "accounts"; + +pub struct SnapshotPackage { + snapshot_path: PathBuf, + storage_entries: Vec>, + tar_output_file: PathBuf, +} + +impl SnapshotPackage { + pub fn new( + snapshot_path: PathBuf, + storage_entries: Vec>, + tar_output_file: PathBuf, + ) -> Self { + Self { + snapshot_path, + storage_entries, + tar_output_file, + } + } +} + +impl Drop for SnapshotPackage { + fn drop(&mut self) { + let _ = fs::remove_dir_all(&self.snapshot_path); + } +} + +pub struct SnapshotPackagerService { + t_snapshot_packager: JoinHandle<()>, +} + +impl SnapshotPackagerService { + pub fn new(snapshot_package_receiver: SnapshotPackageReceiver, exit: &Arc) -> Self { + let exit = exit.clone(); + let t_snapshot_packager = Builder::new() + .name("solana-snapshot-packager".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + if let Err(e) = Self::package_snapshots(&snapshot_package_receiver) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => info!("Error from package_snapshots: {:?}", e), + } + } + }) + .unwrap(); + Self { + t_snapshot_packager, + } + } + + pub fn package_snapshots(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> { + let snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?; + + // Create the tar builder + let tar_gz = tempfile::Builder::new() + .prefix("new_state") + .suffix(".tgz") + .tempfile()?; + let temp_tar_path = tar_gz.path(); + let enc = GzEncoder::new(&tar_gz, Compression::default()); + let mut tar = tar::Builder::new(enc); + + // Create the list of paths to compress, starting with the snapshots + let tar_output_snapshots_dir = Path::new(&TAR_SNAPSHOT_DIR); + + // Add the snapshots to the tarball and delete the directory of hardlinks to the snapshots + // that was created to persist those snapshots while this package was being created + let res = tar.append_dir_all( + tar_output_snapshots_dir, + snapshot_package.snapshot_path.as_path(), + ); + let _ = fs::remove_dir_all(snapshot_package.snapshot_path.as_path()); + res?; + + // Add the AppendVecs into the compressible list + let tar_output_accounts_dir = Path::new(&TAR_ACCOUNTS_DIR); + for storage in &snapshot_package.storage_entries { + let storage_path = storage.get_path(); + let output_path = tar_output_accounts_dir.join( + storage_path + .file_name() + .expect("Invalid AppendVec file path"), + ); + + // `output_path` - The directory where the AppendVec will be placed in the tarball. + // `storage_path` - The file path where the AppendVec itself is located + tar.append_path_with_name(storage_path, output_path)?; + } + + // Once everything is successful, overwrite the previous tarball so that other validators + // can rsync this newly packaged snapshot + tar.finish()?; + let _ = fs::remove_file(&snapshot_package.tar_output_file); + fs::hard_link(&temp_tar_path, &snapshot_package.tar_output_file)?; + Ok(()) + } +} + +impl Service for SnapshotPackagerService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.t_snapshot_packager.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::snapshot_utils; + use std::fs::OpenOptions; + use std::io::Write; + use std::sync::mpsc::channel; + use tempfile::TempDir; + + #[test] + fn test_package_snapshots() { + // Create temprorary placeholder directory for all test files + let temp_dir = TempDir::new().unwrap(); + let (sender, receiver) = channel(); + let accounts_dir = temp_dir.path().join("accounts"); + let snapshots_dir = temp_dir.path().join("snapshots"); + let snapshot_package_output_path = temp_dir.path().join("snapshots_output"); + fs::create_dir_all(&snapshot_package_output_path).unwrap(); + + // Create some storage entries + let storage_entries: Vec<_> = (0..5) + .map(|i| Arc::new(AccountStorageEntry::new(&accounts_dir, 0, i, 10))) + .collect(); + + // Create some fake snapshot + fs::create_dir_all(&snapshots_dir).unwrap(); + let snapshots_paths: Vec<_> = (0..5) + .map(|i| { + let fake_snapshot_path = snapshots_dir.join(format!("fake_snapshot_{}", i)); + let mut fake_snapshot_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&fake_snapshot_path) + .unwrap(); + + fake_snapshot_file.write_all(b"Hello, world!").unwrap(); + fake_snapshot_path + }) + .collect(); + + // Create directory of hard links for snapshots + let link_snapshots_dir = temp_dir.path().join("link_snapshots"); + fs::create_dir_all(&link_snapshots_dir).unwrap(); + for snapshots_path in snapshots_paths { + let snapshot_file_name = snapshots_path.file_name().unwrap(); + let link_path = link_snapshots_dir.join(snapshot_file_name); + fs::hard_link(&snapshots_path, &link_path).unwrap(); + } + + // Create a packageable snapshot + let output_tar_path = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); + let snapshot_package = SnapshotPackage::new( + link_snapshots_dir, + storage_entries.clone(), + output_tar_path.clone(), + ); + sender.send(snapshot_package).unwrap(); + + // Make tarball from packageable snapshot + SnapshotPackagerService::package_snapshots(&receiver).unwrap(); + + // Check tarball is correct + snapshot_utils::tests::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir); + } +} diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs new file mode 100644 index 000000000..126aaa28b --- /dev/null +++ b/core/src/snapshot_utils.rs @@ -0,0 +1,255 @@ +use crate::result::{Error, Result}; +use crate::snapshot_package::SnapshotPackage; +use bincode::{deserialize_from, serialize_into}; +use flate2::read::GzDecoder; +use solana_runtime::bank::{Bank, StatusCacheRc}; +use std::fs; +use std::fs::File; +use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind}; +use std::path::{Path, PathBuf}; +use tar::Archive; + +pub fn package_snapshot, Q: AsRef>( + bank: &Bank, + snapshot_names: &[u64], + snapshot_dir: P, + snapshot_package_output_file: Q, +) -> Result { + let slot = bank.slot(); + + // Hard link all the snapshots we need for this package + let snapshot_hard_links_dir = get_snapshots_hardlink_dir_for_package( + snapshot_package_output_file + .as_ref() + .parent() + .expect("Invalid output path for tar"), + slot, + ); + + let _ = fs::remove_dir_all(&snapshot_hard_links_dir); + fs::create_dir_all(&snapshot_hard_links_dir)?; + + // Get a reference to all the relevant AccountStorageEntries + let account_storage_entries = bank.rc.get_storage_entries(); + + // Create a snapshot package + trace!( + "Snapshot for bank: {} has {} account storage entries", + slot, + account_storage_entries.len() + ); + let package = SnapshotPackage::new( + snapshot_hard_links_dir.clone(), + account_storage_entries, + snapshot_package_output_file.as_ref().to_path_buf(), + ); + + // Any errors from this point on will cause the above SnapshotPackage to drop, clearing + // any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir) + for name in snapshot_names { + hardlink_snapshot_directory(&snapshot_dir, &snapshot_hard_links_dir, *name)?; + } + + Ok(package) +} + +pub fn get_snapshot_names>(snapshot_path: P) -> Vec { + let paths = fs::read_dir(snapshot_path).expect("Invalid snapshot path"); + let mut names = paths + .filter_map(|entry| { + entry.ok().and_then(|e| { + e.path() + .file_name() + .and_then(|n| n.to_str().map(|s| s.parse::().unwrap())) + }) + }) + .collect::>(); + + names.sort(); + names +} + +pub fn add_snapshot>(snapshot_path: P, bank: &Bank, root: u64) -> Result<()> { + let slot = bank.slot(); + let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot); + fs::create_dir_all(slot_snapshot_dir.clone()).map_err(Error::from)?; + + let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); + trace!( + "creating snapshot {}, path: {:?}", + bank.slot(), + snapshot_file_path + ); + let file = File::create(&snapshot_file_path)?; + let mut stream = BufWriter::new(file); + + // Create the snapshot + serialize_into(&mut stream, &*bank).map_err(|_| get_io_error("serialize bank error"))?; + let mut parent_slot: u64 = 0; + if let Some(parent_bank) = bank.parent() { + parent_slot = parent_bank.slot(); + } + serialize_into(&mut stream, &parent_slot) + .map_err(|_| get_io_error("serialize bank parent error"))?; + serialize_into(&mut stream, &root).map_err(|_| get_io_error("serialize root error"))?; + serialize_into(&mut stream, &bank.src) + .map_err(|_| get_io_error("serialize bank status cache error"))?; + serialize_into(&mut stream, &bank.rc) + .map_err(|_| get_io_error("serialize bank accounts error"))?; + + trace!( + "successfully created snapshot {}, path: {:?}", + bank.slot(), + snapshot_file_path + ); + Ok(()) +} + +pub fn remove_snapshot>(slot: u64, snapshot_path: P) -> Result<()> { + let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot); + // Remove the snapshot directory for this slot + fs::remove_dir_all(slot_snapshot_dir)?; + Ok(()) +} + +pub fn load_snapshots>( + names: &[u64], + bank0: &mut Bank, + bank_maps: &mut Vec<(u64, u64, Bank)>, + status_cache_rc: &StatusCacheRc, + snapshot_path: P, +) -> Option { + let mut bank_root: Option = None; + + for (i, bank_slot) in names.iter().rev().enumerate() { + let snapshot_file_name = get_snapshot_file_name(*bank_slot); + let snapshot_dir = get_bank_snapshot_dir(&snapshot_path, *bank_slot); + let snapshot_file_path = snapshot_dir.join(snapshot_file_name.clone()); + trace!("Load from {:?}", snapshot_file_path); + let file = File::open(snapshot_file_path); + if file.is_err() { + warn!("Snapshot file open failed for {}", bank_slot); + continue; + } + let file = file.unwrap(); + let mut stream = BufReader::new(file); + let bank: Result = + deserialize_from(&mut stream).map_err(|_| get_io_error("deserialize bank error")); + let slot: Result = deserialize_from(&mut stream) + .map_err(|_| get_io_error("deserialize bank parent error")); + let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 }; + let root: Result = + deserialize_from(&mut stream).map_err(|_| get_io_error("deserialize root error")); + let status_cache: Result = deserialize_from(&mut stream) + .map_err(|_| get_io_error("deserialize bank status cache error")); + if bank_root.is_none() && bank0.rc.update_from_stream(&mut stream).is_ok() { + bank_root = Some(root.unwrap()); + } + if bank_root.is_some() { + match bank { + Ok(v) => { + if status_cache.is_ok() { + let status_cache = status_cache.unwrap(); + status_cache_rc.append(&status_cache); + // On the last snapshot, purge all outdated status cache + // entries + if i == names.len() - 1 { + status_cache_rc.purge_roots(); + } + } + + bank_maps.push((*bank_slot, parent_slot, v)); + } + Err(_) => warn!("Load snapshot failed for {}", bank_slot), + } + } else { + warn!("Load snapshot rc failed for {}", bank_slot); + } + } + + bank_root +} + +pub fn get_snapshot_tar_path>(snapshot_output_dir: P) -> PathBuf { + snapshot_output_dir.as_ref().join("state.tgz") +} + +pub fn untar_snapshot_in, Q: AsRef>( + snapshot_tar: P, + unpack_dir: Q, +) -> Result<()> { + let tar_gz = File::open(snapshot_tar)?; + let tar = GzDecoder::new(tar_gz); + let mut archive = Archive::new(tar); + archive.unpack(&unpack_dir)?; + Ok(()) +} + +fn hardlink_snapshot_directory, Q: AsRef>( + snapshot_dir: P, + snapshot_hardlink_dir: Q, + slot: u64, +) -> Result<()> { + // Create a new directory in snapshot_hardlink_dir + let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(slot.to_string()); + let _ = fs::remove_dir_all(&new_slot_hardlink_dir); + fs::create_dir_all(&new_slot_hardlink_dir)?; + + // Hardlink the contents of the directory + let snapshot_file = snapshot_dir + .as_ref() + .join(slot.to_string()) + .join(slot.to_string()); + fs::hard_link( + &snapshot_file, + &new_slot_hardlink_dir.join(slot.to_string()), + )?; + Ok(()) +} + +fn get_snapshot_file_name(slot: u64) -> String { + slot.to_string() +} + +fn get_bank_snapshot_dir>(path: P, slot: u64) -> PathBuf { + path.as_ref().join(slot.to_string()) +} + +fn get_snapshots_hardlink_dir_for_package>(parent_dir: P, slot: u64) -> PathBuf { + let file_name = format!("snapshot_{}_hard_links", slot); + parent_dir.as_ref().join(file_name) +} + +fn get_io_error(error: &str) -> Error { + warn!("BankForks error: {:?}", error); + Error::IO(IOError::new(ErrorKind::Other, error)) +} + +#[cfg(test)] +pub mod tests { + use super::*; + use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOT_DIR}; + use tempfile::TempDir; + + pub fn verify_snapshot_tar( + snapshot_tar: P, + snapshots_to_verify: Q, + storages_to_verify: R, + ) where + P: AsRef, + Q: AsRef, + R: AsRef, + { + let temp_dir = TempDir::new().unwrap(); + let unpack_dir = temp_dir.path(); + untar_snapshot_in(snapshot_tar, &unpack_dir).unwrap(); + + // Check snapshots are the same + let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOT_DIR); + assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap()); + + // Check the account entries are the same + let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR); + assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap()); + } +} diff --git a/core/src/tvu.rs b/core/src/tvu.rs index ad2ca18b6..cacf9fe73 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -24,6 +24,7 @@ use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; +use crate::snapshot_package::SnapshotPackagerService; use crate::storage_stage::{StorageStage, StorageState}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -41,6 +42,7 @@ pub struct Tvu { blockstream_service: Option, ledger_cleanup_service: Option, storage_stage: StorageStage, + snapshot_packager_service: Option, } pub struct Sockets { @@ -116,6 +118,17 @@ impl Tvu { let (blockstream_slot_sender, blockstream_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); + let (snapshot_packager_service, snapshot_package_sender) = { + let snapshot_config = { bank_forks.read().unwrap().snapshot_config().clone() }; + if snapshot_config.is_some() { + // Start a snapshot packaging service + let (sender, receiver) = channel(); + let snapshot_packager_service = SnapshotPackagerService::new(receiver, exit); + (Some(snapshot_packager_service), Some(sender)) + } else { + (None, None) + } + }; let (replay_stage, root_bank_receiver) = ReplayStage::new( &keypair.pubkey(), @@ -130,6 +143,7 @@ impl Tvu { poh_recorder, leader_schedule_cache, vec![blockstream_slot_sender, ledger_cleanup_slot_sender], + snapshot_package_sender, ); let blockstream_service = if blockstream_unix_socket.is_some() { @@ -171,6 +185,7 @@ impl Tvu { blockstream_service, ledger_cleanup_service, storage_stage, + snapshot_packager_service, } } } @@ -189,6 +204,9 @@ impl Service for Tvu { self.ledger_cleanup_service.unwrap().join()?; } self.replay_stage.join()?; + if let Some(s) = self.snapshot_packager_service { + s.join()?; + } Ok(()) } } diff --git a/core/src/validator.rs b/core/src/validator.rs index 9aff3c208..20459a2c7 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1,6 +1,6 @@ //! The `fullnode` module hosts all the fullnode microservices. -use crate::bank_forks::BankForks; +use crate::bank_forks::{BankForks, SnapshotConfig}; use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::blocktree_processor::{self, BankForksInfo}; use crate::broadcast_stage::BroadcastStageType; @@ -40,7 +40,7 @@ pub struct ValidatorConfig { pub storage_slots_per_turn: u64, pub account_paths: Option, pub rpc_config: JsonRpcConfig, - pub snapshot_path: Option, + pub snapshot_config: Option, pub max_ledger_slots: Option, pub broadcast_stage_type: BroadcastStageType, pub erasure_config: ErasureConfig, @@ -56,7 +56,7 @@ impl Default for ValidatorConfig { max_ledger_slots: None, account_paths: None, rpc_config: JsonRpcConfig::default(), - snapshot_path: None, + snapshot_config: None, broadcast_stage_type: BroadcastStageType::Standard, erasure_config: ErasureConfig::default(), } @@ -105,7 +105,7 @@ impl Validator { ) = new_banks_from_blocktree( ledger_path, config.account_paths.clone(), - config.snapshot_path.as_ref(), + config.snapshot_config.clone(), verify_ledger, ); @@ -134,7 +134,7 @@ impl Validator { &leader_schedule_cache, &poh_config, ); - if config.snapshot_path.is_some() { + if config.snapshot_config.is_some() { poh_recorder.set_bank(&bank); } @@ -308,43 +308,57 @@ fn get_bank_forks( genesis_block: &GenesisBlock, blocktree: &Blocktree, account_paths: Option, - snapshot_path: Option<&PathBuf>, + snapshot_config: Option, verify_ledger: bool, ) -> (BankForks, Vec, LeaderScheduleCache) { - if let Some(snapshot_path) = snapshot_path { - let bank_forks = - BankForks::load_from_snapshot(&genesis_block, account_paths.clone(), snapshot_path); - match bank_forks { - Ok(v) => { - let bank = &v.working_bank(); - let fork_info = BankForksInfo { - bank_slot: bank.slot(), - entry_height: bank.tick_height(), - }; - return (v, vec![fork_info], LeaderScheduleCache::new_from_bank(bank)); + let (mut bank_forks, bank_forks_info, leader_schedule_cache) = { + let mut result = None; + if snapshot_config.is_some() { + let bank_forks = BankForks::load_from_snapshot( + &genesis_block, + account_paths.clone(), + snapshot_config.as_ref().unwrap(), + ); + match bank_forks { + Ok(v) => { + let bank = &v.working_bank(); + let fork_info = BankForksInfo { + bank_slot: bank.slot(), + entry_height: bank.tick_height(), + }; + result = Some((v, vec![fork_info], LeaderScheduleCache::new_from_bank(bank))); + } + Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"), } - Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"), } + + // If loading from a snapshot failed/snapshot didn't exist + if result.is_none() { + result = Some( + blocktree_processor::process_blocktree( + &genesis_block, + &blocktree, + account_paths, + verify_ledger, + ) + .expect("process_blocktree failed"), + ); + } + + result.unwrap() + }; + + if snapshot_config.is_some() { + bank_forks.set_snapshot_config(snapshot_config.unwrap()); } - let (mut bank_forks, bank_forks_info, leader_schedule_cache) = - blocktree_processor::process_blocktree( - &genesis_block, - &blocktree, - account_paths, - verify_ledger, - ) - .expect("process_blocktree failed"); - if let Some(snapshot_path) = snapshot_path { - bank_forks.set_snapshot_path(snapshot_path); - let _ = bank_forks.add_snapshot(0, 0); - } + (bank_forks, bank_forks_info, leader_schedule_cache) } pub fn new_banks_from_blocktree( blocktree_path: &Path, account_paths: Option, - snapshot_path: Option<&PathBuf>, + snapshot_config: Option, verify_ledger: bool, ) -> ( BankForks, @@ -366,7 +380,7 @@ pub fn new_banks_from_blocktree( &genesis_block, &blocktree, account_paths, - snapshot_path, + snapshot_config, verify_ledger, ); diff --git a/multinode-demo/bootstrap-leader.sh b/multinode-demo/bootstrap-leader.sh index 31cb6285b..9d7087098 100755 --- a/multinode-demo/bootstrap-leader.sh +++ b/multinode-demo/bootstrap-leader.sh @@ -62,6 +62,7 @@ args+=( --ledger "$ledger_dir" --rpc-port 8899 --snapshot-path "$SOLANA_CONFIG_DIR"/bootstrap-leader/snapshots + --snapshot-interval-slots 100 --storage-keypair "$storage_keypair" --voting-keypair "$vote_keypair" --rpc-drone-address 127.0.0.1:9900 diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 58efbb6db..65dec5922 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -263,6 +263,7 @@ default_arg --storage-keypair "$storage_keypair_path" default_arg --ledger "$ledger_config_dir" default_arg --accounts "$accounts_config_dir" default_arg --snapshot-path "$snapshot_config_dir" +default_arg --snapshot-interval-slots 100 if [[ -n $SOLANA_CUDA ]]; then program=$solana_validator_cuda diff --git a/run.sh b/run.sh index ce8817fc3..271ba3c0e 100755 --- a/run.sh +++ b/run.sh @@ -94,6 +94,7 @@ args=( --rpc-drone-address 127.0.0.1:9900 --accounts "$dataDir"/accounts --snapshot-path "$dataDir"/snapshots + --snapshot-interval-slots 100 ) if [[ -n $blockstreamSocket ]]; then args+=(--blockstream "$blockstreamSocket") diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 1116059e0..3a8a020f6 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -36,6 +36,7 @@ use std::fmt; use std::fs::remove_dir_all; use std::io::{BufReader, Cursor, Error, ErrorKind, Read}; use std::path::Path; +use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use sys_info; @@ -81,8 +82,8 @@ pub type InstructionLoaders = Vec>; // Each fork has a set of storage entries. type ForkStores = HashMap>; -#[derive(Default, Debug)] -pub struct AccountStorage(HashMap); +#[derive(Clone, Default, Debug)] +pub struct AccountStorage(pub HashMap); struct AccountStorageVisitor; @@ -122,11 +123,20 @@ impl Serialize for AccountStorage { len += storage.len(); } let mut map = serializer.serialize_map(Some(len))?; + let mut count = 0; + let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms"); for fork_storage in self.0.values() { for (storage_id, account_storage_entry) in fork_storage { map.serialize_entry(storage_id, &**account_storage_entry)?; + count += 1; } } + serialize_account_storage_timer.stop(); + datapoint_info!( + "serialize_account_storage_ms", + ("duration", serialize_account_storage_timer.as_ms(), i64), + ("num_entries", count, i64), + ); map.end() } } @@ -166,7 +176,7 @@ pub struct AccountStorageEntry { } impl AccountStorageEntry { - pub fn new(path: &str, fork_id: Fork, id: usize, file_size: u64) -> Self { + pub fn new(path: &Path, fork_id: Fork, id: usize, file_size: u64) -> Self { let tail = format!("{}.{}", fork_id, id); let path = Path::new(path).join(&tail); let accounts = AppendVec::new(&path, true, file_size as usize); @@ -208,6 +218,14 @@ impl AccountStorageEntry { self.count_and_status.read().unwrap().0 } + pub fn fork_id(&self) -> Fork { + self.fork_id + } + + pub fn append_vec_id(&self) -> AppendVecId { + self.id + } + fn add_account(&self) { let mut count_and_status = self.count_and_status.write().unwrap(); *count_and_status = (count_and_status.0 + 1, count_and_status.1); @@ -252,6 +270,10 @@ impl AccountStorageEntry { } count_and_status.0 } + + pub fn get_path(&self) -> PathBuf { + self.accounts.get_path() + } } pub fn get_paths_vec(paths: &str) -> Vec { @@ -409,7 +431,7 @@ impl AccountsDB { Ok(()) } - fn new_storage_entry(&self, fork_id: Fork, path: &str, size: u64) -> AccountStorageEntry { + fn new_storage_entry(&self, fork_id: Fork, path: &Path, size: u64) -> AccountStorageEntry { AccountStorageEntry::new( path, fork_id, @@ -568,7 +590,7 @@ impl AccountsDB { ) -> Arc { let paths = self.paths.read().unwrap(); let path_index = thread_rng().gen_range(0, paths.len()); - let store = Arc::new(self.new_storage_entry(fork_id, &paths[path_index], size)); + let store = Arc::new(self.new_storage_entry(fork_id, &Path::new(&paths[path_index]), size)); fork_storage.insert(store.id, store.clone()); store } diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 694d021e6..5bedc3e73 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -235,6 +235,10 @@ impl AppendVec { Some((meta, stored.0.clone_account())) } + pub fn get_path(&self) -> PathBuf { + self.path.clone() + } + pub fn accounts<'a>(&'a self, mut start: usize) -> Vec> { let mut accounts = vec![]; while let Some((account, next)) = self.get_account(start) { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index ea358e5c4..e65d8fe56 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -3,6 +3,7 @@ //! on behalf of the caller, and a low-level API for when they have //! already been signed and verified. use crate::accounts::Accounts; +use crate::accounts_db::AccountStorageEntry; use crate::accounts_db::{ AppendVecId, ErrorCounters, InstructionAccounts, InstructionCredits, InstructionLoaders, }; @@ -83,6 +84,15 @@ impl BankRc { self.accounts.update_from_stream(stream) } + pub fn get_storage_entries(&self) -> Vec> { + let r_storage = self.accounts.accounts_db.storage.read().unwrap(); + r_storage + .0 + .values() + .flat_map(|fork_store| fork_store.values().cloned()) + .collect() + } + fn get_io_error(error: &str) -> std::io::Error { warn!("BankRc error: {:?}", error); std::io::Error::new(std::io::ErrorKind::Other, error) @@ -167,6 +177,10 @@ impl StatusCacheRc { let sc = status_cache_rc.status_cache.write().unwrap(); self.status_cache.write().unwrap().append(&sc); } + + pub fn purge_roots(&self) { + self.status_cache.write().unwrap().purge_roots(); + } } /// Manager for the state of all accounts and programs after processing its entries. diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index 71b390cfe..1d3b209e1 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -6,7 +6,7 @@ use solana_sdk::hash::Hash; use solana_sdk::signature::Signature; use std::collections::{HashMap, HashSet}; -const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS; +pub const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS; const CACHED_SIGNATURE_SIZE: usize = 20; // Store forks in a single chunk of memory to avoid another lookup. @@ -104,14 +104,7 @@ impl StatusCache { /// After MAX_CACHE_ENTRIES, roots are removed, and any old signatures are cleared. pub fn add_root(&mut self, fork: ForkId) { self.roots.insert(fork); - if self.roots.len() > MAX_CACHE_ENTRIES { - if let Some(min) = self.roots.iter().min().cloned() { - self.roots.remove(&min); - for cache in self.cache.iter_mut() { - cache.retain(|_, (fork, _, _)| *fork > min); - } - } - } + self.purge_roots(); } /// Insert a new signature for a specific fork. @@ -136,6 +129,17 @@ impl StatusCache { sig_forks.push((fork, res)); } + pub fn purge_roots(&mut self) { + if self.roots.len() > MAX_CACHE_ENTRIES { + if let Some(min) = self.roots.iter().min().cloned() { + self.roots.remove(&min); + for cache in self.cache.iter_mut() { + cache.retain(|_, (fork, _, _)| *fork > min); + } + } + } + } + fn insert_entry( &mut self, transaction_blockhash: &Hash, diff --git a/validator/src/main.rs b/validator/src/main.rs index 603f9bb78..09ca077e3 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1,5 +1,6 @@ use clap::{crate_description, crate_name, crate_version, App, Arg}; use log::*; +use solana::bank_forks::SnapshotConfig; use solana::cluster_info::{Node, FULLNODE_PORT_RANGE}; use solana::contact_info::ContactInfo; use solana::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS; @@ -158,10 +159,19 @@ fn main() { .arg( clap::Arg::with_name("snapshot_path") .long("snapshot-path") - .value_name("PATHS") + .value_name("SNAPSHOT_PATHS") .takes_value(true) + .requires("snapshot_interval_slots") .help("Snapshot path"), ) + .arg( + clap::Arg::with_name("snapshot_interval_slots") + .long("snapshot-interval-slots") + .value_name("SNAPSHOT_INTERVAL_SLOTS") + .takes_value(true) + .requires("snapshot_path") + .help("Number of slots between generating snapshots"), + ) .arg( clap::Arg::with_name("limit_ledger_size") .long("limit-ledger-size") @@ -233,8 +243,19 @@ fn main() { ), ); - validator_config.account_paths = matches.value_of("accounts").map(ToString::to_string); - validator_config.snapshot_path = matches.value_of("snapshot_path").map(PathBuf::from); + if let Some(paths) = matches.value_of("accounts") { + validator_config.account_paths = Some(paths.to_string()); + } + if let Some(snapshot_path) = matches.value_of("snapshot_path").map(PathBuf::from) { + let snapshot_interval = matches.value_of("snapshot_interval_slots").unwrap(); + validator_config.snapshot_config = Some(SnapshotConfig::new( + snapshot_path, + ledger_path.clone(), + snapshot_interval.parse::().unwrap(), + )); + } else { + validator_config.snapshot_config = None; + } if matches.is_present("limit_ledger_size") { validator_config.max_ledger_slots = Some(DEFAULT_MAX_LEDGER_SLOTS); }