Move status cache serialization to the Snapshot Packager service (#6081)

* Move status cache serialization to the Snapshot Packager service

* Minor comment updates

* use ok_or_else instead of ok_or

* satus cache

* Remove assert when snapshot format is wrong

* Fix compile

* Remove slots_to_snapshot from bank forks

* Address review comment

* Remove unused imports
This commit is contained in:
Sagar Dhawan 2019-09-25 13:42:19 -07:00 committed by GitHub
parent 093b5b5267
commit e987d0094f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 159 additions and 129 deletions

1
Cargo.lock generated
View File

@ -3690,6 +3690,7 @@ dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "fs_extra 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.62 (registry+https://github.com/rust-lang/crates.io-index)",
"libloading 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "libloading 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -31,7 +31,7 @@ pub struct BankForks {
working_bank: Arc<Bank>, working_bank: Arc<Bank>,
root: u64, root: u64,
snapshot_config: Option<SnapshotConfig>, snapshot_config: Option<SnapshotConfig>,
slots_since_snapshot: Vec<u64>, last_snapshot_slot: u64,
} }
impl Index<u64> for BankForks { impl Index<u64> for BankForks {
@ -51,7 +51,7 @@ impl BankForks {
working_bank, working_bank,
root: 0, root: 0,
snapshot_config: None, snapshot_config: None,
slots_since_snapshot: vec![bank_slot], last_snapshot_slot: bank_slot,
} }
} }
@ -126,13 +126,13 @@ impl BankForks {
banks.insert(parent.slot(), parent.clone()); banks.insert(parent.slot(), parent.clone());
} }
} }
let root = *rooted_path.last().unwrap();
Self { Self {
root: *rooted_path.last().unwrap(), root,
banks, banks,
working_bank, working_bank,
snapshot_config: None, snapshot_config: None,
slots_since_snapshot: rooted_path, last_snapshot_slot: root,
} }
} }
@ -163,21 +163,6 @@ impl BankForks {
.map(|bank| bank.transaction_count()) .map(|bank| bank.transaction_count())
.unwrap_or(0); .unwrap_or(0);
if self.snapshot_config.is_some() && snapshot_package_sender.is_some() {
let new_rooted_path = root_bank
.parents()
.into_iter()
.map(|p| p.slot())
.rev()
.skip(1);
self.slots_since_snapshot.extend(new_rooted_path);
self.slots_since_snapshot.push(root);
if self.slots_since_snapshot.len() > MAX_CACHE_ENTRIES {
let num_to_remove = self.slots_since_snapshot.len() - MAX_CACHE_ENTRIES;
self.slots_since_snapshot.drain(0..num_to_remove);
}
}
root_bank.squash(); root_bank.squash();
let new_tx_count = root_bank.transaction_count(); let new_tx_count = root_bank.transaction_count();
@ -186,18 +171,18 @@ impl BankForks {
if self.snapshot_config.is_some() && snapshot_package_sender.is_some() { if self.snapshot_config.is_some() && snapshot_package_sender.is_some() {
let config = self.snapshot_config.as_ref().unwrap(); let config = self.snapshot_config.as_ref().unwrap();
info!("setting snapshot root: {}", root); info!("setting snapshot root: {}", root);
if root - self.slots_since_snapshot[0] >= config.snapshot_interval_slots as u64 { if root - self.last_snapshot_slot >= config.snapshot_interval_slots as u64 {
let mut snapshot_time = Measure::start("total-snapshot-ms"); let mut snapshot_time = Measure::start("total-snapshot-ms");
let r = self.generate_snapshot( let r = self.generate_snapshot(
root, root,
&self.slots_since_snapshot[1..], &root_bank.src.roots(),
snapshot_package_sender.as_ref().unwrap(), snapshot_package_sender.as_ref().unwrap(),
snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path), snapshot_utils::get_snapshot_tar_path(&config.snapshot_package_output_path),
); );
if r.is_err() { if r.is_err() {
warn!("Error generating snapshot for bank: {}, err: {:?}", root, r); warn!("Error generating snapshot for bank: {}, err: {:?}", root, r);
} else { } else {
self.slots_since_snapshot = vec![root]; self.last_snapshot_slot = root;
} }
// Cleanup outdated snapshots // Cleanup outdated snapshots
@ -223,10 +208,6 @@ impl BankForks {
self.root self.root
} }
pub fn slots_since_snapshot(&self) -> &[u64] {
&self.slots_since_snapshot
}
fn purge_old_snapshots(&self) { fn purge_old_snapshots(&self) {
// Remove outdated snapshots // Remove outdated snapshots
let config = self.snapshot_config.as_ref().unwrap(); let config = self.snapshot_config.as_ref().unwrap();
@ -243,7 +224,7 @@ impl BankForks {
fn generate_snapshot<P: AsRef<Path>>( fn generate_snapshot<P: AsRef<Path>>(
&self, &self,
root: u64, root: u64,
slots_since_snapshot: &[u64], slots_to_snapshot: &[u64],
snapshot_package_sender: &SnapshotPackageSender, snapshot_package_sender: &SnapshotPackageSender,
tar_output_file: P, tar_output_file: P,
) -> Result<()> { ) -> Result<()> {
@ -256,22 +237,23 @@ impl BankForks {
.expect("root must exist in BankForks"); .expect("root must exist in BankForks");
let mut add_snapshot_time = Measure::start("add-snapshot-ms"); let mut add_snapshot_time = Measure::start("add-snapshot-ms");
snapshot_utils::add_snapshot(&config.snapshot_path, &bank, slots_since_snapshot)?; snapshot_utils::add_snapshot(&config.snapshot_path, &bank)?;
add_snapshot_time.stop(); add_snapshot_time.stop();
inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize); inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize);
// Package the relevant snapshots // Package the relevant snapshots
let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path); let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path);
let latest_slot_snapshot_paths = slot_snapshot_paths
// We only care about the last MAX_CACHE_ENTRIES snapshots of roots because .last()
// the status cache of anything older is thrown away by the bank in .expect("no snapshots found in config snapshot_path");
// status_cache.prune_roots() // We only care about the last bank's snapshot.
let start = slot_snapshot_paths.len().saturating_sub(MAX_CACHE_ENTRIES); // We'll ask the bank for MAX_CACHE_ENTRIES (on the rooted path) worth of statuses
let package = snapshot_utils::package_snapshot( let package = snapshot_utils::package_snapshot(
&bank, &bank,
&slot_snapshot_paths[start..], latest_slot_snapshot_paths,
tar_output_file, tar_output_file,
&config.snapshot_path, &config.snapshot_path,
slots_to_snapshot,
)?; )?;
// Send the package to the packaging thread // Send the package to the packaging thread
@ -301,13 +283,18 @@ mod tests {
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use crate::service::Service; use crate::service::Service;
use crate::snapshot_package::SnapshotPackagerService; use crate::snapshot_package::SnapshotPackagerService;
use bincode::serialize_into;
use fs_extra::dir::CopyOptions; use fs_extra::dir::CopyOptions;
use itertools::Itertools; use itertools::Itertools;
use solana_runtime::status_cache::SlotDelta;
use solana_sdk::hash::{hashv, Hash}; use solana_sdk::hash::{hashv, Hash};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction; use solana_sdk::system_transaction;
use solana_sdk::transaction::Result as TransactionResult;
use std::fs; use std::fs;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use tempfile::TempDir; use tempfile::TempDir;
@ -444,9 +431,12 @@ mod tests {
snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path); snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path);
let snapshot_package = snapshot_utils::package_snapshot( let snapshot_package = snapshot_utils::package_snapshot(
last_bank, last_bank,
&slot_snapshot_paths, slot_snapshot_paths
.last()
.expect("no snapshots found in path"),
snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path), snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path),
&snapshot_config.snapshot_path, &snapshot_config.snapshot_path,
&last_bank.src.roots(),
) )
.unwrap(); .unwrap();
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
@ -539,7 +529,7 @@ mod tests {
// Take snapshot of zeroth bank // Take snapshot of zeroth bank
let bank0 = bank_forks.get(0).unwrap(); let bank0 = bank_forks.get(0).unwrap();
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, &vec![]).unwrap(); snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap();
// Set up snapshotting channels // Set up snapshotting channels
let (sender, receiver) = channel(); let (sender, receiver) = channel();
@ -612,11 +602,15 @@ mod tests {
.map(|s| s.parse::<u64>().ok().map(|_| file_path.clone())) .map(|s| s.parse::<u64>().ok().map(|_| file_path.clone()))
.unwrap_or(None) .unwrap_or(None)
}) })
.sorted()
.collect(); .collect();
// only save off the snapshot of this slot, we don't need the others.
for snapshot_path in snapshot_paths { fs_extra::dir::copy(
fs_extra::dir::copy(&snapshot_path, &saved_snapshots_dir, &options).unwrap(); &snapshot_paths.last().unwrap(),
} &saved_snapshots_dir,
&options,
)
.unwrap();
} }
} }
@ -649,6 +643,17 @@ mod tests {
.expect("SnapshotPackagerService exited with error"); .expect("SnapshotPackagerService exited with error");
// Check the tar we cached the state for earlier was generated correctly // Check the tar we cached the state for earlier was generated correctly
// before we compare, stick an empty status_cache in this dir so that the package comparision works
// This is needed since the status_cache is added by the packager and is not collected from
// the source dir for snapshots
let slot_deltas: Vec<SlotDelta<TransactionResult<()>>> = vec![];
let dummy_status_cache =
File::create(saved_snapshots_dir.path().join("status_cache")).unwrap();
let mut status_cache_stream = BufWriter::new(dummy_status_cache);
serialize_into(&mut status_cache_stream, &slot_deltas).unwrap();
status_cache_stream.flush().unwrap();
snapshot_utils::tests::verify_snapshot_tar( snapshot_utils::tests::verify_snapshot_tar(
saved_tar, saved_tar,
saved_snapshots_dir.path(), saved_snapshots_dir.path(),
@ -659,7 +664,7 @@ mod tests {
} }
#[test] #[test]
fn test_slots_since_snapshot() { fn test_slots_to_snapshot() {
solana_logger::setup(); solana_logger::setup();
for add_root_interval in 1..10 { for add_root_interval in 1..10 {
let (snapshot_sender, _snapshot_receiver) = channel(); let (snapshot_sender, _snapshot_receiver) = channel();
@ -680,24 +685,19 @@ mod tests {
snapshot_test_config snapshot_test_config
.bank_forks .bank_forks
.set_root(current_bank.slot(), &snapshot_sender); .set_root(current_bank.slot(), &snapshot_sender);
let slots_since_snapshot_hashset: HashSet<_> = snapshot_test_config
.bank_forks
.slots_since_snapshot
.iter()
.cloned()
.collect();
assert_eq!(slots_since_snapshot_hashset, current_bank.src.roots());
} }
let expected_slots_since_snapshot = let num_old_slots = num_set_roots * add_root_interval - MAX_CACHE_ENTRIES + 1;
(0..=num_set_roots as u64 * add_root_interval as u64).collect_vec(); let expected_slots_to_snapshot = (num_old_slots as u64
let num_old_slots = expected_slots_since_snapshot.len() - MAX_CACHE_ENTRIES; ..=num_set_roots as u64 * add_root_interval as u64)
.collect_vec();
assert_eq!( let rooted_bank = snapshot_test_config
snapshot_test_config.bank_forks.slots_since_snapshot(), .bank_forks
&expected_slots_since_snapshot[num_old_slots..], .get(snapshot_test_config.bank_forks.root())
); .unwrap();
let slots_to_snapshot = rooted_bank.src.roots();
assert_eq!(slots_to_snapshot, expected_slots_to_snapshot);
} }
} }

View File

@ -1569,12 +1569,6 @@ pub mod tests {
bank_slot: 6, // The head of the fork is slot 6 bank_slot: 6, // The head of the fork is slot 6
} }
); );
// slots_since_snapshot should contain everything on the rooted path
assert_eq!(
bank_forks.slots_since_snapshot().to_vec(),
vec![1, 2, 3, 4, 5]
);
assert_eq!(bank_forks.root(), 5); assert_eq!(bank_forks.root(), 5);
// Verify the parents of the head of the fork // Verify the parents of the head of the fork

