From 701d90a41d8d00a94d6621a03a053b0c26d5e61f Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 25 Sep 2019 18:07:41 -0700 Subject: [PATCH] Remove some AccountStorage Serialization (#6047) * Remove serialization of AccountStorageEntry fields * Add metric for evaluating BankRc serialization time * Serialize AppendVec current len * Add dashboard metrics * Move flush of AppendVecs to packaging thread --- core/src/snapshot_package.rs | 1 + core/src/snapshot_utils.rs | 5 + .../dashboards/testnet-monitor.json | 117 ++++++++++++++++++ runtime/src/accounts_db.rs | 64 +++++----- runtime/src/append_vec.rs | 71 ++--------- 5 files changed, 171 insertions(+), 87 deletions(-) diff --git a/core/src/snapshot_package.rs b/core/src/snapshot_package.rs index 9bd7e00115..7d1822b552 100644 --- a/core/src/snapshot_package.rs +++ b/core/src/snapshot_package.rs @@ -111,6 +111,7 @@ impl SnapshotPackagerService { // Add the AppendVecs into the compressible list for storage in &snapshot_package.storage_entries { + storage.flush()?; let storage_path = storage.get_path(); let output_path = staging_accounts_dir.join( storage_path diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs index 710140c740..ec5858badf 100644 --- a/core/src/snapshot_utils.rs +++ b/core/src/snapshot_utils.rs @@ -5,6 +5,7 @@ use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR}; use bincode::{deserialize_from, serialize_into}; use bzip2::bufread::BzDecoder; use fs_extra::dir::CopyOptions; +use solana_measure::measure::Measure; use solana_runtime::bank::Bank; use solana_runtime::status_cache::SlotDelta; use solana_sdk::transaction; @@ -140,7 +141,11 @@ pub fn add_snapshot>(snapshot_path: P, bank: &Bank) -> Result<()> let mut snapshot_stream = BufWriter::new(snapshot_file); // Create the snapshot serialize_into(&mut snapshot_stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?; + let mut bank_rc_serialize = Measure::start("bank_rc_serialize-ms"); serialize_into(&mut snapshot_stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?; + bank_rc_serialize.stop(); + inc_new_counter_info!("bank-rc-serialize-ms", bank_rc_serialize.as_ms() as usize); + info!( "successfully created snapshot {}, path: {:?}", bank.slot(), diff --git a/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json b/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json index 52c49a7029..770a5f6fb0 100644 --- a/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json +++ b/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json @@ -5693,6 +5693,123 @@ ], "tags": [] }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"count\") AS \"serialize_bank_rc\" FROM \"$testnet\".\"autogen\".\"bank-rc-serialize-ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"count\") AS \"add_snapshot_ms\" FROM \"$testnet\".\"autogen\".\"add-snapshot-ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "hide": false, + "measurement": "cluster_info-vote-count", + "orderByTime": "ASC", + "policy": "autogen", + "query": "SELECT mean(\"duration\") AS \"serialize_account_storage\" FROM \"$testnet\".\"autogen\".\"serialize_account_storage_ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "rawQuery": true, + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "count" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [] + }, { "groupBy": [ { diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 96c292922d..8e070dfe18 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -84,20 +84,6 @@ pub type AppendVecId = usize; // Each fork has a set of storage entries. type ForkStores = HashMap>; -#[derive(Clone, Default, Debug)] -pub struct AccountStorage(pub HashMap); -pub struct AccountStorageSerialize<'a> { - account_storage: &'a AccountStorage, - slot: u64, -} -impl<'a> AccountStorageSerialize<'a> { - pub fn new(account_storage: &'a AccountStorage, slot: u64) -> Self { - Self { - account_storage, - slot, - } - } -} struct AccountStorageVisitor; impl<'de> Visitor<'de> for AccountStorageVisitor { @@ -113,38 +99,51 @@ impl<'de> Visitor<'de> for AccountStorageVisitor { M: MapAccess<'de>, { let mut map = HashMap::new(); - while let Some((storage_id, storage_entry)) = access.next_entry()? { - let storage_entry: AccountStorageEntry = storage_entry; - let storage_fork_map = map - .entry(storage_entry.fork_id) - .or_insert_with(HashMap::new); - storage_fork_map.insert(storage_id, Arc::new(storage_entry)); + while let Some((fork_id, storage_entries)) = access.next_entry()? { + let storage_entries: Vec = storage_entries; + let storage_fork_map = map.entry(fork_id).or_insert_with(HashMap::new); + for mut storage in storage_entries { + storage.fork_id = fork_id; + storage_fork_map.insert(storage.id, Arc::new(storage)); + } } Ok(AccountStorage(map)) } } +pub struct AccountStorageSerialize<'a> { + account_storage: &'a AccountStorage, + slot: u64, +} +impl<'a> AccountStorageSerialize<'a> { + pub fn new(account_storage: &'a AccountStorage, slot: u64) -> Self { + Self { + account_storage, + slot, + } + } +} + impl<'a> Serialize for AccountStorageSerialize<'a> { fn serialize(&self, serializer: S) -> Result where S: Serializer, { let mut len: usize = 0; - for (fork_id, storage) in &self.account_storage.0 { + for fork_id in self.account_storage.0.keys() { if *fork_id <= self.slot { - len += storage.len(); + len += 1; } } let mut map = serializer.serialize_map(Some(len))?; let mut count = 0; let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms"); - for fork_storage in self.account_storage.0.values() { - for (storage_id, account_storage_entry) in fork_storage { - if account_storage_entry.fork_id <= self.slot { - map.serialize_entry(storage_id, &**account_storage_entry)?; - count += 1; - } + for (fork_id, fork_storage) in &self.account_storage.0 { + if *fork_id <= self.slot { + let storage_entries: Vec<_> = fork_storage.values().collect(); + map.serialize_entry(&fork_id, &storage_entries)?; + count += fork_storage.len(); } } serialize_account_storage_timer.stop(); @@ -157,6 +156,8 @@ impl<'a> Serialize for AccountStorageSerialize<'a> { } } +#[derive(Clone, Default, Debug)] +pub struct AccountStorage(pub HashMap); impl<'de> Deserialize<'de> for AccountStorage { fn deserialize(deserializer: D) -> Result where @@ -174,10 +175,11 @@ pub enum AccountStorageStatus { } /// Persistent storage structure holding the accounts -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct AccountStorageEntry { id: AppendVecId, + #[serde(skip)] fork_id: Fork, /// storage holding the accounts @@ -242,6 +244,10 @@ impl AccountStorageEntry { self.id } + pub fn flush(&self) -> Result<(), IOError> { + self.accounts.flush() + } + fn add_account(&self) { let mut count_and_status = self.count_and_status.write().unwrap(); *count_and_status = (count_and_status.0 + 1, count_and_status.1); diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 6d68850411..674b44e2d1 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -1,4 +1,4 @@ -use bincode::{deserialize_from, serialize_into, serialized_size}; +use bincode::{deserialize_from, serialize_into}; use memmap::MmapMut; use serde::{Deserialize, Serialize}; use solana_sdk::{account::Account, clock::Epoch, hash::Hash, pubkey::Pubkey}; @@ -74,6 +74,7 @@ pub struct AppendVec { path: PathBuf, map: MmapMut, // This mutex forces append to be single threaded, but concurrent with reads + #[allow(clippy::mutex_atomic)] append_offset: Mutex, current_len: AtomicUsize, file_size: u64, @@ -137,6 +138,10 @@ impl AppendVec { } } + pub fn flush(&self) -> io::Result<()> { + self.map.flush() + } + #[allow(clippy::mutex_atomic)] pub fn reset(&self) { // This mutex forces append to be single threaded, but concurrent with reads @@ -167,6 +172,7 @@ impl AppendVec { PathBuf::from(&format!("{}.{}", fork_id, id)) } + #[allow(clippy::mutex_atomic)] pub fn set_file>(&mut self, path: P) -> io::Result<()> { self.path = path.as_ref().to_path_buf(); let data = OpenOptions::new() @@ -381,19 +387,11 @@ impl Serialize for AppendVec { S: serde::ser::Serializer, { use serde::ser::Error; - let len = serialized_size(&self.path).unwrap() - + std::mem::size_of::() as u64 - + std::mem::size_of::() as u64 - + std::mem::size_of::() as u64; + let len = std::mem::size_of::() as u64; let mut buf = vec![0u8; len as usize]; let mut wr = Cursor::new(&mut buf[..]); - self.map.flush().map_err(Error::custom)?; - serialize_into(&mut wr, &self.path).map_err(Error::custom)?; serialize_into(&mut wr, &(self.current_len.load(Ordering::Relaxed) as u64)) .map_err(Error::custom)?; - serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?; - let offset = *self.append_offset.lock().unwrap(); - serialize_into(&mut wr, &offset).map_err(Error::custom)?; let len = wr.position() as usize; serializer.serialize_bytes(&wr.into_inner()[..len]) } @@ -417,19 +415,14 @@ impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { { use serde::de::Error; let mut rd = Cursor::new(&data[..]); - // TODO: this path does not need to be serialized, can remove - let path: PathBuf = deserialize_from(&mut rd).map_err(Error::custom)?; - let current_len: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; - let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; - let offset: usize = deserialize_from(&mut rd).map_err(Error::custom)?; - + let current_len: usize = deserialize_from(&mut rd).map_err(Error::custom)?; let map = MmapMut::map_anon(1).map_err(|e| Error::custom(e.to_string()))?; Ok(AppendVec { - path, + path: PathBuf::from(String::default()), map, - append_offset: Mutex::new(offset), - current_len: AtomicUsize::new(current_len as usize), - file_size, + append_offset: Mutex::new(current_len), + current_len: AtomicUsize::new(current_len), + file_size: current_len as u64, }) } } @@ -519,44 +512,6 @@ pub mod tests { ); } - #[test] - fn test_append_vec_serialize() { - let path = get_append_vec_path("test_append_serialize"); - let av: AppendVec = AppendVec::new(&Path::new(&path.path).join("0"), true, 1024 * 1024); - let account1 = create_test_account(1); - let index1 = av.append_account_test(&account1).unwrap(); - assert_eq!(index1, 0); - assert_eq!(av.get_account_test(index1).unwrap(), account1); - - let account2 = create_test_account(2); - let index2 = av.append_account_test(&account2).unwrap(); - assert_eq!(av.get_account_test(index2).unwrap(), account2); - assert_eq!(av.get_account_test(index1).unwrap(), account1); - - let append_vec_path = &av.path; - - // Serialize the AppendVec - let mut writer = Cursor::new(vec![]); - serialize_into(&mut writer, &av).unwrap(); - - // Deserialize the AppendVec - let buf = writer.into_inner(); - let mut reader = Cursor::new(&buf[..]); - let mut dav: AppendVec = deserialize_from(&mut reader).unwrap(); - - // Set the AppendVec path - dav.set_file(append_vec_path).unwrap(); - assert_eq!(dav.get_account_test(index2).unwrap(), account2); - assert_eq!(dav.get_account_test(index1).unwrap(), account1); - drop(dav); - - // Dropping dav above blows away underlying file's directory entry, so - // trying to set the file will fail - let mut reader = Cursor::new(&buf[..]); - let mut dav: AppendVec = deserialize_from(&mut reader).unwrap(); - assert!(dav.set_file(append_vec_path).is_err()); - } - #[test] fn test_relative_path() { let relative_path = AppendVec::new_relative_path(0, 2);