stop copying Blooms (#3379)

* stop copying Blooms

* fixup

* clippy
This commit is contained in:
Rob Walker 2019-03-20 11:06:39 -07:00 committed by GitHub
parent 13c9d3d4e1
commit df9fd2bc0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 115 additions and 43 deletions

View File

@ -130,14 +130,13 @@ pub fn process_blocktree(
warn!("entry0 not present"); warn!("entry0 not present");
return Err(BlocktreeProcessorError::LedgerVerificationFailed); return Err(BlocktreeProcessorError::LedgerVerificationFailed);
} }
let entry0 = &entries[0]; let entry0 = entries.remove(0);
if !(entry0.is_tick() && entry0.verify(&last_entry_hash)) { if !(entry0.is_tick() && entry0.verify(&last_entry_hash)) {
warn!("Ledger proof of history failed at entry0"); warn!("Ledger proof of history failed at entry0");
return Err(BlocktreeProcessorError::LedgerVerificationFailed); return Err(BlocktreeProcessorError::LedgerVerificationFailed);
} }
last_entry_hash = entry0.hash; last_entry_hash = entry0.hash;
entry_height += 1; entry_height += 1;
entries = entries.drain(1..).collect();
} }
if !entries.is_empty() { if !entries.is_empty() {

BIN
runtime/grow Normal file

Binary file not shown.

View File

@ -247,6 +247,7 @@ impl Bank {
// freeze is a one-way trip, idempotent // freeze is a one-way trip, idempotent
*hash = self.hash_internal_state(); *hash = self.hash_internal_state();
} }
self.status_cache.write().unwrap().freeze();
} }
/// squash the parent's state up into this Bank, /// squash the parent's state up into this Bank,

View File

