Snapshot Packaging Service (#5262)

* Snapshot serialization and packaging
This commit is contained in:
carllin 2019-07-31 17:58:10 -07:00 committed by GitHub
parent 937f9ad049
commit 6cb2040a1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 990 additions and 274 deletions

38
Cargo.lock generated
View File

@ -828,6 +828,14 @@ dependencies = [
"generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "dir-diff"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"walkdir 2.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "dirs"
version = "1.0.5"
@ -1059,6 +1067,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.60 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz_oxide_c_api 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1080,6 +1089,11 @@ name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "fs_extra"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
@ -1789,6 +1803,15 @@ dependencies = [
"unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miniz-sys"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cc 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.60 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "miniz_oxide"
version = "0.3.0"
@ -3030,6 +3053,9 @@ dependencies = [
"core_affinity 0.5.9 (registry+https://github.com/rust-lang/crates.io-index)",
"crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"dir-diff 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"flate2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
"fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"hashbrown 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hex-literal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -3075,7 +3101,10 @@ dependencies = [
"solana-storage-program 0.18.0-pre0",
"solana-vote-api 0.18.0-pre0",
"solana-vote-signer 0.18.0-pre0",
"symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sys-info 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)",
"tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-fs 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -4349,6 +4378,11 @@ name = "subtle"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "symlink"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "syn"
version = "0.11.11"
@ -5189,6 +5223,7 @@ dependencies = [
"checksum difference 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
"checksum digest 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "03b072242a8cbaf9c145665af9d250c59af3b958f83ed6824e13533cf76d5b90"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
"checksum dir-diff 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1cce6e50ca36311e494793f7629014dc78cd963ba85cd05968ae06a63b867f0b"
"checksum dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3fd78930633bd1c6e35c4b42b1df7b0cbc6bc191146e512bb3bedf243fcc3901"
"checksum dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3"
"checksum dirs-sys 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b"
@ -5217,6 +5252,7 @@ dependencies = [
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
"checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
"checksum fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5f2a4a2034423744d2cc7ca2068453168dcdb82c438419e639a26bd87839c674"
"checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
@ -5297,6 +5333,7 @@ dependencies = [
"checksum mime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0"
"checksum mime 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)" = "3e27ca21f40a310bd06d9031785f4801710d566c184a6e15bad4f1d9b65f9425"
"checksum mime_guess 2.0.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "30de2e4613efcba1ec63d8133f344076952090c122992a903359be5a4f99c3ed"
"checksum miniz-sys 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "1e9e3ae51cea1576ceba0dde3d484d30e6e5b86dee0b2d412fe3a16a15c98202"
"checksum miniz_oxide 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c061edee74a88eb35d876ce88b94d77a0448a201de111c244b70d047f5820516"
"checksum miniz_oxide_c_api 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6c675792957b0d19933816c4e1d56663c341dd9bfa31cb2140ff2267c1d8ecf4"
"checksum mio 0.6.19 (registry+https://github.com/rust-lang/crates.io-index)" = "83f51996a3ed004ef184e16818edc51fadffe8e7ca68be67f9dee67d84d0ff23"
@ -5468,6 +5505,7 @@ dependencies = [
"checksum structopt-derive 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "53010261a84b37689f9ed7d395165029f9cc7abb9f56bbfe86bee2597ed25107"
"checksum subtle 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee"
"checksum subtle 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "01dca13cf6c3b179864ab3292bd794e757618d35a7766b7c46050c614ba00829"
"checksum symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a"
"checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad"
"checksum syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)" = "eadc09306ca51a40555dd6fc2b415538e9e18bc9f870e47b1a524a79fe2dcf5e"
"checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6"

View File

@ -25,6 +25,9 @@ chrono = { version = "0.4.7", features = ["serde"] }
core_affinity = "0.5.9"
crc = { version = "1.8.1", optional = true }
crossbeam-channel = "0.3"
dir-diff = "0.3.1"
flate2 = "1.0.9"
fs_extra = "1.1.0"
hashbrown = "0.2.0"
indexmap = "1.0"
itertools = "0.8.0"
@ -65,7 +68,10 @@ solana-storage-api = { path = "../programs/storage_api", version = "0.18.0-pre0"
solana-storage-program = { path = "../programs/storage_program", version = "0.18.0-pre0" }
solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre0" }
solana-vote-signer = { path = "../vote-signer", version = "0.18.0-pre0" }
symlink = "0.1.0"
sys-info = "0.5.7"
tar = "0.4.26"
tempfile = "3.1.0"
tokio = "0.1"
tokio-codec = "0.1"
tokio-fs = "0.1"

View File

@ -1,25 +1,61 @@
//! The `bank_forks` module implments BankForks a DAG of checkpointed Banks
use bincode::{deserialize_from, serialize_into};
use crate::result::{Error, Result};
use crate::snapshot_package::SnapshotPackageSender;
use crate::snapshot_utils;
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::{Bank, BankRc, StatusCacheRc};
use solana_runtime::status_cache::MAX_CACHE_ENTRIES;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::timing;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::fs::File;
use std::io::{BufReader, BufWriter, Error, ErrorKind};
use std::io::{Error as IOError, ErrorKind};
use std::ops::Index;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SnapshotConfig {
snapshot_path: PathBuf,
snapshot_package_output_path: PathBuf,
snapshot_interval_slots: usize,
}
impl SnapshotConfig {
pub fn new(
snapshot_path: PathBuf,
snapshot_package_output_path: PathBuf,
snapshot_interval_slots: usize,
) -> Self {
Self {
snapshot_path,
snapshot_package_output_path,
snapshot_interval_slots,
}
}
pub fn snapshot_path(&self) -> &Path {
self.snapshot_path.as_path()
}
pub fn snapshot_package_output_path(&self) -> &Path {
&self.snapshot_package_output_path.as_path()
}
pub fn snapshot_interval_slots(&self) -> usize {
self.snapshot_interval_slots
}
}
pub struct BankForks {
banks: HashMap<u64, Arc<Bank>>,
working_bank: Arc<Bank>,
root: u64,
slots: HashSet<u64>,
snapshot_path: Option<PathBuf>,
snapshot_config: Option<SnapshotConfig>,
last_snapshot: u64,
confidence: HashMap<u64, Confidence>,
}
@ -71,8 +107,8 @@ impl BankForks {
banks,
working_bank,
root: 0,
slots: HashSet::new(),
snapshot_path: None,
snapshot_config: None,
last_snapshot: 0,
confidence: HashMap::new(),
}
}
@ -136,8 +172,8 @@ impl BankForks {
root,
banks,
working_bank,
slots: HashSet::new(),
snapshot_path: None,
snapshot_config: None,
last_snapshot: 0,
confidence: HashMap::new(),
}
}
@ -156,7 +192,7 @@ impl BankForks {
self.working_bank.clone()
}
pub fn set_root(&mut self, root: u64) {
pub fn set_root(&mut self, root: u64, snapshot_package_sender: &Option<SnapshotPackageSender>) {
self.root = root;
let set_root_start = Instant::now();
let root_bank = self
@ -172,6 +208,33 @@ impl BankForks {
let new_tx_count = root_bank.transaction_count();
self.prune_non_root(root);
// Generate a snapshot if snapshots are configured and it's been an appropriate number
// of banks since the last snapshot
if self.snapshot_config.is_some() {
let config = self
.snapshot_config
.as_ref()
.expect("Called package_snapshot without a snapshot configuration");
if root - self.last_snapshot >= config.snapshot_interval_slots as u64 {
let mut snapshot_time = Measure::start("total-snapshot-ms");
let r = self.generate_snapshot(
root,
snapshot_package_sender.as_ref().unwrap(),
snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path),
);
if r.is_err() {
warn!("Error generating snapshot for bank: {}, err: {:?}", root, r);
} else {
self.last_snapshot = root;
}
// Cleanup outdated snapshots
self.purge_old_snapshots();
snapshot_time.stop();
inc_new_counter_info!("total-snapshot-setup-ms", snapshot_time.as_ms() as usize);
}
}
inc_new_counter_info!(
"bank-forks_set_root_ms",
timing::duration_as_ms(&set_root_start.elapsed()) as usize
@ -186,30 +249,60 @@ impl BankForks {
self.root
}
fn prune_non_root(&mut self, root: u64) {
let slots: HashSet<u64> = self
.banks
.iter()
.filter(|(_, b)| b.is_frozen())
.map(|(k, _)| *k)
.collect();
let descendants = self.descendants();
self.banks
.retain(|slot, _| descendants[&root].contains(slot));
self.confidence
.retain(|slot, _| slot == &root || descendants[&root].contains(slot));
if let Some(snapshot_path) = &self.snapshot_path {
let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect();
trace!("prune non root {} - {:?}", root, diff);
for slot in diff.iter() {
if **slot > root {
let _ = self.add_snapshot(**slot, root);
} else {
BankForks::remove_snapshot(**slot, &snapshot_path);
}
fn purge_old_snapshots(&self) {
// Remove outdated snapshots
let config = self.snapshot_config.as_ref().unwrap();
let names = snapshot_utils::get_snapshot_names(&config.snapshot_path);
let num_to_remove = names.len().saturating_sub(MAX_CACHE_ENTRIES);
for old_root in &names[..num_to_remove] {
let r = snapshot_utils::remove_snapshot(*old_root, &config.snapshot_path);
if r.is_err() {
warn!("Couldn't remove snapshot at: {:?}", config.snapshot_path);
}
}
self.slots = slots.clone();
}
fn generate_snapshot<P: AsRef<Path>>(
&self,
root: u64,
snapshot_package_sender: &SnapshotPackageSender,
tar_output_file: P,
) -> Result<()> {
let config = self.snapshot_config.as_ref().unwrap();
// Add a snapshot for the new root
let bank = self
.get(root)
.cloned()
.expect("root must exist in BankForks");
snapshot_utils::add_snapshot(&config.snapshot_path, &bank, root)?;
// Package the relevant snapshots
let names = snapshot_utils::get_snapshot_names(&config.snapshot_path);
// We only care about the last MAX_CACHE_ENTRIES snapshots of roots because
// the status cache of anything older is thrown away by the bank in
// status_cache.prune_roots()
let start = names.len().saturating_sub(MAX_CACHE_ENTRIES);
let package = snapshot_utils::package_snapshot(
&bank,
&names[start..],
&config.snapshot_path,
tar_output_file,
)?;
// Send the package to the packaging thread
snapshot_package_sender.send(package)?;
Ok(())
}
fn prune_non_root(&mut self, root: u64) {
let descendants = self.descendants();
self.banks
.retain(|slot, _| slot == &root || descendants[&root].contains(slot));
self.confidence
.retain(|slot, _| slot == &root || descendants[&root].contains(slot));
}
pub fn cache_fork_confidence(
@ -247,109 +340,20 @@ impl BankForks {
self.confidence.get(&fork)
}
fn get_io_error(error: &str) -> Error {
warn!("BankForks error: {:?}", error);
Error::new(ErrorKind::Other, error)
pub fn set_snapshot_config(&mut self, snapshot_config: SnapshotConfig) {
self.snapshot_config = Some(snapshot_config);
}
pub fn add_snapshot(&self, slot: u64, root: u64) -> Result<(), Error> {
let path = self.snapshot_path.as_ref().expect("no snapshot_path");
fs::create_dir_all(path.clone())?;
let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file);
trace!("path: {:?}", bank_file_path);
let file = File::create(bank_file_path)?;
let mut stream = BufWriter::new(file);
let bank_slot = self.get(slot);
if bank_slot.is_none() {
return Err(BankForks::get_io_error("bank_forks get error"));
}
let bank = bank_slot.unwrap().clone();
serialize_into(&mut stream, &*bank)
.map_err(|_| BankForks::get_io_error("serialize bank error"))?;
let mut parent_slot: u64 = 0;
if let Some(parent_bank) = bank.parent() {
parent_slot = parent_bank.slot();
}
serialize_into(&mut stream, &parent_slot)
.map_err(|_| BankForks::get_io_error("serialize bank parent error"))?;
serialize_into(&mut stream, &root)
.map_err(|_| BankForks::get_io_error("serialize root error"))?;
serialize_into(&mut stream, &bank.src)
.map_err(|_| BankForks::get_io_error("serialize bank status cache error"))?;
serialize_into(&mut stream, &bank.rc)
.map_err(|_| BankForks::get_io_error("serialize bank accounts error"))?;
Ok(())
}
pub fn remove_snapshot(slot: u64, path: &Path) {
let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file);
let _ = fs::remove_file(bank_file_path);
}
pub fn set_snapshot_path(&mut self, snapshot_path: &Path) {
self.snapshot_path = Some(snapshot_path.to_path_buf());
}
fn load_snapshots(
names: &[u64],
bank0: &mut Bank,
bank_maps: &mut Vec<(u64, u64, Bank)>,
status_cache_rc: &StatusCacheRc,
snapshot_path: &Path,
) -> Option<u64> {
let mut bank_root: Option<u64> = None;
for bank_slot in names.iter().rev() {
let bank_path = format!("{}", bank_slot);
let bank_file_path = snapshot_path.join(bank_path.clone());
info!("Load from {:?}", bank_file_path);
let file = File::open(bank_file_path);
if file.is_err() {
warn!("Snapshot file open failed for {}", bank_slot);
continue;
}
let file = file.unwrap();
let mut stream = BufReader::new(file);
let bank: Result<Bank, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank error"));
let slot: Result<u64, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank parent error"));
let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 };
let root: Result<u64, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize root error"));
let status_cache: Result<StatusCacheRc, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank status cache error"));
if bank_root.is_none() && bank0.rc.update_from_stream(&mut stream).is_ok() {
bank_root = Some(root.unwrap());
}
if bank_root.is_some() {
match bank {
Ok(v) => {
if status_cache.is_ok() {
status_cache_rc.append(&status_cache.unwrap());
}
bank_maps.push((*bank_slot, parent_slot, v));
}
Err(_) => warn!("Load snapshot failed for {}", bank_slot),
}
} else {
BankForks::remove_snapshot(*bank_slot, snapshot_path);
warn!("Load snapshot rc failed for {}", bank_slot);
}
}
bank_root
pub fn snapshot_config(&self) -> &Option<SnapshotConfig> {
&self.snapshot_config
}
fn setup_banks(
bank_maps: &mut Vec<(u64, u64, Bank)>,
bank_rc: &BankRc,
status_cache_rc: &StatusCacheRc,
) -> (HashMap<u64, Arc<Bank>>, HashSet<u64>, u64) {
) -> (HashMap<u64, Arc<Bank>>, u64) {
let mut banks = HashMap::new();
let mut slots = HashSet::new();
let (last_slot, last_parent_slot, mut last_bank) = bank_maps.remove(0);
last_bank.set_bank_rc(&bank_rc, &status_cache_rc);
@ -362,7 +366,6 @@ impl BankForks {
}
if slot > 0 {
banks.insert(slot, Arc::new(bank));
slots.insert(slot);
}
}
if last_parent_slot != 0 {
@ -371,56 +374,54 @@ impl BankForks {
}
}
banks.insert(last_slot, Arc::new(last_bank));
slots.insert(last_slot);
(banks, slots, last_slot)
(banks, last_slot)
}
pub fn load_from_snapshot(
genesis_block: &GenesisBlock,
account_paths: Option<String>,
snapshot_path: &Path,
) -> Result<Self, Error> {
let paths = fs::read_dir(snapshot_path)?;
let mut names = paths
.filter_map(|entry| {
entry.ok().and_then(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().unwrap()))
})
})
.collect::<Vec<u64>>();
names.sort();
snapshot_config: &SnapshotConfig,
) -> Result<Self> {
fs::create_dir_all(&snapshot_config.snapshot_path)?;
let names = snapshot_utils::get_snapshot_names(&snapshot_config.snapshot_path);
if names.is_empty() {
return Err(Error::IO(IOError::new(
ErrorKind::Other,
"no snapshots found",
)));
}
let mut bank_maps = vec![];
let status_cache_rc = StatusCacheRc::default();
let id = (names[names.len() - 1] + 1) as usize;
let mut bank0 =
Bank::create_with_genesis(&genesis_block, account_paths.clone(), &status_cache_rc, id);
bank0.freeze();
let bank_root = BankForks::load_snapshots(
let bank_root = snapshot_utils::load_snapshots(
&names,
&mut bank0,
&mut bank_maps,
&status_cache_rc,
snapshot_path,
&snapshot_config.snapshot_path,
);
if bank_maps.is_empty() || bank_root.is_none() {
BankForks::remove_snapshot(0, snapshot_path);
return Err(Error::new(ErrorKind::Other, "no snapshots loaded"));
return Err(Error::IO(IOError::new(
ErrorKind::Other,
"no snapshots loaded",
)));
}
let root = bank_root.unwrap();
let (banks, slots, last_slot) =
let (banks, last_slot) =
BankForks::setup_banks(&mut bank_maps, &bank0.rc, &status_cache_rc);
let working_bank = banks[&last_slot].clone();
Ok(BankForks {
banks,
working_bank,
root,
slots,
snapshot_path: Some(snapshot_path.to_path_buf()),
snapshot_config: None,
last_snapshot: *names.last().unwrap(),
confidence: HashMap::new(),
})
}
@ -430,12 +431,17 @@ impl BankForks {
mod tests {
use super::*;
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use crate::service::Service;
use crate::snapshot_package::SnapshotPackagerService;
use fs_extra::dir::CopyOptions;
use itertools::Itertools;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
use std::env;
use std::fs::remove_dir_all;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use tempfile::TempDir;
#[test]
fn test_bank_forks() {
@ -545,73 +551,25 @@ mod tests {
);
}
struct TempPaths {
pub paths: String,
}
impl TempPaths {
fn remove_all(&self) {
let paths: Vec<String> = self.paths.split(',').map(|s| s.to_string()).collect();
paths.iter().for_each(|p| {
let _ignored = remove_dir_all(p);
});
}
}
#[macro_export]
macro_rules! tmp_bank_accounts_name {
() => {
&format!("{}-{}", file!(), line!())
};
}
#[macro_export]
macro_rules! get_tmp_bank_accounts_path {
() => {
get_tmp_bank_accounts_path(tmp_bank_accounts_name!())
};
}
impl Drop for TempPaths {
fn drop(&mut self) {
self.remove_all()
}
}
fn get_paths_vec(paths: &str) -> Vec<String> {
paths.split(',').map(|s| s.to_string()).collect()
}
fn get_tmp_snapshots_path() -> TempPaths {
let out_dir = env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
let path = format!("{}/snapshots", out_dir);
TempPaths {
paths: path.to_string(),
}
}
fn get_tmp_bank_accounts_path(paths: &str) -> TempPaths {
let vpaths = get_paths_vec(paths);
let out_dir = env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
let vpaths: Vec<_> = vpaths
.iter()
.map(|path| format!("{}/{}", out_dir, path))
.collect();
TempPaths {
paths: vpaths.join(","),
}
}
fn restore_from_snapshot(
genesis_block: &GenesisBlock,
bank_forks: BankForks,
account_paths: Option<String>,
last_slot: u64,
) {
let snapshot_path = bank_forks.snapshot_path.as_ref().unwrap();
let snapshot_path = bank_forks
.snapshot_config
.as_ref()
.map(|c| &c.snapshot_path)
.unwrap();
let new = BankForks::load_from_snapshot(
&genesis_block,
account_paths,
bank_forks.snapshot_config.as_ref().unwrap(),
)
.unwrap();
let new =
BankForks::load_from_snapshot(&genesis_block, account_paths, snapshot_path).unwrap();
for (slot, _) in new.banks.iter() {
if *slot > 0 {
let bank = bank_forks.banks.get(slot).unwrap().clone();
@ -619,31 +577,39 @@ mod tests {
bank.compare_bank(&new_bank);
}
}
assert_eq!(new.working_bank().slot(), last_slot);
for (slot, _) in new.banks.iter() {
BankForks::remove_snapshot(*slot, snapshot_path);
snapshot_utils::remove_snapshot(*slot, snapshot_path).unwrap();
}
}
#[test]
fn test_bank_forks_snapshot_n() {
solana_logger::setup();
let path = get_tmp_bank_accounts_path!();
let spath = get_tmp_snapshots_path();
let accounts_dir = TempDir::new().unwrap();
let snapshot_dir = TempDir::new().unwrap();
let snapshot_output_path = TempDir::new().unwrap();
let GenesisBlockInfo {
genesis_block,
mint_keypair,
..
} = create_genesis_block(10_000);
path.remove_all();
spath.remove_all();
for index in 0..10 {
let bank0 = Bank::new_with_paths(&genesis_block, Some(path.paths.clone()));
let bank0 = Bank::new_with_paths(
&genesis_block,
Some(accounts_dir.path().to_str().unwrap().to_string()),
);
bank0.freeze();
let slot = bank0.slot();
let mut bank_forks = BankForks::new(0, bank0);
bank_forks.set_snapshot_path(&PathBuf::from(&spath.paths));
bank_forks.add_snapshot(slot, 0).unwrap();
let snapshot_config = SnapshotConfig::new(
PathBuf::from(snapshot_dir.path()),
PathBuf::from(snapshot_output_path.path()),
100,
);
bank_forks.set_snapshot_config(snapshot_config.clone());
let bank0 = bank_forks.get(0).unwrap();
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, 0).unwrap();
for forks in 0..index {
let bank = Bank::new_from_parent(&bank_forks[forks], &Pubkey::default(), forks + 1);
let key1 = Keypair::new().pubkey();
@ -655,11 +621,146 @@ mod tests {
);
assert_eq!(bank.process_transaction(&tx), Ok(()));
bank.freeze();
let slot = bank.slot();
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, &bank, 0).unwrap();
bank_forks.insert(bank);
bank_forks.add_snapshot(slot, 0).unwrap();
}
restore_from_snapshot(&genesis_block, bank_forks, Some(path.paths.clone()), index);
restore_from_snapshot(
&genesis_block,
bank_forks,
Some(accounts_dir.path().to_str().unwrap().to_string()),
index,
);
}
}
#[test]
fn test_concurrent_snapshot_packaging() {
solana_logger::setup();
let accounts_dir = TempDir::new().unwrap();
let snapshots_dir = TempDir::new().unwrap();
let snapshot_output_path = TempDir::new().unwrap();
let GenesisBlockInfo {
genesis_block,
mint_keypair,
..
} = create_genesis_block(10_000);
let (sender, receiver) = channel();
let (fake_sender, _fake_receiver) = channel();
let bank0 = Bank::new_with_paths(
&genesis_block,
Some(accounts_dir.path().to_str().unwrap().to_string()),
);
bank0.freeze();
// Set up bank forks
let mut bank_forks = BankForks::new(0, bank0);
let snapshot_config = SnapshotConfig::new(
PathBuf::from(snapshots_dir.path()),
PathBuf::from(snapshot_output_path.path()),
1,
);
bank_forks.set_snapshot_config(snapshot_config.clone());
// Take snapshot of zeroth bank
let bank0 = bank_forks.get(0).unwrap();
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, 0).unwrap();
// Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted
// and the snapshot purging logic will run on every snapshot taken. This means the three
// (including snapshot for bank0 created above) earliest snapshots will get purged by the
// time this loop is done.
// Also, make a saved copy of the state of the snapshot for a bank with
// bank.slot == saved_slot, so we can use it for a correctness check later.
let saved_snapshots_dir = TempDir::new().unwrap();
let saved_accounts_dir = TempDir::new().unwrap();
let saved_slot = 4;
let saved_tar = snapshot_config
.snapshot_package_output_path
.join(saved_slot.to_string());
for forks in 0..MAX_CACHE_ENTRIES + 2 {
let bank = Bank::new_from_parent(
&bank_forks[forks as u64],
&Pubkey::default(),
(forks + 1) as u64,
);
let slot = bank.slot();
let key1 = Keypair::new().pubkey();
let tx = system_transaction::create_user_account(
&mint_keypair,
&key1,
1,
genesis_block.hash(),
);
assert_eq!(bank.process_transaction(&tx), Ok(()));
bank.freeze();
bank_forks.insert(bank);
let package_sender = {
if slot == saved_slot as u64 {
// Only send one package on the real sende so that the packaging service
// doesn't take forever to run the packaging logic on all MAX_CACHE_ENTRIES
// later
&sender
} else {
&fake_sender
}
};
bank_forks
.generate_snapshot(
slot,
&package_sender,
snapshot_config
.snapshot_package_output_path
.join(slot.to_string()),
)
.unwrap();
if slot == saved_slot as u64 {
let options = CopyOptions::new();
fs_extra::dir::copy(&accounts_dir, &saved_accounts_dir, &options).unwrap();
fs_extra::dir::copy(&snapshots_dir, &saved_snapshots_dir, &options).unwrap();
}
}
// Purge all the outdated snapshots, including the ones needed to generate the package
// currently sitting in the channel
bank_forks.purge_old_snapshots();
let mut snapshot_names = snapshot_utils::get_snapshot_names(&snapshots_dir);
snapshot_names.sort();
assert_eq!(
snapshot_names,
(3..=MAX_CACHE_ENTRIES as u64 + 2).collect_vec()
);
// Create a SnapshotPackagerService to create tarballs from all the pending
// SnapshotPackage's on the channel. By the time this service starts, we have already
// purged the first two snapshots, which are needed by every snapshot other than
// the last two snapshots. However, the packaging service should still be able to
// correctly construct the earlier snapshots because the SnapshotPackage's on the
// channel hold hard links to these deleted snapshots. We verify this is the case below.
let exit = Arc::new(AtomicBool::new(false));
let snapshot_packager_service = SnapshotPackagerService::new(receiver, &exit);
// Close the channel so that the package service will exit after reading all the
// packages off the channel
drop(sender);
// Wait for service to finish
snapshot_packager_service
.join()
.expect("SnapshotPackagerService exited with error");
// Check the tar we cached the state for earlier was generated correctly
snapshot_utils::tests::verify_snapshot_tar(
saved_tar,
saved_snapshots_dir
.path()
.join(snapshots_dir.path().file_name().unwrap()),
saved_accounts_dir
.path()
.join(accounts_dir.path().file_name().unwrap()),
);
}
}

View File

@ -63,6 +63,8 @@ pub mod rpc_subscriptions;
pub mod service;
pub mod sigverify;
pub mod sigverify_stage;
pub mod snapshot_package;
pub mod snapshot_utils;
pub mod staking_utils;
pub mod storage_stage;
pub mod streamer;
@ -101,3 +103,8 @@ extern crate solana_metrics;
extern crate matches;
extern crate crossbeam_channel;
extern crate dir_diff;
extern crate flate2;
extern crate fs_extra;
extern crate tar;
extern crate tempfile;

View File

@ -12,6 +12,7 @@ use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::snapshot_package::SnapshotPackageSender;
use solana_metrics::{datapoint_warn, inc_new_counter_info};
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
@ -41,6 +42,7 @@ impl Finalizer {
Finalizer { exit_sender }
}
}
// Implement a destructor for Finalizer.
impl Drop for Finalizer {
fn drop(&mut self) {
@ -90,6 +92,7 @@ impl ReplayStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
snapshot_package_sender: Option<SnapshotPackageSender>,
) -> (Self, Receiver<Vec<Arc<Bank>>>)
where
T: 'static + KeypairUtil + Send + Sync,
@ -179,6 +182,7 @@ impl ReplayStage {
&root_bank_sender,
lockouts,
&lockouts_sender,
&snapshot_package_sender,
)?;
Self::reset_poh_recorder(
@ -396,6 +400,7 @@ impl ReplayStage {
root_bank_sender: &Sender<Vec<Arc<Bank>>>,
lockouts: HashMap<u64, StakeLockout>,
lockouts_sender: &Sender<LockoutAggregationData>,
snapshot_package_sender: &Option<SnapshotPackageSender>,
) -> Result<()>
where
T: 'static + KeypairUtil + Send + Sync,
@ -419,7 +424,10 @@ impl ReplayStage {
// is consumed by repair_service to update gossip, so we don't want to get blobs for
// repair on gossip before we update leader schedule, otherwise they may get dropped.
leader_schedule_cache.set_root(rooted_banks.last().unwrap());
bank_forks.write().unwrap().set_root(new_root);
bank_forks
.write()
.unwrap()
.set_root(new_root, snapshot_package_sender);
Self::handle_new_root(&bank_forks, progress);
trace!("new root {}", new_root);
if let Err(e) = root_bank_sender.send(rooted_banks) {

View File

@ -30,6 +30,7 @@ pub enum Error {
SendError,
PohRecorderError(poh_recorder::PohRecorderError),
BlocktreeError(blocktree::BlocktreeError),
FsExtra(fs_extra::error::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
@ -102,6 +103,11 @@ impl std::convert::From<std::io::Error> for Error {
Error::IO(e)
}
}
impl std::convert::From<fs_extra::error::Error> for Error {
fn from(e: fs_extra::error::Error) -> Error {
Error::FsExtra(e)
}
}
impl std::convert::From<serde_json::Error> for Error {
fn from(e: serde_json::Error) -> Error {
Error::JSON(e)

View File

@ -0,0 +1,195 @@
use crate::result::{Error, Result};
use crate::service::Service;
use flate2::write::GzEncoder;
use flate2::Compression;
use solana_runtime::accounts_db::AccountStorageEntry;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
pub type SnapshotPackageSender = Sender<SnapshotPackage>;
pub type SnapshotPackageReceiver = Receiver<SnapshotPackage>;
pub const TAR_SNAPSHOT_DIR: &str = "snapshots";
pub const TAR_ACCOUNTS_DIR: &str = "accounts";
pub struct SnapshotPackage {
snapshot_path: PathBuf,
storage_entries: Vec<Arc<AccountStorageEntry>>,
tar_output_file: PathBuf,
}
impl SnapshotPackage {
pub fn new(
snapshot_path: PathBuf,
storage_entries: Vec<Arc<AccountStorageEntry>>,
tar_output_file: PathBuf,
) -> Self {
Self {
snapshot_path,
storage_entries,
tar_output_file,
}
}
}
impl Drop for SnapshotPackage {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.snapshot_path);
}
}
pub struct SnapshotPackagerService {
t_snapshot_packager: JoinHandle<()>,
}
impl SnapshotPackagerService {
pub fn new(snapshot_package_receiver: SnapshotPackageReceiver, exit: &Arc<AtomicBool>) -> Self {
let exit = exit.clone();
let t_snapshot_packager = Builder::new()
.name("solana-snapshot-packager".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::package_snapshots(&snapshot_package_receiver) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("Error from package_snapshots: {:?}", e),
}
}
})
.unwrap();
Self {
t_snapshot_packager,
}
}
pub fn package_snapshots(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> {
let snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?;
// Create the tar builder
let tar_gz = tempfile::Builder::new()
.prefix("new_state")
.suffix(".tgz")
.tempfile()?;
let temp_tar_path = tar_gz.path();
let enc = GzEncoder::new(&tar_gz, Compression::default());
let mut tar = tar::Builder::new(enc);
// Create the list of paths to compress, starting with the snapshots
let tar_output_snapshots_dir = Path::new(&TAR_SNAPSHOT_DIR);
// Add the snapshots to the tarball and delete the directory of hardlinks to the snapshots
// that was created to persist those snapshots while this package was being created
let res = tar.append_dir_all(
tar_output_snapshots_dir,
snapshot_package.snapshot_path.as_path(),
);
let _ = fs::remove_dir_all(snapshot_package.snapshot_path.as_path());
res?;
// Add the AppendVecs into the compressible list
let tar_output_accounts_dir = Path::new(&TAR_ACCOUNTS_DIR);
for storage in &snapshot_package.storage_entries {
let storage_path = storage.get_path();
let output_path = tar_output_accounts_dir.join(
storage_path
.file_name()
.expect("Invalid AppendVec file path"),
);
// `output_path` - The directory where the AppendVec will be placed in the tarball.
// `storage_path` - The file path where the AppendVec itself is located
tar.append_path_with_name(storage_path, output_path)?;
}
// Once everything is successful, overwrite the previous tarball so that other validators
// can rsync this newly packaged snapshot
tar.finish()?;
let _ = fs::remove_file(&snapshot_package.tar_output_file);
fs::hard_link(&temp_tar_path, &snapshot_package.tar_output_file)?;
Ok(())
}
}
impl Service for SnapshotPackagerService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_snapshot_packager.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::snapshot_utils;
use std::fs::OpenOptions;
use std::io::Write;
use std::sync::mpsc::channel;
use tempfile::TempDir;
#[test]
fn test_package_snapshots() {
// Create temprorary placeholder directory for all test files
let temp_dir = TempDir::new().unwrap();
let (sender, receiver) = channel();
let accounts_dir = temp_dir.path().join("accounts");
let snapshots_dir = temp_dir.path().join("snapshots");
let snapshot_package_output_path = temp_dir.path().join("snapshots_output");
fs::create_dir_all(&snapshot_package_output_path).unwrap();
// Create some storage entries
let storage_entries: Vec<_> = (0..5)
.map(|i| Arc::new(AccountStorageEntry::new(&accounts_dir, 0, i, 10)))
.collect();
// Create some fake snapshot
fs::create_dir_all(&snapshots_dir).unwrap();
let snapshots_paths: Vec<_> = (0..5)
.map(|i| {
let fake_snapshot_path = snapshots_dir.join(format!("fake_snapshot_{}", i));
let mut fake_snapshot_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&fake_snapshot_path)
.unwrap();
fake_snapshot_file.write_all(b"Hello, world!").unwrap();
fake_snapshot_path
})
.collect();
// Create directory of hard links for snapshots
let link_snapshots_dir = temp_dir.path().join("link_snapshots");
fs::create_dir_all(&link_snapshots_dir).unwrap();
for snapshots_path in snapshots_paths {
let snapshot_file_name = snapshots_path.file_name().unwrap();
let link_path = link_snapshots_dir.join(snapshot_file_name);
fs::hard_link(&snapshots_path, &link_path).unwrap();
}
// Create a packageable snapshot
let output_tar_path = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
let snapshot_package = SnapshotPackage::new(
link_snapshots_dir,
storage_entries.clone(),
output_tar_path.clone(),
);
sender.send(snapshot_package).unwrap();
// Make tarball from packageable snapshot
SnapshotPackagerService::package_snapshots(&receiver).unwrap();
// Check tarball is correct
snapshot_utils::tests::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir);
}
}

255
core/src/snapshot_utils.rs Normal file
View File

@ -0,0 +1,255 @@
use crate::result::{Error, Result};
use crate::snapshot_package::SnapshotPackage;
use bincode::{deserialize_from, serialize_into};
use flate2::read::GzDecoder;
use solana_runtime::bank::{Bank, StatusCacheRc};
use std::fs;
use std::fs::File;
use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind};
use std::path::{Path, PathBuf};
use tar::Archive;
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
bank: &Bank,
snapshot_names: &[u64],
snapshot_dir: P,
snapshot_package_output_file: Q,
) -> Result<SnapshotPackage> {
let slot = bank.slot();
// Hard link all the snapshots we need for this package
let snapshot_hard_links_dir = get_snapshots_hardlink_dir_for_package(
snapshot_package_output_file
.as_ref()
.parent()
.expect("Invalid output path for tar"),
slot,
);
let _ = fs::remove_dir_all(&snapshot_hard_links_dir);
fs::create_dir_all(&snapshot_hard_links_dir)?;
// Get a reference to all the relevant AccountStorageEntries
let account_storage_entries = bank.rc.get_storage_entries();
// Create a snapshot package
trace!(
"Snapshot for bank: {} has {} account storage entries",
slot,
account_storage_entries.len()
);
let package = SnapshotPackage::new(
snapshot_hard_links_dir.clone(),
account_storage_entries,
snapshot_package_output_file.as_ref().to_path_buf(),
);
// Any errors from this point on will cause the above SnapshotPackage to drop, clearing
// any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir)
for name in snapshot_names {
hardlink_snapshot_directory(&snapshot_dir, &snapshot_hard_links_dir, *name)?;
}
Ok(package)
}
pub fn get_snapshot_names<P: AsRef<Path>>(snapshot_path: P) -> Vec<u64> {
let paths = fs::read_dir(snapshot_path).expect("Invalid snapshot path");
let mut names = paths
.filter_map(|entry| {
entry.ok().and_then(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str().map(|s| s.parse::<u64>().unwrap()))
})
})
.collect::<Vec<u64>>();
names.sort();
names
}
pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank, root: u64) -> Result<()> {
let slot = bank.slot();
let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot);
fs::create_dir_all(slot_snapshot_dir.clone()).map_err(Error::from)?;
let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot));
trace!(
"creating snapshot {}, path: {:?}",
bank.slot(),
snapshot_file_path
);
let file = File::create(&snapshot_file_path)?;
let mut stream = BufWriter::new(file);
// Create the snapshot
serialize_into(&mut stream, &*bank).map_err(|_| get_io_error("serialize bank error"))?;
let mut parent_slot: u64 = 0;
if let Some(parent_bank) = bank.parent() {
parent_slot = parent_bank.slot();
}
serialize_into(&mut stream, &parent_slot)
.map_err(|_| get_io_error("serialize bank parent error"))?;
serialize_into(&mut stream, &root).map_err(|_| get_io_error("serialize root error"))?;
serialize_into(&mut stream, &bank.src)
.map_err(|_| get_io_error("serialize bank status cache error"))?;
serialize_into(&mut stream, &bank.rc)
.map_err(|_| get_io_error("serialize bank accounts error"))?;
trace!(
"successfully created snapshot {}, path: {:?}",
bank.slot(),
snapshot_file_path
);
Ok(())
}
pub fn remove_snapshot<P: AsRef<Path>>(slot: u64, snapshot_path: P) -> Result<()> {
let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot);
// Remove the snapshot directory for this slot
fs::remove_dir_all(slot_snapshot_dir)?;
Ok(())
}
pub fn load_snapshots<P: AsRef<Path>>(
names: &[u64],
bank0: &mut Bank,
bank_maps: &mut Vec<(u64, u64, Bank)>,
status_cache_rc: &StatusCacheRc,
snapshot_path: P,
) -> Option<u64> {
let mut bank_root: Option<u64> = None;
for (i, bank_slot) in names.iter().rev().enumerate() {
let snapshot_file_name = get_snapshot_file_name(*bank_slot);
let snapshot_dir = get_bank_snapshot_dir(&snapshot_path, *bank_slot);
let snapshot_file_path = snapshot_dir.join(snapshot_file_name.clone());
trace!("Load from {:?}", snapshot_file_path);
let file = File::open(snapshot_file_path);
if file.is_err() {
warn!("Snapshot file open failed for {}", bank_slot);
continue;
}
let file = file.unwrap();
let mut stream = BufReader::new(file);
let bank: Result<Bank> =
deserialize_from(&mut stream).map_err(|_| get_io_error("deserialize bank error"));
let slot: Result<u64> = deserialize_from(&mut stream)
.map_err(|_| get_io_error("deserialize bank parent error"));
let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 };
let root: Result<u64> =
deserialize_from(&mut stream).map_err(|_| get_io_error("deserialize root error"));
let status_cache: Result<StatusCacheRc> = deserialize_from(&mut stream)
.map_err(|_| get_io_error("deserialize bank status cache error"));
if bank_root.is_none() && bank0.rc.update_from_stream(&mut stream).is_ok() {
bank_root = Some(root.unwrap());
}
if bank_root.is_some() {
match bank {
Ok(v) => {
if status_cache.is_ok() {
let status_cache = status_cache.unwrap();
status_cache_rc.append(&status_cache);
// On the last snapshot, purge all outdated status cache
// entries
if i == names.len() - 1 {
status_cache_rc.purge_roots();
}
}
bank_maps.push((*bank_slot, parent_slot, v));
}
Err(_) => warn!("Load snapshot failed for {}", bank_slot),
}
} else {
warn!("Load snapshot rc failed for {}", bank_slot);
}
}
bank_root
}
pub fn get_snapshot_tar_path<P: AsRef<Path>>(snapshot_output_dir: P) -> PathBuf {
snapshot_output_dir.as_ref().join("state.tgz")
}
pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
snapshot_tar: P,
unpack_dir: Q,
) -> Result<()> {
let tar_gz = File::open(snapshot_tar)?;
let tar = GzDecoder::new(tar_gz);
let mut archive = Archive::new(tar);
archive.unpack(&unpack_dir)?;
Ok(())
}
fn hardlink_snapshot_directory<P: AsRef<Path>, Q: AsRef<Path>>(
snapshot_dir: P,
snapshot_hardlink_dir: Q,
slot: u64,
) -> Result<()> {
// Create a new directory in snapshot_hardlink_dir
let new_slot_hardlink_dir = snapshot_hardlink_dir.as_ref().join(slot.to_string());
let _ = fs::remove_dir_all(&new_slot_hardlink_dir);
fs::create_dir_all(&new_slot_hardlink_dir)?;
// Hardlink the contents of the directory
let snapshot_file = snapshot_dir
.as_ref()
.join(slot.to_string())
.join(slot.to_string());
fs::hard_link(
&snapshot_file,
&new_slot_hardlink_dir.join(slot.to_string()),
)?;
Ok(())
}
fn get_snapshot_file_name(slot: u64) -> String {
slot.to_string()
}
fn get_bank_snapshot_dir<P: AsRef<Path>>(path: P, slot: u64) -> PathBuf {
path.as_ref().join(slot.to_string())
}
fn get_snapshots_hardlink_dir_for_package<P: AsRef<Path>>(parent_dir: P, slot: u64) -> PathBuf {
let file_name = format!("snapshot_{}_hard_links", slot);
parent_dir.as_ref().join(file_name)
}
fn get_io_error(error: &str) -> Error {
warn!("BankForks error: {:?}", error);
Error::IO(IOError::new(ErrorKind::Other, error))
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOT_DIR};
use tempfile::TempDir;
pub fn verify_snapshot_tar<P, Q, R>(
snapshot_tar: P,
snapshots_to_verify: Q,
storages_to_verify: R,
) where
P: AsRef<Path>,
Q: AsRef<Path>,
R: AsRef<Path>,
{
let temp_dir = TempDir::new().unwrap();
let unpack_dir = temp_dir.path();
untar_snapshot_in(snapshot_tar, &unpack_dir).unwrap();
// Check snapshots are the same
let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOT_DIR);
assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap());
// Check the account entries are the same
let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR);
assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap());
}
}

