Remove some AccountStorage Serialization (#6047)

* Remove serialization of AccountStorageEntry fields

* Add metric for evaluating BankRc serialization time

* Serialize AppendVec current len

* Add dashboard metrics

* Move flush of AppendVecs to packaging thread
This commit is contained in:
carllin 2019-09-25 18:07:41 -07:00 committed by GitHub
parent 56f6ee84f1
commit 701d90a41d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 171 additions and 87 deletions

View File

@ -111,6 +111,7 @@ impl SnapshotPackagerService {
// Add the AppendVecs into the compressible list
for storage in &snapshot_package.storage_entries {
storage.flush()?;
let storage_path = storage.get_path();
let output_path = staging_accounts_dir.join(
storage_path

View File

@ -5,6 +5,7 @@ use crate::snapshot_package::{TAR_ACCOUNTS_DIR, TAR_SNAPSHOTS_DIR};
use bincode::{deserialize_from, serialize_into};
use bzip2::bufread::BzDecoder;
use fs_extra::dir::CopyOptions;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use solana_runtime::status_cache::SlotDelta;
use solana_sdk::transaction;
@ -140,7 +141,11 @@ pub fn add_snapshot<P: AsRef<Path>>(snapshot_path: P, bank: &Bank) -> Result<()>
let mut snapshot_stream = BufWriter::new(snapshot_file);
// Create the snapshot
serialize_into(&mut snapshot_stream, &*bank).map_err(|e| get_io_error(&e.to_string()))?;
let mut bank_rc_serialize = Measure::start("bank_rc_serialize-ms");
serialize_into(&mut snapshot_stream, &bank.rc).map_err(|e| get_io_error(&e.to_string()))?;
bank_rc_serialize.stop();
inc_new_counter_info!("bank-rc-serialize-ms", bank_rc_serialize.as_ms() as usize);
info!(
"successfully created snapshot {}, path: {:?}",
bank.slot(),

View File

@ -5693,6 +5693,123 @@
],
"tags": []
},
{
"groupBy": [
{
"params": [
"$__interval"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"hide": false,
"measurement": "cluster_info-vote-count",
"orderByTime": "ASC",
"policy": "autogen",
"query": "SELECT mean(\"count\") AS \"serialize_bank_rc\" FROM \"$testnet\".\"autogen\".\"bank-rc-serialize-ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "C",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"count"
],
"type": "field"
},
{
"params": [],
"type": "sum"
}
]
],
"tags": []
},
{
"groupBy": [
{
"params": [
"$__interval"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"hide": false,
"measurement": "cluster_info-vote-count",
"orderByTime": "ASC",
"policy": "autogen",
"query": "SELECT mean(\"count\") AS \"add_snapshot_ms\" FROM \"$testnet\".\"autogen\".\"add-snapshot-ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "C",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"count"
],
"type": "field"
},
{
"params": [],
"type": "sum"
}
]
],
"tags": []
},
{
"groupBy": [
{
"params": [
"$__interval"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"hide": false,
"measurement": "cluster_info-vote-count",
"orderByTime": "ASC",
"policy": "autogen",
"query": "SELECT mean(\"duration\") AS \"serialize_account_storage\" FROM \"$testnet\".\"autogen\".\"serialize_account_storage_ms\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "C",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"count"
],
"type": "field"
},
{
"params": [],
"type": "sum"
}
]
],
"tags": []
},
{
"groupBy": [
{

View File

@ -84,20 +84,6 @@ pub type AppendVecId = usize;
// Each fork has a set of storage entries.
type ForkStores = HashMap<usize, Arc<AccountStorageEntry>>;
#[derive(Clone, Default, Debug)]
pub struct AccountStorage(pub HashMap<Fork, ForkStores>);
pub struct AccountStorageSerialize<'a> {
account_storage: &'a AccountStorage,
slot: u64,
}
impl<'a> AccountStorageSerialize<'a> {
pub fn new(account_storage: &'a AccountStorage, slot: u64) -> Self {
Self {
account_storage,
slot,
}
}
}
struct AccountStorageVisitor;
impl<'de> Visitor<'de> for AccountStorageVisitor {
@ -113,38 +99,51 @@ impl<'de> Visitor<'de> for AccountStorageVisitor {
M: MapAccess<'de>,
{
let mut map = HashMap::new();
while let Some((storage_id, storage_entry)) = access.next_entry()? {
let storage_entry: AccountStorageEntry = storage_entry;
let storage_fork_map = map
.entry(storage_entry.fork_id)
.or_insert_with(HashMap::new);
storage_fork_map.insert(storage_id, Arc::new(storage_entry));
while let Some((fork_id, storage_entries)) = access.next_entry()? {
let storage_entries: Vec<AccountStorageEntry> = storage_entries;
let storage_fork_map = map.entry(fork_id).or_insert_with(HashMap::new);
for mut storage in storage_entries {
storage.fork_id = fork_id;
storage_fork_map.insert(storage.id, Arc::new(storage));
}
}
Ok(AccountStorage(map))
}
}
pub struct AccountStorageSerialize<'a> {
account_storage: &'a AccountStorage,
slot: u64,
}
impl<'a> AccountStorageSerialize<'a> {
pub fn new(account_storage: &'a AccountStorage, slot: u64) -> Self {
Self {
account_storage,
slot,
}
}
}
impl<'a> Serialize for AccountStorageSerialize<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut len: usize = 0;
for (fork_id, storage) in &self.account_storage.0 {
for fork_id in self.account_storage.0.keys() {
if *fork_id <= self.slot {
len += storage.len();
len += 1;
}
}
let mut map = serializer.serialize_map(Some(len))?;
let mut count = 0;
let mut serialize_account_storage_timer = Measure::start("serialize_account_storage_ms");
for fork_storage in self.account_storage.0.values() {
for (storage_id, account_storage_entry) in fork_storage {
if account_storage_entry.fork_id <= self.slot {
map.serialize_entry(storage_id, &**account_storage_entry)?;
count += 1;
}
for (fork_id, fork_storage) in &self.account_storage.0 {
if *fork_id <= self.slot {
let storage_entries: Vec<_> = fork_storage.values().collect();
map.serialize_entry(&fork_id, &storage_entries)?;
count += fork_storage.len();
}
}
serialize_account_storage_timer.stop();
@ -157,6 +156,8 @@ impl<'a> Serialize for AccountStorageSerialize<'a> {
}
}
#[derive(Clone, Default, Debug)]
pub struct AccountStorage(pub HashMap<Fork, ForkStores>);
impl<'de> Deserialize<'de> for AccountStorage {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
@ -174,10 +175,11 @@ pub enum AccountStorageStatus {
}
/// Persistent storage structure holding the accounts
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct AccountStorageEntry {
id: AppendVecId,
#[serde(skip)]
fork_id: Fork,
/// storage holding the accounts
@ -242,6 +244,10 @@ impl AccountStorageEntry {
self.id
}
pub fn flush(&self) -> Result<(), IOError> {
self.accounts.flush()
}
fn add_account(&self) {
let mut count_and_status = self.count_and_status.write().unwrap();
*count_and_status = (count_and_status.0 + 1, count_and_status.1);

View File

@ -1,4 +1,4 @@
use bincode::{deserialize_from, serialize_into, serialized_size};
use bincode::{deserialize_from, serialize_into};
use memmap::MmapMut;
use serde::{Deserialize, Serialize};
use solana_sdk::{account::Account, clock::Epoch, hash::Hash, pubkey::Pubkey};
@ -74,6 +74,7 @@ pub struct AppendVec {
path: PathBuf,
map: MmapMut,
// This mutex forces append to be single threaded, but concurrent with reads
#[allow(clippy::mutex_atomic)]
append_offset: Mutex<usize>,
current_len: AtomicUsize,
file_size: u64,
@ -137,6 +138,10 @@ impl AppendVec {
}
}
pub fn flush(&self) -> io::Result<()> {
self.map.flush()
}
#[allow(clippy::mutex_atomic)]
pub fn reset(&self) {
// This mutex forces append to be single threaded, but concurrent with reads
@ -167,6 +172,7 @@ impl AppendVec {
PathBuf::from(&format!("{}.{}", fork_id, id))
}
#[allow(clippy::mutex_atomic)]
pub fn set_file<P: AsRef<Path>>(&mut self, path: P) -> io::Result<()> {
self.path = path.as_ref().to_path_buf();
let data = OpenOptions::new()
@ -381,19 +387,11 @@ impl Serialize for AppendVec {
S: serde::ser::Serializer,
{
use serde::ser::Error;
let len = serialized_size(&self.path).unwrap()
+ std::mem::size_of::<u64>() as u64
+ std::mem::size_of::<u64>() as u64
+ std::mem::size_of::<usize>() as u64;
let len = std::mem::size_of::<usize>() as u64;
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
self.map.flush().map_err(Error::custom)?;
serialize_into(&mut wr, &self.path).map_err(Error::custom)?;
serialize_into(&mut wr, &(self.current_len.load(Ordering::Relaxed) as u64))
.map_err(Error::custom)?;
serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?;
let offset = *self.append_offset.lock().unwrap();
serialize_into(&mut wr, &offset).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
@ -417,19 +415,14 @@ impl<'a> serde::de::Visitor<'a> for AppendVecVisitor {
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
// TODO: this path does not need to be serialized, can remove
let path: PathBuf = deserialize_from(&mut rd).map_err(Error::custom)?;
let current_len: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let offset: usize = deserialize_from(&mut rd).map_err(Error::custom)?;
let current_len: usize = deserialize_from(&mut rd).map_err(Error::custom)?;
let map = MmapMut::map_anon(1).map_err(|e| Error::custom(e.to_string()))?;
Ok(AppendVec {
path,
path: PathBuf::from(String::default()),
map,
append_offset: Mutex::new(offset),
current_len: AtomicUsize::new(current_len as usize),
file_size,
append_offset: Mutex::new(current_len),
current_len: AtomicUsize::new(current_len),
file_size: current_len as u64,
})
}
}
@ -519,44 +512,6 @@ pub mod tests {
);
}
#[test]
fn test_append_vec_serialize() {
let path = get_append_vec_path("test_append_serialize");
let av: AppendVec = AppendVec::new(&Path::new(&path.path).join("0"), true, 1024 * 1024);
let account1 = create_test_account(1);
let index1 = av.append_account_test(&account1).unwrap();
assert_eq!(index1, 0);
assert_eq!(av.get_account_test(index1).unwrap(), account1);
let account2 = create_test_account(2);
let index2 = av.append_account_test(&account2).unwrap();
assert_eq!(av.get_account_test(index2).unwrap(), account2);
assert_eq!(av.get_account_test(index1).unwrap(), account1);
let append_vec_path = &av.path;
// Serialize the AppendVec
let mut writer = Cursor::new(vec![]);
serialize_into(&mut writer, &av).unwrap();
// Deserialize the AppendVec
let buf = writer.into_inner();
let mut reader = Cursor::new(&buf[..]);
let mut dav: AppendVec = deserialize_from(&mut reader).unwrap();
// Set the AppendVec path
dav.set_file(append_vec_path).unwrap();
assert_eq!(dav.get_account_test(index2).unwrap(), account2);
assert_eq!(dav.get_account_test(index1).unwrap(), account1);
drop(dav);
// Dropping dav above blows away underlying file's directory entry, so
// trying to set the file will fail
let mut reader = Cursor::new(&buf[..]);
let mut dav: AppendVec = deserialize_from(&mut reader).unwrap();
assert!(dav.set_file(append_vec_path).is_err());
}
#[test]
fn test_relative_path() {
let relative_path = AppendVec::new_relative_path(0, 2);