View File

@ -1,10 +1,15 @@
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::snapshot_utils;
use bincode::serialize_into;
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_metrics::datapoint_info; use solana_metrics::datapoint_info;
use solana_runtime::accounts_db::AccountStorageEntry; use solana_runtime::accounts_db::AccountStorageEntry;
use solana_runtime::status_cache::SlotDelta;
use solana_sdk::transaction::Result as TransactionResult;
use std::fs; use std::fs;
use std::io::{Error as IOError, ErrorKind}; use std::fs::File;
use std::io::{BufWriter, Error as IOError, ErrorKind};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
@ -22,6 +27,7 @@ pub const TAR_ACCOUNTS_DIR: &str = "accounts";
pub struct SnapshotPackage { pub struct SnapshotPackage {
root: u64, root: u64,
slot_deltas: Vec<SlotDelta<TransactionResult<()>>>,
snapshot_links: TempDir, snapshot_links: TempDir,
storage_entries: Vec<Arc<AccountStorageEntry>>, storage_entries: Vec<Arc<AccountStorageEntry>>,
tar_output_file: PathBuf, tar_output_file: PathBuf,
@ -30,12 +36,14 @@ pub struct SnapshotPackage {
impl SnapshotPackage { impl SnapshotPackage {
pub fn new( pub fn new(
root: u64, root: u64,
slot_deltas: Vec<SlotDelta<TransactionResult<()>>>,
snapshot_links: TempDir, snapshot_links: TempDir,
storage_entries: Vec<Arc<AccountStorageEntry>>, storage_entries: Vec<Arc<AccountStorageEntry>>,
tar_output_file: PathBuf, tar_output_file: PathBuf,
) -> Self { ) -> Self {
Self { Self {
root, root,
slot_deltas,
snapshot_links, snapshot_links,
storage_entries, storage_entries,
tar_output_file, tar_output_file,
@ -75,6 +83,12 @@ impl SnapshotPackagerService {
"Generating snapshot tarball for root {}", "Generating snapshot tarball for root {}",
snapshot_package.root snapshot_package.root
); );
Self::serialize_status_cache(
&snapshot_package.slot_deltas,
&snapshot_package.snapshot_links,
)?;
let mut timer = Measure::start("snapshot_package-package_snapshots"); let mut timer = Measure::start("snapshot_package-package_snapshots");
let tar_dir = snapshot_package let tar_dir = snapshot_package
.tar_output_file .tar_output_file
@ -167,6 +181,31 @@ impl SnapshotPackagerService {
warn!("Snapshot Packaging Error: {:?}", error); warn!("Snapshot Packaging Error: {:?}", error);
Error::IO(IOError::new(ErrorKind::Other, error)) Error::IO(IOError::new(ErrorKind::Other, error))
} }
fn serialize_status_cache(
slot_deltas: &[SlotDelta<TransactionResult<()>>],
snapshot_links: &TempDir,
) -> Result<()> {
// the status cache is stored as snapshot_path/status_cache
let snapshot_status_cache_file_path = snapshot_links
.path()
.join(snapshot_utils::SNAPSHOT_STATUS_CACHE_FILE_NAME);
let status_cache = File::create(&snapshot_status_cache_file_path)?;
// status cache writer
let mut status_cache_stream = BufWriter::new(status_cache);
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
// write the status cache
serialize_into(&mut status_cache_stream, slot_deltas)
.map_err(|_| Self::get_io_error("serialize status cache error"))?;
status_cache_serialize.stop();
inc_new_counter_info!(
"serialize-status-cache-ms",
status_cache_serialize.as_ms() as usize
);
Ok(())
}
} }
impl Service for SnapshotPackagerService { impl Service for SnapshotPackagerService {
@ -228,6 +267,7 @@ mod tests {
let output_tar_path = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); let output_tar_path = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
let snapshot_package = SnapshotPackage::new( let snapshot_package = SnapshotPackage::new(
5, 5,
vec![],
link_snapshots_dir, link_snapshots_dir,
storage_entries.clone(), storage_entries.clone(),
output_tar_path.clone(), output_tar_path.clone(),
@ -236,6 +276,15 @@ mod tests {
// Make tarball from packageable snapshot // Make tarball from packageable snapshot
SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap();
// before we compare, stick an empty status_cache in this dir so that the package comparision works
// This is needed since the status_cache is added by the packager and is not collected from
// the source dir for snapshots
let slot_deltas: Vec<SlotDelta<TransactionResult<()>>> = vec![];
let dummy_status_cache = File::create(snapshots_dir.join("status_cache")).unwrap();
let mut status_cache_stream = BufWriter::new(dummy_status_cache);
serialize_into(&mut status_cache_stream, &slot_deltas).unwrap();
status_cache_stream.flush().unwrap();
// Check tarball is correct // Check tarball is correct
snapshot_utils::tests::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir); snapshot_utils::tests::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir);
} }

View File

@ -5,7 +5,6 @@ use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
use bincode::{deserialize_from, serialize_into}; use bincode::{deserialize_from, serialize_into};
use bzip2::bufread::BzDecoder; use bzip2::bufread::BzDecoder;
use fs_extra::dir::CopyOptions; use fs_extra::dir::CopyOptions;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_runtime::status_cache::SlotDelta; use solana_runtime::status_cache::SlotDelta;
use solana_sdk::transaction; use solana_sdk::transaction;
@ -16,13 +15,12 @@ use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tar::Archive; use tar::Archive;
const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache";
#[derive(PartialEq, Ord, Eq, Debug)] #[derive(PartialEq, Ord, Eq, Debug)]
pub struct SlotSnapshotPaths { pub struct SlotSnapshotPaths {
pub slot: u64, pub slot: u64,
pub snapshot_file_path: PathBuf, pub snapshot_file_path: PathBuf,
pub snapshot_status_cache_path: PathBuf,
} }
impl PartialOrd for SlotSnapshotPaths { impl PartialOrd for SlotSnapshotPaths {
@ -43,20 +41,16 @@ impl SlotSnapshotPaths {
&self.snapshot_file_path, &self.snapshot_file_path,
&new_slot_hardlink_dir.join(self.slot.to_string()), &new_slot_hardlink_dir.join(self.slot.to_string()),
)?; )?;
// Copy the status cache
fs::copy(
&self.snapshot_status_cache_path,
&new_slot_hardlink_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
)?;
Ok(()) Ok(())
} }
} }
pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>( pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
bank: &Bank, bank: &Bank,
snapshot_files: &[SlotSnapshotPaths], snapshot_files: &SlotSnapshotPaths,
snapshot_package_output_file: P, snapshot_package_output_file: P,
snapshot_path: Q, snapshot_path: Q,
slots_to_snapshot: &[u64],
) -> Result<SnapshotPackage> { ) -> Result<SnapshotPackage> {
// Hard link all the snapshots we need for this package // Hard link all the snapshots we need for this package
let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?; let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?;
@ -78,12 +72,11 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
// Any errors from this point on will cause the above SnapshotPackage to drop, clearing // Any errors from this point on will cause the above SnapshotPackage to drop, clearing
// any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir) // any temporary state created for the SnapshotPackage (like the snapshot_hard_links_dir)
for files in snapshot_files { snapshot_files.copy_snapshot_directory(snapshot_hard_links_dir.path())?;
files.copy_snapshot_directory(snapshot_hard_links_dir.path())?;
}
let package = SnapshotPackage::new( let package = SnapshotPackage::new(
bank.slot(), bank.slot(),
bank.src.slot_deltas(slots_to_snapshot),
snapshot_hard_links_dir, snapshot_hard_links_dir,
account_storage_entries, account_storage_entries,
snapshot_package_output_file.as_ref().to_path_buf(), snapshot_package_output_file.as_ref().to_path_buf(),
@ -112,8 +105,6 @@ where
SlotSnapshotPaths { SlotSnapshotPaths {
slot, slot,
snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)), snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)),
snapshot_status_cache_path: snapshot_path
.join(SNAPSHOT_STATUS_CACHE_FILE_NAME),
} }
}) })
.collect::<Vec<SlotSnapshotPaths>>(); .collect::<Vec<SlotSnapshotPaths>>();
@ -131,11 +122,7 @@ where
} }
} }
pub fn add_snapshot<P: AsRef<Path>>( pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank) -> Result<()> {
snapshot_path: P,
bank: &Bank,
slots_since_snapshot: &[u64],
) -> Result<()> {
let slot = bank.slot(); let slot = bank.slot();
// snapshot_path/slot // snapshot_path/slot
let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot); let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot);
@ -143,43 +130,21 @@ pub fn add_snapshot<P: AsRef<Path>>(
// the snapshot is stored as snapshot_path/slot/slot // the snapshot is stored as snapshot_path/slot/slot
let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); let snapshot_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot));
// the status cache is stored as snapshot_path/slot/slot_satus_cache
let snapshot_status_cache_file_path = slot_snapshot_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
info!( info!(
"creating snapshot {}, path: {:?} status_cache: {:?}", "creating snapshot {}, path: {:?}",
bank.slot(), bank.slot(),
snapshot_file_path, snapshot_file_path,
snapshot_status_cache_file_path
); );
let snapshot_file = File::create(&snapshot_file_path)?; let snapshot_file = File::create(&snapshot_file_path)?;
// snapshot writer // snapshot writer
let mut snapshot_stream = BufWriter::new(snapshot_file); let mut snapshot_stream = BufWriter::new(snapshot_file);
let status_cache = File::create(&snapshot_status_cache_file_path)?;
// status cache writer
let mut status_cache_stream = BufWriter::new(status_cache);
// Create the snapshot // Create the snapshot
serialize_into(&mut snapshot_stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?; serialize_into(&mut snapshot_stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?;
serialize_into(&mut snapshot_stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?; serialize_into(&mut snapshot_stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?;
let mut status_cache_serialize = Measure::start("status_cache_serialize-ms");
// write the status cache
serialize_into(
&mut status_cache_stream,
&bank.src.slot_deltas(slots_since_snapshot),
)
.map_err(|_| get_io_error("serialize bank status cache error"))?;
status_cache_serialize.stop();
inc_new_counter_info!(
"serialize-status-cache-ms",
status_cache_serialize.as_ms() as usize
);
info!( info!(
"successfully created snapshot {}, path: {:?} status_cache: {:?}", "successfully created snapshot {}, path: {:?}",
bank.slot(), bank.slot(),
snapshot_file_path, snapshot_file_path,
snapshot_status_cache_file_path
); );
Ok(()) Ok(())
@ -217,8 +182,11 @@ pub fn bank_from_archive<P: AsRef<Path>>(
let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR); let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR);
let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR); let unpacked_snapshots_dir = unpack_dir.as_ref().join(TAR_SNAPSHOTS_DIR);
let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); let bank = rebuild_bank_from_snapshots(
let bank = rebuild_bank_from_snapshots(account_paths, &snapshot_paths, unpacked_accounts_dir)?; account_paths,
&unpacked_snapshots_dir,
unpacked_accounts_dir,
)?;
if !bank.verify_hash_internal_state() { if !bank.verify_hash_internal_state() {
warn!("Invalid snapshot hash value!"); warn!("Invalid snapshot hash value!");
@ -260,18 +228,23 @@ pub fn untar_snapshot_in<P: AsRef<Path>, Q: AsRef<Path>>(
fn rebuild_bank_from_snapshots<P>( fn rebuild_bank_from_snapshots<P>(
local_account_paths: String, local_account_paths: String,
snapshot_paths: &[SlotSnapshotPaths], unpacked_snapshots_dir: &PathBuf,
append_vecs_path: P, append_vecs_path: P,
) -> Result<Bank> ) -> Result<Bank>
where where
P: AsRef<Path>, P: AsRef<Path>,
{ {
// Rebuild the last root bank let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir);
let last_root_paths = snapshot_paths if snapshot_paths.len() > 1 {
.last() return Err(get_io_error("invalid snapshot format"));
}
let root_paths = snapshot_paths
.pop()
.ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?;
info!("Loading from {:?}", &last_root_paths.snapshot_file_path);
let file = File::open(&last_root_paths.snapshot_file_path)?; // Rebuild the root bank
info!("Loading from {:?}", &root_paths.snapshot_file_path);
let file = File::open(&root_paths.snapshot_file_path)?;
let mut stream = BufReader::new(file); let mut stream = BufReader::new(file);
let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?; let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?;
@ -279,16 +252,15 @@ where
bank.rc bank.rc
.accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?; .accounts_from_stream(&mut stream, local_account_paths, append_vecs_path)?;
// merge the status caches from all previous banks // Rebuild status cache
for slot_paths in snapshot_paths.iter().rev() { let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME);
let status_cache = File::open(&slot_paths.snapshot_status_cache_path)?; let status_cache = File::open(status_cache_path)?;
let mut stream = BufReader::new(status_cache); let mut stream = BufReader::new(status_cache);
let slot_deltas: Vec<SlotDelta<transaction::Result<()>>> = deserialize_from(&mut stream) let slot_deltas: Vec<SlotDelta<transaction::Result<()>>> = deserialize_from(&mut stream)
.map_err(|_| get_io_error("deserialize root error")) .map_err(|_| get_io_error("deserialize root error"))
.unwrap_or_default(); .unwrap_or_default();
bank.src.append(&slot_deltas); bank.src.append(&slot_deltas);
}
Ok(bank) Ok(bank)
} }

View File

@ -38,6 +38,7 @@ solana-vote-program = { path = "../programs/vote_program", version = "0.20.0" }
sys-info = "0.5.8" sys-info = "0.5.8"
tempfile = "3.1.0" tempfile = "3.1.0"
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.20.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "0.20.0" }
itertools = "0.8.0"
[lib] [lib]
crate-type = ["lib"] crate-type = ["lib"]

View File

@ -23,6 +23,7 @@ use crate::{
}; };
use bincode::{deserialize_from, serialize_into}; use bincode::{deserialize_from, serialize_into};
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use itertools::Itertools;
use log::*; use log::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
@ -48,7 +49,7 @@ use solana_sdk::{
timing::duration_as_ns, timing::duration_as_ns,
transaction::{Result, Transaction, TransactionError}, transaction::{Result, Transaction, TransactionError},
}; };
use std::collections::{HashMap, HashSet}; use std::collections::HashMap;
use std::io::{BufReader, Cursor, Error as IOError, Read}; use std::io::{BufReader, Cursor, Error as IOError, Read};
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@ -136,8 +137,15 @@ impl StatusCacheRc {
sc.slot_deltas(slots) sc.slot_deltas(slots)
} }
pub fn roots(&self) -> HashSet<u64> { pub fn roots(&self) -> Vec<u64> {
self.status_cache.read().unwrap().roots().clone() self.status_cache
.read()
.unwrap()
.roots()
.iter()
.cloned()
.sorted()
.collect()
} }
pub fn append(&self, slot_deltas: &[SlotDelta<Result<()>>]) { pub fn append(&self, slot_deltas: &[SlotDelta<Result<()>>]) {

View File

@ -1,13 +1,13 @@
use log::*; use log::*;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use serde::Serialize; use serde::Serialize;
use solana_sdk::clock::{Slot, MAX_HASH_AGE_IN_SECONDS}; use solana_sdk::clock::{Slot, MAX_RECENT_BLOCKHASHES};
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
pub const MAX_CACHE_ENTRIES: usize = MAX_HASH_AGE_IN_SECONDS; pub const MAX_CACHE_ENTRIES: usize = MAX_RECENT_BLOCKHASHES;
const CACHED_SIGNATURE_SIZE: usize = 20; const CACHED_SIGNATURE_SIZE: usize = 20;
// Store forks in a single chunk of memory to avoid another lookup. // Store forks in a single chunk of memory to avoid another lookup.
@ -430,4 +430,9 @@ mod tests {
assert_eq!(cache, status_cache); assert_eq!(cache, status_cache);
} }
#[test]
fn test_age_sanity() {
assert!(MAX_CACHE_ENTRIES <= MAX_RECENT_BLOCKHASHES);
}
} }