View File

@ -24,6 +24,7 @@ use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::snapshot_package::SnapshotPackagerService;
use crate::storage_stage::{StorageStage, StorageState};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
@ -41,6 +42,7 @@ pub struct Tvu {
blockstream_service: Option<BlockstreamService>,
ledger_cleanup_service: Option<LedgerCleanupService>,
storage_stage: StorageStage,
snapshot_packager_service: Option<SnapshotPackagerService>,
}
pub struct Sockets {
@ -116,6 +118,17 @@ impl Tvu {
let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
let (snapshot_packager_service, snapshot_package_sender) = {
let snapshot_config = { bank_forks.read().unwrap().snapshot_config().clone() };
if snapshot_config.is_some() {
// Start a snapshot packaging service
let (sender, receiver) = channel();
let snapshot_packager_service = SnapshotPackagerService::new(receiver, exit);
(Some(snapshot_packager_service), Some(sender))
} else {
(None, None)
}
};
let (replay_stage, root_bank_receiver) = ReplayStage::new(
&keypair.pubkey(),
@ -130,6 +143,7 @@ impl Tvu {
poh_recorder,
leader_schedule_cache,
vec![blockstream_slot_sender, ledger_cleanup_slot_sender],
snapshot_package_sender,
);
let blockstream_service = if blockstream_unix_socket.is_some() {
@ -171,6 +185,7 @@ impl Tvu {
blockstream_service,
ledger_cleanup_service,
storage_stage,
snapshot_packager_service,
}
}
}
@ -189,6 +204,9 @@ impl Service for Tvu {
self.ledger_cleanup_service.unwrap().join()?;
}
self.replay_stage.join()?;
if let Some(s) = self.snapshot_packager_service {
s.join()?;
}
Ok(())
}
}

