Correct missing entry handling to avoid bad warns (#8339)

* Correct missing entry handling to avoid bad warns

* Pass storage entries to AccountStorageSerialize

* Fix CI.....

* Add tests and reorder condition for cheapest first

* Remove unneeded reference
This commit is contained in:
Ryo Onodera 2020-02-21 15:27:55 +09:00 committed by GitHub
parent 0b7e8d0162
commit d238371b0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 176 additions and 74 deletions

View File

@ -156,7 +156,7 @@ mod tests {
5,
vec![],
link_snapshots_dir,
storage_entries.clone(),
vec![storage_entries],
output_tar_path.clone(),
Hash::default(),
);

View File

@ -131,6 +131,7 @@ mod tests {
}
// Generate a snapshot package for last bank
let last_bank = bank_forks.get(last_slot).unwrap();
let storages: Vec<_> = last_bank.get_snapshot_storages();
let slot_snapshot_paths =
snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path);
let snapshot_package = snapshot_utils::package_snapshot(
@ -143,6 +144,7 @@ mod tests {
),
&snapshot_config.snapshot_path,
&last_bank.src.roots(),
storages,
)
.unwrap();
@ -202,7 +204,8 @@ mod tests {
// Take snapshot of zeroth bank
let bank0 = bank_forks.get(0).unwrap();
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap();
let storages: Vec<_> = bank0.get_snapshot_storages();
snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, &storages).unwrap();
// Set up snapshotting channels
let (sender, receiver) = channel();

View File

