Removes support for loading snapshots with > 1 append vec per slot (#474)
This commit is contained in:
parent
f36a45a971
commit
d5c0c0b1c2
|
@ -6318,49 +6318,6 @@ impl AccountsDb {
|
|||
self.flush_slot_cache_with_clean(slot, None::<&mut fn(&_, &_) -> bool>, None)
|
||||
}
|
||||
|
||||
/// 1.13 and some 1.14 could produce legal snapshots with more than 1 append vec per slot.
|
||||
/// This is now illegal at runtime in the validator.
|
||||
/// However, there is a clear path to be able to support this.
|
||||
/// So, combine all accounts from 'slot_stores' into a new storage and return it.
|
||||
/// This runs prior to the storages being put in AccountsDb.storage
|
||||
pub fn combine_multiple_slots_into_one_at_startup(
|
||||
path: &Path,
|
||||
id: AccountsFileId,
|
||||
slot: Slot,
|
||||
slot_stores: &HashMap<AccountsFileId, Arc<AccountStorageEntry>>,
|
||||
) -> Arc<AccountStorageEntry> {
|
||||
let size = slot_stores.values().map(|storage| storage.capacity()).sum();
|
||||
let storage = AccountStorageEntry::new(path, slot, id, size);
|
||||
|
||||
// get unique accounts, most recent version by write_version
|
||||
let mut accum = HashMap::<Pubkey, StoredAccountMeta<'_>>::default();
|
||||
slot_stores.iter().for_each(|(_id, store)| {
|
||||
store.accounts.account_iter().for_each(|loaded_account| {
|
||||
match accum.entry(*loaded_account.pubkey()) {
|
||||
hash_map::Entry::Occupied(mut occupied_entry) => {
|
||||
if loaded_account.write_version() > occupied_entry.get().write_version() {
|
||||
occupied_entry.insert(loaded_account);
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(vacant_entry) => {
|
||||
vacant_entry.insert(loaded_account);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// store all unique accounts into new storage
|
||||
let accounts = accum.values().collect::<Vec<_>>();
|
||||
let to_store = (slot, &accounts[..]);
|
||||
let storable =
|
||||
StorableAccountsWithHashesAndWriteVersions::<'_, '_, _, _, &AccountHash>::new(
|
||||
&to_store,
|
||||
);
|
||||
storage.accounts.append_accounts(&storable, 0);
|
||||
|
||||
Arc::new(storage)
|
||||
}
|
||||
|
||||
/// `should_flush_f` is an optional closure that determines whether a given
|
||||
/// account should be flushed. Passing `None` will by default flush all
|
||||
/// accounts
|
||||
|
@ -10107,33 +10064,6 @@ pub mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_combine_multiple_slots_into_one_at_startup() {
|
||||
solana_logger::setup();
|
||||
let (db, slot1) = create_db_with_storages_and_index(false, 2, None);
|
||||
let slot2 = slot1 + 1;
|
||||
|
||||
let initial_accounts = get_all_accounts(&db, slot1..(slot2 + 1));
|
||||
|
||||
let tf = TempDir::new().unwrap();
|
||||
let stores = db
|
||||
.storage
|
||||
.all_slots()
|
||||
.into_iter()
|
||||
.map(|slot| {
|
||||
let storage = db.storage.get_slot_storage_entry(slot).unwrap();
|
||||
(storage.append_vec_id(), storage)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
let new_storage =
|
||||
AccountsDb::combine_multiple_slots_into_one_at_startup(tf.path(), 1000, slot1, &stores);
|
||||
|
||||
compare_all_accounts(
|
||||
&initial_accounts,
|
||||
&get_all_accounts_from_storages(std::iter::once(&new_storage)),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accountsdb_scan_snapshot_stores_hash_not_stored() {
|
||||
let accounts_db = AccountsDb::new_single_for_tests();
|
||||
|
|
|
@ -356,6 +356,9 @@ pub enum SnapshotError {
|
|||
|
||||
#[error("failed to archive snapshot package: {0}")]
|
||||
ArchiveSnapshotPackage(#[from] ArchiveSnapshotPackageError),
|
||||
|
||||
#[error("failed to rebuild snapshot storages: {0}")]
|
||||
RebuildStorages(String),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
|
|
|
@ -16,15 +16,14 @@ use {
|
|||
regex::Regex,
|
||||
solana_accounts_db::{
|
||||
account_storage::{AccountStorageMap, AccountStorageReference},
|
||||
accounts_db::{AccountStorageEntry, AccountsDb, AccountsFileId, AtomicAccountsFileId},
|
||||
append_vec::AppendVec,
|
||||
accounts_db::{AccountStorageEntry, AccountsFileId, AtomicAccountsFileId},
|
||||
},
|
||||
solana_sdk::clock::Slot,
|
||||
std::{
|
||||
collections::HashMap,
|
||||
fs::File,
|
||||
io::{BufReader, Error as IoError},
|
||||
path::{Path, PathBuf},
|
||||
path::PathBuf,
|
||||
str::FromStr as _,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
|
@ -333,52 +332,19 @@ impl SnapshotStorageRebuilder {
|
|||
.collect::<Result<HashMap<AccountsFileId, Arc<AccountStorageEntry>>, SnapshotError>>(
|
||||
)?;
|
||||
|
||||
let storage = if slot_stores.len() > 1 {
|
||||
let remapped_append_vec_folder = lock.first().unwrap().parent().unwrap();
|
||||
let remapped_append_vec_id = Self::get_unique_append_vec_id(
|
||||
&self.next_append_vec_id,
|
||||
remapped_append_vec_folder,
|
||||
slot,
|
||||
);
|
||||
AccountsDb::combine_multiple_slots_into_one_at_startup(
|
||||
remapped_append_vec_folder,
|
||||
remapped_append_vec_id,
|
||||
slot,
|
||||
&slot_stores,
|
||||
)
|
||||
} else {
|
||||
slot_stores
|
||||
.into_values()
|
||||
.next()
|
||||
.expect("at least 1 storage per slot required")
|
||||
};
|
||||
|
||||
self.storage.insert(
|
||||
slot,
|
||||
AccountStorageReference {
|
||||
id: storage.append_vec_id(),
|
||||
storage,
|
||||
},
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// increment `next_append_vec_id` until there is no file in `parent_folder` with this id and slot
|
||||
/// return the id
|
||||
fn get_unique_append_vec_id(
|
||||
next_append_vec_id: &Arc<AtomicAccountsFileId>,
|
||||
parent_folder: &Path,
|
||||
slot: Slot,
|
||||
) -> AccountsFileId {
|
||||
loop {
|
||||
let remapped_append_vec_id = next_append_vec_id.fetch_add(1, Ordering::AcqRel);
|
||||
let remapped_file_name = AppendVec::file_name(slot, remapped_append_vec_id);
|
||||
let remapped_append_vec_path = parent_folder.join(remapped_file_name);
|
||||
if std::fs::metadata(&remapped_append_vec_path).is_err() {
|
||||
// getting an err here means that there is no existing file here
|
||||
return remapped_append_vec_id;
|
||||
}
|
||||
if slot_stores.len() != 1 {
|
||||
return Err(SnapshotError::RebuildStorages(format!(
|
||||
"there must be exactly one storage per slot, but slot {slot} has {} storages",
|
||||
slot_stores.len()
|
||||
)));
|
||||
}
|
||||
// SAFETY: The check above guarantees there is one item in slot_stores,
|
||||
// so `.next()` will always return `Some`
|
||||
let (id, storage) = slot_stores.into_iter().next().unwrap();
|
||||
|
||||
self.storage
|
||||
.insert(slot, AccountStorageReference { id, storage });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Wait for the completion of the rebuilding threads
|
||||
|
@ -462,27 +428,6 @@ mod tests {
|
|||
solana_accounts_db::append_vec::AppendVec,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_get_unique_append_vec_id() {
|
||||
let folder = tempfile::TempDir::new().unwrap();
|
||||
let folder = folder.path();
|
||||
let next_id = Arc::default();
|
||||
let slot = 1;
|
||||
let append_vec_id =
|
||||
SnapshotStorageRebuilder::get_unique_append_vec_id(&next_id, folder, slot);
|
||||
assert_eq!(append_vec_id, 0);
|
||||
let file_name = AppendVec::file_name(slot, append_vec_id);
|
||||
let append_vec_path = folder.join(file_name);
|
||||
|
||||
// create a file at this path
|
||||
_ = File::create(append_vec_path).unwrap();
|
||||
next_id.store(0, Ordering::Release);
|
||||
let append_vec_id =
|
||||
SnapshotStorageRebuilder::get_unique_append_vec_id(&next_id, folder, slot);
|
||||
// should have found a conflict with 0
|
||||
assert_eq!(append_vec_id, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_snapshot_file_kind() {
|
||||
assert_eq!(None, get_snapshot_file_kind("file.txt"));
|
||||
|
|
Loading…
Reference in New Issue