View File

@ -1,6 +1,6 @@
//! The `fullnode` module hosts all the fullnode microservices.
use crate::bank_forks::BankForks;
use crate::bank_forks::{BankForks, SnapshotConfig};
use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::blocktree_processor::{self, BankForksInfo};
use crate::broadcast_stage::BroadcastStageType;
@ -40,7 +40,7 @@ pub struct ValidatorConfig {
pub storage_slots_per_turn: u64,
pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig,
pub snapshot_path: Option<PathBuf>,
pub snapshot_config: Option<SnapshotConfig>,
pub max_ledger_slots: Option<u64>,
pub broadcast_stage_type: BroadcastStageType,
pub erasure_config: ErasureConfig,
@ -56,7 +56,7 @@ impl Default for ValidatorConfig {
max_ledger_slots: None,
account_paths: None,
rpc_config: JsonRpcConfig::default(),
snapshot_path: None,
snapshot_config: None,
broadcast_stage_type: BroadcastStageType::Standard,
erasure_config: ErasureConfig::default(),
}
@ -105,7 +105,7 @@ impl Validator {
) = new_banks_from_blocktree(
ledger_path,
config.account_paths.clone(),
config.snapshot_path.as_ref(),
config.snapshot_config.clone(),
verify_ledger,
);
@ -134,7 +134,7 @@ impl Validator {
&leader_schedule_cache,
&poh_config,
);
if config.snapshot_path.is_some() {
if config.snapshot_config.is_some() {
poh_recorder.set_bank(&bank);
}
@ -308,43 +308,57 @@ fn get_bank_forks(
genesis_block: &GenesisBlock,
blocktree: &Blocktree,
account_paths: Option<String>,
snapshot_path: Option<&PathBuf>,
snapshot_config: Option<SnapshotConfig>,
verify_ledger: bool,
) -> (BankForks, Vec<BankForksInfo>, LeaderScheduleCache) {
if let Some(snapshot_path) = snapshot_path {
let bank_forks =
BankForks::load_from_snapshot(&genesis_block, account_paths.clone(), snapshot_path);
match bank_forks {
Ok(v) => {
let bank = &v.working_bank();
let fork_info = BankForksInfo {
bank_slot: bank.slot(),
entry_height: bank.tick_height(),
};
return (v, vec![fork_info], LeaderScheduleCache::new_from_bank(bank));
let (mut bank_forks, bank_forks_info, leader_schedule_cache) = {
let mut result = None;
if snapshot_config.is_some() {
let bank_forks = BankForks::load_from_snapshot(
&genesis_block,
account_paths.clone(),
snapshot_config.as_ref().unwrap(),
);
match bank_forks {
Ok(v) => {
let bank = &v.working_bank();
let fork_info = BankForksInfo {
bank_slot: bank.slot(),
entry_height: bank.tick_height(),
};
result = Some((v, vec![fork_info], LeaderScheduleCache::new_from_bank(bank)));
}
Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"),
}
Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"),
}
// If loading from a snapshot failed/snapshot didn't exist
if result.is_none() {
result = Some(
blocktree_processor::process_blocktree(
&genesis_block,
&blocktree,
account_paths,
verify_ledger,
)
.expect("process_blocktree failed"),
);
}
result.unwrap()
};
if snapshot_config.is_some() {
bank_forks.set_snapshot_config(snapshot_config.unwrap());
}
let (mut bank_forks, bank_forks_info, leader_schedule_cache) =
blocktree_processor::process_blocktree(
&genesis_block,
&blocktree,
account_paths,
verify_ledger,
)
.expect("process_blocktree failed");
if let Some(snapshot_path) = snapshot_path {
bank_forks.set_snapshot_path(snapshot_path);
let _ = bank_forks.add_snapshot(0, 0);
}
(bank_forks, bank_forks_info, leader_schedule_cache)
}
pub fn new_banks_from_blocktree(
blocktree_path: &Path,
account_paths: Option<String>,
snapshot_path: Option<&PathBuf>,
snapshot_config: Option<SnapshotConfig>,
verify_ledger: bool,
) -> (
BankForks,
@ -366,7 +380,7 @@ pub fn new_banks_from_blocktree(
&genesis_block,
&blocktree,
account_paths,
snapshot_path,
snapshot_config,
verify_ledger,
);

View File

@ -62,6 +62,7 @@ args+=(
--ledger "$ledger_dir"
--rpc-port 8899
--snapshot-path "$SOLANA_CONFIG_DIR"/bootstrap-leader/snapshots
--snapshot-interval-slots 100
--storage-keypair "$storage_keypair"
--voting-keypair "$vote_keypair"
--rpc-drone-address 127.0.0.1:9900

View File

@ -263,6 +263,7 @@ default_arg --storage-keypair "$storage_keypair_path"
default_arg --ledger "$ledger_config_dir"
default_arg --accounts "$accounts_config_dir"
default_arg --snapshot-path "$snapshot_config_dir"
default_arg --snapshot-interval-slots 100
if [[ -n $SOLANA_CUDA ]]; then
program=$solana_validator_cuda

1
run.sh
View File

@ -94,6 +94,7 @@ args=(
--rpc-drone-address 127.0.0.1:9900
--accounts "$dataDir"/accounts
--snapshot-path "$dataDir"/snapshots
--snapshot-interval-slots 100
)
if [[ -n $blockstreamSocket ]]; then
args+=(--blockstream "$blockstreamSocket")

View File

@ -36,6 +36,7 @@ use std::fmt;
use std::fs::remove_dir_all;
use std::io::{BufReader, Cursor, Error, ErrorKind, Read};
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use sys_info;
@ -81,8 +82,8 @@ pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
// Each fork has a set of storage entries.
type ForkStores = HashMap<usize, Arc<AccountStorageEntry>>;
#[derive(Default, Debug)]
pub struct AccountStorage(HashMap<Fork, ForkStores>);
#[derive(Clone, Default, Debug)]
pub struct AccountStorage(pub HashMap<Fork, ForkStores>);
struct AccountStorageVisitor;
@ -122,11 +123,20 @@ impl Serialize for AccountStorage {
len += storage.len();
}
let mut map = serializer.serialize_map(Some(len))?;
let mut count = 0;
let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms");
for fork_storage in self.0.values() {
for (storage_id, account_storage_entry) in fork_storage {
map.serialize_entry(storage_id, &**account_storage_entry)?;
count += 1;
}
}
serialize_account_storage_timer.stop();
datapoint_info!(
"serialize_account_storage_ms",
("duration", serialize_account_storage_timer.as_ms(), i64),
("num_entries", count, i64),
);
map.end()
}
}
@ -166,7 +176,7 @@ pub struct AccountStorageEntry {
}
impl AccountStorageEntry {
pub fn new(path: &str, fork_id: Fork, id: usize, file_size: u64) -> Self {
pub fn new(path: &Path, fork_id: Fork, id: usize, file_size: u64) -> Self {
let tail = format!("{}.{}", fork_id, id);
let path = Path::new(path).join(&tail);
let accounts = AppendVec::new(&path, true, file_size as usize);
@ -208,6 +218,14 @@ impl AccountStorageEntry {
self.count_and_status.read().unwrap().0
}
pub fn fork_id(&self) -> Fork {
self.fork_id
}
pub fn append_vec_id(&self) -> AppendVecId {
self.id
}
fn add_account(&self) {
let mut count_and_status = self.count_and_status.write().unwrap();
*count_and_status = (count_and_status.0 + 1, count_and_status.1);
@ -252,6 +270,10 @@ impl AccountStorageEntry {
}
count_and_status.0
}
pub fn get_path(&self) -> PathBuf {
self.accounts.get_path()
}
}
pub fn get_paths_vec(paths: &str) -> Vec<String> {
@ -409,7 +431,7 @@ impl AccountsDB {
Ok(())
}
fn new_storage_entry(&self, fork_id: Fork, path: &str, size: u64) -> AccountStorageEntry {
fn new_storage_entry(&self, fork_id: Fork, path: &Path, size: u64) -> AccountStorageEntry {
AccountStorageEntry::new(
path,
fork_id,
@ -568,7 +590,7 @@ impl AccountsDB {
) -> Arc<AccountStorageEntry> {
let paths = self.paths.read().unwrap();
let path_index = thread_rng().gen_range(0, paths.len());
let store = Arc::new(self.new_storage_entry(fork_id, &paths[path_index], size));
let store = Arc::new(self.new_storage_entry(fork_id, &Path::new(&paths[path_index]), size));
fork_storage.insert(store.id, store.clone());
store
}

View File

@ -235,6 +235,10 @@ impl AppendVec {
Some((meta, stored.0.clone_account()))
}
pub fn get_path(&self) -> PathBuf {
self.path.clone()
}
pub fn accounts<'a>(&'a self, mut start: usize) -> Vec<StoredAccount<'a>> {
let mut accounts = vec![];
while let Some((account, next)) = self.get_account(start) {

View File

@ -3,6 +3,7 @@
//! on behalf of the caller, and a low-level API for when they have
//! already been signed and verified.
use crate::accounts::Accounts;
use crate::accounts_db::AccountStorageEntry;
use crate::accounts_db::{
AppendVecId, ErrorCounters, InstructionAccounts, InstructionCredits, InstructionLoaders,
};
@ -83,6 +84,15 @@ impl BankRc {
self.accounts.update_from_stream(stream)
}
pub fn get_storage_entries(&self) -> Vec<Arc<AccountStorageEntry>> {
let r_storage = self.accounts.accounts_db.storage.read().unwrap();
r_storage
.0
.values()
.flat_map(|fork_store| fork_store.values().cloned())
.collect()
}
fn get_io_error(error: &str) -> std::io::Error {
warn!("BankRc error: {:?}", error);
std::io::Error::new(std::io::ErrorKind::Other, error)
@ -167,6 +177,10 @@ impl StatusCacheRc {
let sc = status_cache_rc.status_cache.write().unwrap();
self.status_cache.write().unwrap().append(&sc);
}
pub fn purge_roots(&self) {
self.status_cache.write().unwrap().purge_roots();
}
}
/// Manager for the state of all accounts and programs after processing its entries.

View File

@ -6,7 +6,7 @@ use solana_sdk::hash::Hash;
use solana_sdk::signature::Signature;
use std::collections::{HashMap, HashSet};
const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS;
pub const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS;
const CACHED_SIGNATURE_SIZE: usize = 20;
// Store forks in a single chunk of memory to avoid another lookup.
@ -104,14 +104,7 @@ impl<T: Serialize + Clone> StatusCache<T> {
/// After MAX_CACHE_ENTRIES, roots are removed, and any old signatures are cleared.
pub fn add_root(&mut self, fork: ForkId) {
self.roots.insert(fork);
if self.roots.len() > MAX_CACHE_ENTRIES {
if let Some(min) = self.roots.iter().min().cloned() {
self.roots.remove(&min);
for cache in self.cache.iter_mut() {
cache.retain(|_, (fork, _, _)| *fork > min);
}
}
}
self.purge_roots();
}
/// Insert a new signature for a specific fork.
@ -136,6 +129,17 @@ impl<T: Serialize + Clone> StatusCache<T> {
sig_forks.push((fork, res));
}
pub fn purge_roots(&mut self) {
if self.roots.len() > MAX_CACHE_ENTRIES {
if let Some(min) = self.roots.iter().min().cloned() {
self.roots.remove(&min);
for cache in self.cache.iter_mut() {
cache.retain(|_, (fork, _, _)| *fork > min);
}
}
}
}
fn insert_entry(
&mut self,
transaction_blockhash: &Hash,

View File

@ -1,5 +1,6 @@
use clap::{crate_description, crate_name, crate_version, App, Arg};
use log::*;
use solana::bank_forks::SnapshotConfig;
use solana::cluster_info::{Node, FULLNODE_PORT_RANGE};
use solana::contact_info::ContactInfo;
use solana::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS;
@ -158,10 +159,19 @@ fn main() {
.arg(
clap::Arg::with_name("snapshot_path")
.long("snapshot-path")
.value_name("PATHS")
.value_name("SNAPSHOT_PATHS")
.takes_value(true)
.requires("snapshot_interval_slots")
.help("Snapshot path"),
)
.arg(
clap::Arg::with_name("snapshot_interval_slots")
.long("snapshot-interval-slots")
.value_name("SNAPSHOT_INTERVAL_SLOTS")
.takes_value(true)
.requires("snapshot_path")
.help("Number of slots between generating snapshots"),
)
.arg(
clap::Arg::with_name("limit_ledger_size")
.long("limit-ledger-size")
@ -233,8 +243,19 @@ fn main() {
),
);
validator_config.account_paths = matches.value_of("accounts").map(ToString::to_string);
validator_config.snapshot_path = matches.value_of("snapshot_path").map(PathBuf::from);
if let Some(paths) = matches.value_of("accounts") {
validator_config.account_paths = Some(paths.to_string());
}
if let Some(snapshot_path) = matches.value_of("snapshot_path").map(PathBuf::from) {
let snapshot_interval = matches.value_of("snapshot_interval_slots").unwrap();
validator_config.snapshot_config = Some(SnapshotConfig::new(
snapshot_path,
ledger_path.clone(),
snapshot_interval.parse::<usize>().unwrap(),
));
} else {
validator_config.snapshot_config = None;
}
if matches.is_present("limit_ledger_size") {
validator_config.max_ledger_slots = Some(DEFAULT_MAX_LEDGER_SLOTS);
}