From d238371b0ce3f409719d35ccf5518b1714bda2f8 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 21 Feb 2020 15:27:55 +0900 Subject: [PATCH] Correct missing entry handling to avoid bad warns (#8339) * Correct missing entry handling to avoid bad warns * Pass storage entries to AccountStorageSerialize * Fix CI..... * Add tests and reorder condition for cheapest first * Remove unneeded reference --- core/src/snapshot_packager_service.rs | 2 +- core/tests/bank_forks.rs | 5 +- ledger-tool/src/main.rs | 4 +- ledger/src/bank_forks.rs | 4 +- ledger/src/snapshot_package.rs | 13 +-- ledger/src/snapshot_utils.rs | 37 ++++--- runtime/src/accounts.rs | 6 +- runtime/src/accounts_db.rs | 141 +++++++++++++++++++------- runtime/src/bank.rs | 38 +++++-- 9 files changed, 176 insertions(+), 74 deletions(-) diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 986c96f03..6a164b19a 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -156,7 +156,7 @@ mod tests { 5, vec![], link_snapshots_dir, - storage_entries.clone(), + vec![storage_entries], output_tar_path.clone(), Hash::default(), ); diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 1cdeabb2e..8458b26f4 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -131,6 +131,7 @@ mod tests { } // Generate a snapshot package for last bank let last_bank = bank_forks.get(last_slot).unwrap(); + let storages: Vec<_> = last_bank.get_snapshot_storages(); let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path); let snapshot_package = snapshot_utils::package_snapshot( @@ -143,6 +144,7 @@ mod tests { ), &snapshot_config.snapshot_path, &last_bank.src.roots(), + storages, ) .unwrap(); @@ -202,7 +204,8 @@ mod tests { // Take snapshot of zeroth bank let bank0 = bank_forks.get(0).unwrap(); - snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap(); + let storages: Vec<_> = bank0.get_snapshot_storages(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0, &storages).unwrap(); // Set up snapshotting channels let (sender, receiver) = channel(); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index ca40ac5fd..4aebff057 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -903,7 +903,8 @@ fn main() { exit(1); }); - snapshot_utils::add_snapshot(&temp_dir, &bank) + let storages: Vec<_> = bank.get_snapshot_storages(); + snapshot_utils::add_snapshot(&temp_dir, &bank, &storages) .and_then(|slot_snapshot_paths| { snapshot_utils::package_snapshot( &bank, @@ -911,6 +912,7 @@ fn main() { snapshot_utils::get_snapshot_archive_path(output_directory), &temp_dir, &bank.src.roots(), + storages, ) }) .and_then(|package| { diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index 9ceca079c..b60dd3eae 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -251,8 +251,9 @@ impl BankForks { .cloned() .expect("root must exist in BankForks"); + let storages: Vec<_> = bank.get_snapshot_storages(); let mut add_snapshot_time = Measure::start("add-snapshot-ms"); - snapshot_utils::add_snapshot(&config.snapshot_path, &bank)?; + snapshot_utils::add_snapshot(&config.snapshot_path, &bank, &storages)?; add_snapshot_time.stop(); inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize); @@ -269,6 +270,7 @@ impl BankForks { tar_output_file, &config.snapshot_path, slots_to_snapshot, + storages, )?; // Send the package to the packaging thread diff --git a/ledger/src/snapshot_package.rs b/ledger/src/snapshot_package.rs index fd894ae33..dd875900e 100644 --- a/ledger/src/snapshot_package.rs +++ b/ledger/src/snapshot_package.rs @@ -1,12 +1,9 @@ -use solana_runtime::{accounts_db::AccountStorageEntry, bank::BankSlotDelta}; +use solana_runtime::{accounts_db::SnapshotStorages, bank::BankSlotDelta}; use solana_sdk::clock::Slot; use solana_sdk::hash::Hash; use std::{ path::PathBuf, - sync::{ - mpsc::{Receiver, SendError, Sender}, - Arc, - }, + sync::mpsc::{Receiver, SendError, Sender}, }; use tempfile::TempDir; @@ -19,7 +16,7 @@ pub struct SnapshotPackage { pub root: Slot, pub slot_deltas: Vec, pub snapshot_links: TempDir, - pub storage_entries: Vec>, + pub storages: SnapshotStorages, pub tar_output_file: PathBuf, pub hash: Hash, } @@ -29,7 +26,7 @@ impl SnapshotPackage { root: Slot, slot_deltas: Vec, snapshot_links: TempDir, - storage_entries: Vec>, + storages: SnapshotStorages, tar_output_file: PathBuf, hash: Hash, ) -> Self { @@ -37,7 +34,7 @@ impl SnapshotPackage { root, slot_deltas, snapshot_links, - storage_entries, + storages, tar_output_file, hash, } diff --git a/ledger/src/snapshot_utils.rs b/ledger/src/snapshot_utils.rs index 175c51ffa..f4705a5f0 100644 --- a/ledger/src/snapshot_utils.rs +++ b/ledger/src/snapshot_utils.rs @@ -4,8 +4,12 @@ use bzip2::bufread::BzDecoder; use fs_extra::dir::CopyOptions; use log::*; use solana_measure::measure::Measure; -use solana_runtime::bank::{ - self, deserialize_from_snapshot, Bank, BankSlotDelta, MAX_SNAPSHOT_DATA_FILE_SIZE, +use solana_runtime::{ + accounts_db::{SnapshotStorage, SnapshotStorages}, + bank::{ + self, deserialize_from_snapshot, Bank, BankRcSerialize, BankSlotDelta, + MAX_SNAPSHOT_DATA_FILE_SIZE, + }, }; use solana_sdk::clock::Slot; use std::{ @@ -78,23 +82,16 @@ pub fn package_snapshot, Q: AsRef>( snapshot_package_output_file: P, snapshot_path: Q, slots_to_snapshot: &[Slot], + snapshot_storages: SnapshotStorages, ) -> Result { // Hard link all the snapshots we need for this package let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?; - // Get a reference to all the relevant AccountStorageEntries - let account_storage_entries: Vec<_> = bank - .rc - .get_rooted_storage_entries() - .into_iter() - .filter(|x| x.slot_id() <= bank.slot()) - .collect(); - // Create a snapshot package info!( "Snapshot for bank: {} has {} account storage entries", bank.slot(), - account_storage_entries.len() + snapshot_storages.len() ); // Any errors from this point on will cause the above SnapshotPackage to drop, clearing @@ -105,7 +102,7 @@ pub fn package_snapshot, Q: AsRef>( bank.slot(), bank.src.slot_deltas(slots_to_snapshot), snapshot_hard_links_dir, - account_storage_entries, + snapshot_storages, snapshot_package_output_file.as_ref().to_path_buf(), bank.hash(), ); @@ -147,7 +144,7 @@ pub fn archive_snapshot_package(snapshot_package: &SnapshotPackage) -> Result<() )?; // Add the AppendVecs into the compressible list - for storage in &snapshot_package.storage_entries { + for storage in snapshot_package.storages.iter().flatten() { storage.flush()?; let storage_path = storage.get_path(); let output_path = staging_accounts_dir.join( @@ -321,7 +318,11 @@ where Ok(ret) } -pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result { +pub fn add_snapshot>( + snapshot_path: P, + bank: &Bank, + snapshot_storages: &[SnapshotStorage], +) -> Result { bank.purge_zero_lamport_accounts(); let slot = bank.slot(); // snapshot_path/slot @@ -341,7 +342,13 @@ pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result>; +pub type SnapshotStorages = Vec; // Each slot has a set of storage entries. type SlotStores = HashMap>; @@ -119,17 +121,8 @@ impl<'de> Visitor<'de> for AccountStorageVisitor { } } -pub struct AccountStorageSerialize<'a> { - account_storage: &'a AccountStorage, - slot: Slot, -} -impl<'a> AccountStorageSerialize<'a> { - pub fn new(account_storage: &'a AccountStorage, slot: Slot) -> Self { - Self { - account_storage, - slot, - } - } +struct AccountStorageSerialize<'a> { + account_storage_entries: &'a [SnapshotStorage], } impl<'a> Serialize for AccountStorageSerialize<'a> { @@ -137,21 +130,12 @@ impl<'a> Serialize for AccountStorageSerialize<'a> { where S: Serializer, { - let mut len: usize = 0; - for slot_id in self.account_storage.0.keys() { - if *slot_id <= self.slot { - len += 1; - } - } - let mut map = serializer.serialize_map(Some(len))?; + let mut map = serializer.serialize_map(Some(self.account_storage_entries.len()))?; let mut count = 0; let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms"); - for (slot_id, slot_storage) in &self.account_storage.0 { - if *slot_id <= self.slot { - let storage_entries: Vec<_> = slot_storage.values().collect(); - map.serialize_entry(&slot_id, &storage_entries)?; - count += slot_storage.len(); - } + for storage_entries in self.account_storage_entries { + map.serialize_entry(&storage_entries.first().unwrap().slot_id, storage_entries)?; + count += storage_entries.len(); } serialize_account_storage_timer.stop(); datapoint_info!( @@ -327,27 +311,37 @@ pub fn get_temp_accounts_paths(count: u32) -> IOResult<(Vec, Vec { +pub struct AccountsDBSerialize<'a, 'b> { accounts_db: &'a AccountsDB, slot: Slot, + account_storage_entries: &'b [SnapshotStorage], } -impl<'a> AccountsDBSerialize<'a> { - pub fn new(accounts_db: &'a AccountsDB, slot: Slot) -> Self { - Self { accounts_db, slot } +impl<'a, 'b> AccountsDBSerialize<'a, 'b> { + pub fn new( + accounts_db: &'a AccountsDB, + slot: Slot, + account_storage_entries: &'b [SnapshotStorage], + ) -> Self { + Self { + accounts_db, + slot, + account_storage_entries, + } } } -impl<'a> Serialize for AccountsDBSerialize<'a> { +impl<'a, 'b> Serialize for AccountsDBSerialize<'a, 'b> { fn serialize(&self, serializer: S) -> std::result::Result where S: serde::ser::Serializer, { use serde::ser::Error; - let storage = self.accounts_db.storage.read().unwrap(); let mut wr = Cursor::new(vec![]); let version: u64 = self.accounts_db.write_version.load(Ordering::Relaxed) as u64; - let account_storage_serialize = AccountStorageSerialize::new(&*storage, self.slot); + let account_storage_serialize = AccountStorageSerialize { + account_storage_entries: self.account_storage_entries, + }; serialize_into(&mut wr, &account_storage_serialize).map_err(Error::custom)?; serialize_into(&mut wr, &version).map_err(Error::custom)?; let bank_hashes = self.accounts_db.bank_hashes.read().unwrap(); @@ -1240,14 +1234,16 @@ impl AccountsDB { self.accounts_index.write().unwrap().add_root(slot) } - pub fn get_rooted_storage_entries(&self) -> Vec> { + pub fn get_snapshot_storages(&self, snapshot_slot: Slot) -> SnapshotStorages { let accounts_index = self.accounts_index.read().unwrap(); let r_storage = self.storage.read().unwrap(); r_storage .0 - .values() - .flat_map(|slot_store| slot_store.values().cloned()) - .filter(|store| accounts_index.is_root(store.slot_id)) + .iter() + .filter(|(slot, storage)| { + **slot <= snapshot_slot && !storage.is_empty() && accounts_index.is_root(**slot) + }) + .map(|(_, slot_store)| slot_store.values().cloned().collect()) .collect() } @@ -1938,7 +1934,12 @@ pub mod tests { fn reconstruct_accounts_db_via_serialization(accounts: &AccountsDB, slot: Slot) -> AccountsDB { let mut writer = Cursor::new(vec![]); - serialize_into(&mut writer, &AccountsDBSerialize::new(&accounts, slot)).unwrap(); + let snapshot_storages = accounts.get_snapshot_storages(slot); + serialize_into( + &mut writer, + &AccountsDBSerialize::new(&accounts, slot, &snapshot_storages), + ) + .unwrap(); let buf = writer.into_inner(); let mut reader = BufReader::new(&buf[..]); @@ -2281,8 +2282,8 @@ pub mod tests { accounts_db: &AccountsDB, output_dir: P, ) -> IOResult<()> { - let storage_entries = accounts_db.get_rooted_storage_entries(); - for storage in storage_entries { + let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value()); + for storage in storage_entries.iter().flatten() { let storage_path = storage.get_path(); let output_path = output_dir.as_ref().join( storage_path @@ -2488,4 +2489,68 @@ pub mod tests { } } } + + #[test] + fn test_get_snapshot_storages_empty() { + let db = AccountsDB::new(Vec::new()); + assert!(db.get_snapshot_storages(0).is_empty()); + } + + #[test] + fn test_get_snapshot_storages_only_older_than_or_equal_to_snapshot_slot() { + let db = AccountsDB::new(Vec::new()); + + let key = Pubkey::default(); + let account = Account::new(1, 0, &key); + let before_slot = 0; + let base_slot = before_slot + 1; + let after_slot = base_slot + 1; + + db.add_root(base_slot); + db.store(base_slot, &[(&key, &account)]); + assert!(db.get_snapshot_storages(before_slot).is_empty()); + + assert_eq!(1, db.get_snapshot_storages(base_slot).len()); + assert_eq!(1, db.get_snapshot_storages(after_slot).len()); + } + + #[test] + fn test_get_snapshot_storages_only_non_empty() { + let db = AccountsDB::new(Vec::new()); + + let key = Pubkey::default(); + let account = Account::new(1, 0, &key); + let base_slot = 0; + let after_slot = base_slot + 1; + + db.store(base_slot, &[(&key, &account)]); + db.storage + .write() + .unwrap() + .0 + .get_mut(&base_slot) + .unwrap() + .clear(); + db.add_root(base_slot); + assert!(db.get_snapshot_storages(after_slot).is_empty()); + + db.store(base_slot, &[(&key, &account)]); + assert_eq!(1, db.get_snapshot_storages(after_slot).len()); + } + + #[test] + fn test_get_snapshot_storages_only_roots() { + let db = AccountsDB::new(Vec::new()); + + let key = Pubkey::default(); + let account = Account::new(1, 0, &key); + let base_slot = 0; + let after_slot = base_slot + 1; + + db.store(base_slot, &[(&key, &account)]); + assert!(db.get_snapshot_storages(after_slot).is_empty()); + + db.add_root(base_slot); + assert_eq!(1, db.get_snapshot_storages(after_slot).len()); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 03342cb0f..35bb5655d 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4,7 +4,9 @@ //! already been signed and verified. use crate::{ accounts::{Accounts, TransactionAccounts, TransactionLoadResult, TransactionLoaders}, - accounts_db::{AccountStorageEntry, AccountsDBSerialize, AppendVecId, ErrorCounters}, + accounts_db::{ + AccountsDBSerialize, AppendVecId, ErrorCounters, SnapshotStorage, SnapshotStorages, + }, blockhash_queue::BlockhashQueue, hard_forks::HardForks, message_processor::{MessageProcessor, ProcessInstruction}, @@ -111,8 +113,8 @@ impl BankRc { Ok(()) } - pub fn get_rooted_storage_entries(&self) -> Vec> { - self.accounts.accounts_db.get_rooted_storage_entries() + pub fn get_snapshot_storages(&self, slot: Slot) -> SnapshotStorages { + self.accounts.accounts_db.get_snapshot_storages(slot) } fn get_io_error(error: &str) -> IOError { @@ -121,15 +123,23 @@ impl BankRc { } } -impl Serialize for BankRc { +pub struct BankRcSerialize<'a, 'b> { + pub bank_rc: &'a BankRc, + pub snapshot_storages: &'b [SnapshotStorage], +} + +impl<'a, 'b> Serialize for BankRcSerialize<'a, 'b> { fn serialize(&self, serializer: S) -> std::result::Result where S: serde::ser::Serializer, { use serde::ser::Error; let mut wr = Cursor::new(Vec::new()); - let accounts_db_serialize = - AccountsDBSerialize::new(&*self.accounts.accounts_db, self.slot); + let accounts_db_serialize = AccountsDBSerialize::new( + &*self.bank_rc.accounts.accounts_db, + self.bank_rc.slot, + self.snapshot_storages, + ); serialize_into(&mut wr, &accounts_db_serialize).map_err(Error::custom)?; let len = wr.position() as usize; serializer.serialize_bytes(&wr.into_inner()[..len]) @@ -1857,6 +1867,13 @@ impl Bank { .verify_bank_hash(self.slot(), &self.ancestors) } + pub fn get_snapshot_storages(&self) -> SnapshotStorages { + self.rc + .get_snapshot_storages(self.slot()) + .into_iter() + .collect() + } + #[must_use] fn verify_hash(&self) -> bool { assert!(self.is_frozen()); @@ -4691,11 +4708,16 @@ mod tests { bank2.squash(); - let len = serialized_size(&bank2).unwrap() + serialized_size(&bank2.rc).unwrap(); + let snapshot_storages = bank2.get_snapshot_storages(); + let rc_serialize = BankRcSerialize { + bank_rc: &bank2.rc, + snapshot_storages: &snapshot_storages, + }; + let len = serialized_size(&bank2).unwrap() + serialized_size(&rc_serialize).unwrap(); let mut buf = vec![0u8; len as usize]; let mut writer = Cursor::new(&mut buf[..]); serialize_into(&mut writer, &bank2).unwrap(); - serialize_into(&mut writer, &bank2.rc).unwrap(); + serialize_into(&mut writer, &rc_serialize).unwrap(); let mut rdr = Cursor::new(&buf[..]); let mut dbank: Bank = deserialize_from_snapshot(&mut rdr).unwrap();