Snapshot optimizations (#5518)
* Limit slots_since_snapshot size, only package latest snapshot, refactor tests * Add test checking status_cache.roots == bank_forks.slots_since_snapshot after bank_forks.set_root()
This commit is contained in:
parent
802537564b
commit
d791c70d90
|
@ -222,6 +222,10 @@ impl BankForks {
|
|||
.skip(1);
|
||||
self.slots_since_snapshot.extend(new_rooted_path);
|
||||
self.slots_since_snapshot.push(root);
|
||||
if self.slots_since_snapshot.len() > MAX_CACHE_ENTRIES {
|
||||
let num_to_remove = self.slots_since_snapshot.len() - MAX_CACHE_ENTRIES;
|
||||
self.slots_since_snapshot.drain(0..num_to_remove);
|
||||
}
|
||||
}
|
||||
|
||||
root_bank.squash();
|
||||
|
@ -500,7 +504,7 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
fn restore_from_snapshot(old_bank_forks: BankForks, account_paths: String) {
|
||||
fn restore_from_snapshot(old_bank_forks: &BankForks, account_paths: String) {
|
||||
let (snapshot_path, snapshot_package_output_path) = old_bank_forks
|
||||
.snapshot_config
|
||||
.as_ref()
|
||||
|
@ -537,26 +541,14 @@ mod tests {
|
|||
F: Fn(&mut Bank, &Keypair),
|
||||
{
|
||||
solana_logger::setup();
|
||||
let accounts_dir = TempDir::new().unwrap();
|
||||
let snapshot_dir = TempDir::new().unwrap();
|
||||
let snapshot_output_path = TempDir::new().unwrap();
|
||||
let GenesisBlockInfo {
|
||||
genesis_block,
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(10_000);
|
||||
let bank0 = Bank::new_with_paths(
|
||||
&genesis_block,
|
||||
Some(accounts_dir.path().to_str().unwrap().to_string()),
|
||||
);
|
||||
bank0.freeze();
|
||||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
let snapshot_config = SnapshotConfig::new(
|
||||
PathBuf::from(snapshot_dir.path()),
|
||||
PathBuf::from(snapshot_output_path.path()),
|
||||
1,
|
||||
);
|
||||
bank_forks.set_snapshot_config(snapshot_config.clone());
|
||||
// Set up snapshotting config
|
||||
let mut snapshot_test_config = setup_snapshot_test(1);
|
||||
|
||||
let bank_forks = &mut snapshot_test_config.bank_forks;
|
||||
let accounts_dir = &snapshot_test_config.accounts_dir;
|
||||
let snapshot_config = &snapshot_test_config.snapshot_config;
|
||||
let mint_keypair = &snapshot_test_config.genesis_block_info.mint_keypair;
|
||||
|
||||
let (s, _r) = channel();
|
||||
let sender = Some(s);
|
||||
for slot in 0..last_slot {
|
||||
|
@ -658,35 +650,25 @@ mod tests {
|
|||
#[test]
|
||||
fn test_concurrent_snapshot_packaging() {
|
||||
solana_logger::setup();
|
||||
let accounts_dir = TempDir::new().unwrap();
|
||||
let snapshots_dir = TempDir::new().unwrap();
|
||||
let snapshot_output_path = TempDir::new().unwrap();
|
||||
let GenesisBlockInfo {
|
||||
genesis_block,
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(10_000);
|
||||
let (sender, receiver) = channel();
|
||||
let (fake_sender, _fake_receiver) = channel();
|
||||
let bank0 = Bank::new_with_paths(
|
||||
&genesis_block,
|
||||
Some(accounts_dir.path().to_str().unwrap().to_string()),
|
||||
);
|
||||
bank0.freeze();
|
||||
|
||||
// Set up bank forks
|
||||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
let snapshot_config = SnapshotConfig::new(
|
||||
PathBuf::from(snapshots_dir.path()),
|
||||
PathBuf::from(snapshot_output_path.path()),
|
||||
1,
|
||||
);
|
||||
bank_forks.set_snapshot_config(snapshot_config.clone());
|
||||
// Set up snapshotting config
|
||||
let mut snapshot_test_config = setup_snapshot_test(1);
|
||||
|
||||
let bank_forks = &mut snapshot_test_config.bank_forks;
|
||||
let accounts_dir = &snapshot_test_config.accounts_dir;
|
||||
let snapshots_dir = &snapshot_test_config.snapshot_dir;
|
||||
let snapshot_config = &snapshot_test_config.snapshot_config;
|
||||
let mint_keypair = &snapshot_test_config.genesis_block_info.mint_keypair;
|
||||
let genesis_block = &snapshot_test_config.genesis_block_info.genesis_block;
|
||||
|
||||
// Take snapshot of zeroth bank
|
||||
let bank0 = bank_forks.get(0).unwrap();
|
||||
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, &vec![]).unwrap();
|
||||
|
||||
// Set up snapshotting channels
|
||||
let (sender, receiver) = channel();
|
||||
let (fake_sender, _fake_receiver) = channel();
|
||||
|
||||
// Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted
|
||||
// and the snapshot purging logic will run on every snapshot taken. This means the three
|
||||
// (including snapshot for bank0 created above) earliest snapshots will get purged by the
|
||||
|
@ -742,7 +724,7 @@ mod tests {
|
|||
|
||||
if slot == saved_slot as u64 {
|
||||
let options = CopyOptions::new();
|
||||
fs_extra::dir::copy(&accounts_dir, &saved_accounts_dir, &options).unwrap();
|
||||
fs_extra::dir::copy(accounts_dir, &saved_accounts_dir, &options).unwrap();
|
||||
let snapshot_paths: Vec<_> = fs::read_dir(&snapshot_config.snapshot_path)
|
||||
.unwrap()
|
||||
.filter_map(|entry| {
|
||||
|
@ -804,45 +786,79 @@ mod tests {
|
|||
fn test_slots_since_snapshot() {
|
||||
solana_logger::setup();
|
||||
for add_root_interval in 1..10 {
|
||||
let (s, _r) = channel();
|
||||
let accounts_dir = TempDir::new().unwrap();
|
||||
let snapshot_dir = TempDir::new().unwrap();
|
||||
let snapshot_output_path = TempDir::new().unwrap();
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(10_000);
|
||||
let bank0 = Bank::new_with_paths(
|
||||
&genesis_block,
|
||||
Some(accounts_dir.path().to_str().unwrap().to_string()),
|
||||
);
|
||||
bank0.freeze();
|
||||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
|
||||
// We will call bank_forks.set_root() every `add_root_interval` banks,
|
||||
// a `num_set_roots` number of times.
|
||||
let num_set_roots = 10;
|
||||
|
||||
let snapshot_config = SnapshotConfig::new(
|
||||
PathBuf::from(snapshot_dir.path()),
|
||||
PathBuf::from(snapshot_output_path.path()),
|
||||
add_root_interval * num_set_roots * 2,
|
||||
);
|
||||
bank_forks.set_snapshot_config(snapshot_config.clone());
|
||||
let mut current_bank = bank_forks[0].clone();
|
||||
let sender = Some(s);
|
||||
let (snapshot_sender, _snapshot_receiver) = channel();
|
||||
let num_set_roots = MAX_CACHE_ENTRIES * 5;
|
||||
// Make sure this test never clears bank.slots_since_snapshot
|
||||
let mut snapshot_test_config =
|
||||
setup_snapshot_test(add_root_interval * num_set_roots * 2);
|
||||
let mut current_bank = snapshot_test_config.bank_forks[0].clone();
|
||||
let snapshot_sender = Some(snapshot_sender);
|
||||
for _ in 0..num_set_roots {
|
||||
for _ in 0..add_root_interval {
|
||||
let new_slot = current_bank.slot() + 1;
|
||||
let new_bank =
|
||||
Bank::new_from_parent(¤t_bank, &Pubkey::default(), new_slot);
|
||||
bank_forks.insert(new_bank);
|
||||
current_bank = bank_forks[new_slot].clone();
|
||||
snapshot_test_config.bank_forks.insert(new_bank);
|
||||
current_bank = snapshot_test_config.bank_forks[new_slot].clone();
|
||||
}
|
||||
bank_forks.set_root(current_bank.slot(), &sender);
|
||||
snapshot_test_config
|
||||
.bank_forks
|
||||
.set_root(current_bank.slot(), &snapshot_sender);
|
||||
|
||||
let slots_since_snapshot_hashset: HashSet<_> = snapshot_test_config
|
||||
.bank_forks
|
||||
.slots_since_snapshot
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect();
|
||||
assert_eq!(slots_since_snapshot_hashset, current_bank.src.roots());
|
||||
}
|
||||
|
||||
let expected_slots_since_snapshot =
|
||||
(0..=num_set_roots as u64 * add_root_interval as u64).collect_vec();
|
||||
let num_old_slots = expected_slots_since_snapshot.len() - MAX_CACHE_ENTRIES;
|
||||
|
||||
assert_eq!(
|
||||
bank_forks.slots_since_snapshot(),
|
||||
&((0..=num_set_roots as u64 * add_root_interval as u64).collect_vec()[..])
|
||||
snapshot_test_config.bank_forks.slots_since_snapshot(),
|
||||
&expected_slots_since_snapshot[num_old_slots..],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
struct SnapshotTestConfig {
|
||||
accounts_dir: TempDir,
|
||||
snapshot_dir: TempDir,
|
||||
_snapshot_output_path: TempDir,
|
||||
snapshot_config: SnapshotConfig,
|
||||
bank_forks: BankForks,
|
||||
genesis_block_info: GenesisBlockInfo,
|
||||
}
|
||||
|
||||
fn setup_snapshot_test(snapshot_interval: usize) -> SnapshotTestConfig {
|
||||
let accounts_dir = TempDir::new().unwrap();
|
||||
let snapshot_dir = TempDir::new().unwrap();
|
||||
let snapshot_output_path = TempDir::new().unwrap();
|
||||
let genesis_block_info = create_genesis_block(10_000);
|
||||
let bank0 = Bank::new_with_paths(
|
||||
&genesis_block_info.genesis_block,
|
||||
Some(accounts_dir.path().to_str().unwrap().to_string()),
|
||||
);
|
||||
bank0.freeze();
|
||||
let mut bank_forks = BankForks::new(0, bank0);
|
||||
|
||||
let snapshot_config = SnapshotConfig::new(
|
||||
PathBuf::from(snapshot_dir.path()),
|
||||
PathBuf::from(snapshot_output_path.path()),
|
||||
snapshot_interval,
|
||||
);
|
||||
bank_forks.set_snapshot_config(snapshot_config.clone());
|
||||
SnapshotTestConfig {
|
||||
accounts_dir,
|
||||
snapshot_dir,
|
||||
_snapshot_output_path: snapshot_output_path,
|
||||
snapshot_config,
|
||||
bank_forks,
|
||||
genesis_block_info,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -119,7 +119,11 @@ impl SnapshotPackagerService {
|
|||
}
|
||||
|
||||
fn run(snapshot_receiver: &SnapshotPackageReceiver) -> Result<()> {
|
||||
let snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
let mut snapshot_package = snapshot_receiver.recv_timeout(Duration::from_secs(1))?;
|
||||
// Only package the latest
|
||||
while let Ok(new_snapshot_package) = snapshot_receiver.recv() {
|
||||
snapshot_package = new_snapshot_package;
|
||||
}
|
||||
Self::package_snapshots(&snapshot_package)?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ use solana_sdk::{
|
|||
timing::{duration_as_ns, get_segment_from_slot, Epoch, Slot, MAX_RECENT_BLOCKHASHES},
|
||||
transaction::{Result, Transaction, TransactionError},
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::{BufReader, Cursor, Error as IOError, Read};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
|
@ -124,7 +124,7 @@ impl Serialize for BankRc {
|
|||
pub struct StatusCacheRc {
|
||||
/// where all the Accounts are stored
|
||||
/// A cache of signature statuses
|
||||
status_cache: Arc<RwLock<BankStatusCache>>,
|
||||
pub status_cache: Arc<RwLock<BankStatusCache>>,
|
||||
}
|
||||
|
||||
impl StatusCacheRc {
|
||||
|
@ -133,6 +133,10 @@ impl StatusCacheRc {
|
|||
sc.slot_deltas(slots)
|
||||
}
|
||||
|
||||
pub fn roots(&self) -> HashSet<u64> {
|
||||
self.status_cache.read().unwrap().roots().clone()
|
||||
}
|
||||
|
||||
pub fn append(&self, slot_deltas: &[SlotDelta<Result<()>>]) {
|
||||
let mut sc = self.status_cache.write().unwrap();
|
||||
sc.append(slot_deltas);
|
||||
|
|
|
@ -124,6 +124,10 @@ impl<T: Serialize + Clone> StatusCache<T> {
|
|||
self.purge_roots();
|
||||
}
|
||||
|
||||
pub fn roots(&self) -> &HashSet<u64> {
|
||||
&self.roots
|
||||
}
|
||||
|
||||
/// Insert a new signature for a specific slot.
|
||||
pub fn insert(&mut self, transaction_blockhash: &Hash, sig: &Signature, slot: Slot, res: T) {
|
||||
let sig_index: usize;
|
||||
|
|
Loading…
Reference in New Issue