@ -1,11 +1,13 @@
use crate::bloom::{Bloom, BloomHashIndex}; use crate::bloom::{Bloom, BloomHashIndex};
use hashbrown::HashMap; use hashbrown::HashMap;
use log::*;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::Deref; use std::ops::Deref;
#[cfg(test)] #[cfg(test)]
use std::ops::DerefMut; use std::ops::DerefMut;
use std::sync::Arc;
/// Each cache entry is designed to span ~1 second of signatures /// Each cache entry is designed to span ~1 second of signatures
const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS; const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS;
@ -13,15 +15,51 @@ const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS;
type FailureMap<T> = HashMap<Signature, T>; type FailureMap<T> = HashMap<Signature, T>;
#[derive(Clone)] #[derive(Clone)]
pub struct StatusCache<T> { struct Status<T> {
/// all signatures seen at this checkpoint /// all signatures seen during a hash period
signatures: Bloom<Signature>, signatures: Bloom<Signature>,
/// failures /// failures
failures: FailureMap<T>, failures: FailureMap<T>,
}
/// Merges are empty unless this is the root checkpoint which cannot be unrolled impl<T: Clone> Status<T> {
merges: VecDeque<StatusCache<T>>, fn new(blockhash: &Hash) -> Self {
let keys = (0..27).map(|i| blockhash.hash_at_index(i)).collect();
Status {
signatures: Bloom::new(38_340_234, keys),
failures: HashMap::default(),
}
}
fn has_signature(&self, sig: &Signature) -> bool {
self.signatures.contains(&sig)
}
fn add(&mut self, sig: &Signature) {
self.signatures.add(&sig);
}
fn clear(&mut self) {
self.failures.clear();
self.signatures.clear();
}
pub fn get_signature_status(&self, sig: &Signature) -> Option<Result<(), T>> {
if let Some(res) = self.failures.get(sig) {
return Some(Err(res.clone()));
} else if self.signatures.contains(sig) {
return Some(Ok(()));
}
None
}
}
#[derive(Clone)]
pub struct StatusCache<T> {
/// currently active status
active: Option<Status<T>>,
/// merges cover previous periods, and are read-only
merges: VecDeque<Arc<Status<T>>>,
} }
impl<T: Clone> Default for StatusCache<T> { impl<T: Clone> Default for StatusCache<T> {
@ -32,11 +70,9 @@ impl<T: Clone> Default for StatusCache<T> {
impl<T: Clone> StatusCache<T> { impl<T: Clone> StatusCache<T> {
pub fn new(blockhash: &Hash) -> Self { pub fn new(blockhash: &Hash) -> Self {
let keys = (0..27).map(|i| blockhash.hash_at_index(i)).collect();
Self { Self {
signatures: Bloom::new(38_340_234, keys), active: Some(Status::new(blockhash)),
failures: HashMap::new(), merges: VecDeque::default(),
merges: VecDeque::new(),
} }
} }
fn has_signature_merged(&self, sig: &Signature) -> bool { fn has_signature_merged(&self, sig: &Signature) -> bool {
@ -49,23 +85,40 @@ impl<T: Clone> StatusCache<T> {
} }
/// test if a signature is known /// test if a signature is known
pub fn has_signature(&self, sig: &Signature) -> bool { pub fn has_signature(&self, sig: &Signature) -> bool {
self.signatures.contains(&sig) || self.has_signature_merged(sig) self.active
.as_ref()
.map_or(false, |active| active.has_signature(&sig))
|| self.has_signature_merged(sig)
} }
/// add a signature /// add a signature
pub fn add(&mut self, sig: &Signature) { pub fn add(&mut self, sig: &Signature) {
self.signatures.add(&sig) if let Some(active) = self.active.as_mut() {
active.add(&sig);
}
} }
/// Save an error status for a signature /// Save an error status for a signature
pub fn save_failure_status(&mut self, sig: &Signature, err: T) { pub fn save_failure_status(&mut self, sig: &Signature, err: T) {
assert!(self.has_signature(sig), "sig not found"); assert!(
self.failures.insert(*sig, err); self.active
.as_ref()
.map_or(false, |active| active.has_signature(sig)),
"sig not found"
);
self.active
.as_mut()
.map(|active| active.failures.insert(*sig, err));
} }
/// Forget all signatures. Useful for benchmarking. /// Forget all signatures. Useful for benchmarking.
pub fn clear(&mut self) { pub fn clear(&mut self) {
self.failures.clear(); if let Some(active) = self.active.as_mut() {
self.signatures.clear(); active.clear();
}
self.merges = VecDeque::new(); self.merges = VecDeque::new();
} }
fn get_signature_status_merged(&self, sig: &Signature) -> Option<Result<(), T>> { fn get_signature_status_merged(&self, sig: &Signature) -> Option<Result<(), T>> {
for c in &self.merges { for c in &self.merges {
if c.has_signature(sig) { if c.has_signature(sig) {
@ -75,32 +128,30 @@ impl<T: Clone> StatusCache<T> {
None None
} }
pub fn get_signature_status(&self, sig: &Signature) -> Option<Result<(), T>> { pub fn get_signature_status(&self, sig: &Signature) -> Option<Result<(), T>> {
if let Some(res) = self.failures.get(sig) { self.active
return Some(Err(res.clone())); .as_ref()
} else if self.signatures.contains(sig) { .and_then(|active| active.get_signature_status(sig))
return Some(Ok(())); .or_else(|| self.get_signature_status_merged(sig))
}
self.get_signature_status_merged(sig)
} }
fn squash_parent_is_full(&mut self, parent: &Self) -> bool { fn squash_parent_is_full(&mut self, parent: &Self) -> bool {
// flatten and squash the parent and its merges into self.merges, // flatten and squash the parent and its merges into self.merges,
// returns true if self is full // returns true if self is full
if parent.active.is_some() {
self.merges.push_back(StatusCache { warn!("=========== FIXME: squash() on an active parent! ================");
signatures: parent.signatures.clone(),
failures: parent.failures.clone(),
merges: VecDeque::new(),
});
for merge in &parent.merges {
self.merges.push_back(StatusCache {
signatures: merge.signatures.clone(),
failures: merge.failures.clone(),
merges: VecDeque::new(),
});
} }
self.merges.truncate(MAX_CACHE_ENTRIES); // TODO: put this assert back in
//assert!(parent.active.is_none());
if self.merges.len() < MAX_CACHE_ENTRIES {
for merge in parent
.merges
.iter()
.take(MAX_CACHE_ENTRIES - self.merges.len())
{
self.merges.push_back(merge.clone());
}
}
self.merges.len() == MAX_CACHE_ENTRIES self.merges.len() == MAX_CACHE_ENTRIES
} }
@ -119,15 +170,21 @@ impl<T: Clone> StatusCache<T> {
/// Crate a new cache, pushing the old cache into the merged queue /// Crate a new cache, pushing the old cache into the merged queue
pub fn new_cache(&mut self, blockhash: &Hash) { pub fn new_cache(&mut self, blockhash: &Hash) {
let mut old = Self::new(blockhash); assert!(self.active.is_some());
std::mem::swap(&mut old.signatures, &mut self.signatures); let merge = self.active.replace(Status::new(blockhash));
std::mem::swap(&mut old.failures, &mut self.failures);
assert!(old.merges.is_empty()); self.merges.push_front(Arc::new(merge.unwrap()));
self.merges.push_front(old);
if self.merges.len() > MAX_CACHE_ENTRIES { if self.merges.len() > MAX_CACHE_ENTRIES {
self.merges.pop_back(); self.merges.pop_back();
} }
} }
pub fn freeze(&mut self) {
if let Some(active) = self.active.take() {
self.merges.push_front(Arc::new(active));
}
}
pub fn get_signature_status_all<U>( pub fn get_signature_status_all<U>(
checkpoints: &[U], checkpoints: &[U],
signature: &Signature, signature: &Signature,
@ -246,7 +303,7 @@ mod tests {
let blockhash = hash(blockhash.as_ref()); let blockhash = hash(blockhash.as_ref());
let mut second = BankStatusCache::new(&blockhash); let mut second = BankStatusCache::new(&blockhash);
first.freeze();
second.squash(&[&first]); second.squash(&[&first]);
assert_eq!(second.get_signature_status(&sig), Some(Ok(()))); assert_eq!(second.get_signature_status(&sig), Some(Ok(())));
@ -254,7 +311,6 @@ mod tests {
} }
#[test] #[test]
#[ignore] // takes a lot of time or RAM or both..
fn test_status_cache_squash_overflow() { fn test_status_cache_squash_overflow() {
let mut blockhash = hash(Hash::default().as_ref()); let mut blockhash = hash(Hash::default().as_ref());
let mut cache = BankStatusCache::new(&blockhash); let mut cache = BankStatusCache::new(&blockhash);
@ -263,7 +319,9 @@ mod tests {
.map(|_| { .map(|_| {
blockhash = hash(blockhash.as_ref()); blockhash = hash(blockhash.as_ref());
BankStatusCache::new(&blockhash) let mut p = BankStatusCache::new(&blockhash);
p.freeze();
p
}) })
.collect(); .collect();
@ -332,4 +390,18 @@ mod tests {
false false
); );
} }
#[test]
fn test_status_cache_freeze() {
let sig = Signature::default();
let blockhash = hash(Hash::default().as_ref());
let mut cache: StatusCache<()> = StatusCache::new(&blockhash);
cache.freeze();
cache.freeze();
cache.add(&sig);
assert_eq!(cache.has_signature(&sig), false);
}
} }