@ -903,7 +903,8 @@ fn main() {
exit(1);
});
snapshot_utils::add_snapshot(&temp_dir, &bank)
let storages: Vec<_> = bank.get_snapshot_storages();
snapshot_utils::add_snapshot(&temp_dir, &bank, &storages)
.and_then(|slot_snapshot_paths| {
snapshot_utils::package_snapshot(
&bank,
@ -911,6 +912,7 @@ fn main() {
snapshot_utils::get_snapshot_archive_path(output_directory),
&temp_dir,
&bank.src.roots(),
storages,
)
})
.and_then(|package| {

View File

@ -251,8 +251,9 @@ impl BankForks {
.cloned()
.expect("root must exist in BankForks");
let storages: Vec<_> = bank.get_snapshot_storages();
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
snapshot_utils::add_snapshot(&config.snapshot_path, &bank)?;
snapshot_utils::add_snapshot(&config.snapshot_path, &bank, &storages)?;
add_snapshot_time.stop();
inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize);
@ -269,6 +270,7 @@ impl BankForks {
tar_output_file,
&config.snapshot_path,
slots_to_snapshot,
storages,
)?;
// Send the package to the packaging thread

View File

@ -1,12 +1,9 @@
use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta};
use solana_runtime::{accounts_db::SnapshotStorages, bank::BankSlotDelta};
use solana_sdk::clock::Slot;
use solana_sdk::hash::Hash;
use std::{
path::PathBuf,
sync::{
mpsc::{Receiver, SendError, Sender},
Arc,
},
sync::mpsc::{Receiver, SendError, Sender},
};
use tempfile::TempDir;
@ -19,7 +16,7 @@ pub struct SnapshotPackage {
pub root: Slot,
pub slot_deltas: Vec<BankSlotDelta>,
pub snapshot_links: TempDir,
pub storage_entries: Vec<Arc<AccountStorageEntry>>,
pub storages: SnapshotStorages,
pub tar_output_file: PathBuf,
pub hash: Hash,
}
@ -29,7 +26,7 @@ impl SnapshotPackage {
root: Slot,
slot_deltas: Vec<BankSlotDelta>,
snapshot_links: TempDir,
storage_entries: Vec<Arc<AccountStorageEntry>>,
storages: SnapshotStorages,
tar_output_file: PathBuf,
hash: Hash,
) -> Self {
@ -37,7 +34,7 @@ impl SnapshotPackage {
root,
slot_deltas,
snapshot_links,
storage_entries,
storages,
tar_output_file,
hash,
}

View File

@ -4,8 +4,12 @@ use bzip2::bufread::BzDecoder;
use fs_extra::dir::CopyOptions;
use log::*;
use solana_measure::measure::Measure;
use solana_runtime::bank::{
self, deserialize_from_snapshot, Bank, BankSlotDelta, MAX_SNAPSHOT_DATA_FILE_SIZE,
use solana_runtime::{
accounts_db::{SnapshotStorage, SnapshotStorages},
bank::{
self, deserialize_from_snapshot, Bank, BankRcSerialize, BankSlotDelta,
MAX_SNAPSHOT_DATA_FILE_SIZE,
},
};
use solana_sdk::clock::Slot;
use std::{
@ -78,23 +82,16 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
snapshot_package_output_file: P,
snapshot_path: Q,
slots_to_snapshot: &[Slot],
snapshot_storages: SnapshotStorages,
) -> Result<SnapshotPackage> {
// Hard link all the snapshots we need for this package
let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?;
// Get a reference to all the relevant AccountStorageEntries
let account_storage_entries: Vec<_> = bank
.rc
.get_rooted_storage_entries()
.into_iter()
.filter(|x| x.slot_id() <= bank.slot())
.collect();
// Create a snapshot package
info!(
"Snapshot for bank: {} has {} account storage entries",
bank.slot(),
account_storage_entries.len()
snapshot_storages.len()
);
// Any errors from this point on will cause the above SnapshotPackage to drop, clearing
@ -105,7 +102,7 @@ pub fn package_snapshot<P: AsRef<Path>, Q: AsRef<Path>>(
bank.slot(),
bank.src.slot_deltas(slots_to_snapshot),
snapshot_hard_links_dir,
account_storage_entries,
snapshot_storages,
snapshot_package_output_file.as_ref().to_path_buf(),
bank.hash(),
);
@ -147,7 +144,7 @@ pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<()
)?;
// Add the AppendVecs into the compressible list
for storage in &snapshot_package.storage_entries {
for storage in snapshot_package.storages.iter().flatten() {
storage.flush()?;
let storage_path = storage.get_path();
let output_path = staging_accounts_dir.join(
@ -321,7 +318,11 @@ where
Ok(ret)
}
pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank) -> Result<SlotSnapshotPaths> {
pub fn add_snapshot<P: AsRef<Path>>(
snapshot_path: P,
bank: &Bank,
snapshot_storages: &[SnapshotStorage],
) -> Result<SlotSnapshotPaths> {
bank.purge_zero_lamport_accounts();
let slot = bank.slot();
// snapshot_path/slot
@ -341,7 +342,13 @@ pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank) -> Result<Slo
MAX_SNAPSHOT_DATA_FILE_SIZE,
|stream| {
serialize_into(stream.by_ref(), &*bank)?;
serialize_into(stream.by_ref(), &bank.rc)?;
serialize_into(
stream.by_ref(),
&BankRcSerialize {
bank_rc: &bank.rc,
snapshot_storages,
},
)?;
Ok(())
},
)?;

View File

@ -1343,7 +1343,11 @@ mod tests {
let mut writer = Cursor::new(vec![]);
serialize_into(
&mut writer,
&AccountsDBSerialize::new(&*accounts.accounts_db, 0),
&AccountsDBSerialize::new(
&*accounts.accounts_db,
0,
&accounts.accounts_db.get_snapshot_storages(0),
),
)
.unwrap();

View File

@ -87,6 +87,8 @@ pub struct AccountInfo {
}
/// An offset into the AccountsDB::storage vector
pub type AppendVecId = usize;
pub type SnapshotStorage = Vec<Arc<AccountStorageEntry>>;
pub type SnapshotStorages = Vec<SnapshotStorage>;
// Each slot has a set of storage entries.
type SlotStores = HashMap<usize, Arc<AccountStorageEntry>>;
@ -119,17 +121,8 @@ impl<'de> Visitor<'de> for AccountStorageVisitor {
}
}
pub struct AccountStorageSerialize<'a> {
account_storage: &'a AccountStorage,
slot: Slot,
}
impl<'a> AccountStorageSerialize<'a> {
pub fn new(account_storage: &'a AccountStorage, slot: Slot) -> Self {
Self {
account_storage,
slot,
}
}
struct AccountStorageSerialize<'a> {
account_storage_entries: &'a [SnapshotStorage],
}
impl<'a> Serialize for AccountStorageSerialize<'a> {
@ -137,21 +130,12 @@ impl<'a> Serialize for AccountStorageSerialize<'a> {
where
S: Serializer,
{
let mut len: usize = 0;
for slot_id in self.account_storage.0.keys() {
if *slot_id <= self.slot {
len += 1;
}
}
let mut map = serializer.serialize_map(Some(len))?;
let mut map = serializer.serialize_map(Some(self.account_storage_entries.len()))?;
let mut count = 0;
let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms");
for (slot_id, slot_storage) in &self.account_storage.0 {
if *slot_id <= self.slot {
let storage_entries: Vec<_> = slot_storage.values().collect();
map.serialize_entry(&slot_id, &storage_entries)?;
count += slot_storage.len();
}
for storage_entries in self.account_storage_entries {
map.serialize_entry(&storage_entries.first().unwrap().slot_id, storage_entries)?;
count += storage_entries.len();
}
serialize_account_storage_timer.stop();
datapoint_info!(
@ -327,27 +311,37 @@ pub fn get_temp_accounts_paths(count: u32) -> IOResult<(Vec<TempDir>, Vec<PathBu
Ok((temp_dirs, paths))
}
pub struct AccountsDBSerialize<'a> {
pub struct AccountsDBSerialize<'a, 'b> {
accounts_db: &'a AccountsDB,
slot: Slot,
account_storage_entries: &'b [SnapshotStorage],
}
impl<'a> AccountsDBSerialize<'a> {
pub fn new(accounts_db: &'a AccountsDB, slot: Slot) -> Self {
Self { accounts_db, slot }
impl<'a, 'b> AccountsDBSerialize<'a, 'b> {
pub fn new(
accounts_db: &'a AccountsDB,
slot: Slot,
account_storage_entries: &'b [SnapshotStorage],
) -> Self {
Self {
accounts_db,
slot,
account_storage_entries,
}
}
}
impl<'a> Serialize for AccountsDBSerialize<'a> {
impl<'a, 'b> Serialize for AccountsDBSerialize<'a, 'b> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let storage = self.accounts_db.storage.read().unwrap();
let mut wr = Cursor::new(vec![]);
let version: u64 = self.accounts_db.write_version.load(Ordering::Relaxed) as u64;
let account_storage_serialize = AccountStorageSerialize::new(&*storage, self.slot);
let account_storage_serialize = AccountStorageSerialize {
account_storage_entries: self.account_storage_entries,
};
serialize_into(&mut wr, &account_storage_serialize).map_err(Error::custom)?;
serialize_into(&mut wr, &version).map_err(Error::custom)?;
let bank_hashes = self.accounts_db.bank_hashes.read().unwrap();
@ -1240,14 +1234,16 @@ impl AccountsDB {
self.accounts_index.write().unwrap().add_root(slot)
}
pub fn get_rooted_storage_entries(&self) -> Vec<Arc<AccountStorageEntry>> {
pub fn get_snapshot_storages(&self, snapshot_slot: Slot) -> SnapshotStorages {
let accounts_index = self.accounts_index.read().unwrap();
let r_storage = self.storage.read().unwrap();
r_storage
.0
.values()
.flat_map(|slot_store| slot_store.values().cloned())
.filter(|store| accounts_index.is_root(store.slot_id))
.iter()
.filter(|(slot, storage)| {
**slot <= snapshot_slot && !storage.is_empty() && accounts_index.is_root(**slot)
})
.map(|(_, slot_store)| slot_store.values().cloned().collect())
.collect()
}
@ -1938,7 +1934,12 @@ pub mod tests {
fn reconstruct_accounts_db_via_serialization(accounts: &AccountsDB, slot: Slot) -> AccountsDB {
let mut writer = Cursor::new(vec![]);
serialize_into(&mut writer, &AccountsDBSerialize::new(&accounts, slot)).unwrap();
let snapshot_storages = accounts.get_snapshot_storages(slot);
serialize_into(
&mut writer,
&AccountsDBSerialize::new(&accounts, slot, &snapshot_storages),
)
.unwrap();
let buf = writer.into_inner();
let mut reader = BufReader::new(&buf[..]);
@ -2281,8 +2282,8 @@ pub mod tests {
accounts_db: &AccountsDB,
output_dir: P,
) -> IOResult<()> {
let storage_entries = accounts_db.get_rooted_storage_entries();
for storage in storage_entries {
let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value());
for storage in storage_entries.iter().flatten() {
let storage_path = storage.get_path();
let output_path = output_dir.as_ref().join(
storage_path
@ -2488,4 +2489,68 @@ pub mod tests {
}
}
}
#[test]
fn test_get_snapshot_storages_empty() {
let db = AccountsDB::new(Vec::new());
assert!(db.get_snapshot_storages(0).is_empty());
}
#[test]
fn test_get_snapshot_storages_only_older_than_or_equal_to_snapshot_slot() {
let db = AccountsDB::new(Vec::new());
let key = Pubkey::default();
let account = Account::new(1, 0, &key);
let before_slot = 0;
let base_slot = before_slot + 1;
let after_slot = base_slot + 1;
db.add_root(base_slot);
db.store(base_slot, &[(&key, &account)]);
assert!(db.get_snapshot_storages(before_slot).is_empty());
assert_eq!(1, db.get_snapshot_storages(base_slot).len());
assert_eq!(1, db.get_snapshot_storages(after_slot).len());
}
#[test]
fn test_get_snapshot_storages_only_non_empty() {
let db = AccountsDB::new(Vec::new());
let key = Pubkey::default();
let account = Account::new(1, 0, &key);
let base_slot = 0;
let after_slot = base_slot + 1;
db.store(base_slot, &[(&key, &account)]);
db.storage
.write()
.unwrap()
.0
.get_mut(&base_slot)
.unwrap()
.clear();
db.add_root(base_slot);
assert!(db.get_snapshot_storages(after_slot).is_empty());
db.store(base_slot, &[(&key, &account)]);
assert_eq!(1, db.get_snapshot_storages(after_slot).len());
}
#[test]
fn test_get_snapshot_storages_only_roots() {
let db = AccountsDB::new(Vec::new());
let key = Pubkey::default();
let account = Account::new(1, 0, &key);
let base_slot = 0;
let after_slot = base_slot + 1;
db.store(base_slot, &[(&key, &account)]);
assert!(db.get_snapshot_storages(after_slot).is_empty());
db.add_root(base_slot);
assert_eq!(1, db.get_snapshot_storages(after_slot).len());
}
}

View File

@ -4,7 +4,9 @@
//! already been signed and verified.
use crate::{
accounts::{Accounts, TransactionAccounts, TransactionLoadResult, TransactionLoaders},
accounts_db::{AccountStorageEntry, AccountsDBSerialize, AppendVecId, ErrorCounters},
accounts_db::{
AccountsDBSerialize, AppendVecId, ErrorCounters, SnapshotStorage, SnapshotStorages,
},
blockhash_queue::BlockhashQueue,
hard_forks::HardForks,
message_processor::{MessageProcessor, ProcessInstruction},
@ -111,8 +113,8 @@ impl BankRc {
Ok(())
}
pub fn get_rooted_storage_entries(&self) -> Vec<Arc<AccountStorageEntry>> {
self.accounts.accounts_db.get_rooted_storage_entries()
pub fn get_snapshot_storages(&self, slot: Slot) -> SnapshotStorages {
self.accounts.accounts_db.get_snapshot_storages(slot)
}
fn get_io_error(error: &str) -> IOError {
@ -121,15 +123,23 @@ impl BankRc {
}
}
impl Serialize for BankRc {
pub struct BankRcSerialize<'a, 'b> {
pub bank_rc: &'a BankRc,
pub snapshot_storages: &'b [SnapshotStorage],
}
impl<'a, 'b> Serialize for BankRcSerialize<'a, 'b> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let mut wr = Cursor::new(Vec::new());
let accounts_db_serialize =
AccountsDBSerialize::new(&*self.accounts.accounts_db, self.slot);
let accounts_db_serialize = AccountsDBSerialize::new(
&*self.bank_rc.accounts.accounts_db,
self.bank_rc.slot,
self.snapshot_storages,
);
serialize_into(&mut wr, &accounts_db_serialize).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
@ -1857,6 +1867,13 @@ impl Bank {
.verify_bank_hash(self.slot(), &self.ancestors)
}
pub fn get_snapshot_storages(&self) -> SnapshotStorages {
self.rc
.get_snapshot_storages(self.slot())
.into_iter()
.collect()
}
#[must_use]
fn verify_hash(&self) -> bool {
assert!(self.is_frozen());
@ -4691,11 +4708,16 @@ mod tests {
bank2.squash();
let len = serialized_size(&bank2).unwrap() + serialized_size(&bank2.rc).unwrap();
let snapshot_storages = bank2.get_snapshot_storages();
let rc_serialize = BankRcSerialize {
bank_rc: &bank2.rc,
snapshot_storages: &snapshot_storages,
};
let len = serialized_size(&bank2).unwrap() + serialized_size(&rc_serialize).unwrap();
let mut buf = vec![0u8; len as usize];
let mut writer = Cursor::new(&mut buf[..]);
serialize_into(&mut writer, &bank2).unwrap();
serialize_into(&mut writer, &bank2.rc).unwrap();
serialize_into(&mut writer, &rc_serialize).unwrap();
let mut rdr = Cursor::new(&buf[..]);
let mut dbank: Bank = deserialize_from_snapshot(&mut rdr).unwrap();