support loading snapshots with > 1 append vec per slot (#30570)

* support loading snapshots with > 1 append vec per slot

* pr feedback

* drain to into_values

* unwrap() -> expect()

* loop to find non-conflicting file and add test
This commit is contained in:
Jeff Washington (jwash) 2023-03-06 08:08:57 -06:00 committed by GitHub
parent ec7b1c8bda
commit b26d470b02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 164 additions and 21 deletions

View File

@ -95,7 +95,7 @@ use {
std::{
borrow::{Borrow, Cow},
boxed::Box,
collections::{BTreeSet, HashMap, HashSet},
collections::{hash_map, BTreeSet, HashMap, HashSet},
hash::{Hash as StdHash, Hasher as StdHasher},
io::{Error as IoError, Result as IoResult},
ops::{Range, RangeBounds},
@ -6531,6 +6531,51 @@ impl AccountsDb {
self.flush_slot_cache_with_clean(&[slot], None::<&mut fn(&_, &_) -> bool>, None)
}
/// 1.13 and some 1.14 could produce legal snapshots with more than 1 append vec per slot.
/// This is now illegal at runtime in the validator.
/// However, there is a clear path to be able to support this.
/// So, combine all accounts from 'slot_stores' into a new storage and return it.
/// This runs prior to the storages being put in AccountsDb.storage
pub(crate) fn combine_multiple_slots_into_one_at_startup(
path: &Path,
id: AppendVecId,
slot: Slot,
slot_stores: &HashMap<AppendVecId, Arc<AccountStorageEntry>>,
) -> Arc<AccountStorageEntry> {
let size = slot_stores.values().map(|storage| storage.capacity()).sum();
let storage = AccountStorageEntry::new(path, slot, id, size);
// get unique accounts, most recent version by write_version
let mut accum = HashMap::<Pubkey, StoredAccountMeta<'_>>::default();
slot_stores.iter().for_each(|(_id, store)| {
store.accounts.account_iter().for_each(|loaded_account| {
match accum.entry(*loaded_account.pubkey()) {
hash_map::Entry::Occupied(mut occupied_entry) => {
if loaded_account.write_version() > occupied_entry.get().write_version() {
occupied_entry.insert(loaded_account);
}
}
hash_map::Entry::Vacant(vacant_entry) => {
vacant_entry.insert(loaded_account);
}
}
});
});
// store all unique accounts into new storage
let accounts = accum.values().collect::<Vec<_>>();
let to_store = (
slot,
&accounts[..],
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
);
let storable =
StorableAccountsWithHashesAndWriteVersions::<'_, '_, _, _, &Hash>::new(&to_store);
storage.accounts.append_accounts(&storable, 0);
Arc::new(storage)
}
/// `should_flush_f` is an optional closure that determines whether a given
/// account should be flushed. Passing `None` will by default flush all
/// accounts
@ -7099,7 +7144,7 @@ impl AccountsDb {
let file_name = {
let mut load_from_cache = true;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut hasher = hash_map::DefaultHasher::new();
bin_range.start.hash(&mut hasher);
bin_range.end.hash(&mut hasher);
let is_first_scan_pass = bin_range.start == 0;
@ -9971,6 +10016,33 @@ pub mod tests {
}
}
#[test]
fn test_combine_multiple_slots_into_one_at_startup() {
solana_logger::setup();
let (db, slot1) = create_db_with_storages_and_index(false, 2, None);
let slot2 = slot1 + 1;
let initial_accounts = get_all_accounts(&db, slot1..(slot2 + 1));
let tf = TempDir::new().unwrap();
let stores = db
.storage
.all_slots()
.into_iter()
.map(|slot| {
let storage = db.storage.get_slot_storage_entry(slot).unwrap();
(storage.append_vec_id(), storage)
})
.collect::<HashMap<_, _>>();
let new_storage =
AccountsDb::combine_multiple_slots_into_one_at_startup(tf.path(), 1000, slot1, &stores);
compare_all_accounts(
&initial_accounts,
&get_all_accounts_from_storages(std::iter::once(&new_storage)),
);
}
#[test]
fn test_accountsdb_scan_snapshot_stores() {
solana_logger::setup();
@ -16867,7 +16939,7 @@ pub mod tests {
#[test]
fn test_hash_storage_info() {
{
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut hasher = hash_map::DefaultHasher::new();
let storages = None;
let slot = 1;
let load = AccountsDb::hash_storage_info(&mut hasher, storages, slot);
@ -16876,7 +16948,7 @@ pub mod tests {
assert!(load);
}
{
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut hasher = hash_map::DefaultHasher::new();
let slot: Slot = 0;
let tf = crate::append_vec::test_utils::get_append_vec_path(
"test_accountsdb_scan_account_storage_no_bank",
@ -16892,13 +16964,13 @@ pub mod tests {
// can't assert hash here - it is a function of mod date
assert!(load);
let slot = 2; // changed this
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut hasher = hash_map::DefaultHasher::new();
let load = AccountsDb::hash_storage_info(&mut hasher, Some(&storage), slot);
let hash2 = hasher.finish();
assert_ne!(hash, hash2); // slot changed, these should be different
// can't assert hash here - it is a function of mod date
assert!(load);
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut hasher = hash_map::DefaultHasher::new();
append_sample_data_to_storage(
&storage,
&solana_sdk::pubkey::new_rand(),
@ -16911,7 +16983,7 @@ pub mod tests {
assert_ne!(hash2, hash3); // moddate and written size changed
// can't assert hash here - it is a function of mod date
assert!(load);
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let mut hasher = hash_map::DefaultHasher::new();
let load = AccountsDb::hash_storage_info(&mut hasher, Some(&storage), slot);
let hash4 = hasher.finish();
assert_eq!(hash4, hash3); // same
@ -17276,6 +17348,20 @@ pub mod tests {
}
}
fn get_all_accounts_from_storages<'a>(
storages: impl Iterator<Item = &'a Arc<AccountStorageEntry>>,
) -> Vec<(Pubkey, AccountSharedData)> {
storages
.flat_map(|storage| {
storage
.accounts
.account_iter()
.map(|account| (*account.pubkey(), account.to_account_shared_data()))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
}
pub(crate) fn get_all_accounts(
db: &AccountsDb,
slots: Range<Slot>,
@ -17283,13 +17369,7 @@ pub mod tests {
slots
.filter_map(|slot| {
let storage = db.storage.get_slot_storage_entry(slot);
storage.map(|storage| {
storage
.accounts
.account_iter()
.map(|account| (*account.pubkey(), account.to_account_shared_data()))
.collect::<Vec<_>>()
})
storage.map(|storage| get_all_accounts_from_storages(std::iter::once(&storage)))
})
.flatten()
.collect::<Vec<_>>()

View File

@ -4,7 +4,8 @@ use {
super::{get_io_error, snapshot_version_from_file, SnapshotError, SnapshotVersion},
crate::{
account_storage::{AccountStorageMap, AccountStorageReference},
accounts_db::{AccountStorageEntry, AppendVecId, AtomicAppendVecId},
accounts_db::{AccountStorageEntry, AccountsDb, AppendVecId, AtomicAppendVecId},
append_vec::AppendVec,
serde_snapshot::{
self, remap_and_reconstruct_single_storage, snapshot_storage_lengths_from_fields,
SerdeStyle, SerializedAppendVecId,
@ -23,7 +24,7 @@ use {
collections::HashMap,
fs::File,
io::BufReader,
path::PathBuf,
path::{Path, PathBuf},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
@ -292,7 +293,7 @@ impl SnapshotStorageRebuilder {
let slot_storage_paths = self.storage_paths.get(&slot).unwrap();
let lock = slot_storage_paths.lock().unwrap();
let mut slot_stores = lock
let slot_stores = lock
.iter()
.map(|path| {
let filename = path.file_name().unwrap().to_str().unwrap();
@ -317,13 +318,54 @@ impl SnapshotStorageRebuilder {
})
.collect::<Result<HashMap<AppendVecId, Arc<AccountStorageEntry>>, std::io::Error>>()?;
assert_eq!(slot_stores.len(), 1);
let (id, storage) = slot_stores.drain().next().unwrap();
self.storage
.insert(slot, AccountStorageReference { id, storage });
let storage = if slot_stores.len() > 1 {
let remapped_append_vec_folder = lock.first().unwrap().parent().unwrap();
let remapped_append_vec_id = Self::get_unique_append_vec_id(
&self.next_append_vec_id,
remapped_append_vec_folder,
slot,
);
AccountsDb::combine_multiple_slots_into_one_at_startup(
remapped_append_vec_folder,
remapped_append_vec_id,
slot,
&slot_stores,
)
} else {
slot_stores
.into_values()
.next()
.expect("at least 1 storage per slot required")
};
self.storage.insert(
slot,
AccountStorageReference {
id: storage.append_vec_id(),
storage,
},
);
Ok(())
}
/// increment `next_append_vec_id` until there is no file in `parent_folder` with this id and slot
/// return the id
fn get_unique_append_vec_id(
next_append_vec_id: &Arc<AtomicAppendVecId>,
parent_folder: &Path,
slot: Slot,
) -> AppendVecId {
loop {
let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::AcqRel);
let remapped_file_name = AppendVec::file_name(slot, remapped_append_vec_id);
let remapped_append_vec_path = parent_folder.join(remapped_file_name);
if std::fs::metadata(&remapped_append_vec_path).is_err() {
// getting an err here means that there is no existing file here
return remapped_append_vec_id;
}
}
}
/// Wait for the completion of the rebuilding threads
fn wait_for_completion(
&self,
@ -405,6 +447,27 @@ pub(crate) fn get_slot_and_append_vec_id(filename: &str) -> (Slot, usize) {
mod tests {
use {super::*, crate::append_vec::AppendVec};
#[test]
fn test_get_unique_append_vec_id() {
let folder = tempfile::TempDir::new().unwrap();
let folder = folder.path();
let next_id = Arc::default();
let slot = 1;
let append_vec_id =
SnapshotStorageRebuilder::get_unique_append_vec_id(&next_id, folder, slot);
assert_eq!(append_vec_id, 0);
let file_name = AppendVec::file_name(slot, append_vec_id);
let append_vec_path = folder.join(file_name);
// create a file at this path
_ = File::create(append_vec_path).unwrap();
next_id.store(0, Ordering::Release);
let append_vec_id =
SnapshotStorageRebuilder::get_unique_append_vec_id(&next_id, folder, slot);
// should have found a conflict with 0
assert_eq!(append_vec_id, 1);
}
#[test]
fn test_get_snapshot_file_kind() {
assert_eq!(None, get_snapshot_file_kind("file.txt"));